Skip to content

Commit 45197a2

Browse files
committed
Merge branch 'v1.8.0_dev' into 'feat_1.8_rowkey'
# Conflicts: # hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java
2 parents 03a4913 + a91871d commit 45197a2

File tree

88 files changed

+2851
-1335
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+2851
-1335
lines changed

.gitlab-ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ build:
22
stage: test
33
script:
44
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
5-
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.branch.name="v1.8.0_dev" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
5+
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
66
- sh ci/sonar_notify.sh
77
only:
88
- v1.8.0_dev

README.md

+1-23
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,11 @@
77
> > * 支持原生FLinkSQL所有的语法
88
> > * 扩展了输入和输出的性能指标到promethus
99
10-
## 新特性:
11-
* 1.kafka源表支持not null语法,支持字符串类型的时间转换。
12-
* 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。
13-
* 3.异步维表支持非等值连接,比如:<>,<,>。
14-
* 4.增加kafka数组解析
15-
* 5.增加kafka1.0以上版本的支持
16-
* 6.增加postgresql、kudu、clickhouse维表、结果表的支持
17-
* 7.支持插件的依赖方式,参考pluginLoadMode参数
18-
* 8.支持cep处理
19-
* 9.支持udaf
20-
* 10.支持谓词下移
21-
* 11.支持状态的ttl
22-
23-
## BUG修复:
24-
* 1.修复不能解析sql中orderby,union语法。
25-
* 2.修复yarnPer模式提交失败的异常。
26-
* 3.一些bug的修复
27-
2810
# 已支持
2911
* 源表:kafka 0.9、0.10、0.11、1.x版本
3012
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver
3113
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver
3214

33-
# 后续开发计划
34-
* 维表快照
35-
* kafka avro格式
36-
* topN
37-
3815
## 1 快速起步
3916
### 1.1 运行模式
4017

@@ -205,6 +182,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
205182
* [impala 结果表插件](docs/impalaSink.md)
206183
* [db2 结果表插件](docs/db2Sink.md)
207184
* [sqlserver 结果表插件](docs/sqlserverSink.md)
185+
* [kafka 结果表插件](docs/kafkaSink.md)
208186

209187
### 2.3 维表插件
210188
* [hbase 维表插件](docs/hbaseSide.md)

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
267267
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
268268
+ ",pwd:" + tableInfo.getPassword();
269269
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
270-
Thread.sleep(5 * 1000);
270+
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
271271
} catch (InterruptedException e1) {
272272
LOG.error("", e1);
273273
}

core/pom.xml

-6
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,6 @@
116116
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
117117
<version>${flink.version}</version>
118118
</dependency>
119-
120-
<dependency>
121-
<groupId>junit</groupId>
122-
<artifactId>junit</artifactId>
123-
<version>4.12</version>
124-
</dependency>
125119
<dependency>
126120
<groupId>com.aiweiergou</groupId>
127121
<artifactId>tools-logger</artifactId>

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,13 @@ public JobExecutionResult execute(String jobName) throws Exception {
113113
LOG.info("Running job on local embedded Flink mini cluster");
114114
}
115115

116-
MiniCluster exec = new MiniCluster(configBuilder.build());
117-
try {
116+
try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
118117
exec.start();
119-
return exec.executeJobBlocking(jobGraph);
120-
}
121-
finally {
118+
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
122119
transformations.clear();
123-
exec.closeAsync();
120+
return jobExecutionResult;
121+
} catch (Exception e) {
122+
throw new RuntimeException(e);
124123
}
125124
}
126125
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
282282

283283
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
284284
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
285-
.map((Tuple2<Boolean, Row> f0) -> {
286-
return f0.f1;
287-
})
285+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
288286
.returns(typeInfo);
289287

290288
String fields = String.join(",", typeInfo.getFieldNames());

core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.flink.metrics.Counter;
2525
import org.apache.flink.metrics.Meter;
2626
import org.apache.flink.metrics.MeterView;
27-
import org.apache.flink.types.Row;
27+
import org.apache.flink.table.runtime.types.CRow;
2828

2929

