Skip to content

Commit 1f467e8

Browse files
author
dapeng
committed
Merge remote-tracking branch 'origin/feat_1.8_rowkey' into 1.8_test_3.10.x
# Conflicts: # core/pom.xml # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java
2 parents 48a12f2 + 75f0349 commit 1f467e8

File tree

29 files changed

+925
-83
lines changed

29 files changed

+925
-83
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -292,9 +292,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
292292

293293
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
294294
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
295-
.map((Tuple2<Boolean, Row> f0) -> {
296-
return f0.f1;
297-
})
295+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
298296
.returns(typeInfo);
299297

300298
String fields = String.join(",", typeInfo.getFieldNames());

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.lang.StringUtils;
2727
import java.lang.reflect.InvocationTargetException;
2828
import java.lang.reflect.Field;
29+
import java.nio.charset.StandardCharsets;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.io.File;
@@ -102,8 +103,8 @@ public List<String> getProgramExeArgList() throws Exception {
102103
continue;
103104
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
104105
File file = new File(value.toString());
105-
String content = FileUtils.readFile(file, "UTF-8");
106-
value = URLEncoder.encode(content, Charsets.UTF_8.name());
106+
String content = FileUtils.readFile(file, StandardCharsets.UTF_8.name());
107+
value = URLEncoder.encode(content, StandardCharsets.UTF_8.name());
107108
}
108109
args.add("-" + key);
109110
args.add(value.toString());

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class CreateTableParser implements IParser {
4141

4242
private static final Pattern PATTERN = Pattern.compile(PATTERN_STR);
4343

44+
private static final Pattern PROP_PATTERN = Pattern.compile("^'\\s*(.+)\\s*'$");
45+
4446
public static CreateTableParser newInstance(){
4547
return new CreateTableParser();
4648
}
@@ -69,18 +71,27 @@ public void parseSql(String sql, SqlTree sqlTree) {
6971
}
7072

7173
private Map parseProp(String propsStr){
72-
String[] strs = propsStr.trim().split("'\\s*,");
74+
propsStr = propsStr.replaceAll("'\\s*,", "'|");
75+
String[] strs = propsStr.trim().split("\\|");
7376
Map<String, Object> propMap = Maps.newHashMap();
7477
for(int i=0; i<strs.length; i++){
7578
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');
7679
String key = ss.get(0).trim();
77-
String value = ss.get(1).trim().replaceAll("'", "").trim();
80+
String value = extractValue(ss.get(1));
7881
propMap.put(key, value);
7982
}
8083

8184
return propMap;
8285
}
8386

87+
private String extractValue(String value) {
88+
Matcher matcher = PROP_PATTERN.matcher(value);
89+
if (matcher.find()) {
90+
return matcher.group(1);
91+
}
92+
throw new RuntimeException("[" + value + "] format is invalid");
93+
}
94+
8495
public static class SqlParserResult{
8596

8697
private String tableName;

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,12 @@ public void exec(String sql,
114114

115115
SideSQLParser sideSQLParser = new SideSQLParser();
116116
sideSQLParser.setLocalTableCache(localTableCache);
117-
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet(), scope);
118-
Object pollObj = null;
117+
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
118+
Object pollObj;
119+
120+
//need clean
121+
boolean preIsSideJoin = false;
122+
List<FieldReplaceInfo> replaceInfoList = Lists.newArrayList();
119123

120124
while((pollObj = exeQueue.poll()) != null){
121125

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@
3939
public class WaterMarkerAssigner {
4040

4141
public boolean checkNeedAssignWaterMarker(AbstractSourceTableInfo tableInfo){
42-
if(Strings.isNullOrEmpty(tableInfo.getEventTimeField())){
43-
return false;
44-
}
45-
46-
return true;
42+
return !Strings.isNullOrEmpty(tableInfo.getEventTimeField());
4743
}
4844

4945
public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo typeInfo, AbstractSourceTableInfo sourceTableInfo){

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public ReplaceInfo getReplaceInfo(String field){
120120
}
121121

122122
private List<ReplaceInfo> makeFormula(String formula){
123-
if (formula == null || formula.length() <= 0) {
123+
if(formula == null || formula.length() <= 0){
124124
return Lists.newArrayList();
125125
}
126126

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import com.dtstack.flink.sql.enums.EUpdateMode;
2424
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
25-
import com.google.common.collect.Lists;
2625
import com.google.common.collect.Maps;
2726
import org.apache.commons.lang3.StringUtils;
2827
import org.apache.flink.api.java.tuple.Tuple2;
@@ -41,7 +40,6 @@
4140
import org.slf4j.LoggerFactory;
4241

4342
import java.io.IOException;
44-
import java.util.List;
4543
import java.util.Map;
4644
import java.util.Set;
4745

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/RowKeyBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public ReplaceInfo getReplaceInfo(String field){
130130

131131
private List<ReplaceInfo> makeFormula(String formula){
132132
if(formula == null || formula.length() <= 0){
133-
Lists.newArrayList();
133+
return Lists.newArrayList();
134134
}
135135
List<ReplaceInfo> result = Lists.newArrayList();
136136
for(String meta: splitIgnoreQuotaBrackets(formula, "\\+")){

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

+43-17
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
* to you under the Apache License, Version 2.0 (the
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
9-
*
9+
* <p>
1010
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
11+
* <p>
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,
1414
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,18 +21,15 @@
2121
import com.dtstack.flink.sql.enums.ClusterMode;
2222
import com.dtstack.flink.sql.option.Options;
2323
import com.dtstack.flink.sql.util.PluginUtil;
24-
import com.esotericsoftware.minlog.Log;
2524
import org.apache.commons.io.Charsets;
2625
import org.apache.commons.lang.StringUtils;
2726
import org.apache.flink.client.program.ClusterClient;
28-
import org.apache.flink.client.program.MiniClusterClient;
27+
import org.apache.flink.client.program.rest.RestClusterClient;
2928
import org.apache.flink.configuration.Configuration;
3029
import org.apache.flink.configuration.GlobalConfiguration;
3130
import org.apache.flink.configuration.JobManagerOptions;
3231
import org.apache.flink.core.fs.FileSystem;
3332
import org.apache.flink.runtime.akka.AkkaUtils;
34-
import org.apache.flink.runtime.minicluster.MiniCluster;
35-
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
3633
import org.apache.flink.runtime.util.LeaderConnectionInfo;
3734
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
3835
import org.apache.flink.yarn.YarnClusterDescriptor;
@@ -42,21 +39,35 @@
4239
import org.apache.hadoop.yarn.client.api.YarnClient;
4340
import org.apache.hadoop.yarn.conf.YarnConfiguration;
4441
import org.apache.hadoop.yarn.util.StringHelper;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
4544

4645
import java.net.InetSocketAddress;
4746
import java.net.URLDecoder;
4847
import java.util.EnumSet;
4948
import java.util.HashSet;
50-
import java.util.Iterator;
49+
import java.util.Set;
5150
import java.util.List;
5251
import java.util.Properties;
53-
import java.util.Set;
52+
import java.util.Iterator;
5453

5554
/**
5655
* @author sishu.yss
5756
*/
5857
public class ClusterClientFactory {
5958

59+
private static final Logger LOG = LoggerFactory.getLogger(ClusterClientFactory.class);
60+
61+
private static final String HA_CLUSTER_ID = "high-availability.cluster-id";
62+
63+
private static final String HIGH_AVAILABILITY = "high-availability";
64+
65+
private static final String NODE = "NONE";
66+
67+
private static final String ZOOKEEPER = "zookeeper";
68+
69+
private static final String HADOOP_CONF = "fs.hdfs.hadoopconf";
70+
6071
public static ClusterClient createClusterClient(Options launcherOptions) throws Exception {
6172
String mode = launcherOptions.getMode();
6273
if (mode.equals(ClusterMode.standalone.name())) {
@@ -70,10 +81,12 @@ public static ClusterClient createClusterClient(Options launcherOptions) throws
7081
public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception {
7182
String flinkConfDir = launcherOptions.getFlinkconf();
7283
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
73-
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
74-
configBuilder.setConfiguration(config);
75-
MiniCluster miniCluster = new MiniCluster(configBuilder.build());
76-
MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
84+
85+
LOG.info("------------config params-------------------------");
86+
config.toMap().forEach((key, value) -> LOG.info("{}: {}", key, value));
87+
LOG.info("-------------------------------------------");
88+
89+
RestClusterClient clusterClient = new RestClusterClient<>(config, "clusterClient");
7790
LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();
7891
InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
7992
config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName());
@@ -89,18 +102,21 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
89102

90103
if (StringUtils.isNotBlank(yarnConfDir)) {
91104
try {
92-
config.setString("fs.hdfs.hadoopconf", yarnConfDir);
105+
boolean isHighAvailability;
106+
107+
config.setString(HADOOP_CONF, yarnConfDir);
93108
FileSystem.initialize(config);
94109

95110
YarnConfiguration yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
96111
YarnClient yarnClient = YarnClient.createYarnClient();
97112
yarnClient.init(yarnConf);
98113
yarnClient.start();
99-
ApplicationId applicationId = null;
114+
ApplicationId applicationId;
100115

101116
String yarnSessionConf = launcherOptions.getYarnSessionConf();
102117
yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());
103118
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
119+
104120
Object yid = yarnSessionConfProperties.get("yid");
105121

106122
if (null != yid) {
@@ -109,20 +125,30 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
109125
applicationId = getYarnClusterApplicationId(yarnClient);
110126
}
111127

112-
Log.info("applicationId={}", applicationId.toString());
128+
LOG.info("current applicationId = {}", applicationId.toString());
113129

114130
if (StringUtils.isEmpty(applicationId.toString())) {
115131
throw new RuntimeException("No flink session found on yarn cluster.");
116132
}
117133

134+
isHighAvailability = config.getString(HIGH_AVAILABILITY, NODE).equals(ZOOKEEPER);
135+
136+
if (isHighAvailability && config.getString(HA_CLUSTER_ID, null) == null) {
137+
config.setString(HA_CLUSTER_ID, applicationId.toString());
138+
}
139+
140+
LOG.info("------------config params-------------------------");
141+
config.toMap().forEach((key, value) -> LOG.info("{}: {}", key, value));
142+
LOG.info("-------------------------------------------");
143+
118144
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, flinkConfDir, yarnClient, false);
119145
ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
120146
clusterClient.setDetached(true);
121147
return clusterClient;
122148
} catch (Exception e) {
123149
throw new RuntimeException(e);
124150
}
125-
}else{
151+
} else {
126152
throw new RuntimeException("yarn mode must set param of 'yarnconf'!!!");
127153
}
128154
}
@@ -158,7 +184,7 @@ private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient)
158184

159185
}
160186

161-
if (null == applicationId) {
187+
if (applicationId == null || StringUtils.isEmpty(applicationId.toString())) {
162188
throw new RuntimeException("No flink session found on yarn cluster.");
163189
}
164190
return applicationId;

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

+17-15
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.launcher;
2221

@@ -55,6 +54,7 @@
5554
/**
5655
* Date: 2017/2/20
5756
* Company: www.dtstack.com
57+
*
5858
* @author xuchao
5959
*/
6060

@@ -63,19 +63,17 @@ public class LauncherMain {
6363
private static final Logger LOG = LoggerFactory.getLogger(LauncherMain.class);
6464
private static final String CORE_JAR = "core";
6565

66+
private static final Logger LOG = LoggerFactory.getLogger(LauncherMain.class);
67+
6668
private static String SP = File.separator;
6769

6870
private static String getLocalCoreJarPath(String localSqlRootJar) throws Exception {
6971
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR);
70-
String corePath = localSqlRootJar + SP + jarPath;
71-
return corePath;
72+
return localSqlRootJar + SP + jarPath;
7273
}
7374

7475
public static void main(String[] args) throws Exception {
75-
76-
LOG.info("----start----");
77-
78-
if (args.length == 1 && args[0].endsWith(".json")){
76+
if (args.length == 1 && args[0].endsWith(".json")) {
7977
args = parseJson(args);
8078
}
8179

@@ -88,28 +86,33 @@ public static void main(String[] args) throws Exception {
8886
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
8987
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
9088

91-
if(mode.equals(ClusterMode.local.name())) {
92-
String[] localArgs = argList.toArray(new String[argList.size()]);
89+
LOG.info("current job mode is {}", mode);
90+
91+
if (mode.equals(ClusterMode.local.name())) {
92+
String[] localArgs = argList.toArray(new String[0]);
9393
Main.main(localArgs);
9494
return;
9595
}
9696

9797
String pluginRoot = launcherOptions.getLocalSqlPluginPath();
9898
File jarFile = new File(getLocalCoreJarPath(pluginRoot));
99-
String[] remoteArgs = argList.toArray(new String[argList.size()]);
99+
String[] remoteArgs = argList.toArray(new String[0]);
100100
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);
101101

102102
String savePointPath = confProperties.getProperty(ConfigConstrant.SAVE_POINT_PATH_KEY);
103-
if(StringUtils.isNotBlank(savePointPath)){
103+
if (StringUtils.isNotBlank(savePointPath)) {
104104
String allowNonRestoredState = confProperties.getOrDefault(ConfigConstrant.ALLOW_NON_RESTORED_STATE_KEY, "false").toString();
105105
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savePointPath, BooleanUtils.toBoolean(allowNonRestoredState)));
106106
}
107107

108-
if(mode.equals(ClusterMode.yarnPer.name())){
108+
if (mode.equals(ClusterMode.yarnPer.name())) {
109109
String flinkConfDir = launcherOptions.getFlinkconf();
110110
Configuration config = StringUtils.isEmpty(flinkConfDir) ? new Configuration() : GlobalConfiguration.loadConfiguration(flinkConfDir);
111111
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, config, 1);
112-
PerJobSubmitter.submit(launcherOptions, jobGraph, config);
112+
113+
LOG.info("current jobID is {}", jobGraph.getJobID());
114+
115+
LOG.info("submit applicationId is {}", PerJobSubmitter.submit(launcherOptions, jobGraph, config));
113116
} else {
114117
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
115118
clusterClient.run(program, 1);
@@ -127,7 +130,6 @@ private static String[] parseJson(String[] args) throws IOException {
127130
list.add("-" + entry.getKey());
128131
list.add(entry.getValue().toString());
129132
}
130-
String[] array = list.toArray(new String[list.size()]);
131-
return array;
133+
return list.toArray(new String[0]);
132134
}
133135
}

0 commit comments

Comments
 (0)