Skip to content

Commit bf1f766

Browse files
committed
Merge branch 'feat_1.8_fixJoinSameNameError' into '1.8_test_3.10.x'
Feat 1.8 fix join same name error See merge request dt-insight-engine/flinkStreamSQL!59
2 parents 07ac721 + 6a35aab commit bf1f766

File tree

1 file changed

+55
-9
lines changed

1 file changed

+55
-9
lines changed

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

+55-9
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,18 @@
2020

2121
package com.dtstack.flink.sql.parser;
2222

23-
import org.apache.calcite.config.Lex;
24-
import org.apache.calcite.sql.SqlBasicCall;
2523
import org.apache.calcite.sql.SqlInsert;
2624
import org.apache.calcite.sql.SqlJoin;
2725
import org.apache.calcite.sql.SqlKind;
28-
import org.apache.calcite.sql.SqlMatchRecognize;
2926
import org.apache.calcite.sql.SqlNode;
30-
import org.apache.calcite.sql.SqlOrderBy;
3127
import org.apache.calcite.sql.SqlSelect;
32-
import org.apache.calcite.sql.parser.SqlParseException;
33-
import org.apache.calcite.sql.parser.SqlParser;
28+
import org.apache.calcite.sql.SqlNodeList;
29+
import org.apache.calcite.sql.SqlBasicCall;
30+
import org.apache.calcite.sql.SqlMatchRecognize;
31+
import org.apache.calcite.sql.SqlOrderBy;
32+
import org.apache.calcite.sql.SqlIdentifier;
33+
import org.apache.calcite.sql.SqlAsOperator;
34+
import org.apache.calcite.sql.parser.SqlParserPos;
3435
import org.apache.commons.lang3.StringUtils;
3536
import com.google.common.collect.Lists;
3637
import org.apache.flink.table.calcite.FlinkPlannerImpl;
@@ -49,6 +50,9 @@
4950

5051
public class InsertSqlParser implements IParser {
5152

53+
// 用来标识当前解析节点的上一层节点是否为 insert 节点
54+
private static Boolean parentIsInsert = false;
55+
5256
@Override
5357
public boolean verify(String sql) {
5458
return StringUtils.isNotBlank(sql) && sql.trim().toLowerCase().startsWith("insert");
@@ -78,13 +82,19 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
7882
SqlNode sqlTarget = ((SqlInsert)sqlNode).getTargetTable();
7983
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
8084
sqlParseResult.addTargetTable(sqlTarget.toString());
85+
parentIsInsert = true;
8186
parseNode(sqlSource, sqlParseResult);
8287
break;
8388
case SELECT:
84-
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
85-
if(sqlFrom.getKind() == IDENTIFIER){
89+
SqlSelect sqlSelect = (SqlSelect) sqlNode;
90+
if (parentIsInsert) {
91+
rebuildSelectNode(sqlSelect.getSelectList(), sqlSelect);
92+
}
93+
SqlNode sqlFrom = ((SqlSelect) sqlNode).getFrom();
94+
if (sqlFrom.getKind() == IDENTIFIER) {
8695
sqlParseResult.addSourceTable(sqlFrom.toString());
87-
}else{
96+
} else {
97+
parentIsInsert = false;
8898
parseNode(sqlFrom, sqlParseResult);
8999
}
90100
break;
@@ -141,6 +151,42 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
141151
}
142152
}
143153

154+
/**
155+
* 将第一层 select 中的 sqlNode 转化为 AsNode,解决字段名冲突问题
156+
* @param selectList select Node 的 select 字段
157+
* @param sqlSelect 第一层解析出来的 selectNode
158+
*/
159+
private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelect) {
160+
SqlNodeList sqlNodes = new SqlNodeList(selectList.getParserPosition());
161+
162+
for (int index = 0; index < selectList.size(); index++) {
163+
if (selectList.get(index).getKind().equals(SqlKind.AS)) {
164+
sqlNodes.add(selectList.get(index));
165+
continue;
166+
}
167+
sqlNodes.add(transformToAsNode(selectList.get(index)));
168+
}
169+
sqlSelect.setSelectList(sqlNodes);
170+
}
171+
172+
/**
173+
* 将 sqlNode 转化为 AsNode
174+
* @param sqlNode 需要转化的 sqlNode
175+
* @return 重新构造的 AsNode
176+
*/
177+
public static SqlBasicCall transformToAsNode(SqlNode sqlNode) {
178+
String asName = "";
179+
SqlParserPos pos = new SqlParserPos(sqlNode.getParserPosition().getLineNum(),
180+
sqlNode.getParserPosition().getEndColumnNum());
181+
if (sqlNode.getKind().equals(SqlKind.IDENTIFIER)) {
182+
asName = ((SqlIdentifier) sqlNode).names.get(1);
183+
}
184+
SqlNode[] operands = new SqlNode[2];
185+
operands[0] = sqlNode;
186+
operands[1] = new SqlIdentifier(asName, null, pos);
187+
return new SqlBasicCall(new SqlAsOperator(), operands, pos);
188+
}
189+
144190
public static class SqlParseResult {
145191

146192
private List<String> sourceTableList = Lists.newArrayList();

0 commit comments

Comments
 (0)