Skip to content

Commit 0de96e8

Browse files
author
xuchao
committed
添加对join条件中非equal 类型对支持
1 parent 33047f6 commit 0de96e8

File tree

3 files changed

+169
-28
lines changed

3 files changed

+169
-28
lines changed

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.api.common.JobExecutionResult;
2323
import org.apache.flink.api.java.ExecutionEnvironment;
2424
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.RestOptions;
2526
import org.apache.flink.configuration.TaskManagerOptions;
2627
import org.apache.flink.runtime.jobgraph.JobGraph;
2728
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -93,33 +94,48 @@ public JobExecutionResult execute(String jobName) throws Exception {
9394
// transform the streaming program into a JobGraph
9495
StreamGraph streamGraph = getStreamGraph();
9596
streamGraph.setJobName(jobName);
97+
return execute(streamGraph);
98+
}
99+
100+
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
96101

97102
JobGraph jobGraph = streamGraph.getJobGraph();
98103
jobGraph.setClasspaths(classpaths);
104+
jobGraph.setAllowQueuedScheduling(true);
99105

100106
Configuration configuration = new Configuration();
101107
configuration.addAll(jobGraph.getJobConfiguration());
102-
103-
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "512M");
104-
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
108+
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
105109

106110
// add (and override) the settings with what the user defined
107111
configuration.addAll(this.conf);
108112

109-
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
110-
configBuilder.setConfiguration(configuration);
113+
if (!configuration.contains(RestOptions.BIND_PORT)) {
114+
configuration.setString(RestOptions.BIND_PORT, "0");
115+
}
116+
117+
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
118+
119+
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
120+
.setConfiguration(configuration)
121+
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
122+
.build();
111123

112124
if (LOG.isInfoEnabled()) {
113125
LOG.info("Running job on local embedded Flink mini cluster");
114126
}
115127

116-
try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
117-
exec.start();
118-
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
128+
MiniCluster miniCluster = new MiniCluster(cfg);
129+
130+
try {
131+
miniCluster.start();
132+
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
133+
134+
return miniCluster.executeJobBlocking(jobGraph);
135+
}
136+
finally {
119137
transformations.clear();
120-
return jobExecutionResult;
121-
} catch (Exception e) {
122-
throw new RuntimeException(e);
138+
miniCluster.close();
123139
}
124140
}
125141
}

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 95 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -512,10 +512,53 @@ private void extractJoinField(SqlNode condition, Set<Tuple2<String, String>> joi
512512
}
513513

