Skip to content

Commit 911637a

Browse files
author
gituser
committed
Merge branch 'hotfix_3.9.x_23188' into 1.8_release_3.9.x
2 parents 260ddcd + b251fb9 commit 911637a

18 files changed

+1229
-750
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package com.dtstack.flink.sql;
2121

22-
import com.dtstack.flink.sql.config.CalciteConfig;
2322
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2423
import com.dtstack.flink.sql.enums.ClusterMode;
2524
import com.dtstack.flink.sql.enums.ECacheType;
@@ -29,6 +28,7 @@
2928
import com.dtstack.flink.sql.option.OptionParser;
3029
import com.dtstack.flink.sql.parser.CreateFuncParser;
3130
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
31+
import com.dtstack.flink.sql.parser.FlinkPlanner;
3232
import com.dtstack.flink.sql.parser.InsertSqlParser;
3333
import com.dtstack.flink.sql.parser.SqlParser;
3434
import com.dtstack.flink.sql.parser.SqlTree;
@@ -60,6 +60,7 @@
6060
import org.apache.flink.table.api.Table;
6161
import org.apache.flink.table.api.TableEnvironment;
6262
import org.apache.flink.table.api.java.StreamTableEnvironment;
63+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
6364
import org.apache.flink.table.sinks.TableSink;
6465
import org.apache.flink.types.Row;
6566
import org.slf4j.Logger;
@@ -72,7 +73,6 @@
7273
import java.net.URLDecoder;
7374
import java.util.List;
7475
import java.util.Map;
75-
import java.util.Optional;
7676
import java.util.Properties;
7777
import java.util.Set;
7878

