Paper Reading: Apache Calcite: A Foundational Framework for Optimized Query Processing Over Heterogeneous Data Sources
这篇paper是 Apache Calcite: A Foundational Framework for Optimized Query Processing Over Heterogeneous Data Sources, in SIGMOD, 2018。
对应15-721的Query Optimizer Implementation II,主要涉及数据库的查询优化器的具体实现。
Apache Calcite 是一个基础软件框架,为许多开源数据处理系统 提供 query processing、optimization、language 的支持。比如,Hive 、Flink、Storm、Druid。
Introduction
从2005年,列存储、流处理引擎、文本搜索引擎 的崛起,对于各种特定需求的处理系统,出现了两个首要问题:
- 数据系统开发者 都要遇到相同的问题:查询优化、查询语言支持(SQL 或 SQL相关扩展,如流查询)。因此,需要一个统一的框架 来处理类似的问题。
- 数据系统开发者 经常需要整合多种 异构数据源。因此,需要
优化查询系统
能够支持 多种数据源。
Apache Calcite 出现以解决上面的问题。它提供了,完整的查询处理系统,包括 query execution、optimization、query language,但不包括 data storage and management
,而是交给具体的数据引擎。
Paper
paper 整体内容介绍比较浅,因此这部分为概念介绍,代码分析为具体实现介绍
Architecture
正如上面介绍,Calcite 目的清晰,提供一个数据处理的基础框架,如下图,分为4个核心部分:
- Core:Operator expressions (关系代数表达式) + Planner (基于 Volcano/Cascades) = Query Optimizer
- External:Data storage + algorithm + catalog
- Optional:SQL parser + JDBC/ODBC drivers
- Extensible:Planner rewrite rules + statistics + cost model + algebra + UDFs
简单介绍其中几点:
- SQL Parser and Validator,将输入的 SQL 解析成 relational operations tree
- Optimizer 包含 rules、metadata providers 和 planner engines,后面会详细介绍
- 由于 Calcite 不包含 storage layer,因此它提供
adapters
概念,去定义 external storage engine 的 table schemas 和 views,后面会详细介绍 - 一些数据系统支持 SQL,但不具备 optimizer。Calcite 能够将 SQL 优化后的 relational expressions 再写回成 SQL,交回给数据系统。这个 feature 让它可以作为独立的系统 作为其它 数据系统的补充
- Calcite 不仅可以优化 SQL查询,还可以通过直接构造 operator tree 来代替 parser 后的 SQL,具体做法参考 relational expressions builder。
Query Algebra
关系代数是SQL处理与优化的核心,Calcite 除了支持常用的 filter、project、join 等,还可以方便的支持其它 operator,比如 window operator。
Traits
:Calcite 没有区分 logical operator 和 physical operator,而是使用 traits 来描述 operator 的 physical properties。这些 traits 能够帮助优化器去评估不同 plan 的 cost。
举例:在优化阶段,Calcite 会 enforce traits on relational expressions,比如,需求是对某些列的排序,则 operator 可以实现 converter
接口,来表明如何将一个 expression 的 trait 从一个值转换为另一个值。
Calling Convention trait
:这个 trait 代表 expression 将被执行的系统。引入这个 trait 的目的是,通过抽象,透明地优化 可能跨越不同引擎的 查询。
举例:如下图 splunk 和 mysql 的表做 join,scan 在各自的 数据引擎中查询。同时,splunk filter 被 pushdown 了,这个操作是 adapter-specific rule
后面会介绍。同时,还有一点能够体现这个 trait 目的,比如下图中的 join operator 改成 spark engine 去执行,则它变成 spark convention,而它的 input 变成了 jdbc-mysql 和 splunk 到 spark 的 convention。可能有点绕口,后面看代码时,会容易理解点。
Adapters
Adapter 模式是 calcite 定义如何接入不同 data source。如下图:
- model:对 accessed data source 的 physical properties 的 specification
- schema:对 model 中的 data (format / layouts) 的 definition
而 adapter 中定义了一系列的 rules 给 planner,比如:定义 logical expression 转成满足 adapter convention 的 expression。
上一节提到的 Calling Convention 去定义不同 database backend 执行的 operator。比如,adapter 必须实现的最小接口 scan operator 后,calcite 优化器就可以继续使用 client-side 的 operator (sorting / filtering / join) 去继续 evaluate 底层表。
enumerable
:calcite 定义了一种 calling convention。目的是,让 operator 可以通过 iterator 接口访问底层 tuples。
Adapter 抽象带来的优势:可以支持多个后端的优化查询。
Query Processing And Optimization
Query Optimizer 是 Calcite 的核心组件,正如之前 Cascades 介绍中提到的,calcite 实现了基于 volcano 的优化器设计
- Planner rules:calcite 包含几百条 optimization rules,同时,对于依赖 calcite 的数据系统来说,重写自己特定的 rules也是很普遍的。比如,predicate pushdown 对于不同 adapter 来说,需要根据自身类型重写自己的 rule。举例来说,data source 是否包含 partition,partitions 排序是怎么样的
- Metadata providers:提供给 rules 所需要的 信息。比如,expression 估计会返回的行数与数据大小(cardinality),当前可执行的最大并行度。同时,calcite 支持 pluggable metadata provider
- Planner engines:calcite 支持 RBO(Rule-Based Optimizer) 和 CBO(Cost-Based Optimize),这里不在重复,可参考 cascade 那篇介绍
- Materialized views:calcite 支持优化时 考虑物化视图。即,优化器中包含事先注册好的 mv,当用户的查询能够命中 mv 后,会直接去 mv 拿数据,相当于缓存。实现中需要考虑的点还有很多,比如,如何判断命中物化视图(mv包含一些filter条件时),如何进行view改写,如何进行物化视图注册(lattices,自动收集统计星型模型和雪花模型的表,自动构建部分 cube 的物化视图),具体内容参考总结部分
Embedded Calcite
calcite 提供许多灵活性,比如作为其它数据系统的 library,提供了如下功能:
- query language:面向用户的查询语言接口,如 SQL、Streaming SQL、extensions
- jdbc driver
- SQL parser and validator
- query algebra to represent operations over data
- execution engine:calcite’s operators (enumerable)
如下图,展示了 引用 Calcite 的软件,其中执行引擎部分,可以使用自己 Native 的引擎,或者外部其它系统的引擎,或者 calcite enumerable operators
Calcite Adapters
calcite 灵活性的另外一点体现,通过不同的 adapter 与不同 data source 数据交互。如下图,展示的各种 adapters。
adapter 的核心组件是 converter,用来 translate algebra expression 成目的数据系统 支持的查询语言。如,Spark Adapter,它的产出是 RDD。
代码分析
首先,需要明确 Calcite 的一些术语 和 常用类,建议先跟着官方Calcite tutorial at BOSS 2021写一遍 demo,这里也会记录其中的一些知识点。
overview
如下图,数据库的查询处理器的核心组件,不清楚的可以回顾下15721 slides,其中:
- Rules:常见的 project/filter push down。calcite 包含 100+ 的 rules
- Metadata(Cost, Statistics):cardinality 和 filter selectivity 等其它的 properties
如下图,是 calcite 的核心 components,构成了它的查询处理器,其中:
- SqlParser:接收 SQL query 解析成 SqlNode(对应AST)
- SqlValidator:接收 SqlNode,利用 CatalogReader 获取 Schema,去 validate query 里的 identifier,生成 Validated SqlNode
- SqlToRelConverter:接收 Validated SqlNode,去 convert 成 RelNode(Relational Expression),即关系代数,代表对数据的处理操作,如 sort/join/project/filter/scan
- RelOptPlanner:接收 RelNode,利用提供的 RelRule + RelMetadata Provider 将关系表达式 转换为 逻辑等价 的 lowest cost 关系表达式
- RelRunner:外部的 storage engine 接收 RelNode,并执行计划,得到 results
有了基本概念后,按照 demo 可以实现一个,基于 Lucene adapter 的查询处理器,参考
观察代码,发现其中用到 Lucene 的只有 LuceneTable,其它部分都是 calcite core 中已经实现的。使用者只有利用好其扩展性,实现具体接口,就可以快速搭建起自己的 optimizer。
回到 demo 代码,主要分为下面几个部分,会分别介绍
SqlParser&SqlValidator
一个 Query SQL 进入数据库后,第一步就是 SQL parser + validator
Parser:将 sql 转换成 ast,其中包含了 语法/词法 解析,保证 sql 的合法性。如下代码
|
|
其中,SqlParser 可以传入 config 来支持不同 database SQL 的特殊语法,比如 Lex(Oracle/Mysql)、Sql兼容模式(SQL:92/SQL:99/BigQuery/…)、caseSensitive、quoting 等等
Validator:SQL语法没问题后,就需要校验其中的 identifier 合法性。15721中用 binder 读取 system catalog 来表示这个过程,calcite 中也是类似过程。如下伪代码
|
|
到这里,一条 SQL 已经变成一个 合法的 AST,可以进入后续流程
AST to LogicalPlan
这一步,SqlNode 会经过 SqlToRelConverter 转成 LogicalPlan,如下代码
第一步,创建 Optimizer Cluster,即 RelOptCluster,包含了 planner 和 运行时环境,其中一些概念:
- RelTraitDef:代表一种 RelTrait(即,relational expression trait),目前只有三种
- ConventionTraitDef:调用约定的定义。上面 volcanoPlanner.addRelTraitDef 会将它加入 traitDefs,在 planner.addRule 时进行 contains 判断
- RelCollationTraitDef:ordering trait 定义
- RelDistributionTraitDef:distribution trait 定义
- RexBuilder:构建 RexNode,即 row-level expression. 对一行数据的处理逻辑,比如 RexInputRef、RexLiteral
|
|
第二步,构建 SqlToRelConverter,其中一些概念
- SqlToRelConverter:SqlNode 转成 relational algebra expression (包含一系列 RelNode)
- StandardConvertletTable:一系列 SqlNode 到 RelNode 的转换规则
|
|
第三步,convert SqlNode,注意其中 convertQuery 返回的是 RelRoot 类型,它的存在也很特别,通过在 SqlNode 最外层嵌套一层, 解决了一些特殊 SQL,建议看源码comments
同时,SqlToRelConverter中包含了一个成熟 Binder 的所有逻辑,可以作为很好的参考资料。
|
|
LogicalPlan to PhysicalPlan
正如,Cascades 那篇提到的,有了 LogicalPlan 之后,就需要进入 Optimizer 进行优化,还是贴一下 15721 中的 arch overview,帮助构建整体体系。
第一步,为 planner 添加 rules,如下的两种 Rules:
- CoreRules:logical transformation,即 等价的 LogicalPlan 转换
- EnumerableRules:logical plan to Enumerable calling convention
重点,这里的 enumerable calling convention,原文解释是 Family of calling conventions that return results as an org.apache.calcite.linq4j.Enumerable.
而 convention 也是 Calcite 的核心概念之一,它是一种类型的 trait,代表着 a single data source
,目前4中实现:
- Convention.Impl:空白的 convention 实现,可以用于其它 convention 继承
- JdbcConvention:继承于 Convention.Impl,代表建立在 JDBC Database 之上的 调用约定
- InterpretableConvention:返回 linq4j.Enumerable,但是不通过
codegen
执行 - BindableConvention:和 InterpretableConvention 类似,返回 linq4j.Enumerable,但是不通过
codegen
执行 - EnumerableConvention:返回 linq4j.Enumerable,通过
codegen
去执行,一般会使用这个,其它convention也可以转成它
这里,有个疑问,Bindable vs Enumerable,到底有什么区别?,可以参考作者的解释,简要来说,如下区别
- BINDABLE runs using an interpreter and returns Object[]s
- ENUMERABLE runs by generating a Java AST, compiling it, and generating an Enumerable over Object[], synthetic classes, boxed primitives (e.g. java.lang.Integer) or empty List.
- calling conventions correspond to runtime engines: The Interpreter implements BINDABLE, and the generated-Java-enumerable engine implements ENUMERABLE. However, Interpreter is not the only thing that can produce BINDABLE.
- Both engines (interpreter and enumerable) will implement 100% of the core algebra. The goals of the interpreter are simplicity of its code and correctness. Performance is not a goal.
|
|
PhysicalPlan to ExecutablePlan
这一步,需要将优化后的plan,进行执行,正如上面提到的 convention,我们可以采用 解释执行 或 编译执行 的方式,一般选用编译执行方式来减少虚函数调用,提高CPU的IPC,可以参考总结资料里的Interpretation vs Compilation
加深理解,之前几篇 paper reading 也提到过相关话题
|
|
Hybrid planning
这一部分,还是上面提到的官方demo中的内容,如何实现混合多种 DataSrouce 的 query planning,以学习如何集成一个新的 adapter
Calling Convention
Calling Convention 理解:physical format of data + how is processed and passed from one operator to another
最初所有的 nodes 都来属于 logical convention,也就是 Raw Relational Algebra,没有具体的物理实现,因此它有着 infinity cost
我们熟悉的 physical plan 也属于一种 call convention,它通过 rules 进行 node 的转换,同时一个 node 可能也包含多个 convention 去实现
Converter
为了 convention 的转换,通过在 nodes 之间插入 converter
来进行转换。如下图中的,Blue to Logical Converter 和 Green to Logical Converter
custom operators/rules
这里需要实现一个 Lucene Convention 来 filter operator,如下图,我们需要实现相应的 operator 和 rules
LuceneRel
1.定义 interface LuceneRel extends RelNode
来表示 Lucene operator
定义 Convention LUCENE = new Convention.Impl("LUCENE", LuceneRel.class);
来代表 Lucene Calling Convention
LuceneTableScan
2.实现 class LuceneTableScan extends TableScan implements LuceneRel
LuceneTableScan 代表实现 Lucene Convention 的 TableScan 类型,后续在 rule 里会定义何时使用
LuceneTableScanRule
3.实现 class LuceneTableScanRule extends ConverterRule
定义 LuceneTableScan 如何 convert
其中 rule 定义的匹配规则 如下,注意 withConversion 的定义:
- clazz:定义可以操作这条 rule 的类,比如这里的 LogicalTableScan
- in:定义这条 rule 的输入 trait,比如这里的 Convention.NONE 代表 logical convention
- out:定义这条 rule 会输出的 trait,比如这里的 LUCENE Calling Convention
|
|
至此,可以将 LuceneTableScanRule 加入 planner 观察会发生什么
|
|
如下图,LogicalTableScan 所在的 group,它的 parent nodes rel#41 EnumerableHashJoin 和 rel#27 EnumerableHashJoin 都在要求一个 Enumerable Convention 的 operator,但是我们当前没有这种,因为我们是自定义的 TableScan 返回的是 Lucene Convention
因此,需要在不同 Convention 之间加入 converter 来实现转换,在第4步介绍
LuceneToEnumerableConverterRule
4.实现 class LuceneToEnumerableConverterRule extends ConverterRule
定义 将 Lucene Convention 转成 Enumerable Convention
如下匹配规则
|
|
LuceneToEnumerableConverter
5.实现 class LuceneToEnumerableConverter extends ConverterImpl implements EnumerableRel
来实现第4步中的需要的 convert 逻辑
其中 EnumerableRel
是为了实现 codegen,compiler 能力由 embedded Java compiler (Janino) 支持
为了debug Janino的代码,可以通过如下设置
|
|
注意当前版本的 Calcite 不支持 -Dorg.codehaus.janino.source_debugging.keep=true
,因此需要在代码结尾打断点,通过debug方式运行,再去查看codegen代码
至此,Lucene Converntion 已经可以基本的运行了,下一步实现 Lucene filter operator
LuceneFilter
6.实现 class LuceneFilter extends Filter implements LuceneRel
来实现 Lucene 的 filter operator
注意这个 filter operator 其实也实现了 pushdown,逻辑在上面 LuceneToEnumerableConverter 中实现的,如下代码,将 query 最终传给 luceneEnumerable 从而达到 pushdown 效果
|
|
LuceneFilter 的主要逻辑,就是实现 LuceneRel#implement 方法,构造 lucene query,然后提供给 LuceneToEnumerableConverter 使用,如下代码
|
|
同时可以看到,LuceneTableScan 构造的 Result query 是 MatchAllDocsQuery,而为什么没有直接把 filter pushdown 放到 TableScan 中呢,猜测理论上是可以的,只是当前设计没有这么做
从上面这点看出,对于自定义的 Convention 核心是 LuceneEnumerable
,利用它,将一系列 Lucene Operators 转换成 Enumerable,在与上层 operator 结合起来使用
对应到这里, LuceneFilter 和 LuceneTableScan 合力去构造 LuceneRel#Result,然后提供给 LuceneToEnumerableConverter 进行转化
RexToLuceneTranslator
7.定义 class RexToLuceneTranslator extends RexVisitorImpl<Query>
将一个 Calcite Filter 转成 Lucene Query,来补充 LuceneFilter 里的 query 构造逻辑
它的核心逻辑,解析 RexCall 里的 colRef 和 literal,在组装成 Lucene 的 query,demo 中为了简单,只考虑了 equals 的一种情况
LuceneFilterRule
8.定义 class LuceneFilterRule extends ConverterRule
来决定何时 convert,如下匹配规则
|
|
LuceneFilterChecker
- 定义
class LuceneFilterChecker extends RexVisitorImpl<Boolean>
来实现 LuceneFilterRule 的匹配逻辑
这里也继承了 RexVisitorImpl 来去 visit RexCall
最终PhysicalPlan
|
|
总结
- 有些概念术语,对于不是 native speaker 的我们来说,确实有些难懂,比如 convention,queryable/scanable 等
- 看 paper 和看代码 是两回事,尤其是你第一次去看 calcite 代码时候。因此,对 paper 中没有理解的概念,可以通过其它渠道资料补充,这里推荐我收集的calcite-playground资料,筛选了一些作者收集的资料和demo代码
- 这一篇只是 Calcite Overview 介绍,目的让作者总览 Calcite 核心组件 与 如何集成使用。对于它的一些核心技术:Volcano Planner Internals,SubQuery Optimization,Materialized Views 会在后续继续学习