Skip to content

Commit 8e2e91d

Browse files
author
xuchao
committed
初步重构 sql语句解析逻辑;
字段替换和重命名相关的逻辑在解析阶段完成; 目标:经过解析sql之后的所有sql是确定的。
1 parent 48cb34e commit 8e2e91d

File tree

3 files changed

+104
-113
lines changed

3 files changed

+104
-113
lines changed

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

-23
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.google.common.collect.HashBasedTable;
24-
import com.google.common.collect.HashBiMap;
2524
import com.google.common.collect.Maps;
2625
import org.apache.calcite.sql.JoinType;
2726
import org.apache.calcite.sql.SqlNode;
@@ -34,7 +33,6 @@
3433
* Join信息
3534
* Date: 2018/7/24
3635
* Company: www.dtstack.com
37-
*
3836
* @author xuchao
3937
*/
4038

@@ -45,8 +43,6 @@ public class JoinInfo implements Serializable {
4543
//左表是否是维表
4644
private boolean leftIsSideTable;
4745

48-
private boolean leftIsTmpTable = false;
49-
5046
//右表是否是维表
5147
private boolean rightIsSideTable;
5248

@@ -70,8 +66,6 @@ public class JoinInfo implements Serializable {
7066

7167
private JoinType joinType;
7268

73-
private HashBiMap<String, String> fieldRefInfo = HashBiMap.create();
74-
7569
/**
7670
* 左表需要查询的字段信息和output的时候对应的列名称
7771
*/
@@ -210,22 +204,6 @@ public void setJoinType(JoinType joinType) {
210204
this.joinType = joinType;
211205
}
212206

213-
public boolean isLeftIsTmpTable() {
214-
return leftIsTmpTable;
215-
}
216-
217-
public void setLeftIsTmpTable(boolean leftIsTmpTable) {
218-
this.leftIsTmpTable = leftIsTmpTable;
219-
}
220-
221-
public HashBiMap<String, String> getFieldRefInfo() {
222-
return fieldRefInfo;
223-
}
224-
225-
public void setFieldRefInfo(HashBiMap<String, String> fieldRefInfo) {
226-
this.fieldRefInfo = fieldRefInfo;
227-
}
228-
229207
public Map<String, String> getLeftSelectFieldInfo() {
230208
return leftSelectFieldInfo;
231209
}
@@ -259,7 +237,6 @@ public HashBasedTable<String, String, String> getTableFieldRef(){
259237
public String toString() {
260238
return "JoinInfo{" +
261239
"leftIsSideTable=" + leftIsSideTable +
262-
", leftIsTmpTable=" + leftIsTmpTable +
263240
", rightIsSideTable=" + rightIsSideTable +
264241
", leftTableName='" + leftTableName + '\'' +
265242
", leftTableAlias='" + leftTableAlias + '\'' +

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

+102-90
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,15 @@ public JoinNodeDealer(SideSQLParser sideSQLParser){
8181
* @param tableRef 存储构建临时表查询后源表和新表之间的关联关系
8282
* @return
8383
*/
84-
public JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet,
85-
Queue<Object> queueInfo, SqlNode parentWhere,
84+
public JoinInfo dealJoinNode(SqlJoin joinNode,
85+
Set<String> sideTableSet,
86+
Queue<Object> queueInfo,
87+
SqlNode parentWhere,
8688
SqlNodeList parentSelectList,
8789
Set<Tuple2<String, String>> joinFieldSet,
8890
Map<String, String> tableRef,
8991
Map<String, String> fieldRef) {
92+
9093
SqlNode leftNode = joinNode.getLeft();
9194
SqlNode rightNode = joinNode.getRight();
9295
JoinType joinType = joinNode.getJoinType();
@@ -95,8 +98,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet,
9598
String leftTbAlias = "";
9699
String rightTableName = "";
97100
String rightTableAlias = "";
98-
boolean leftTbisTmp = false;
99-
//TODO 这个是啥东西
101+
102+
//TODO 含义需要更明确
100103
HashBiMap<String, String> fieldReplaceRef = HashBiMap.create();
101104

102105
//如果是连续join 判断是否已经处理过添加到执行队列
@@ -109,14 +112,12 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet,
109112
dealNestJoin(joinNode, sideTableSet,
110113
queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef, parentSelectList, fieldReplaceRef);
111114
leftNode = joinNode.getLeft();
112-
113115
}
114116

115117
if (leftNode.getKind() == AS) {
116118
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo, parentWhere, parentSelectList);
117119
leftTbName = aliasInfo.getName();
118120
leftTbAlias = aliasInfo.getAlias();
119-
120121
}
121122

122123
boolean leftIsSide = checkIsSideTable(leftTbName, sideTableSet);
@@ -146,69 +147,20 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet,
146147
tableInfo.setRightTableAlias(rightTableAlias);
147148
}
148149

149-
150-
tableInfo.setLeftIsTmpTable(leftTbisTmp);
151150
tableInfo.setLeftIsSideTable(leftIsSide);
152151
tableInfo.setRightIsSideTable(rightIsSide);
153152
tableInfo.setLeftNode(leftNode);
154153
tableInfo.setRightNode(rightNode);
155154
tableInfo.setJoinType(joinType);
156-
tableInfo.setFieldRefInfo(fieldReplaceRef);
157155

158156
tableInfo.setCondition(joinNode.getCondition());
159157
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);
160158

161159
//extract 需要查询的字段信息
162-
//TODO 抽取===========================
163160
if(rightIsSide){
164-
Set<String> fromTableNameSet = Sets.newHashSet();
165-
TableUtils.getFromTableInfo(leftNode, fromTableNameSet);
166-
Set<String> extractCondition = Sets.newHashSet();
167-
extractWhereCondition(fromTableNameSet, (SqlBasicCall) parentWhere, extractCondition);
168-
Set<String> extractSelectField = extractSelectFields(parentSelectList, fromTableNameSet, tableRef);
169-
Set<String> fieldFromJoinCondition = extractSelectFieldFromJoinCondition(joinFieldSet, fromTableNameSet, tableRef);
170-
171-
extractSelectField.addAll(extractCondition);
172-
extractSelectField.addAll(fieldFromJoinCondition);
173-
174-
Set<String> rightFromTableNameSet = Sets.newHashSet();
175-
TableUtils.getFromTableInfo(rightNode, rightFromTableNameSet);
176-
Set<String> extractRightCondition = Sets.newHashSet();
177-
extractWhereCondition(rightFromTableNameSet, (SqlBasicCall) parentWhere, extractRightCondition);
178-
Set<String> rightExtractSelectField = extractSelectFields(parentSelectList, rightFromTableNameSet, tableRef);
179-
Set<String> rightFieldFromJoinCondition = extractSelectFieldFromJoinCondition(joinFieldSet, rightFromTableNameSet, tableRef);
180-
rightExtractSelectField.addAll(extractRightCondition);
181-
rightExtractSelectField.addAll(rightFieldFromJoinCondition);
182-
183-
//重命名right 中和 left 重名的
184-
Map<String, String> leftTbSelectField = Maps.newHashMap();
185-
Map<String, String> rightTbSelectField = Maps.newHashMap();
186-
String newTableName = tableInfo.getNewTableAlias();
187-
188-
for(String tmpField : extractSelectField){
189-
String[] tmpFieldSplit = StringUtils.split(tmpField, '.');
190-
leftTbSelectField.put(tmpFieldSplit[1], tmpFieldSplit[1]);
191-
fieldRef.put(tmpField, TableUtils.buildTableField(newTableName, tmpFieldSplit[1]));
192-
}
193-
194-
for(String tmpField : rightExtractSelectField){
195-
String[] tmpFieldSplit = StringUtils.split(tmpField, '.');
196-
String originalFieldName = tmpFieldSplit[1];
197-
String targetFieldName = originalFieldName;
198-
if(leftTbSelectField.containsKey(originalFieldName)){
199-
targetFieldName = ParseUtils.dealDuplicateFieldName(leftTbSelectField, originalFieldName);
200-
}
201-
202-
rightTbSelectField.put(originalFieldName, targetFieldName);
203-
fieldRef.put(tmpField, TableUtils.buildTableField(newTableName, targetFieldName));
204-
}
205-
206-
tableInfo.setLeftSelectFieldInfo(leftTbSelectField);
207-
tableInfo.setRightSelectFieldInfo(rightTbSelectField);
208-
161+
extractJoinNeedSelectField(leftNode, rightNode, parentWhere, parentSelectList, tableRef, joinFieldSet, fieldRef, tableInfo);
209162
}
210163

211-
//=========================================
212164
if(tableInfo.getLeftNode().getKind() != AS){
213165
return tableInfo;
214166
}
@@ -222,19 +174,80 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet,
222174
return tableInfo;
223175
}
224176

177+
/**
178+
* 获取join 之后需要查询的字段信息
179+
*/
180+
public void extractJoinNeedSelectField(SqlNode leftNode,
181+
SqlNode rightNode,
182+
SqlNode parentWhere,
183+
SqlNodeList parentSelectList,
184+
Map<String, String> tableRef,
185+
Set<Tuple2<String, String>> joinFieldSet,
186+
Map<String, String> fieldRef,
187+
JoinInfo tableInfo){
188+
Set<String> fromTableNameSet = Sets.newHashSet();
189+
TableUtils.getFromTableInfo(leftNode, fromTableNameSet);
190+
Set<String> extractCondition = Sets.newHashSet();
191+
extractWhereCondition(fromTableNameSet, (SqlBasicCall) parentWhere, extractCondition);
192+
Set<String> extractSelectField = extractSelectFields(parentSelectList, fromTableNameSet, tableRef);
193+
Set<String> fieldFromJoinCondition = extractSelectFieldFromJoinCondition(joinFieldSet, fromTableNameSet, tableRef);
194+
195+
extractSelectField.addAll(extractCondition);
196+
extractSelectField.addAll(fieldFromJoinCondition);
197+
198+
Set<String> rightFromTableNameSet = Sets.newHashSet();
199+
TableUtils.getFromTableInfo(rightNode, rightFromTableNameSet);
200+
Set<String> extractRightCondition = Sets.newHashSet();
201+
extractWhereCondition(rightFromTableNameSet, (SqlBasicCall) parentWhere, extractRightCondition);
202+
Set<String> rightExtractSelectField = extractSelectFields(parentSelectList, rightFromTableNameSet, tableRef);
203+
Set<String> rightFieldFromJoinCondition = extractSelectFieldFromJoinCondition(joinFieldSet, rightFromTableNameSet, tableRef);
204+
rightExtractSelectField.addAll(extractRightCondition);
205+
rightExtractSelectField.addAll(rightFieldFromJoinCondition);
206+
207+
//重命名right 中和 left 重名的
208+
Map<String, String> leftTbSelectField = Maps.newHashMap();
209+
Map<String, String> rightTbSelectField = Maps.newHashMap();
210+
String newTableName = tableInfo.getNewTableAlias();
211+
212+
for(String tmpField : extractSelectField){
213+
String[] tmpFieldSplit = StringUtils.split(tmpField, '.');
214+
leftTbSelectField.put(tmpFieldSplit[1], tmpFieldSplit[1]);
215+
fieldRef.put(tmpField, TableUtils.buildTableField(newTableName, tmpFieldSplit[1]));
216+
}
217+
218+
for(String tmpField : rightExtractSelectField){
219+
String[] tmpFieldSplit = StringUtils.split(tmpField, '.');
220+
String originalFieldName = tmpFieldSplit[1];
221+
String targetFieldName = originalFieldName;
222+
if(leftTbSelectField.containsKey(originalFieldName)){
223+
targetFieldName = ParseUtils.dealDuplicateFieldName(leftTbSelectField, originalFieldName);
224+
}
225+
226+
rightTbSelectField.put(originalFieldName, targetFieldName);
227+
fieldRef.put(tmpField, TableUtils.buildTableField(newTableName, targetFieldName));
228+
}
229+
230+
tableInfo.setLeftSelectFieldInfo(leftTbSelectField);
231+
tableInfo.setRightSelectFieldInfo(rightTbSelectField);
232+
}
233+
225234

226235
/**
227236
* 处理多层join
228-
* 判断左节点是否需要创建零时查询
237+
* 判断左节点是否需要创建临时查询
229238
* (1)右节点是维表
230239
* (2)左节点不是 as 节点
231240
*/
232-
private JoinInfo dealNestJoin(SqlJoin joinNode, Set<String> sideTableSet,
233-
Queue<Object> queueInfo, SqlNode parentWhere,
234-
SqlNodeList selectList, Set<Tuple2<String, String>> joinFieldSet,
235-
Map<String, String> tableRef, Map<String, String> fieldRef,
236-
SqlNodeList parentSelectList,
237-
HashBiMap<String, String> fieldReplaceRef){
241+
private JoinInfo dealNestJoin(SqlJoin joinNode,
242+
Set<String> sideTableSet,
243+
Queue<Object> queueInfo,
244+
SqlNode parentWhere,
245+
SqlNodeList selectList,
246+
Set<Tuple2<String, String>> joinFieldSet,
247+
Map<String, String> tableRef,
248+
Map<String, String> fieldRef,
249+
SqlNodeList parentSelectList,
250+
HashBiMap<String, String> fieldReplaceRef){
238251

239252
SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
240253
SqlNode parentRightJoinNode = joinNode.getRight();
@@ -249,7 +262,6 @@ private JoinInfo dealNestJoin(SqlJoin joinNode, Set<String> sideTableSet,
249262
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
250263
SqlBasicCall buildAs = TableUtils.buildAsNodeByJoinInfo(joinInfo, null, null);
251264

252-
//TODO 抽取相同代码
253265
if(rightIsSide){
254266
addSideInfoToExeQueue(queueInfo, joinInfo, joinNode, parentSelectList, parentWhere, fieldReplaceRef, tableRef);
255267
}
@@ -262,37 +274,17 @@ private JoinInfo dealNestJoin(SqlJoin joinNode, Set<String> sideTableSet,
262274
extractTemporaryQuery(newLeftNode, leftTbAlias, (SqlBasicCall) parentWhere,
263275
parentSelectList, queueInfo, joinFieldSet, tableRef, fieldRef);
264276

265-
//记录表之间的关联关系
266-
String newLeftTableName = buildAs.getOperands()[1].toString();
267-
Set<String> fromTableNameSet = Sets.newHashSet();
268-
TableUtils.getFromTableInfo(newLeftNode, fromTableNameSet);
269-
for(String tbTmp : fromTableNameSet){
270-
tableRef.put(tbTmp, newLeftTableName);
271-
}
272-
273277
//替换leftNode 为新的查询
274278
joinNode.setLeft(buildAs);
275-
276-
//替换select field 中的对应字段
277-
for(SqlNode sqlNode : parentSelectList.getList()){
278-
for(String tbTmp : fromTableNameSet) {
279-
//TODO
280-
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef);
281-
}
282-
}
283-
284-
//替换where 中的条件相关
285-
for(String tbTmp : fromTableNameSet){
286-
TableUtils.replaceWhereCondition(parentWhere, tbTmp, newLeftTableName);
287-
}
288-
279+
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere, fieldReplaceRef);
289280
}
290281

291282
return joinInfo;
292283
}
293284

294285
/**
295286
* 右边表是维表需要重新构建左表的临时查询
287+
* 并将joinInfo 添加到执行队列里面
296288
* @param queueInfo
297289
* @param joinInfo
298290
* @param joinNode
@@ -319,15 +311,34 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
319311
//替换左表为新的表名称
320312
joinNode.setLeft(buildAs);
321313

314+
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere, fieldReplaceRef);
315+
}
316+
317+
/**
318+
* 替换指定的查询和条件节点中的字段为新的字段
319+
* @param buildAs
320+
* @param leftJoinNode
321+
* @param tableRef
322+
* @param parentSelectList
323+
* @param parentWhere
324+
* @param fieldReplaceRef
325+
*/
326+
public void replaceSelectAndWhereField(SqlBasicCall buildAs,
327+
SqlNode leftJoinNode,
328+
Map<String, String> tableRef,
329+
SqlNodeList parentSelectList,
330+
SqlNode parentWhere,
331+
HashBiMap<String, String> fieldReplaceRef){
332+
322333
String newLeftTableName = buildAs.getOperands()[1].toString();
323334
Set<String> fromTableNameSet = Sets.newHashSet();
324335
TableUtils.getFromTableInfo(leftJoinNode, fromTableNameSet);
336+
325337
for(String tbTmp : fromTableNameSet){
326338
tableRef.put(tbTmp, newLeftTableName);
327339
}
328340

329341
//替换select field 中的对应字段
330-
//TODO
331342
for(SqlNode sqlNode : parentSelectList.getList()){
332343
for(String tbTmp : fromTableNameSet) {
333344
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef);
@@ -351,8 +362,10 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
351362
* @param tableRef
352363
* @return 源自段和新生成字段之间的映射关系
353364
*/
354-
private void extractTemporaryQuery(SqlNode node, String tableAlias, SqlBasicCall parentWhere,
355-
SqlNodeList parentSelectList, Queue<Object> queueInfo,
365+
private void extractTemporaryQuery(SqlNode node, String tableAlias,
366+
SqlBasicCall parentWhere,
367+
SqlNodeList parentSelectList,
368+
Queue<Object> queueInfo,
356369
Set<Tuple2<String, String>> joinFieldSet,
357370
Map<String, String> tableRef,
358371
Map<String, String> fieldRef){
@@ -389,7 +402,6 @@ private void extractTemporaryQuery(SqlNode node, String tableAlias, SqlBasicCall
389402
queueInfo.offer(sqlBasicCall);
390403

391404
//替换select中的表结构
392-
//TODO
393405
HashBiMap<String, String> fieldReplaceRef = HashBiMap.create();
394406
for(SqlNode tmpSelect : parentSelectList.getList()){
395407
for(String tbTmp : fromTableNameSet) {

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

+2
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,8 @@ public static void getFromTableInfo(SqlNode fromTable, Set<String> tableNameSet)
297297

298298
/**
299299
* 替换select 中的字段信息
300+
* 如果mappingTable 非空则从该参数获取字段的映射
301+
* 如果mappingTable 为空则根据是否存在新生成字段
300302
* @param selectNode
301303
* @param oldTbName
302304
* @param newTbName

0 commit comments

Comments
 (0)