3030
/**
@@ -34,11 +34,11 @@
3434
* author: toutian
3535
* create: 2019/12/24
3636
*/
37-
public class SerializationMetricWrapper implements SerializationSchema<Row> {
37+
public class SerializationMetricWrapper implements SerializationSchema<CRow> {
3838

3939
private static final long serialVersionUID = 1L;
4040

41-
private SerializationSchema<Row> serializationSchema;
41+
private SerializationSchema<CRow> serializationSchema;
4242

4343
private transient RuntimeContext runtimeContext;
4444

@@ -47,7 +47,7 @@ public class SerializationMetricWrapper implements SerializationSchema<Row> {
4747
protected transient Meter dtNumRecordsOutRate;
4848

4949

50-
public SerializationMetricWrapper(SerializationSchema<Row> serializationSchema) {
50+
public SerializationMetricWrapper(SerializationSchema<CRow> serializationSchema) {
5151
this.serializationSchema = serializationSchema;
5252
}
5353

@@ -57,7 +57,7 @@ public void initMetric() {
5757
}
5858

5959
@Override
60-
public byte[] serialize(Row element) {
60+
public byte[] serialize(CRow element) {
6161
beforeSerialize();
6262
byte[] row = serializationSchema.serialize(element);
6363
afterSerialize();
@@ -79,7 +79,7 @@ public void setRuntimeContext(RuntimeContext runtimeContext) {
7979
this.runtimeContext = runtimeContext;
8080
}
8181

82-
public SerializationSchema<Row> getSerializationSchema() {
82+
public SerializationSchema<CRow> getSerializationSchema() {
8383
return serializationSchema;
8484
}
8585

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@
2626
import org.apache.commons.lang.StringUtils;
2727
import java.lang.reflect.InvocationTargetException;
2828
import java.lang.reflect.Field;
29+
import java.nio.charset.StandardCharsets;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.io.File;
32-
import java.io.FileInputStream;
3333
import java.net.URLEncoder;
34-
import java.util.stream.Stream;
3534

3635
import org.apache.commons.codec.Charsets;
3736
import org.apache.flink.util.FileUtils;
@@ -104,8 +103,8 @@ public List<String> getProgramExeArgList() throws Exception {
104103
continue;
105104
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
106105
File file = new File(value.toString());
107-
String content = FileUtils.readFile(file, "UTF-8");
108-
value = URLEncoder.encode(content, Charsets.UTF_8.name());
106+
String content = FileUtils.readFile(file, StandardCharsets.UTF_8.name());
107+
value = URLEncoder.encode(content, StandardCharsets.UTF_8.name());
109108
}
110109
args.add("-" + key);
111110
args.add(value.toString());

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
2424
import org.apache.calcite.config.Lex;
25-
import org.apache.calcite.sql.*;
25+
import org.apache.calcite.sql.SqlBasicCall;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.SqlSelect;
2630
import org.apache.calcite.sql.parser.SqlParseException;
2731
import org.apache.calcite.sql.parser.SqlParser;
2832
import com.google.common.collect.Lists;

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import org.apache.calcite.config.Lex;
24-
import org.apache.calcite.sql.*;
24+
import org.apache.calcite.sql.SqlBasicCall;
25+
import org.apache.calcite.sql.SqlInsert;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlMatchRecognize;
29+
import org.apache.calcite.sql.SqlNode;
30+
import org.apache.calcite.sql.SqlOrderBy;
31+
import org.apache.calcite.sql.SqlSelect;
2532
import org.apache.calcite.sql.parser.SqlParseException;
2633
import org.apache.calcite.sql.parser.SqlParser;
2734
import org.apache.commons.lang3.StringUtils;

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
5757

5858
public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";
5959

60-
private String cacheType = "none";//None or LRU or ALL
60+
private String cacheType = "none";
6161

6262
private int cacheSize = 10000;
6363

64-
private long cacheTimeout = 60 * 1000;//
64+
private long cacheTimeout = 60_000L;
6565

6666
private int asyncCapacity=100;
6767

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> impl
4747

4848
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
4949

50+
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
51+
5052
protected BaseSideInfo sideInfo;
5153

5254
private ScheduledExecutorService es;

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.common.base.Strings;
2626

2727
import java.io.Serializable;
28-
import java.util.Map;
2928

3029
/**
3130
* Join信息

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void exec(String sql, Map<String, AbstractSideTableInfo> sideTableMap, St
121121
SideSQLParser sideSQLParser = new SideSQLParser();
122122
sideSQLParser.setLocalTableCache(localTableCache);
123123
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
124-
Object pollObj = null;
124+
Object pollObj;
125125

126126
//need clean
127127
boolean preIsSideJoin = false;

0 commit comments

Comments
 (0)