Skip to content

Commit 48cb34e

Browse files
author
xuchao
committed
初步重构 sql语句解析逻辑;
字段替换和重命名相关的逻辑在解析阶段完成; 目标:经过解析sql之后的所有sql是确定的。
1 parent 728490a commit 48cb34e

File tree

4 files changed

+23
-92
lines changed

4 files changed

+23
-92
lines changed

core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java

-24
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,9 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.util.ReflectionUtils;
2423
import com.google.common.collect.HashBasedTable;
2524
import org.apache.commons.lang3.StringUtils;
2625

27-
import java.lang.reflect.Field;
28-
import java.util.HashMap;
29-
import java.util.Map;
3026

3127
/**
3228
* 用于记录转换之后的表和原来表直接字段的关联关系
@@ -89,26 +85,6 @@ public String getTargetFieldName(String tableName, String fieldName) {
8985
return targetFieldName;
9086
}
9187

92-
93-
/* Field field = ReflectionUtils.getDeclaredField(mappingTable, "backingMap");
94-
field.setAccessible(true);
95-
HashMap<String, Map<String, String>> map = null;
96-
try {
97-
map = (HashMap) field.get(mappingTable);
98-
} catch (IllegalAccessException e) {
99-
e.printStackTrace();
100-
}
101-
102-
if(map.size() == 1 && tableName.equalsIgnoreCase(targetTableAlias)){
103-
for(Map<String, String> tmp : map.values()){
104-
targetFieldName = tmp.get(fieldName);
105-
if(targetFieldName != null){
106-
return targetFieldName;
107-
}
108-
}
109-
}*/
110-
111-
11288
if(preNode == null){
11389
return null;
11490
}

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode, Set<String> sideTableSet,
277277
for(SqlNode sqlNode : parentSelectList.getList()){
278278
for(String tbTmp : fromTableNameSet) {
279279
//TODO
280-
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef, null);
280+
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef);
281281
}
282282
}
283283

@@ -330,7 +330,7 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
330330
//TODO
331331
for(SqlNode sqlNode : parentSelectList.getList()){
332332
for(String tbTmp : fromTableNameSet) {
333-
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef, null);
333+
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef);
334334
}
335335
}
336336