514514
SqlKind joinKind = condition.getKind();
515-
if( joinKind == AND || joinKind == EQUALS ){
516-
extractJoinField(((SqlBasicCall)condition).operands[0], joinFieldSet);
517-
extractJoinField(((SqlBasicCall)condition).operands[1], joinFieldSet);
518-
}else{
515+
if ( AGGREGATE.contains(condition.getKind())
516+
|| AVG_AGG_FUNCTIONS.contains(joinKind)
517+
|| COMPARISON.contains(joinKind)
518+
|| joinKind == OTHER_FUNCTION
519+
|| joinKind == DIVIDE
520+
|| joinKind == CAST
521+
|| joinKind == TRIM
522+
|| joinKind == TIMES
523+
|| joinKind == PLUS
524+
|| joinKind == NOT_IN
525+
|| joinKind == OR
526+
|| joinKind == AND
527+
|| joinKind == MINUS
528+
|| joinKind == TUMBLE
529+
|| joinKind == TUMBLE_START
530+
|| joinKind == TUMBLE_END
531+
|| joinKind == SESSION
532+
|| joinKind == SESSION_START
533+
|| joinKind == SESSION_END
534+
|| joinKind == HOP
535+
|| joinKind == HOP_START
536+
|| joinKind == HOP_END
537+
|| joinKind == BETWEEN
538+
|| joinKind == IS_NULL
539+
|| joinKind == IS_NOT_NULL
540+
|| joinKind == CONTAINS
541+
|| joinKind == TIMESTAMP_ADD
542+
|| joinKind == TIMESTAMP_DIFF
543+
|| joinKind == LIKE
544+
|| joinKind == COALESCE
545+
|| joinKind == EQUALS ){
546+
547+
SqlBasicCall sqlBasicCall = (SqlBasicCall) condition;
548+
for(int i=0; i<sqlBasicCall.getOperands().length; i++){
549+
SqlNode sqlNode = sqlBasicCall.getOperands()[i];
550+
if(sqlNode instanceof SqlLiteral){
551+
continue;
552+
}
553+
554+
if(sqlNode instanceof SqlDataTypeSpec){
555+
continue;
556+
}
557+
558+
extractJoinField(sqlNode, joinFieldSet);
559+
}
560+
561+
} else if (condition.getKind() == IDENTIFIER){
519562
Preconditions.checkState(((SqlIdentifier)condition).names.size() == 2, "join condition must be format table.field");
520563
Tuple2<String, String> tuple2 = Tuple2.of(((SqlIdentifier)condition).names.get(0), ((SqlIdentifier)condition).names.get(1));
521564
joinFieldSet.add(tuple2);
@@ -822,20 +865,57 @@ public SqlBasicCall buildEmptyCondition(){
822865
private SqlIdentifier checkAndReplaceJoinCondition(SqlNode node, Map<String, String> tableMap){
823866

824867
SqlKind joinKind = node.getKind();
825-
if( joinKind == AND || joinKind == EQUALS ){
826-
SqlIdentifier leftNode = checkAndReplaceJoinCondition(((SqlBasicCall)node).operands[0], tableMap);
827-
SqlIdentifier rightNode = checkAndReplaceJoinCondition(((SqlBasicCall)node).operands[1], tableMap);
868+
if( AGGREGATE.contains(joinKind)
869+
|| AVG_AGG_FUNCTIONS.contains(joinKind)
870+
|| COMPARISON.contains(joinKind)
871+
|| joinKind == OTHER_FUNCTION
872+
|| joinKind == DIVIDE
873+
|| joinKind == CAST
874+
|| joinKind == TRIM
875+
|| joinKind == TIMES
876+
|| joinKind == PLUS
877+
|| joinKind == NOT_IN
878+
|| joinKind == OR
879+
|| joinKind == AND
880+
|| joinKind == MINUS
881+
|| joinKind == TUMBLE
882+
|| joinKind == TUMBLE_START
883+
|| joinKind == TUMBLE_END
884+
|| joinKind == SESSION
885+
|| joinKind == SESSION_START
886+
|| joinKind == SESSION_END
887+
|| joinKind == HOP
888+
|| joinKind == HOP_START
889+
|| joinKind == HOP_END
890+
|| joinKind == BETWEEN
891+
|| joinKind == IS_NULL
892+
|| joinKind == IS_NOT_NULL
893+
|| joinKind == CONTAINS
894+
|| joinKind == TIMESTAMP_ADD
895+
|| joinKind == TIMESTAMP_DIFF
896+
|| joinKind == LIKE
897+
|| joinKind == COALESCE
898+
|| joinKind == EQUALS ){
899+
SqlBasicCall sqlBasicCall = (SqlBasicCall) node;
900+
for(int i=0; i<sqlBasicCall.getOperands().length; i++){
901+
SqlNode sqlNode = sqlBasicCall.getOperands()[i];
902+
if(sqlNode instanceof SqlLiteral){
903+
continue;
904+
}
828905

829-
if(leftNode != null){
830-
((SqlBasicCall)node).setOperand(0, leftNode);
831-
}
906+
if(sqlNode instanceof SqlDataTypeSpec){
907+
continue;
908+
}
909+
910+
SqlIdentifier replaceNode = checkAndReplaceJoinCondition(sqlNode, tableMap);
911+
if(replaceNode != null){
912+
((SqlBasicCall)node).setOperand(i, replaceNode);
913+
}
832914

833-
if(rightNode != null){
834-
((SqlBasicCall)node).setOperand(1, leftNode);
835915
}
836916

837917
return null;
838-
} else {
918+
} else if (node.getKind() == IDENTIFIER) {
839919
//replace table
840920
Preconditions.checkState(((SqlIdentifier)node).names.size() == 2, "join condition must be format table.field");
841921
String tbName = ((SqlIdentifier) node).names.get(0);
@@ -846,6 +926,8 @@ private SqlIdentifier checkAndReplaceJoinCondition(SqlNode node, Map<String, Str
846926

847927
return null;
848928
}
929+
930+
return null;
849931
}
850932

851933
/**

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -441,10 +441,53 @@ public static void replaceJoinFieldRefTableName(SqlNode condition, Map<String, S
441441
return;
442442
}
443443
SqlKind joinKind = condition.getKind();
444-
if( joinKind == AND || joinKind == EQUALS ){
445-
replaceJoinFieldRefTableName(((SqlBasicCall)condition).operands[0], oldTabFieldRefNew);
446-
replaceJoinFieldRefTableName(((SqlBasicCall)condition).operands[1], oldTabFieldRefNew);
447-
}else{
444+
if( AGGREGATE.contains(joinKind)
445+
|| AVG_AGG_FUNCTIONS.contains(joinKind)
446+
|| COMPARISON.contains(joinKind)
447+
|| joinKind == OTHER_FUNCTION
448+
|| joinKind == DIVIDE
449+
|| joinKind == CAST
450+
|| joinKind == TRIM
451+
|| joinKind == TIMES
452+
|| joinKind == PLUS
453+
|| joinKind == NOT_IN
454+
|| joinKind == OR
455+
|| joinKind == AND
456+
|| joinKind == MINUS
457+
|| joinKind == TUMBLE
458+
|| joinKind == TUMBLE_START
459+
|| joinKind == TUMBLE_END
460+
|| joinKind == SESSION
461+
|| joinKind == SESSION_START
462+
|| joinKind == SESSION_END
463+
|| joinKind == HOP
464+
|| joinKind == HOP_START
465+
|| joinKind == HOP_END
466+
|| joinKind == BETWEEN
467+
|| joinKind == IS_NULL
468+
|| joinKind == IS_NOT_NULL
469+
|| joinKind == CONTAINS
470+
|| joinKind == TIMESTAMP_ADD
471+
|| joinKind == TIMESTAMP_DIFF
472+
|| joinKind == LIKE
473+
|| joinKind == COALESCE
474+
|| joinKind == EQUALS ){
475+
476+
SqlBasicCall sqlBasicCall = (SqlBasicCall) condition;
477+
for(int i=0; i<sqlBasicCall.getOperands().length; i++){
478+
SqlNode sqlNode = sqlBasicCall.getOperands()[i];
479+
if(sqlNode instanceof SqlLiteral){
480+
continue;
481+
}
482+
483+
if(sqlNode instanceof SqlDataTypeSpec){
484+
continue;
485+
}
486+
487+
replaceJoinFieldRefTableName(sqlNode, oldTabFieldRefNew);
488+
}
489+
490+
} else if (condition.getKind() == IDENTIFIER) {
448491
Preconditions.checkState(((SqlIdentifier)condition).names.size() == 2, "join condition must be format table.field");
449492
String fieldRefTable = ((SqlIdentifier)condition).names.get(0);
450493

0 commit comments

Comments
 (0)