Skip to content

Commit 48a12f2

Browse files
author
gituser
committed
Merge branch '1.8_release_3.10.x' into 1.8_test_3.10.x
2 parents abcfb2b + 5296857 commit 48a12f2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2968
-790
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ FlinkStreamSQL
1515
* 自定义create view 语法
1616
* 自定义create function 语法
1717
* 实现了流与维表的join
18-
* 支持原生FLinkSQL所有的语法
19-
* 扩展了输入和输出的性能指标到promethus
18+
* 支持原生FlinkSQL所有的语法
19+
* 扩展了输入和输出的性能指标到Task metrics
2020

2121
## 目录
2222

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

+27-11
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.api.common.JobExecutionResult;
2323
import org.apache.flink.api.java.ExecutionEnvironment;
2424
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.RestOptions;
2526
import org.apache.flink.configuration.TaskManagerOptions;
2627
import org.apache.flink.runtime.jobgraph.JobGraph;
2728
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -93,33 +94,48 @@ public JobExecutionResult execute(String jobName) throws Exception {
9394
// transform the streaming program into a JobGraph
9495
StreamGraph streamGraph = getStreamGraph();
9596
streamGraph.setJobName(jobName);
97+
return execute(streamGraph);
98+
}
99+
100+
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
96101

97102
JobGraph jobGraph = streamGraph.getJobGraph();
98103
jobGraph.setClasspaths(classpaths);
104+
jobGraph.setAllowQueuedScheduling(true);
99105

100106
Configuration configuration = new Configuration();
101107
configuration.addAll(jobGraph.getJobConfiguration());
102-
103-
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "512M");
104-
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
108+
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
105109

106110
// add (and override) the settings with what the user defined
107111
configuration.addAll(this.conf);
108112

109-
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
110-
configBuilder.setConfiguration(configuration);
113+
if (!configuration.contains(RestOptions.BIND_PORT)) {
114+
configuration.setString(RestOptions.BIND_PORT, "0");
115+
}
116+
117+
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
118+
119+
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
120+
.setConfiguration(configuration)
121+
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
122+
.build();
111123

112124
if (LOG.isInfoEnabled()) {
113125
LOG.info("Running job on local embedded Flink mini cluster");
114126
}
115127

116-
try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
117-
exec.start();
118-
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
128+
MiniCluster miniCluster = new MiniCluster(cfg);
129+
130+
try {
131+
miniCluster.start();
132+
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
133+
134+
return miniCluster.executeJobBlocking(jobGraph);
135+
}
136+
finally {
119137
transformations.clear();
120-
return jobExecutionResult;
121-
} catch (Exception e) {
122-
throw new RuntimeException(e);
138+
miniCluster.close();
123139
}
124140
}
125141
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -198,14 +198,18 @@ private static void sqlTranslation(String localSqlPluginPath,
198198

199199
SideSqlExec sideSqlExec = new SideSqlExec();
200200
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
201+
202+
int scope = 0;
201203
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
202-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
204+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result, scope + "");
205+
scope++;
203206
}
204207

205208
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
206209
if (LOG.isInfoEnabled()) {
207210
LOG.info("exe-sql:\n" + result.getExecSql());
208211
}
212+
209213
boolean isSide = false;
210214
for (String tableName : result.getTargetTableList()) {
211215
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
@@ -216,7 +220,7 @@ private static void sqlTranslation(String localSqlPluginPath,
216220
SqlNode sqlNode = flinkPlanner.parse(realSql);
217221
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
218222
tmp.setExecSql(tmpSql);
219-
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);
223+
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp, scope + "");
220224
} else {
221225
for (String sourceTable : result.getSourceTableList()) {
222226
if (sideTableMap.containsKey(sourceTable)) {
@@ -226,7 +230,7 @@ private static void sqlTranslation(String localSqlPluginPath,
226230
}
227231
if (isSide) {
228232
//sql-dimensional table contains the dimension table of execution
229-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
233+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null, null);
230234
} else {
231235
LOG.info("----------exec sql without dimension join-----------");
232236
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
@@ -236,6 +240,8 @@ private static void sqlTranslation(String localSqlPluginPath,
236240
}
237241
}
238242
}
243+
244+
scope++;
239245
}
240246
}
241247
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ private void invokeWithCache(Map<String, Object> inputParams, CRow input, Result
203203
return;
204204
}else if(ECacheContentType.SingleLine == val.getType()){
205205
try {
206-
Row row = fillData(input.row(), val);
206+
Row row = fillData(input.row(), val.getContent());
207207
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
208208
} catch (Exception e) {
209209
dealFillDataError(input, resultFuture, e);

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23+
import com.dtstack.flink.sql.util.TableUtils;
2324
import com.google.common.collect.HashBasedTable;
2425
import com.google.common.collect.Maps;
2526
import org.apache.calcite.sql.JoinType;
@@ -66,6 +67,8 @@ public class JoinInfo implements Serializable {
6667

6768
private JoinType joinType;
6869

70+
private String scope = "";
71+
6972
/**
7073
* 左表需要查询的字段信息和output的时候对应的列名称
7174
*/
@@ -96,12 +99,14 @@ public String getNewTableName(){
9699
//兼容左边表是as 的情况
97100
String leftStr = leftTableName;
98101
leftStr = Strings.isNullOrEmpty(leftStr) ? leftTableAlias : leftStr;
99-
return leftStr + "_" + rightTableName;
102+
String newName = leftStr + "_" + rightTableName;
103+
return TableUtils.buildTableNameWithScope(newName, scope);
100104
}
101105

102106

103107
public String getNewTableAlias(){
104-
return leftTableAlias + "_" + rightTableAlias;
108+
String newName = leftTableAlias + "_" + rightTableAlias;
109+
return TableUtils.buildTableNameWithScope(newName, scope);
105110
}
106111

107112
public boolean isLeftIsSideTable() {
@@ -233,6 +238,14 @@ public HashBasedTable<String, String, String> getTableFieldRef(){
233238
return mappingTable;
234239
}
235240

241+
public String getScope() {
242+
return scope;
243+
}
244+
245+
public void setScope(String scope) {
246+
this.scope = scope;
247+
}
248+
236249
@Override
237250
public String toString() {
238251
return "JoinInfo{" +

0 commit comments

Comments
 (0)