// fixed point 一个术语,代表 plan tree not changed after applying all rules. 即达到一个固定点,应该停止循环了。
// 具体到代码来说,有几个触发点,1.达到matchLimit 2.遍历rules不会有新的transform产生,即下面代码中的 continue
booleanfixedPoint;do{Iterator<HepRelVertex>iter=getGraphIterator(programState,requireNonNull(root,"root"));fixedPoint=true;while(iter.hasNext()){HepRelVertexvertex=iter.next();for(RelOptRulerule:rules){HepRelVertexnewVertex=applyRule(rule,vertex,forceConversions);if(newVertex==null||newVertex==vertex){continue;}++nMatches;if(nMatches>=programState.matchLimit){return;}// ARBITRARY 或 DEPTH_FIRST 时,为 false,否则为,BOTTOM_UP 或 TOP_DOWN,代表每次 restart 都从 root 节点开始
if(fullRestartAfterTransformation){iter=getGraphIterator(programState,requireNonNull(root,"root"));}else{// To the extent possible, pick up where we left
// off; have to create a new iterator because old
// one was invalidated by transformation.
iter=getGraphIterator(programState,newVertex);if(programState.matchOrder==HepMatchOrder.DEPTH_FIRST){nMatches=depthFirstApply(programState,iter,rules,forceConversions,nMatches);if(nMatches>=programState.matchLimit){return;}}// Remember to go around again since we're
// skipping some stuff.
fixedPoint=false;}break;}}}while(!fixedPoint);
privateHepRelVertexapplyRule(RelOptRulerule,HepRelVertexvertex,booleanforceConversions){if(!graph.vertexSet().contains(vertex)){returnnull;}RelTraitparentTrait=null;List<RelNode>parents=null;// ... 省略 ConverterRule, CommonRelSubExprRule 处理逻辑
finalList<RelNode>bindings=newArrayList<>();finalMap<RelNode,List<RelNode>>nodeChildren=newHashMap<>();booleanmatch=matchOperands(rule.getOperand(),vertex.getCurrentRel(),bindings,nodeChildren);if(!match){returnnull;}HepRuleCallcall=newHepRuleCall(this,rule.getOperand(),bindings.toArray(newRelNode[0]),nodeChildren,parents);// Allow the rule to apply its own side-conditions.
if(!rule.matches(call)){returnnull;}fireRule(call);if(!call.getResults().isEmpty()){returnapplyTransformationResults(vertex,call,parentTrait);}returnnull;}
// 只配置 PushPredicateThroughJoin rule
test("joins: push to either side"){valx=testRelation.subquery("x")valy=testRelation.subquery("y")valoriginalQuery={x.join(y).where("x.b".attr===1).where("y.b".attr===2)}.select("x.b")valoptimized=Optimize.execute(originalQuery.analyze)valleft=testRelation.where($"b"===1)valright=testRelation.where($"b"===2)valcorrectAnswer=left.join(right).analyzecomparePlans(optimized,correctAnswer)}objectPushPredicateThroughJoinextendsRule[LogicalPlan]withPredicateHelper{defapply(plan:LogicalPlan):LogicalPlan=plantransformapplyLocallyvalapplyLocally:PartialFunction[LogicalPlan, LogicalPlan]={// push the where condition down into join filter
casef@Filter(filterCondition,Join(left,right,joinType,joinCondition,hint))ifcanPushThrough(joinType)=>// ...
// push down the join filter into sub query scanning if applicable
casej@Join(left,right,joinType,joinCondition,hint)ifcanPushThrough(joinType)=>// ...
}// 最初输入的 plan tree
'Project [unresolvedalias(x.b, None)]+- 'Filter ('y.b=2)+- 'Filter ('x.b=1)+- 'Join Inner:-SubqueryAliasx:+-LocalRelation<empty>,[a#0, b#1, c#2]+-SubqueryAliasy+-LocalRelation<empty>,[a#0, b#1, c#2]// 第一次匹配到的 plan tree
Filter(b#1=1)+-JoinInner:-LocalRelation<empty>,[a#0, b#1, c#2]+-LocalRelation<empty>,[a#10, b#11, c#12]// 第二次匹配到的 plan tree
Filter(b#11=2)+-JoinInner:-Filter(b#1=1):+-LocalRelation<empty>,[a#0, b#1, c#2]+-LocalRelation<empty>,[a#10, b#11, c#12]// 第三次次匹配到的 plan tree
JoinInner:-Filter(b#1=1):+-LocalRelation<empty>,[a#0, b#1, c#2]+-Filter(b#11=2)+-LocalRelation<empty>,[a#10, b#11, c#12]// apply 完毕的 plan tree
Project[x.bASx.b#13]+-JoinInner:-Filter(b#1=1):+-LocalRelation<empty>,[a#0, b#1, c#2]+-Filter(b#11=2)+-LocalRelation<empty>,[a#10, b#11, c#12]
总结一下上面的流程,catalyst 匹配 rule 的思路和 calcite 类似,遍历 plan tree 的每个 node,依次尝试匹配。但是 catalyst 是直接在原 LogicalPlan 上直接更新,而 calcite 通过将 apply 结果写入到 result 中,再进行选择。
/**
* Executes the batches of rules defined by the subclass. The batches are executed serially
* using the defined execution strategy. Within each batch, rules are also executed serially.
*/defexecute(plan:TreeType):TreeType={varcurPlan=planbatches.foreach{batch=>variteration=1varlastPlan=curPlanvarcontinue=true// Run until fix point (or the max number of iterations as specified in the strategy.
while(continue){// 遍历 rules 依次 apply, result 作为下次 input
curPlan=batch.rules.foldLeft(curPlan){case(plan,rule)=>// apply rule 返回结果
valresult=rule(plan)result}// 达到当前 batch maxIterations,break 执行下个 batch
iteration+=1if(iteration>batch.strategy.maxIterations){continue=false}// 当前 batch rules 执行完后,plan tree not changed,代表达到 fixed point,当前 batch 无需循环执行了
if(curPlan.fastEquals(lastPlan)){logTrace(s"Fixed point reached for batch ${batch.name} after ${iteration-1} iterations.")continue=false}// track 当前 batch 的优化结果,用于上面的 fixed point 判断
lastPlan=curPlan}}curPlan}
/// The pattern tree to match a plan tree. It defined in `Rule` and used in `PatternMatcher`.
pubstructPattern{/// The root node predicate, not contains the children.
pubpredicate: fn(&PlanRef)-> bool,/// The children's predicate of current node.
pubchildren: PatternChildrenPredicate,}pubenumPatternChildrenPredicate{/// All children and their children are matched and will be collected as
/// `OptExprNode::PlanRef`. Currently used in one-time-applied rule.
MatchedRecursive,/// All children will be evaluated in `PatternMatcher`, if pattern match, node will collected
/// as `OptExprNode::PlanRef`. if vec is empty, it means no children are matched and
/// collected.
Predicate(Vec<Pattern>),/// We don't care the children, and them will be collected as existing nodes
/// `OptExprNode::OptExpr` in OptExpr tree.
None,}
pubfnfind_best(&mutself)-> PlanRef{letbatches=self.batches.clone().into_iter();forbatchinbatches{letmutiteration=1_usize;// fixed_point means plan tree not changed after applying all rules.
letmutfixed_point=false;// run until fix point or reach the max number of iterations as specified in the strategy.
while!fixed_point{println!("-----------------------------------------------------");println!("Start Batch: {}, iteration: {}",batch.name,iteration);fixed_point=self.apply_batch(&batch);// max_iteration check priority is higher than fixed_point.
iteration+=1;ifiteration>batch.strategy.max_iteration{println!("Max iteration {} reached for batch: {}",iteration-1,batch.name);break;}// if the plan tree not changed after applying all rules,
// it reaches fix point, should stop.
iffixed_point{println!("Fixed point reached for batch: {}, after {} iterations",batch.name,iteration-1);break;}}}self.graph.to_plan()}pubfnapply_batch(&mutself,batch: &HepBatch)-> bool{letoriginal_plan=self.graph.to_plan();// for each rule will apply each node in graph.
forruleinbatch.rules.iter(){fornode_idinself.graph.nodes_iter(batch.strategy.match_order){if!self.apply_rule(rule.clone(),node_id){// not matched, will try next rule
continue;}println!("After apply plan tree:\n{}",pretty_plan_tree_string(&*self.graph.to_plan()));// if the rule is applied, continue to try next rule in batch,
// max_iteration only controls the iteration num of a batch.
println!("Try next rule in batch ...");break;}}// Compare the two plan trees, if they are the same, it means the plan tree not changed
letnew_plan=self.graph.to_plan();letreach_fixed_point=original_plan==new_plan;println!("Batch: {} finished, reach_fixed_point: {}",batch.name,reach_fixed_point);reach_fixed_point}/// return true if the rule is applied which means the rule matched and the plan tree changed.
fnapply_rule(&mutself,rule: RuleImpl,node_id: HepNodeId)-> bool{letmatcher=HepMatcher::new(rule.pattern(),node_id,&self.graph);ifletSome(opt_expr)=matcher.match_opt_expr(){letmutsubstitute=Substitute::default();letopt_expr_root=opt_expr.root.clone();rule.apply(opt_expr,&mutsubstitute);if!substitute.opt_exprs.is_empty(){assert!(substitute.opt_exprs.len()==1);self.graph.replace_node(node_id,substitute.opt_exprs[0].clone());println!("Apply {:?} at node {:?}: {:?}",rule,node_id,opt_expr_root);returntrue;}println!("Skip {:?} at node {:?}",rule,node_id);false}else{println!("Skip {:?} at node {:?}",rule,node_id);false}}
left join:语义表明 join 结果数据中 left 边是全量数据,所以 leftFilter 下推到 left 边后,与下推前对结果影响相同,都相当于在全量数据上做 filter。但是,对于 right 边,如果下推了 filter,则本来是对 join 结果数据做 filter,变成了只对 right 边先做 filter,这必然会导致了数据量变大,因为未匹配到的 left 数据仍会保留下来。如下图:
right join:与 left join 下推逻辑类似,只能下推 rightFilter 到 right 边,leftFilter + commonFilter 不能下推。