@@ -393,7 +393,7 @@ private void extractTemporaryQuery(SqlNode node, String tableAlias, SqlBasicCall
393393
HashBiMap<String, String> fieldReplaceRef = HashBiMap.create();
394394
for(SqlNode tmpSelect : parentSelectList.getList()){
395395
for(String tbTmp : fromTableNameSet) {
396-
TableUtils.replaceSelectFieldTable(tmpSelect, tbTmp, tableAlias, fieldReplaceRef, null);
396+
TableUtils.replaceSelectFieldTable(tmpSelect, tbTmp, tableAlias, fieldReplaceRef);
397397
}
398398
}
399399

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

+8-41
Original file line numberDiff line numberDiff line change
@@ -108,25 +108,11 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
108108
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
109109
Object pollObj = null;
110110

111-
//need clean
112-
boolean preIsSideJoin = false;
113-
List<FieldReplaceInfo> replaceInfoList = Lists.newArrayList();
114-
115111
while((pollObj = exeQueue.poll()) != null){
116112

117113
if(pollObj instanceof SqlNode){
118114
SqlNode pollSqlNode = (SqlNode) pollObj;
119115

120-
if(preIsSideJoin){
121-
preIsSideJoin = false;
122-
List<String> fieldNames = null;
123-
for(FieldReplaceInfo replaceInfo : replaceInfoList){
124-
fieldNames = Lists.newArrayList();
125-
//replaceFieldName(pollSqlNode, replaceInfo);
126-
//TODO
127-
//addAliasForFieldNode(pollSqlNode, fieldNames, replaceInfo.getMappingTable());
128-
}
129-
}
130116

131117
if(pollSqlNode.getKind() == INSERT){
132118
System.out.println("----------real exec sql-----------" );
@@ -137,7 +123,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
137123
}
138124

139125
}else if(pollSqlNode.getKind() == AS){
140-
dealAsSourceTable(tableEnv, pollSqlNode, tableCache, replaceInfoList);
126+
dealAsSourceTable(tableEnv, pollSqlNode, tableCache);
141127

142128
} else if (pollSqlNode.getKind() == WITH_ITEM) {
143129
SqlWithItem sqlWithItem = (SqlWithItem) pollSqlNode;
@@ -166,8 +152,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
166152
}else if (pollObj instanceof JoinInfo){
167153
System.out.println("----------exec join info----------");
168154
System.out.println(pollObj.toString());
169-
preIsSideJoin = true;
170-
joinFun(pollObj, localTableCache, sideTableMap, tableEnv, replaceInfoList);
155+
joinFun(pollObj, localTableCache, sideTableMap, tableEnv);
171156
}
172157
}
173158

@@ -401,8 +386,7 @@ public List<String> getConditionFields(SqlNode conditionNode, String specifyTabl
401386

402387
protected void dealAsSourceTable(StreamTableEnvironment tableEnv,
403388
SqlNode pollSqlNode,
404-
Map<String, Table> tableCache,
405-
List<FieldReplaceInfo> replaceInfoList) throws SqlParseException {
389+
Map<String, Table> tableCache) throws SqlParseException {
406390

407391
AliasInfo aliasInfo = parseASNode(pollSqlNode);
408392
if (localTableCache.containsKey(aliasInfo.getName())) {
@@ -424,19 +408,13 @@ protected void dealAsSourceTable(StreamTableEnvironment tableEnv,
424408
Set<String> fromTableNameSet = Sets.newHashSet();
425409
SqlNode fromNode = ((SqlBasicCall)pollSqlNode).getOperands()[0];
426410
TableUtils.getFromTableInfo(fromNode, fromTableNameSet);
427-
for(FieldReplaceInfo tmp : replaceInfoList){
428-
if(fromTableNameSet.contains(tmp.getTargetTableName())
429-
|| fromTableNameSet.contains(tmp.getTargetTableAlias())){
430-
fieldReplaceInfo.setPreNode(tmp);
431-
break;
432-
}
433-
}
434-
replaceInfoList.add(fieldReplaceInfo);
411+
435412
}
436413

437-
private void joinFun(Object pollObj, Map<String, Table> localTableCache,
438-
Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
439-
List<FieldReplaceInfo> replaceInfoList) throws Exception{
414+
private void joinFun(Object pollObj,
415+
Map<String, Table> localTableCache,
416+
Map<String, SideTableInfo> sideTableMap,
417+
StreamTableEnvironment tableEnv) throws Exception{
440418
JoinInfo joinInfo = (JoinInfo) pollObj;
441419

442420
JoinScope joinScope = new JoinScope();
@@ -518,17 +496,6 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
518496
replaceInfo.setTargetTableName(targetTableName);
519497
replaceInfo.setTargetTableAlias(targetTableAlias);
520498

521-
//判断之前是不是被替换过,被替换过则设置之前的替换信息作为上一个节点
522-
for(FieldReplaceInfo tmp : replaceInfoList){
523-
if(tmp.getTargetTableName().equalsIgnoreCase(joinInfo.getLeftTableName())
524-
||tmp.getTargetTableName().equalsIgnoreCase(joinInfo.getLeftTableAlias())){
525-
replaceInfo.setPreNode(tmp);
526-
break;
527-
}
528-
}
529-
530-
replaceInfoList.add(replaceInfo);
531-
532499
if (!tableEnv.isRegistered(joinInfo.getNewTableName())){
533500
Table joinTable = tableEnv.fromDataStream(dsOut);
534501
tableEnv.registerTable(joinInfo.getNewTableName(), joinTable);

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

+12-24
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,14 @@ public static void replaceFromNodeForJoin(JoinInfo joinInfo, SqlSelect sqlNode)
256256
Map<String, String> leftFieldMapping = fieldMapping.row(joinInfo.getLeftTableAlias());
257257
Map<String, String> rightFieldMapping = fieldMapping.row(joinInfo.getRightTableAlias());
258258

259-
for(SqlNode oneSelectNode : sqlNode.getSelectList()){
259+
/* for(SqlNode oneSelectNode : sqlNode.getSelectList()){
260260
replaceSelectFieldTable(oneSelectNode, joinInfo.getLeftTableAlias(), newAliasName, null ,leftFieldMapping);
261261
replaceSelectFieldTable(oneSelectNode, joinInfo.getRightTableAlias(), newAliasName, null , rightFieldMapping);
262-
}
262+
}*/
263263

264264
//where中的条件属性为新的表名称和字段
265-
replaceWhereCondition();
265+
FieldReplaceUtil.replaceFieldName(sqlNode, joinInfo.getLeftTableAlias(), newAliasName, leftFieldMapping);
266+
FieldReplaceUtil.replaceFieldName(sqlNode, joinInfo.getRightTableAlias(), newAliasName, rightFieldMapping);
266267
sqlNode.setFrom(sqlBasicCall);
267268
}
268269

@@ -296,22 +297,18 @@ public static void getFromTableInfo(SqlNode fromTable, Set<String> tableNameSet)
296297

297298
/**
298299
* 替换select 中的字段信息
299-
* 如果mappingTable 非空则从该参数获取字段的映射
300-
* 如果mappingTable 为空则根据是否存在新生成字段
301300
* @param selectNode
302301
* @param oldTbName
303302
* @param newTbName
304303
* @param fieldReplaceRef
305-
* @param mappingTable
306304
*/
307305
public static void replaceSelectFieldTable(SqlNode selectNode,
308306
String oldTbName,
309307
String newTbName,
310-
HashBiMap<String, String> fieldReplaceRef,
311-
Map<String, String> mappingTable) {
308+
HashBiMap<String, String> fieldReplaceRef) {
312309
if (selectNode.getKind() == AS) {
313310
SqlNode leftNode = ((SqlBasicCall) selectNode).getOperands()[0];
314-
replaceSelectFieldTable(leftNode, oldTbName, newTbName, fieldReplaceRef, mappingTable);
311+
replaceSelectFieldTable(leftNode, oldTbName, newTbName, fieldReplaceRef);
315312

316313
}else if(selectNode.getKind() == IDENTIFIER){
317314
SqlIdentifier sqlIdentifier = (SqlIdentifier) selectNode;
@@ -322,7 +319,7 @@ public static void replaceSelectFieldTable(SqlNode selectNode,
322319

323320
String fieldTableName = sqlIdentifier.names.get(0);
324321
if(oldTbName.equalsIgnoreCase(fieldTableName)){
325-
replaceOneSelectField(sqlIdentifier, newTbName, oldTbName, fieldReplaceRef, mappingTable);
322+
replaceOneSelectField(sqlIdentifier, newTbName, oldTbName, fieldReplaceRef);
326323
}
327324

328325
}else if(selectNode.getKind() == LITERAL || selectNode.getKind() == LITERAL_CHAIN){//字面含义
@@ -369,7 +366,7 @@ public static void replaceSelectFieldTable(SqlNode selectNode,
369366
continue;
370367
}
371368

372-
replaceSelectFieldTable(sqlNode, oldTbName, newTbName, fieldReplaceRef, mappingTable);
369+
replaceSelectFieldTable(sqlNode, oldTbName, newTbName, fieldReplaceRef);
373370
}
374371

375372
}else if(selectNode.getKind() == CASE){
@@ -380,16 +377,16 @@ public static void replaceSelectFieldTable(SqlNode selectNode,
380377

381378
for(int i=0; i<whenOperands.size(); i++){
382379
SqlNode oneOperand = whenOperands.get(i);
383-
replaceSelectFieldTable(oneOperand, oldTbName, newTbName, fieldReplaceRef, mappingTable);
380+
replaceSelectFieldTable(oneOperand, oldTbName, newTbName, fieldReplaceRef);
384381
}
385382

386383
for(int i=0; i<thenOperands.size(); i++){
387384
SqlNode oneOperand = thenOperands.get(i);
388-
replaceSelectFieldTable(oneOperand, oldTbName, newTbName, fieldReplaceRef, mappingTable);
385+
replaceSelectFieldTable(oneOperand, oldTbName, newTbName, fieldReplaceRef);
389386

390387
}
391388

392-
replaceSelectFieldTable(elseNode, oldTbName, newTbName, fieldReplaceRef, mappingTable);
389+
replaceSelectFieldTable(elseNode, oldTbName, newTbName, fieldReplaceRef);
393390
}else if(selectNode.getKind() == OTHER){
394391
//不处理
395392
return;
@@ -401,20 +398,11 @@ public static void replaceSelectFieldTable(SqlNode selectNode,
401398
private static void replaceOneSelectField(SqlIdentifier sqlIdentifier,
402399
String newTbName,
403400
String oldTbName,
404-
HashBiMap<String, String> fieldReplaceRef,
405-
Map<String, String> mappingTable){
401+
HashBiMap<String, String> fieldReplaceRef){
406402
SqlIdentifier newField = sqlIdentifier.setName(0, newTbName);
407403
String fieldName = sqlIdentifier.names.get(1);
408404
String fieldKey = oldTbName + "_" + fieldName;
409405

410-
if(mappingTable != null){
411-
String mappingFieldName = mappingTable.get(fieldName);
412-
Preconditions.checkNotNull(mappingFieldName, "can't get any field from mappingTable with oldFieldName " + fieldName);
413-
newField = newField.setName(1, mappingFieldName);
414-
sqlIdentifier.assignNamesFrom(newField);
415-
return;
416-
}
417-
418406
if(!fieldReplaceRef.containsKey(fieldKey)){
419407
if(fieldReplaceRef.inverse().get(fieldName) != null){
420408
//换一个名字

0 commit comments

Comments
 (0)