@@ -120,6 +120,8 @@ public static void main(String[] args) throws Exception {
120120
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
121121
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
122122
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, confProperties);
123+
// init global flinkPlanner
124+
FlinkPlanner.createFlinkPlanner(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
123125

124126
List<URL> jarURList = Lists.newArrayList();
125127
SqlTree sqlTree = SqlParser.parseSql(sql);
@@ -149,7 +151,13 @@ public static void main(String[] args) throws Exception {
149151
env.execute(name);
150152
}
151153

152-
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
154+
private static void sqlTranslation(String localSqlPluginPath,
155+
StreamTableEnvironment tableEnv,
156+
SqlTree sqlTree,
157+
Map<String, SideTableInfo> sideTableMap,
158+
Map<String, Table> registerTableCache,
159+
StreamQueryConfig queryConfig) throws Exception {
160+
153161
SideSqlExec sideSqlExec = new SideSqlExec();
154162
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
155163
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
@@ -165,8 +173,9 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
165173
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
166174
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
167175
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
176+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
168177

169-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
178+
SqlNode sqlNode = flinkPlanner.parse(realSql);
170179
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
171180
tmp.setExecSql(tmpSql);
172181
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);

core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java

-35
This file was deleted.

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

+5-11
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24-
import org.apache.calcite.config.Lex;
2524
import org.apache.calcite.sql.*;
26-
import org.apache.calcite.sql.parser.SqlParseException;
27-
import org.apache.calcite.sql.parser.SqlParser;
2825
import com.google.common.collect.Lists;
26+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
27+
2928
import java.util.List;
3029
import java.util.regex.Matcher;
3130
import java.util.regex.Pattern;
@@ -71,17 +70,12 @@ public void parseSql(String sql, SqlTree sqlTree) {
7170
tableName = matcher.group(1);
7271
selectSql = "select " + matcher.group(2);
7372
}
74-
75-
SqlParser.Config config = SqlParser
76-
.configBuilder()
77-
.setLex(Lex.MYSQL)
78-
.build();
79-
SqlParser sqlParser = SqlParser.create(selectSql,config);
73+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
8074

8175
SqlNode sqlNode = null;
8276
try {
83-
sqlNode = sqlParser.parseStmt();
84-
} catch (SqlParseException e) {
77+
sqlNode = flinkPlanner.parse(selectSql);
78+
} catch (Exception e) {
8579
throw new RuntimeException("", e);
8680
}
8781

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.parser;
20+
21+
import org.apache.calcite.plan.RelOptPlanner;
22+
import org.apache.calcite.tools.FrameworkConfig;
23+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
24+
import org.apache.flink.table.calcite.FlinkTypeFactory;
25+
26+
/**
27+
* Date: 2020/3/31
28+
* Company: www.dtstack.com
29+
* @author maqi
30+
*/
31+
public class FlinkPlanner {
32+
33+
public static volatile FlinkPlannerImpl flinkPlanner;
34+
35+
private FlinkPlanner() {
36+
}
37+
38+
public static FlinkPlannerImpl createFlinkPlanner(FrameworkConfig frameworkConfig, RelOptPlanner relOptPlanner, FlinkTypeFactory typeFactory) {
39+
if (flinkPlanner == null) {
40+
synchronized (FlinkPlanner.class) {
41+
if (flinkPlanner == null) {
42+
flinkPlanner = new FlinkPlannerImpl(frameworkConfig, relOptPlanner, typeFactory);
43+
}
44+
}
45+
}
46+
return flinkPlanner;
47+
}
48+
49+
public static FlinkPlannerImpl getFlinkPlanner() {
50+
return flinkPlanner;
51+
}
52+
}

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public void parseSql(String sql, SqlTree sqlTree) {
5757
.configBuilder()
5858
.setLex(Lex.MYSQL)
5959
.build();
60+
6061
SqlParser sqlParser = SqlParser.create(sql,config);
6162
SqlNode sqlNode = null;
6263
try {

core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java

+9
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,13 @@ public boolean equals(Object o) {
8585
public int hashCode() {
8686
return Objects.hash(table, fieldName);
8787
}
88+
89+
@Override
90+
public String toString() {
91+
return "FieldInfo{" +
92+
"table='" + table + '\'' +
93+
", fieldName='" + fieldName + '\'' +
94+
", typeInformation=" + typeInformation +
95+
'}';
96+
}
8897
}

core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.collect.HashBasedTable;
2424
import org.apache.commons.lang3.StringUtils;
2525

26+
2627
/**
2728
* 用于记录转换之后的表和原来表直接字段的关联关系
2829
* Date: 2018/8/30
@@ -78,7 +79,7 @@ public void setTargetTableAlias(String targetTableAlias) {
7879
* @param fieldName
7980
* @return
8081
*/
81-
public String getTargetFieldName(String tableName, String fieldName){
82+
public String getTargetFieldName(String tableName, String fieldName) {
8283
String targetFieldName = mappingTable.get(tableName, fieldName);
8384
if(StringUtils.isNotBlank(targetFieldName)){
8485
return targetFieldName;

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

+38-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23+
import com.google.common.collect.HashBasedTable;
24+
import com.google.common.collect.Maps;
2325
import org.apache.calcite.sql.JoinType;
2426
import org.apache.calcite.sql.SqlNode;
2527
import com.google.common.base.Strings;
@@ -31,7 +33,6 @@
3133
* Join信息
3234
* Date: 2018/7/24
3335
* Company: www.dtstack.com
34-
*
3536
* @author xuchao
3637
*/
3738

@@ -40,9 +41,7 @@ public class JoinInfo implements Serializable {
4041
private static final long serialVersionUID = -1L;
4142

4243
//左表是否是维表
43-
private boolean leftIsSideTable;
44-
45-
private boolean leftIsTmpTable = false;
44+
private boolean leftIsSideTable = false;
4645

4746
//右表是否是维表
4847
private boolean rightIsSideTable;
@@ -67,6 +66,16 @@ public class JoinInfo implements Serializable {
6766

6867
private JoinType joinType;
6968

69+
/**
70+
* 左表需要查询的字段信息和output的时候对应的列名称
71+
*/
72+
private Map<String, String> leftSelectFieldInfo = Maps.newHashMap();
73+
74+
/**
75+
* 右表需要查询的字段信息和output的时候对应的列名称
76+
*/
77+
private Map<String, String> rightSelectFieldInfo = Maps.newHashMap();
78+
7079
public String getSideTableName(){
7180
if(leftIsSideTable){
7281
return leftTableAlias;
@@ -195,19 +204,39 @@ public void setJoinType(JoinType joinType) {
195204
this.joinType = joinType;
196205
}
197206

198-
public boolean isLeftIsTmpTable() {
199-
return leftIsTmpTable;
207+
public Map<String, String> getLeftSelectFieldInfo() {
208+
return leftSelectFieldInfo;
209+
}
210+
211+
public void setLeftSelectFieldInfo(Map<String, String> leftSelectFieldInfo) {
212+
this.leftSelectFieldInfo = leftSelectFieldInfo;
200213
}
201214

202-
public void setLeftIsTmpTable(boolean leftIsTmpTable) {
203-
this.leftIsTmpTable = leftIsTmpTable;
215+
public Map<String, String> getRightSelectFieldInfo() {
216+
return rightSelectFieldInfo;
217+
}
218+
219+
public void setRightSelectFieldInfo(Map<String, String> rightSelectFieldInfo) {
220+
this.rightSelectFieldInfo = rightSelectFieldInfo;
221+
}
222+
223+
public HashBasedTable<String, String, String> getTableFieldRef(){
224+
HashBasedTable<String, String, String> mappingTable = HashBasedTable.create();
225+
getLeftSelectFieldInfo().forEach((key, value) -> {
226+
mappingTable.put(getLeftTableAlias(), key, value);
227+
});
228+
229+
getRightSelectFieldInfo().forEach((key, value) -> {
230+
mappingTable.put(getRightTableAlias(), key, value);
231+
});
232+
233+
return mappingTable;
204234
}
205235

206236
@Override
207237
public String toString() {
208238
return "JoinInfo{" +
209239
"leftIsSideTable=" + leftIsSideTable +
210-
", leftIsTmpTable=" + leftIsTmpTable +
211240
", rightIsSideTable=" + rightIsSideTable +
212241
", leftTableName='" + leftTableName + '\'' +
213242
", leftTableAlias='" + leftTableAlias + '\'' +

0 commit comments

Comments
 (0)