Impala 源代码目录结构

SQL 解析
Impala 的 SQL 解析与执行计划生成部分是由 impala-frontend(Java)实现的,监听端口是 21000。用户通过
Beeswax 接口 BeeswaxService.query() 提交一个请求,在 impalad 端的处理逻辑是由
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) 这个函数(
ImpalaServer.h)完成的。
1 |
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
|
2 |
VLOG_QUERY << "query(): query=" << query.query;
|
3 |
ScopedSessionState session_handle(this);
|
4 |
shared_ptr<SessionState> session;
|
6 |
session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
|
7 |
SQLSTATE_GENERAL_ERROR);
|
11 |
RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
|
15 |
shared_ptr<QueryExecState> exec_state;
|
18 |
RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
|
19 |
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
|
21 |
exec_state->UpdateQueryState(QueryState::RUNNING);
|
24 |
exec_state->WaitAsync();
|
27 |
Status status = SetQueryInflight(session, exec_state);
|
29 |
UnregisterQuery(exec_state->query_id(), false, &status);
|
30 |
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
|
32 |
TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
|
其中 QueryToTQueryContext(query, &query_ctx) 将 Query 装换为 TQueryCtx。具体代码实现如下:
(ImpalaServer.h)
1 |
Status ImpalaServer::QueryToTQueryContext(const Query& query,
|
2 |
TQueryCtx* query_ctx) {
|
3 |
query_ctx->request.stmt = query.query;
|
4 |
VLOG_QUERY << "query: " << ThriftDebugString(query);
|
6 |
shared_ptr<SessionState> session;
|
7 |
const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
|
8 |
RETURN_IF_ERROR(GetSessionState(session_id, &session));
|
9 |
DCHECK(session != NULL);
|
14 |
lock_guard<mutex> l(session->lock);
|
15 |
if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
|
16 |
query_ctx->request.query_options = session->default_query_options;
|
19 |
session->ToThrift(session_id, &query_ctx->session);
|
23 |
if (query.__isset.configuration) {
|
24 |
BOOST_FOREACH(const string& option, query.configuration) {
|
25 |
RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options));
|
27 |
VLOG_QUERY << "TClientRequest.queryOptions: "
|
28 |
<< ThriftDebugString(query_ctx->request.query_options);
|
内部调用 ImpalaServer::Execute()
(ImpalaServer.h)
函数将 TQueryCtx 转换为 TExecRequest,具体逻辑通过调用 ImpalaServer::ExecuteInternal() 实现。代码如下:
1 |
Status ImpalaServer::Execute(TQueryCtx* query_ctx, |
2 |
shared_ptr<SessionState> session_state,
|
3 |
shared_ptr<QueryExecState>* exec_state) {
|
4 |
PrepareQueryContext(query_ctx);
|
5 |
bool registered_exec_state;
|
6 |
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
|
9 |
string stmt = replace_all_copy(query_ctx->request.stmt, "\n", " ");
|
11 |
query_ctx->request.__set_redacted_stmt((const string) stmt);
|
13 |
Status status = ExecuteInternal(*query_ctx, session_state, ®istered_exec_state,
|
15 |
if (!status.ok() && registered_exec_state) {
|
16 |
UnregisterQuery((*exec_state)->query_id(), false, &status);
|
上面的函数调用 ImpalaServer::ExecuteInternal()
(ImpalaServer.h)
在这个函数里通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest,具体代码如下:
1 |
Status ImpalaServer::ExecuteInternal( |
2 |
const TQueryCtx& query_ctx,
|
3 |
shared_ptr<SessionState> session_state,
|
4 |
bool* registered_exec_state,
|
5 |
shared_ptr<QueryExecState>* exec_state) {
|
6 |
DCHECK(session_state != NULL);
|
7 |
*registered_exec_state = false;
|
9 |
return Status("This Impala server is offline. Please retry your query later.");
|
11 |
exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
|
12 |
this, session_state));
|
14 |
(*exec_state)->query_events()->MarkEvent("Start execution");
|
29 |
lock_guard<mutex> l(*(*exec_state)->lock());
|
34 |
RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state));
|
35 |
*registered_exec_state = true;
|
37 |
RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus(
|
39 |
exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
|
40 |
(*exec_state)->query_events()->MarkEvent("Planning finished");
|
41 |
(*exec_state)->summary_profile()->AddEventSequence(
|
42 |
result.timeline.name, result.timeline);
|
43 |
if (result.__isset.result_set_metadata) {
|
44 |
(*exec_state)->set_result_metadata(result.result_set_metadata);
|
47 |
VLOG(2) << "Execution request: " << ThriftDebugString(result);
|
50 |
RETURN_IF_ERROR((*exec_state)->Exec(&result));
|
51 |
if (result.stmt_type == TStmtType::DDL) {
|
52 |
Status status = UpdateCatalogMetrics();
|
54 |
VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail();
|
58 |
if ((*exec_state)->coord() != NULL) {
|
59 |
const unordered_set<TNetworkAddress>& unique_hosts =
|
60 |
(*exec_state)->schedule()->unique_hosts();
|
61 |
if (!unique_hosts.empty()) {
|
62 |
lock_guard<mutex> l(query_locations_lock_);
|
63 |
BOOST_FOREACH(const TNetworkAddress& port, unique_hosts) {
|
64 |
query_locations_[port].insert((*exec_state)->query_id());
|
Frontend::GetExecRequest()
(Frontend.h)
通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest。具体实现代码如下:
1 |
Status Frontend::GetExecRequest( |
2 |
const TQueryCtx& query_ctx, TExecRequest* result) {
|
3 |
return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result);
|
JniUtil::CallJniMethod()
(jni-util.h)
的具体实现代码如下:
4 |
static Status CallJniMethod(const jobject& obj, const jmethodID& method, const T& arg) {
|
5 |
JNIEnv* jni_env = getJNIEnv();
|
6 |
jbyteArray request_bytes;
|
7 |
JniLocalFrame jni_frame;
|
8 |
RETURN_IF_ERROR(jni_frame.push(jni_env));
|
9 |
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
|
10 |
jni_env->CallObjectMethod(obj, method, request_bytes);
|
11 |
RETURN_ERROR_IF_EXC(jni_env);
|
至此,将通过 Thrift 转到 Java Frontend 生成执行计划树。
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
(Frontend.java)
是最重要的方法,它根据提供的 TQueryCtx 创建 TExecRequest。具体代码(分析部分)如下:
2 |
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
|
4 |
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
|
5 |
throws ImpalaException {
|
7 |
AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
|
8 |
EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
|
9 |
timeline.markEvent("Analysis finished");
|
首先通过调用 analyzeStmt()
(Frontend.java)
方法分析提交的 SQL 语句。analyzeStmt() 的具体实现代码如下:
2 |
* Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
|
4 |
private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
|
5 |
throws AnalysisException, InternalException, AuthorizationException {
|
6 |
AnalysisContext analysisCtx = new AnalysisContext(dsqldCatalog_, queryCtx,
|
8 |
LOG.debug("analyze query " + queryCtx.request.stmt);
|
18 |
analysisCtx.analyze(queryCtx.request.stmt);
|
19 |
Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty());
|
20 |
return analysisCtx.getAnalysisResult();
|
21 |
} catch (AnalysisException e) {
|
22 |
Set<TableName> missingTbls = analysisCtx.getAnalyzer().getMissingTbls();
|
24 |
if (missingTbls.isEmpty()) throw e;
|
27 |
if (!requestTblLoadAndWait(missingTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS)) {
|
28 |
LOG.info(String.format("Missing tables were not received in %dms. Load " +
|
29 |
"request will be retried.", MISSING_TBL_LOAD_WAIT_TIMEOUT_MS));
|
37 |
analysisCtx.getAnalyzer().authorize(getAuthzChecker());
|
AnalyzerContext.AnalyzeResult.Analyzer 对象是个存放这个 SQL 所涉及到的所有信息
(包含Table, conjunct, slot,slotRefMap, eqJoinConjuncts等)的知识库,所有跟这个
SQL 有关的东西都会存到 Analyzer对象里面。该类的定义可以查看
Analyzer.java
AnalyzerContex.analyze()
(AnalyzeContext.java)
的具体实现代码如下:
2 |
* Parse and analyze 'stmt'. If 'stmt' is a nested query (i.e. query that
|
3 |
* contains subqueries), it is also rewritten by performing subquery unnesting.
|
4 |
* The transformed stmt is then re-analyzed in a new analysis context.
|
6 |
public void analyze(String stmt) throws AnalysisException {
|
7 |
Analyzer analyzer = new Analyzer(catalog_, queryCtx_, authzConfig_);
|
8 |
analyze(stmt, analyzer);
|
上面的 analyze() 函数通过调用同名的重载函数 analyze(String stmt, Analyzer analyzer)
(AnalyzeContext.java)
实现具体的分析,代码如下:
2 |
* Parse and analyze 'stmt' using a specified Analyzer.
|
4 |
public void analyze(String stmt, Analyzer analyzer) throws AnalysisException {
|
5 |
SqlScanner input = new SqlScanner(new StringReader(stmt));
|
6 |
SqlParser parser = new SqlParser(input);
|
8 |
analysisResult_ = new AnalysisResult();
|
9 |
analysisResult_.analyzer_ = analyzer;
|
10 |
if (analysisResult_.analyzer_ == null) {
|
11 |
analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_);
|
13 |
analysisResult_.stmt_ = (StatementBase) parser.parse().value;
|
14 |
if (analysisResult_.stmt_ == null)
|
19 |
if (analysisResult_.stmt_ instanceof CreateTableAsSelectStmt) {
|
20 |
analysisResult_.tmpCreateTableStmt_ =
|
21 |
((CreateTableAsSelectStmt) analysisResult_.stmt_).getCreateStmt().clone();
|
24 |
analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
|
25 |
boolean isExplain = analysisResult_.isExplainStmt();
|
28 |
if (analysisResult_.requiresRewrite()) {
|
29 |
StatementBase rewrittenStmt = StmtRewriter.rewrite(analysisResult_);
|
31 |
Preconditions.checkNotNull(rewrittenStmt);
|
32 |
analysisResult_ = new AnalysisResult();
|
33 |
analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_);
|
34 |
analysisResult_.stmt_ = rewrittenStmt;
|
35 |
analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
|
36 |
LOG.trace("rewrittenStmt: " + rewrittenStmt.toSql());
|
38 |
analysisResult_.stmt_.setIsExplain();
|
40 |
} catch (AnalysisException e) {
|
43 |
} catch (Exception e) {
|
44 |
throw new AnalysisException(parser.getErrorMsg(stmt), e);
|
上面的函数通过调用 SqlScanner 和 SqlParser 类实现具体的分析。可以查看
sql-scanner.flex
和
sql-parser.y
分析 SQL 语句的大概流程如下:
- 处理这个 SQL 所涉及到的 Table(即TableRefs),这些 Table 是在 from 从句中提取出来的(包含关键字
from, join, on/using)。注意 JOIN 操作以及 on/using 条件是存储在参与 JOIN 操作的右边的表的 TableRef
中并分析的。依次 analyze() 每个 TableRef,向 Analyzer 注册 registerBaseTableRef(填充TupleDescriptor)。
如果对应的 TableRef 涉及到 JOIN 操作,还要 analyzeJoin()。在 analyzeJoin() 时会向 Analyzer registerConjunct()
填充 Analyzer 的一些成员变量:conjuncts,tuplePredicates(TupleId 与 conjunct 的映射),slotPredicates(SlotId
与 conjunct 的映射),eqJoinConjuncts。
- 处理 select 从句(包含关键字 select, MAX(), AVG()等聚集函数):分析这个 SQL 都 select 了哪几项,每一项都是个
Expr 类型的子类对象,把这几项填入 resultExprs 数组和 colLabels。然后把 resultExprs 里面的 Expr 都递归 analyze
一下,要分析到树的最底层,向 Analyzer 注册 SlotRef 等。
- 分析 where 从句(关键字 where),首先递归 Analyze 从句中 Expr 组成的树,然后向 Analyzer registerConjunct()
填充 Analyzer 的一些成员变量(同1,此外还要填充 whereClauseConjuncts) 。
- 处理 sort 相关信息(关键字 order by)。先是解析 aliases 和 ordinals,然后从 order by 后面的从句中提取 Expr 填入
orderingExprs,接着递归 Analyze 从句中 Expr 组成的树,最后创建 SortInfo 对象。
- 处理 aggregation 相关信息(关键字 group by, having, avg, max 等)。首先递归分析 group by 从句里的 Expr,然后如果有
having 从句就像 where 从句一样,先是 analyze having 从句中 Expr 组成的树,然后向 Analyzer registerConjunct()等。
- 处理 InlineView。
至此,词法分析和语法分析都完成了,回到 frontend.createExecRequest()
(Frontend.java)
函数,开始填充 TExecRequest 内的成员变量。代码如下(部分):
2 |
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
|
4 |
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
|
5 |
throws ImpalaException {
|
7 |
AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
|
8 |
EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
|
9 |
timeline.markEvent("Analysis finished");
|
12 |
Preconditions.checkNotNull(analysisResult.getStmt());
|
13 |
TExecRequest result = new TExecRequest();
|
14 |
result.setQuery_options(queryCtx.request.getQuery_options());
|
15 |
result.setAccess_events(analysisResult.getAccessEvents());
|
16 |
result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
|
18 |
if (analysisResult.isCatalogOp()) {
|
19 |
result.stmt_type = TStmtType.DDL;
|
20 |
createCatalogOpRequest(analysisResult, result);
|
21 |
String jsonLineageGraph = analysisResult.getJsonLineageGraph();
|
22 |
if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
|
23 |
result.catalog_op_request.setLineage_graph(jsonLineageGraph);
|
26 |
if (!analysisResult.isCreateTableAsSelectStmt()) return result;
|
27 |
} else if (analysisResult.isLoadDataStmt()) {
|
28 |
result.stmt_type = TStmtType.LOAD;
|
29 |
result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
|
30 |
new TColumn("summary", Type.STRING.toThrift()))));
|
31 |
result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
|
33 |
} else if (analysisResult.isSetStmt()) {
|
34 |
result.stmt_type = TStmtType.SET;
|
35 |
result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
|
36 |
new TColumn("option", Type.STRING.toThrift()),
|
37 |
new TColumn("value", Type.STRING.toThrift()))));
|
38 |
result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
|
如果是 DDL 命令(use, show tables, show databases, describe),那么调用 createCatalogOpRequest()。
如果是 Load Data 或者 Set 语句,就调用相应的 setmetadata 并转换为 Thrift。
执行计划生成
另外一种情况就是 Query 或者 DML 命令,那么就得创建和填充 TQueryExecRequest 了。该部分代码如下:
2 |
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
|
4 |
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
|
12 |
Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
|
13 |
|| analysisResult.isCreateTableAsSelectStmt());
|
15 |
TQueryExecRequest queryExecRequest = new TQueryExecRequest();
|
17 |
LOG.debug("create plan");
|
18 |
Planner planner = new Planner(analysisResult, queryCtx);
|
21 |
ArrayList<PlanFragment> fragments = planner.createPlan();
|
23 |
List<ScanNode> scanNodes = Lists.newArrayList();
|
26 |
Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
|
28 |
for (int fragmentId = 0; fragmentId < fragments.size(); ++fragmentId) {
|
29 |
PlanFragment fragment = fragments.get(fragmentId);
|
30 |
Preconditions.checkNotNull(fragment.getPlanRoot());
|
31 |
fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
|
32 |
fragmentIdx.put(fragment, fragmentId);
|
Planner.java)
的具体实现:
2 |
* Returns a list of plan fragments for executing an analyzed parse tree.
|
3 |
* May return a single-node or distributed executable plan.
|
5 |
public ArrayList<PlanFragment> createPlan() throws ImpalaException {
|
6 |
SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
|
7 |
DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
|
9 |
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
|
10 |
ctx_.getRootAnalyzer().getTimeline().markEvent("Single node plan created");
|
11 |
ArrayList<PlanFragment> fragments = null;
|
14 |
MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
|
15 |
singleNodePlan.accept(visitor);
|
16 |
long maxRowsProcessed = visitor.get() == -1 ? Long.MAX_VALUE : visitor.get();
|
17 |
boolean isSmallQuery =
|
18 |
maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold;
|
21 |
ctx_.getQueryOptions().setNum_nodes(1);
|
22 |
ctx_.getQueryOptions().setDisable_codegen(true);
|
23 |
if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
|
24 |
maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
|
26 |
ctx_.getQueryOptions().setNum_scanner_threads(1);
|
30 |
if (ctx_.isSingleNodeExec()) {
|
32 |
fragments = Lists.newArrayList(new PlanFragment(
|
33 |
ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
|
36 |
fragments = distributedPlanner.createPlanFragments(singleNodePlan);
|
39 |
PlanFragment rootFragment = fragments.get(fragments.size() - 1);
|
40 |
if (ctx_.isInsertOrCtas()) {
|
41 |
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
|
42 |
if (!ctx_.isSingleNodeExec()) {
|
44 |
rootFragment = distributedPlanner.createInsertFragment(
|
45 |
rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
|
48 |
rootFragment.setSink(insertStmt.createDataSink());
|
51 |
ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph();
|
52 |
List<Expr> resultExprs = null;
|
53 |
Table targetTable = null;
|
54 |
if (ctx_.isInsertOrCtas()) {
|
55 |
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
|
56 |
resultExprs = insertStmt.getResultExprs();
|
57 |
targetTable = insertStmt.getTargetTable();
|
58 |
graph.addTargetColumnLabels(targetTable);
|
60 |
resultExprs = ctx_.getQueryStmt().getResultExprs();
|
61 |
graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels());
|
63 |
resultExprs = Expr.substituteList(resultExprs,
|
64 |
rootFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer(), true);
|
65 |
rootFragment.setOutputExprs(resultExprs);
|
66 |
LOG.debug("desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString());
|
67 |
LOG.debug("resultexprs: " + Expr.debugString(rootFragment.getOutputExprs()));
|
68 |
LOG.debug("finalize plan fragments");
|
69 |
for (PlanFragment fragment: fragments) {
|
70 |
fragment.finalize(ctx_.getRootAnalyzer());
|
73 |
Collections.reverse(fragments);
|
74 |
ctx_.getRootAnalyzer().getTimeline().markEvent("Distributed plan created");
|
76 |
if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
|
78 |
if (ctx_.isInsertOrCtas()) {
|
79 |
Preconditions.checkNotNull(targetTable);
|
80 |
List<Expr> exprs = Lists.newArrayList();
|
81 |
if (targetTable instanceof HBaseTable) {
|
82 |
exprs.addAll(resultExprs);
|
84 |
exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs());
|
85 |
exprs.addAll(resultExprs.subList(0,
|
86 |
targetTable.getNonClusteringColumns().size()));
|
88 |
graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer());
|
90 |
graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer());
|
92 |
LOG.trace("lineage: " + graph.debugString());
|
93 |
ctx_.getRootAnalyzer().getTimeline().markEvent("Lineage info computed");
|
createPlan 包括createSingleNodePlan 和 createPlanFragments
两个主要部分。其中第一个是单节点计划树,所有片段只能在一个节点 corrd 上执行,第二个是分布式执行计划树,片段可以分配到不同的节点中运行。我们先来看看 SingleNodePlanner.createSingleNodePlan()
(SingleNodePlanner.java)
该方法根据 Planner Context 中分析的语法树创建单节点执行计划树并返回根节点。计划递归处理语法树并执行以下操作,自上而下处理查询语句:
- materialize the slots required for evaluating expressions of that statement
- migrate conjuncts from parent blocks into inline views and union operands In the bottom-up phase generate the plan tree for every query statement:
- perform join-order optimization when generating the plan of the FROM clause of a select statement; requires that all materialized slots are known for an accurate estimate of row sizes needed for cost-based join ordering
- assign conjuncts that can be evaluated at that node and compute the stats of that node (cardinality, etc.)
- apply combined expression substitution map of child plan nodes; if a plan node re-maps its input, set a substitution map to be applied by parents
具体代码如下:
2 |
* Generates and returns the root of the single-node plan for the analyzed parse tree
|
3 |
* in the planner context.
|
5 |
public PlanNode createSingleNodePlan() throws ImpalaException {
|
6 |
QueryStmt queryStmt = ctx_.getQueryStmt();
|
9 |
Analyzer analyzer = queryStmt.getAnalyzer();
|
10 |
analyzer.computeEquivClasses();
|
11 |
analyzer.getTimeline().markEvent("Equivalence classes computed");
|
22 |
if (queryStmt.getBaseTblResultExprs() != null) {
|
23 |
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
|
26 |
LOG.trace("desctbl: " + analyzer.getDescTbl().debugString());
|
27 |
PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
|
28 |
ctx_.getQueryOptions().isDisable_outermost_topn());
|
29 |
Preconditions.checkNotNull(singleNodePlan);
|
30 |
return singleNodePlan;
|
上面的函数通过调用私有的 createQueryPlan()
(SingleNodePlanner.java)
函数实现。该函数为单节点执行创建计划树。为查询语句中的
Select/Project/Join/Union [All]/Group by/Having/Order by
生成 PlanNode。具体实现代码如下:
2 |
* Create plan tree for single-node execution. Generates PlanNodes for the
|
3 |
* Select/Project/Join/Union [All]/Group by/Having/Order by clauses of the query stmt.
|
5 |
private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, booleandisableTopN)
|
6 |
throws ImpalaException {
|
8 |
if (analyzer.hasEmptyResultSet()) return createEmptyNode(stmt, analyzer);
|
11 |
if (stmt instanceof SelectStmt) {
|
12 |
SelectStmt selectStmt = (SelectStmt) stmt;
|
14 |
root = createSelectPlan(selectStmt, analyzer);
|
17 |
if (((SelectStmt) stmt).getAnalyticInfo() != null) {
|
18 |
AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo();
|
19 |
ArrayList<TupleId> stmtTupleIds = Lists.newArrayList();
|
20 |
stmt.getMaterializedTupleIds(stmtTupleIds);
|
21 |
AnalyticPlanner analyticPlanner =
|
22 |
new AnalyticPlanner(stmtTupleIds, analyticInfo, analyzer, ctx_);
|
23 |
List<Expr> inputPartitionExprs = Lists.newArrayList();
|
24 |
AggregateInfo aggInfo = selectStmt.getAggInfo();
|
25 |
root = analyticPlanner.createSingleNodePlan(root,
|
26 |
aggInfo != null ? aggInfo.getGroupingExprs() : null, inputPartitionExprs);
|
27 |
if (aggInfo != null && !inputPartitionExprs.isEmpty()) {
|
29 |
aggInfo.setPartitionExprs(inputPartitionExprs);
|
33 |
Preconditions.checkState(stmt instanceof UnionStmt);
|
34 |
root = createUnionPlan((UnionStmt) stmt, analyzer);
|
38 |
boolean sortHasMaterializedSlots = false;
|
39 |
if (stmt.evaluateOrderBy()) {
|
40 |
for (SlotDescriptor sortSlotDesc:
|
41 |
stmt.getSortInfo().getSortTupleDescriptor().getSlots()) {
|
42 |
if (sortSlotDesc.isMaterialized()) {
|
43 |
sortHasMaterializedSlots = true;
|
49 |
if (stmt.evaluateOrderBy() && sortHasMaterializedSlots) {
|
50 |
long limit = stmt.getLimit();
|
53 |
boolean useTopN = stmt.hasLimit() && !disableTopN;
|
55 |
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),
|
56 |
useTopN, stmt.getOffset());
|
57 |
Preconditions.checkState(root.hasValidStats());
|
61 |
root.setLimit(stmt.getLimit());
|
62 |
root.computeStats(analyzer);
|
SingleNodePlanner.createSelectPlan()
(SingleNodePlanner.java)
函数创建实现 select 查询语句块中
Select/Project/Join/Group by/Having 等从句的 PlanNode 树。具体实现代码如下:
2 |
* Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having
|
3 |
* of the selectStmt query block.
|
5 |
private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer)
|
6 |
throws ImpalaException {
|
9 |
if (selectStmt.getTableRefs().isEmpty()) {
|
10 |
return createConstantSelectPlan(selectStmt, analyzer);
|
25 |
selectStmt.materializeRequiredSlots(analyzer);
|
27 |
ArrayList<TupleId> rowTuples = Lists.newArrayList();
|
29 |
for (TableRef tblRef: selectStmt.getTableRefs()) {
|
30 |
rowTuples.addAll(tblRef.getMaterializedTupleIds());
|
37 |
if (analyzer.hasEmptySpjResultSet()) {
|
38 |
PlanNode emptySetNode = new EmptySetNode(ctx_.getNextNodeId(), rowTuples);
|
39 |
emptySetNode.init(analyzer);
|
40 |
emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap());
|
41 |
return createAggregationPlan(selectStmt, analyzer, emptySetNode);
|
46 |
List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
|
47 |
for (TableRef ref: selectStmt.getTableRefs()) {
|
48 |
PlanNode plan = createTableRefNode(analyzer, ref);
|
49 |
Preconditions.checkState(plan != null);
|
50 |
refPlans.add(new Pair(ref, plan));
|
53 |
for (Pair<TableRef, PlanNode> entry: refPlans) {
|
54 |
entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
|
59 |
if (!selectStmt.getSelectList().isStraightJoin()) {
|
60 |
Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts();
|
61 |
root = createCheapestJoinPlan(analyzer, refPlans);
|
62 |
if (root == null) analyzer.setAssignedConjuncts(assignedConjuncts);
|
65 |
if (selectStmt.getSelectList().isStraightJoin() || root == null) {
|
68 |
root = createFromClauseJoinPlan(analyzer, refPlans);
|
69 |
Preconditions.checkNotNull(root);
|
73 |
if (selectStmt.getAggInfo() != null) {
|
74 |
root = createAggregationPlan(selectStmt, analyzer, root);
|
上面函数中调用的主要私有方法有:
createTableRefNode()、createCheapestJoinPlan()、 createFromClauseJoinPlan()、 createAggregationPlan(),各个函数的具体实现如下:
createTableRefNode()
2 |
* Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
|
3 |
* CollectionTableRef or an InlineViewRef.
|
5 |
private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef)
|
6 |
throws ImpalaException {
|
7 |
if (tblRef instanceof BaseTableRef || tblRef instanceof CollectionTableRef) {
|
9 |
return createScanNode(analyzer, tblRef);
|
10 |
} else if (tblRef instanceof InlineViewRef) {
|
12 |
return createInlineViewPlan(analyzer, (InlineViewRef) tblRef);
|
14 |
throw new InternalException(
|
15 |
"Unknown TableRef node: " + tblRef.getClass().getSimpleName());
|
createCheapestJoinPlan()
2 |
* 返回物化 join refPlans 中所有 TblRefs 开销最小的 plan
|
3 |
* 假设 refPlans 中的顺序和查询中的原始顺序相同
|
5 |
* - the plan is executable, ie, all non-cross joins have equi-join predicates
|
6 |
* - the leftmost scan is over the largest of the inputs for which we can still
|
7 |
* construct an executable plan(左边的是最大表)
|
8 |
* - all rhs's(right hand side?) are in decreasing order of selectiveness (percentage of rows they
|
10 |
* - outer/cross/semi joins: rhs serialized size is < lhs serialized size;(右边的表比左边的小)
|
11 |
* enforced via join inversion, if necessary(否则通过 join 反转实现)
|
12 |
* Returns null if we can't create an executable plan.
|
14 |
private PlanNode createCheapestJoinPlan(
|
15 |
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
|
16 |
throws ImpalaException {
|
17 |
LOG.trace("createCheapestJoinPlan");
|
18 |
if (refPlans.size() == 1) return refPlans.get(0).second;
|
22 |
ArrayList<Pair<TableRef, Long>> candidates = Lists.newArrayList();
|
23 |
for (Pair<TableRef, PlanNode> entry: refPlans) {
|
24 |
TableRef ref = entry.first;
|
25 |
JoinOperator joinOp = ref.getJoinOp();
|
35 |
if (((joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) &&
|
36 |
ref != refPlans.get(1).first) || joinOp.isNullAwareLeftAntiJoin()) {
|
41 |
PlanNode plan = entry.second;
|
42 |
if (plan.getCardinality() == -1) {
|
45 |
candidates.add(new Pair(ref, new Long(0)));
|
46 |
LOG.trace("candidate " + ref.getUniqueAlias() + ": 0");
|
49 |
Preconditions.checkNotNull(ref.getDesc());
|
50 |
long materializedSize =
|
51 |
(long) Math.ceil(plan.getAvgRowSize() * (double) plan.getCardinality());
|
52 |
candidates.add(new Pair(ref, new Long(materializedSize)));
|
53 |
LOG.trace("candidate " + ref.getUniqueAlias() + ": " + Long.toString(materializedSize));
|
55 |
if (candidates.isEmpty()) return null;
|
59 |
Collections.sort(candidates,
|
60 |
new Comparator<Pair<TableRef, Long>>() {
|
61 |
public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
|
62 |
long diff = b.second - a.second;
|
63 |
return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
|
68 |
for (Pair<TableRef, Long> candidate: candidates) {
|
69 |
PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
|
70 |
if (result != null) return result;
|
createFromClauseJoinPlan()
2 |
* 返回按照 from 语句顺序的 JoinPlan
|
4 |
private PlanNode createFromClauseJoinPlan(
|
5 |
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
|
6 |
throws ImpalaException {
|
8 |
Preconditions.checkState(!refPlans.isEmpty());
|
9 |
PlanNode root = refPlans.get(0).second;
|
10 |
for (int i = 1; i < refPlans.size(); ++i) {
|
11 |
TableRef innerRef = refPlans.get(i).first;
|
12 |
PlanNode innerPlan = refPlans.get(i).second;
|
13 |
root = createJoinNode(analyzer, root, innerPlan, null, innerRef);
|
14 |
root.setId(ctx_.getNextNodeId());
|
createAggregationPlan()
2 |
* Returns a new AggregationNode that materializes the aggregation of the given stmt.
|
3 |
* Assigns conjuncts from the Having clause to the returned node.
|
5 |
private PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer analyzer,
|
6 |
PlanNode root) throws InternalException {
|
7 |
Preconditions.checkState(selectStmt.getAggInfo() != null);
|
9 |
AggregateInfo aggInfo = selectStmt.getAggInfo();
|
10 |
root = new AggregationNode(ctx_.getNextNodeId(), root, aggInfo);
|
12 |
Preconditions.checkState(root.hasValidStats());
|
15 |
if (aggInfo.isDistinctAgg()) {
|
16 |
((AggregationNode)root).unsetNeedsFinalize();
|
18 |
((AggregationNode)root).setIntermediateTuple();
|
19 |
root = new AggregationNode(ctx_.getNextNodeId(), root,
|
20 |
aggInfo.getSecondPhaseDistinctAggInfo());
|
22 |
Preconditions.checkState(root.hasValidStats());
|
25 |
root.assignConjuncts(analyzer);
|
上面的 createCheapestJoinPlan() 和 createFromClauseJoinPlan()
方法调用了 createJoinNode() 和 createJoinPlan() 两个方法。它们的具体实现如下:
createJoinNode()
2 |
* 创建 join outer 和 inner 的 node。两者其中之一可能是一个根据 table ref 创建的 plan
|
3 |
* 但不能同时都是 plan。对应的 outer/inner tableRef 不能为空
|
5 |
private PlanNode createJoinNode(
|
6 |
Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef outerRef,
|
7 |
TableRef innerRef) throws ImpalaException {
|
8 |
Preconditions.checkState(innerRef != null ^ outerRef != null);
|
9 |
TableRef tblRef = (innerRef != null) ? innerRef : outerRef;
|
11 |
List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
|
12 |
List<Expr> eqJoinPredicates = Lists.newArrayList();
|
15 |
if (innerRef != null) {
|
16 |
getHashLookupJoinConjuncts(
|
17 |
analyzer, outer.getTblRefIds(), innerRef, eqJoinConjuncts, eqJoinPredicates);
|
19 |
if (!innerRef.getJoinOp().isOuterJoin()) {
|
20 |
analyzer.createEquivConjuncts(outer.getTblRefIds(), innerRef.getId(),
|
24 |
getHashLookupJoinConjuncts(
|
25 |
analyzer, inner.getTblRefIds(), outerRef, eqJoinConjuncts, eqJoinPredicates);
|
27 |
if (!outerRef.getJoinOp().isOuterJoin()) {
|
28 |
analyzer.createEquivConjuncts(inner.getTblRefIds(), outerRef.getId(),
|
32 |
for (BinaryPredicate eqJoinConjunct: eqJoinConjuncts) {
|
33 |
Expr swapTmp = eqJoinConjunct.getChild(0);
|
34 |
eqJoinConjunct.setChild(0, eqJoinConjunct.getChild(1));
|
35 |
eqJoinConjunct.setChild(1, swapTmp);
|
40 |
if (eqJoinConjuncts.isEmpty()) {
|
46 |
if (tblRef.getJoinOp().isOuterJoin() ||
|
47 |
tblRef.getJoinOp().isSemiJoin()) {
|
48 |
throw new NotImplementedException(
|
49 |
String.format("%s join with '%s' without equi-join " +
|
50 |
"conjuncts is not supported.",
|
51 |
tblRef.getJoinOp().isOuterJoin() ? "Outer" : "Semi",
|
52 |
innerRef.getUniqueAlias()));
|
54 |
CrossJoinNode result =
|
55 |
new CrossJoinNode(outer, inner, tblRef, Collections.<Expr>emptyList());
|
56 |
result.init(analyzer);
|
61 |
if (tblRef.getJoinOp() == JoinOperator.CROSS_JOIN) {
|
62 |
tblRef.setJoinOp(JoinOperator.INNER_JOIN);
|
65 |
analyzer.markConjunctsAssigned(eqJoinPredicates);
|
67 |
List<Expr> otherJoinConjuncts = Lists.newArrayList();
|
68 |
if (tblRef.getJoinOp().isOuterJoin()) {
|
71 |
otherJoinConjuncts = analyzer.getUnassignedOjConjuncts(tblRef);
|
72 |
} else if (tblRef.getJoinOp().isSemiJoin()) {
|
77 |
analyzer.getUnassignedConjuncts(tblRef.getAllTupleIds(), false);
|
78 |
if (tblRef.getJoinOp().isNullAwareLeftAntiJoin()) {
|
79 |
boolean hasNullMatchingEqOperator = false;
|
83 |
Iterator<BinaryPredicate> it = eqJoinConjuncts.iterator();
|
84 |
while (it.hasNext()) {
|
85 |
BinaryPredicate conjunct = it.next();
|
86 |
if (!conjunct.isNullMatchingEq()) {
|
87 |
otherJoinConjuncts.add(conjunct);
|
91 |
Preconditions.checkState(!hasNullMatchingEqOperator);
|
92 |
hasNullMatchingEqOperator = true;
|
95 |
Preconditions.checkState(hasNullMatchingEqOperator);
|
98 |
analyzer.markConjunctsAssigned(otherJoinConjuncts);
|
100 |
HashJoinNode result =
|
101 |
new HashJoinNode(outer, inner, tblRef, eqJoinConjuncts, otherJoinConjuncts);
|
102 |
result.init(analyzer);
|
createJoinPlan()
2 |
* Returns a plan with leftmostRef's plan as its leftmost input; the joins
|
3 |
* are in decreasing order of selectiveness (percentage of rows they eliminate).
|
4 |
* The leftmostRef's join will be inverted if it is an outer/semi/cross join.
|
6 |
private PlanNode createJoinPlan(
|
7 |
Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
|
8 |
throws ImpalaException {
|
10 |
LOG.trace("createJoinPlan: " + leftmostRef.getUniqueAlias());
|
12 |
List<Pair<TableRef, PlanNode>> remainingRefs = Lists.newArrayList();
|
14 |
for (Pair<TableRef, PlanNode> entry: refPlans) {
|
15 |
if (entry.first == leftmostRef) {
|
18 |
remainingRefs.add(entry);
|
21 |
Preconditions.checkNotNull(root);
|
23 |
Set<TableRef> joinedRefs = Sets.newHashSet();
|
24 |
joinedRefs.add(leftmostRef);
|
27 |
boolean planHasInvertedJoin = false;
|
28 |
if (leftmostRef.getJoinOp().isOuterJoin()
|
29 |
|| leftmostRef.getJoinOp().isSemiJoin()
|
30 |
|| leftmostRef.getJoinOp().isCrossJoin()) {
|
35 |
leftmostRef.invertJoin(refPlans, analyzer);
|
36 |
planHasInvertedJoin = true;
|
41 |
while (!remainingRefs.isEmpty()) {
|
43 |
PlanNode newRoot = null;
|
44 |
Pair<TableRef, PlanNode> minEntry = null;
|
45 |
for (Pair<TableRef, PlanNode> entry: remainingRefs) {
|
46 |
TableRef ref = entry.first;
|
47 |
LOG.trace(Integer.toString(i) + " considering ref " + ref.getUniqueAlias());
|
56 |
JoinOperator joinOp = ref.getJoinOp();
|
57 |
if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
|
58 |
List<TupleId> currentTids = Lists.newArrayList(root.getTblRefIds());
|
59 |
currentTids.add(ref.getId());
|
64 |
List<TupleId> tableRefTupleIds = ref.getAllTupleIds();
|
65 |
if (!currentTids.containsAll(tableRefTupleIds) ||
|
66 |
!tableRefTupleIds.containsAll(currentTids)) {
|
71 |
} else if (ref.getJoinOp().isCrossJoin()) {
|
72 |
if (!joinedRefs.contains(ref.getLeftTblRef())) continue;
|
75 |
PlanNode rhsPlan = entry.second;
|
76 |
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
|
78 |
boolean invertJoin = false;
|
79 |
if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
|
85 |
long lhsCard = root.getCardinality();
|
86 |
long rhsCard = rhsPlan.getCardinality();
|
87 |
if (lhsCard != -1 && rhsCard != -1 &&
|
88 |
lhsCard * root.getAvgRowSize() < rhsCard * rhsPlan.getAvgRowSize() &&
|
89 |
!joinOp.isNullAwareLeftAntiJoin()) {
|
93 |
PlanNode candidate = null;
|
95 |
ref.setJoinOp(ref.getJoinOp().invert());
|
96 |
candidate = createJoinNode(analyzer, rhsPlan, root, ref, null);
|
97 |
planHasInvertedJoin = true;
|
99 |
candidate = createJoinNode(analyzer, root, rhsPlan, null, ref);
|
101 |
if (candidate == null) continue;
|
102 |
LOG.trace("cardinality=" + Long.toString(candidate.getCardinality()));
|
106 |
if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
|
114 |
|| (candidate.getClass().equals(newRoot.getClass())
|
115 |
&& candidate.getCardinality() < newRoot.getCardinality())
|
116 |
|| (candidate instanceof HashJoinNode && newRoot instanceofCrossJoinNode)) {
|
121 |
if (newRoot == null) {
|
129 |
Preconditions.checkState(!planHasInvertedJoin);
|
135 |
long lhsCardinality = root.getCardinality();
|
136 |
long rhsCardinality = minEntry.second.getCardinality();
|
137 |
numOps += lhsCardinality + rhsCardinality;
|
138 |
LOG.debug(Integer.toString(i) + " chose " + minEntry.first.getUniqueAlias()
|
139 |
+ " #lhs=" + Long.toString(lhsCardinality)
|
140 |
+ " #rhs=" + Long.toString(rhsCardinality)
|
141 |
+ " #ops=" + Long.toString(numOps));
|
142 |
remainingRefs.remove(minEntry);
|
143 |
joinedRefs.add(minEntry.first);
|
147 |
root.setId(ctx_.getNextNodeId());
|
148 |
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
|
至此我们已经大概介绍了 createSingleNodePlan 的过程。
现在让我们回到 createPlan() 函数,来看看创建分布式执行计划树,即 createPlanFrangments 过程。
DistributedPlanner.createPlanFragments()
(Planner.java)
方法为单点计划树生成多个片段。具体代码如下:
3 |
* 片段通过 list 返回,list 中位置 i 的片段只能使用片段 j 的输出(j > i)。
|
5 |
* TODO: 考虑计划片段中的数据分片; 尤其是要比 createQueryPlan() 更加注重协调
|
6 |
* 聚集操作中 hash partitioning 以及分析计算中的 hash partitioning。
|
7 |
* (只有在相同 select 块中进行聚集和分析计算时才会发生协调)
|
9 |
public ArrayList<PlanFragment> createPlanFragments(
|
10 |
PlanNode singleNodePlan) throws ImpalaException {
|
11 |
Preconditions.checkState(!ctx_.isSingleNodeExec());
|
12 |
AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult();
|
13 |
QueryStmt queryStmt = ctx_.getQueryStmt();
|
14 |
ArrayList<PlanFragment> fragments = Lists.newArrayList();
|
17 |
boolean isPartitioned = false;
|
18 |
if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())
|
19 |
&& !singleNodePlan.hasLimit()) {
|
20 |
Preconditions.checkState(!queryStmt.hasOffset());
|
23 |
LOG.debug("create plan fragments");
|
24 |
long perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
|
25 |
LOG.debug("memlimit=" + Long.toString(perNodeMemLimit));
|
27 |
createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments);
|
上面的方法调用私有成员方法 DistributedPlanner.createPlanFragments()
DistributedPlanner.java
该方法返回生成 root 结果的 fragments。具体代码如下:
2 |
* 返回生成 'root' 结果的 fragments; 递归创建所有 input fragments 到返回的 fragment
|
3 |
* 如果创建了一个新的 fragment,会被追加到 ‘fragments’,这样 fragment 就会在所有需要
|
5 |
* 如果 'isPartitioned' 为否,,那么返回的 fragment 就是 unpartitioned;
|
6 |
* 否则就可能是 partitioned, 取决于它的输入是否 partitioned;
|
7 |
* the partition function is derived from the inputs.
|
9 |
private PlanFragment createPlanFragments(
|
10 |
PlanNode root, boolean isPartitioned,
|
11 |
long perNodeMemLimit, ArrayList<PlanFragment> fragments)
|
12 |
throws InternalException, NotImplementedException {
|
13 |
ArrayList<PlanFragment> childFragments = Lists.newArrayList();
|
14 |
for (PlanNode child: root.getChildren()) {
|
18 |
boolean childIsPartitioned = !child.hasLimit();
|
22 |
child, childIsPartitioned, perNodeMemLimit, fragments));
|
25 |
PlanFragment result = null;
|
26 |
if (root instanceof ScanNode) {
|
27 |
result = createScanFragment(root);
|
28 |
fragments.add(result);
|
29 |
} else if (root instanceof HashJoinNode) {
|
30 |
Preconditions.checkState(childFragments.size() == 2);
|
31 |
result = createHashJoinFragment(
|
32 |
(HashJoinNode) root, childFragments.get(1), childFragments.get(0),
|
33 |
perNodeMemLimit, fragments);
|
34 |
} else if (root instanceof CrossJoinNode) {
|
35 |
Preconditions.checkState(childFragments.size() == 2);
|
36 |
result = createCrossJoinFragment(
|
37 |
(CrossJoinNode) root, childFragments.get(1), childFragments.get(0),
|
38 |
perNodeMemLimit, fragments);
|
39 |
} else if (root instanceof SelectNode) {
|
40 |
result = createSelectNodeFragment((SelectNode) root, childFragments);
|
41 |
} else if (root instanceof UnionNode) {
|
42 |
result = createUnionNodeFragment((UnionNode) root, childFragments, fragments);
|
43 |
} else if (root instanceof AggregationNode) {
|
44 |
result = createAggregationFragment(
|
45 |
(AggregationNode) root, childFragments.get(0), fragments);
|
46 |
} else if (root instanceof SortNode) {
|
47 |
if (((SortNode) root).isAnalyticSort()) {
|
49 |
result = createAnalyticFragment(
|
50 |
(SortNode) root, childFragments.get(0), fragments);
|
52 |
result = createOrderByFragment(
|
53 |
(SortNode) root, childFragments.get(0), fragments);
|
55 |
} else if (root instanceof AnalyticEvalNode) {
|
56 |
result = createAnalyticFragment(root, childFragments.get(0), fragments);
|
57 |
} else if (root instanceof EmptySetNode) {
|
58 |
result = new PlanFragment(
|
59 |
ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
|
61 |
throw new InternalException(
|
62 |
"Cannot create plan fragment for this node type: " + root.getExplainString());
|
65 |
fragments.remove(result);
|
66 |
fragments.add(result);
|
68 |
if (!isPartitioned && result.isPartitioned()) {
|
69 |
result = createMergeFragment(result);
|
70 |
fragments.add(result);
|
上面的方法调用了大量的 create*Fragment() 私有成员方法。这些成员方法的具体实现可以查看源文件:
DistributedPlanner.java
这些成员方法都返回了 PlanFragment 实例,关于该类的具体实现可以查看源代码:
PlanFragment.java
至此,我们大概介绍了 createPlanFragments 的过程。
由于 createSingleNodePlan 和 createPlanFragments 两个 createPlan 最重要的部分都已经介绍了,
createPlan 也就介绍到这里。现在让我们回到 frontend.createExecRequest()
继续来看剩下的内容。frontend.createExecRequest() 其余代码如下:
2 |
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
|
4 |
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
|
5 |
throws ImpalaException {
|
13 |
for (int i = 1; i < fragments.size(); ++i) {
|
14 |
PlanFragment dest = fragments.get(i).getDestFragment();
|
15 |
Integer idx = fragmentIdx.get(dest);
|
16 |
Preconditions.checkState(idx != null);
|
17 |
queryExecRequest.addToDest_fragment_idx(idx.intValue());
|
22 |
LOG.debug("get scan range locations");
|
23 |
Set<TTableName> tablesMissingStats = Sets.newTreeSet();
|
24 |
for (ScanNode scanNode: scanNodes) {
|
25 |
queryExecRequest.putToPer_node_scan_ranges(
|
26 |
scanNode.getId().asInt(),
|
27 |
scanNode.getScanRangeLocations());
|
28 |
if (scanNode.isTableMissingStats()) {
|
29 |
tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
|
33 |
queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
|
34 |
for (TTableName tableName: tablesMissingStats) {
|
35 |
queryCtx.addToTables_missing_stats(tableName);
|
40 |
if (queryCtx.request.query_options.isDisable_unsafe_spills()
|
41 |
&& !tablesMissingStats.isEmpty()
|
42 |
&& !analysisResult.getAnalyzer().hasPlanHints()) {
|
43 |
queryCtx.setDisable_spilling(true);
|
48 |
planner.computeResourceReqs(fragments, true, queryExecRequest);
|
49 |
} catch (Exception e) {
|
51 |
LOG.error("Failed to compute resource requirements for query\n" +
|
52 |
queryCtx.request.getStmt(), e);
|
56 |
for (PlanFragment fragment: fragments) {
|
57 |
TPlanFragment thriftFragment = fragment.toThrift();
|
58 |
queryExecRequest.addToFragments(thriftFragment);
|
62 |
TExplainLevel explainLevel = TExplainLevel.VERBOSE;
|
64 |
if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
|
65 |
explainLevel = queryCtx.request.query_options.getExplain_level();
|
69 |
queryExecRequest.setQuery_ctx(queryCtx);
|
72 |
planner.getExplainString(fragments, queryExecRequest, explainLevel));
|
73 |
queryExecRequest.setQuery_plan(explainString.toString());
|
74 |
queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
|
76 |
String jsonLineageGraph = analysisResult.getJsonLineageGraph();
|
77 |
if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
|
78 |
queryExecRequest.setLineage_graph(jsonLineageGraph);
|
81 |
if (analysisResult.isExplainStmt()) {
|
83 |
createExplainRequest(explainString.toString(), result);
|
87 |
result.setQuery_exec_request(queryExecRequest);
|
89 |
if (analysisResult.isQueryStmt()) {
|
91 |
LOG.debug("create result set metadata");
|
92 |
result.stmt_type = TStmtType.QUERY;
|
93 |
result.query_exec_request.stmt_type = result.stmt_type;
|
94 |
TResultSetMetadata metadata = new TResultSetMetadata();
|
95 |
QueryStmt queryStmt = analysisResult.getQueryStmt();
|
96 |
int colCnt = queryStmt.getColLabels().size();
|
97 |
for (int i = 0; i < colCnt; ++i) {
|
98 |
TColumn colDesc = new TColumn();
|
99 |
colDesc.columnName = queryStmt.getColLabels().get(i);
|
100 |
colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift();
|
101 |
metadata.addToColumns(colDesc);
|
103 |
result.setResult_set_metadata(metadata);
|
105 |
Preconditions.checkState(analysisResult.isInsertStmt() ||
|
106 |
analysisResult.isCreateTableAsSelectStmt());
|
111 |
analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
|
112 |
result.query_exec_request.stmt_type = TStmtType.DML;
|
115 |
InsertStmt insertStmt = analysisResult.getInsertStmt();
|
116 |
if (insertStmt.getTargetTable() instanceof HdfsTable) {
|
117 |
TFinalizeParams finalizeParams = new TFinalizeParams();
|
118 |
finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
|
119 |
finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
|
120 |
finalizeParams.setTable_id(insertStmt.getTargetTable().getId().asInt());
|
121 |
String db = insertStmt.getTargetTableName().getDb();
|
122 |
finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
|
123 |
HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable();
|
124 |
finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
|
125 |
finalizeParams.setStaging_dir(
|
126 |
hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
|
127 |
queryExecRequest.setFinalize_params(finalizeParams);
|
131 |
validateTableIds(analysisResult.getAnalyzer(), result);
|
133 |
timeline.markEvent("Planning finished");
|
134 |
result.setTimeline(analysisResult.getAnalyzer().getTimeline().toThrift());
|
至此,FE 结束,返回 TExecRequest 型的对象给 backend 执行。
由于笔者刚开始接触 Impala,分析可能存在某些谬误,有任何疑问或建议都欢迎讨论。