Skip to content

Commit 2833176

Browse files
author
xuchao
committed
重构 sql语句解析逻辑;
字段替换和重命名相关的逻辑在解析阶段完成; 目标:经过解析sql之后的所有sql是确定的。
1 parent 14bc810 commit 2833176

File tree

6 files changed

+45
-11
lines changed

6 files changed

+45
-11
lines changed

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public void parseSql(String sql, SqlTree sqlTree) {
5757
.configBuilder()
5858
.setLex(Lex.MYSQL)
5959
.build();
60+
6061
SqlParser sqlParser = SqlParser.create(sql,config);
6162
SqlNode sqlNode = null;
6263
try {

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -731,9 +731,9 @@ private boolean checkAndRemoveWhereCondition(Set<String> fromTableNameSet,
731731
Set<String> conditionRefTableNameSet = Sets.newHashSet();
732732

733733
fieldInfos.forEach(fieldInfo -> {
734-
String[] splitInfo = StringUtils.split(fieldInfo, ",");
734+
String[] splitInfo = StringUtils.split(fieldInfo, ".");
735735
if(splitInfo.length == 2){
736-
conditionRefTableNameSet.add(splitInfo[1]);
736+
conditionRefTableNameSet.add(splitInfo[0]);
737737
}
738738
});
739739

core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

+31-5
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public abstract class SideInfo implements Serializable{
5555

5656
protected String sideSelectFields = "";
5757

58+
protected Map<Integer, String> sideSelectFieldsType = Maps.newHashMap();
59+
5860
protected JoinType joinType;
5961

6062
//key:Returns the value of the position, value: the ref field index​in the input table
@@ -84,15 +86,17 @@ public void parseSelectFields(JoinInfo joinInfo){
8486
String sideTableName = joinInfo.getSideTableName();
8587
String nonSideTableName = joinInfo.getNonSideTable();
8688
List<String> fields = Lists.newArrayList();
89+
int sideTableFieldIndex = 0;
8790

88-
int sideIndex = 0;
8991
for( int i=0; i<outFieldInfoList.size(); i++){
9092
FieldInfo fieldInfo = outFieldInfoList.get(i);
9193
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
92-
fields.add(sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName()));
93-
sideFieldIndex.put(i, sideIndex);
94-
sideFieldNameIndex.put(i, fieldInfo.getFieldName());
95-
sideIndex++;
94+
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
95+
fields.add(sideFieldName);
96+
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(sideFieldName));
97+
sideFieldIndex.put(i, sideTableFieldIndex);
98+
sideFieldNameIndex.put(i, sideFieldName);
99+
sideTableFieldIndex++;
96100
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
97101
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
98102
inFieldIndex.put(i, nonSideIndex);
@@ -108,6 +112,16 @@ public void parseSelectFields(JoinInfo joinInfo){
108112
sideSelectFields = String.join(",", fields);
109113
}
110114

115+
public String getTargetFieldType(String fieldName){
116+
int fieldIndex = sideTableInfo.getFieldList().indexOf(fieldName);
117+
if(fieldIndex == -1){
118+
throw new RuntimeException(sideTableInfo.getName() + "can't find field: " + fieldName);
119+
}
120+
121+
return sideTableInfo.getFieldTypes()[fieldIndex];
122+
}
123+
124+
111125
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
112126
if(!SqlKind.COMPARISON.contains(sqlNode.getKind())){
113127
throw new RuntimeException("not compare operator.");
@@ -255,4 +269,16 @@ public Map<Integer, String> getSideFieldNameIndex() {
255269
public void setSideFieldNameIndex(Map<Integer, String> sideFieldNameIndex) {
256270
this.sideFieldNameIndex = sideFieldNameIndex;
257271
}
272+
273+
public Map<Integer, String> getSideSelectFieldsType() {
274+
return sideSelectFieldsType;
275+
}
276+
277+
public void setSideSelectFieldsType(Map<Integer, String> sideSelectFieldsType) {
278+
this.sideSelectFieldsType = sideSelectFieldsType;
279+
}
280+
281+
public String getSelectSideFieldType(int index){
282+
return sideSelectFieldsType.get(index);
283+
}
258284
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

-1
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,6 @@ private void joinFun(Object pollObj,
394394
HashBasedTable<String, String, String> mappingTable = ((JoinInfo) pollObj).getTableFieldRef();
395395

396396
//获取两个表的所有字段
397-
//TODO 抽取
398397
List<FieldInfo> sideJoinFieldInfo = ParserJoinField.getRowTypeInfo(joinInfo.getSelectNode(), joinScope, true);
399398
//通过join的查询字段信息过滤出需要的字段信息
400399
sideJoinFieldInfo.removeIf(tmpFieldInfo -> mappingTable.get(tmpFieldInfo.getTable(), tmpFieldInfo.getFieldName()) == null);

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.common.collect.HashBasedTable;
2727
import com.google.common.collect.HashBiMap;
2828
import com.google.common.collect.Lists;
29+
import com.google.common.collect.Sets;
2930
import com.typesafe.config.ConfigException;
3031
import org.apache.calcite.sql.SqlAsOperator;
3132
import org.apache.calcite.sql.SqlBasicCall;
@@ -200,7 +201,14 @@ public static SqlBasicCall buildAsNodeByJoinInfo(JoinInfo joinInfo, SqlNode sqlN
200201
String joinLeftTableAlias = joinInfo.getLeftTableAlias();
201202
joinLeftTableName = Strings.isNullOrEmpty(joinLeftTableName) ? joinLeftTableAlias : joinLeftTableName;
202203
String newTableName = buildInternalTableName(joinLeftTableName, SPLIT, joinInfo.getRightTableName());
203-
String newTableAlias = !StringUtils.isEmpty(tableAlias) ? tableAlias : buildInternalTableName(joinInfo.getLeftTableAlias(), SPLIT, joinInfo.getRightTableAlias());
204+
String lefTbAlias = joinInfo.getLeftTableAlias();
205+
if(Strings.isNullOrEmpty(lefTbAlias)){
206+
Set<String> fromTableSet = Sets.newHashSet();
207+
TableUtils.getFromTableInfo(joinInfo.getLeftNode(), fromTableSet);
208+
lefTbAlias = StringUtils.join(fromTableSet, "_");
209+
}
210+
211+
String newTableAlias = !StringUtils.isEmpty(tableAlias) ? tableAlias : buildInternalTableName(lefTbAlias, SPLIT, joinInfo.getRightTableAlias());
204212

205213
if (null == sqlNode0) {
206214
sqlNode0 = new SqlIdentifier(newTableName, null, sqlParserPos);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ protected List<Row> getRows(Row inputRow, List<JsonArray> cacheContent, List<Jso
201201
public Row fillData(Row input, Object line) {
202202
JsonArray jsonArray = (JsonArray) line;
203203
Row row = new Row(sideInfo.getOutFieldInfoList().size());
204-
String[] fields = sideInfo.getSideTableInfo().getFieldTypes();
205204
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
206205
Object obj = input.getField(entry.getValue());
207206
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
@@ -216,7 +215,8 @@ public Row fillData(Row input, Object line) {
216215
if (jsonArray == null) {
217216
row.setField(entry.getKey(), null);
218217
} else {
219-
Object object = SwitchUtil.getTarget(jsonArray.getValue(entry.getValue()), fields[entry.getValue()]);
218+
String fieldType = sideInfo.getSelectSideFieldType(entry.getValue());
219+
Object object = SwitchUtil.getTarget(jsonArray.getValue(entry.getValue()), fieldType);
220220
row.setField(entry.getKey(), object);
221221
}
222222
}

0 commit comments

Comments
 (0)