Optimization Strategy

In this tutorial we look into writing an optimization strategy. Technically, an optimization strategy is class that implements GlobalStrategy, which defines only a single method:

fun optimize(
    query: Query,
    dataCharacteristics: DataCharacteristics,
    params: OptimizationParameters
): OptimizationResult

The query is the description of what the user wants, dataCharacteristics is what we (belive to) know about the incoming data, params are further parameters that we should respect during optimization, e.g., available resources of the compute cluster.

Big Picture

The optimizer takes a query and produces a PhysicalGraph. If that graph is transformed into a Storm Topology, this topology will produce a result that fits the query.

The PhysicalGraph consists of nodes, edges, and rules. The nodes represent compute units that may store data. These nodes are connected by edges which have a name. The actions that a node performs are configured using the rules. A rule says, for example, if a tuple arrives on edge a, store it in the local state, and send its value + 1 over edge b.

The BinaryTheta Strategy

Let us walk through the BinaryTheta strategy. You can find this in the source code in the package de.unikl.dbis.clash.optimizer.constant. While the optimization algorithm is not that special (well, it only works for join queries over exactly two relations) this serves as example on what the tasks of the optimizer include in terms of producing a PhysicalGraph.

The materialization tree that we are building looks like this:

         result
          /  \
         /    \
  A-Store      B-Store

At first, we create the PhysicalGraph object:

val physicalGraph = PhysicalGraph()

Inputs

Variables relA and relB contain the relations that are inputs of the query, this means, the query looks like SELECT * FROM relA, relB WHERE some_predicate. This data comes from somewhere, e.g. it arrives at the system by reading a file, listening to a Kafka topic, etc. But this is irrelevant to the optimizer, thus InputStubs are used here. In a later step (outside of the optimizer) these stubs are replaced by the actual input readers.

val inputA = physicalGraph.addInputStubFor(relA)
val inputB = physicalGraph.addInputStubFor(relB)

Stores

Then we add the stores for relations a and b.

val storeA = ThetaStore(label(relA), relA, 1)
physicalGraph.relationStores[relA] = storeA
val storeB = ThetaStore(label(relB), relB, 1)
physicalGraph.relationStores[relB] = storeB

Here we initialize both stores with a label (for convenient output), the relation that is stored inside and the degree of parallelism (in this case 1). The stores are put into the relationStores of the physical graph. This is the way, a PhysicalGraph organizes the registered stores. Each store is identified by the stored relation.

Wiring Things Up

Now we have two input stubs and two stores. But they are not connected, yet. We now add individual edges as well as rules for getting data to and from those edges. First, we add the edges for the tuples that are sent to the stores for storing:

val storeAEdge = addEdge(inputA, storeA, EdgeType.SHUFFLE)
inputA.addRule(RelationSendRule(relA, storeAEdge.streamName))
storeA.addRule(RelationReceiveRule(relA, storeAEdge.streamName))
val storeBEdge = addEdge(inputB, storeB, EdgeType.SHUFFLE)
inputB.addRule(RelationSendRule(relB, storeBEdge.streamName))
storeB.addRule(RelationReceiveRule(relB, storeBEdge.streamName))

The function addEdge() is responsible for adding the edge to the sending and receiving node, and also the edge type is set. Once we have an edge, we can use it to construct rules. The two rules for the first relation are RelationSendRule(relA, storeAEdge.streamName) which means “If you have a tuple for relation relA, then send it over the edge storeAEdge.streamName”, and RelationReceiveRule(relA, storeAEdge.streamName) which means “If you receive a tuple over the edge storeAEdge.streamName, then treat it like a tuple belonging to relation relA”. The former is also called (and an instance of) OutRule and the latter of InRule and we register the first one with inputA, because there the tuples are sent, and the second one to storeA, because there the tuples are received. Of course, we do the same for tuples of relB.

Now all arriving tuples end up at the correct stores. But we also need some joining.

val probeAEdge = addEdge(inputB, storeA, EdgeType.ALL)
val predicateEvaluationA = predicatesForStore(storeA, query.result.joinPredicates)
storeA.addRule(JoinResultRule(probeAEdge.streamName, predicateEvaluationA, query.result))
val probeBEdge = addEdge(inputA, storeB, EdgeType.ALL)
val predicateEvaluationB = predicatesForStore(storeB, query.result.joinPredicates)
storeB.addRule(JoinResultRule(probeBEdge.streamName, predicateEvaluationB, query.result))

Here we connect the B with the A-store and vice versa for the actual join processing. Again we add an edge, this time of type ALL. For join processing, we now need the predicates that should be evaluated. At this time we already decide on the concrete evaluation. For example, for the predicate a.x < b.x, the evaluation decides which side of the inequality is the stored tuple, and which is the probe tuple. The method predicatesForStore constructs such an evaluation. With this evaluation, we can construct a JoinResultRule, meaning “If a tuple from edge probeAEdge.streamName arrives, probe it against the locally stored tuples using the predicate evaluation predicateEvaluationA. The resulting tuple belongs to relation query.result”. Again, we do the same for tuples from relA that are sent to the B-store.

Finalizing and Output

The graph stores so called relation producers. These are nodes where tuples of a certain relation are originating. As the desired join is produced at the A- as well as at the B-store, both are registered:

physicalGraph.addRelationProducer(query.result, storeA)
physicalGraph.addRelationProducer(query.result, storeB)

Similar to input stubs, we register output stubs as sink. The user can decide where to put the results, e.g., write it to a Kafka topic or to a file inside a distributed file system. But that is irrelevant to the obtimization.

val outputStub = OutputStub(label(query.result), query.result)
physicalGraph.outputStub = outputStub
val storeAResultEdge = addEdge(storeA, outputStub, EdgeType.SHUFFLE)
storeA.addRule(RelationSendRule(query.result, storeAResultEdge.streamName))
val storeBResultEdge = addEdge(storeB, outputStub, EdgeType.SHUFFLE)
storeB.addRule(RelationSendRule(query.result, storeBResultEdge.streamName))

We create the OutputStub and add it to the physical graph. Then we also connect both result generating nodes to the stub, and add the according RelationSendRule to the stores. We have seen this rule already at the input stubs. This time they are invoked when a new join result is created.