Skip to content

Commit a6634f7

Browse files
author
dapeng
committed
Merge branch '1.8_release_3.10.x' into hotfix_1.8_3.10.x_26972
2 parents 16398a1 + 76b10af commit a6634f7

File tree

51 files changed

+1903
-95
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1903
-95
lines changed

ci/sonar_notify.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22
#参考钉钉文档 https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq
33
sonarreport=$(curl -s http://172.16.100.198:8082/?projectname=dt-insight-engine/flinkStreamSQL)
4-
curl -s "https://oapi.dingtalk.com/robot/send?access_token=71555061297a53d3ac922a6f4d94285d8e23bccdca0c00b4dc6df0a2d49da724" \
4+
curl -s "https://oapi.dingtalk.com/robot/send?access_token=58fd731d8bed3b17708d3aa27e49a7e2c41c7e6545f6c4be3170963a7bba7e2a" \
55
-H "Content-Type: application/json" \
66
-d "{
77
\"msgtype\": \"markdown\",

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -292,9 +292,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
292292

293293
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
294294
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
295-
.map((Tuple2<Boolean, Row> f0) -> {
296-
return f0.f1;
297-
})
295+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
298296
.returns(typeInfo);
299297

300298
String fields = String.join(",", typeInfo.getFieldNames());

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.lang.StringUtils;
2727
import java.lang.reflect.InvocationTargetException;
2828
import java.lang.reflect.Field;
29+
import java.nio.charset.StandardCharsets;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.io.File;
@@ -102,8 +103,8 @@ public List<String> getProgramExeArgList() throws Exception {
102103
continue;
103104
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
104105
File file = new File(value.toString());
105-
String content = FileUtils.readFile(file, "UTF-8");
106-
value = URLEncoder.encode(content, Charsets.UTF_8.name());
106+
String content = FileUtils.readFile(file, StandardCharsets.UTF_8.name());
107+
value = URLEncoder.encode(content, StandardCharsets.UTF_8.name());
107108
}
108109
args.add("-" + key);
109110
args.add(value.toString());

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class CreateTableParser implements IParser {
4141

4242
private static final Pattern PATTERN = Pattern.compile(PATTERN_STR);
4343

44+
private static final Pattern PROP_PATTERN = Pattern.compile("^'\\s*(.+)\\s*'$");
45+
4446
public static CreateTableParser newInstance(){
4547
return new CreateTableParser();
4648
}
@@ -69,18 +71,27 @@ public void parseSql(String sql, SqlTree sqlTree) {
6971
}
7072

7173
private Map parseProp(String propsStr){
72-
String[] strs = propsStr.trim().split("'\\s*,");
74+
propsStr = propsStr.replaceAll("'\\s*,", "'|");
75+
String[] strs = propsStr.trim().split("\\|");
7376
Map<String, Object> propMap = Maps.newHashMap();
7477
for(int i=0; i<strs.length; i++){
7578
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');
7679
String key = ss.get(0).trim();
77-
String value = ss.get(1).trim().replaceAll("'", "").trim();
80+
String value = extractValue(ss.get(1).trim());
7881
propMap.put(key, value);
7982
}
8083

8184
return propMap;
8285
}
8386

87+
private String extractValue(String value) {
88+
Matcher matcher = PROP_PATTERN.matcher(value);
89+
if (matcher.find()) {
90+
return matcher.group(1);
91+
}
92+
throw new RuntimeException("[" + value + "] format is invalid");
93+
}
94+
8495
public static class SqlParserResult{
8596

8697
private String tableName;

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

+55-9
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,18 @@
2020

2121
package com.dtstack.flink.sql.parser;
2222

23-
import org.apache.calcite.config.Lex;
24-
import org.apache.calcite.sql.SqlBasicCall;
2523
import org.apache.calcite.sql.SqlInsert;
2624
import org.apache.calcite.sql.SqlJoin;
2725
import org.apache.calcite.sql.SqlKind;
28-
import org.apache.calcite.sql.SqlMatchRecognize;
2926
import org.apache.calcite.sql.SqlNode;
30-
import org.apache.calcite.sql.SqlOrderBy;
3127
import org.apache.calcite.sql.SqlSelect;
32-
import org.apache.calcite.sql.parser.SqlParseException;
33-
import org.apache.calcite.sql.parser.SqlParser;
28+
import org.apache.calcite.sql.SqlNodeList;
29+
import org.apache.calcite.sql.SqlBasicCall;
30+
import org.apache.calcite.sql.SqlMatchRecognize;
31+
import org.apache.calcite.sql.SqlOrderBy;
32+
import org.apache.calcite.sql.SqlIdentifier;
33+
import org.apache.calcite.sql.SqlAsOperator;
34+
import org.apache.calcite.sql.parser.SqlParserPos;
3435
import org.apache.commons.lang3.StringUtils;
3536
import com.google.common.collect.Lists;
3637
import org.apache.flink.table.calcite.FlinkPlannerImpl;
@@ -49,6 +50,9 @@
4950

5051
public class InsertSqlParser implements IParser {
5152

53+
// 用来标识当前解析节点的上一层节点是否为 insert 节点
54+
private static Boolean parentIsInsert = false;
55+
5256
@Override
5357
public boolean verify(String sql) {
5458
return StringUtils.isNotBlank(sql) && sql.trim().toLowerCase().startsWith("insert");
@@ -78,13 +82,19 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
7882
SqlNode sqlTarget = ((SqlInsert)sqlNode).getTargetTable();
7983
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
8084
sqlParseResult.addTargetTable(sqlTarget.toString());
85+
parentIsInsert = true;
8186
parseNode(sqlSource, sqlParseResult);
8287
break;
8388
case SELECT:
84-
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
85-
if(sqlFrom.getKind() == IDENTIFIER){
89+
SqlSelect sqlSelect = (SqlSelect) sqlNode;
90+
if (parentIsInsert) {
91+
rebuildSelectNode(sqlSelect.getSelectList(), sqlSelect);
92+
}
93+
SqlNode sqlFrom = ((SqlSelect) sqlNode).getFrom();
94+
if (sqlFrom.getKind() == IDENTIFIER) {
8695
sqlParseResult.addSourceTable(sqlFrom.toString());
87-
}else{
96+
} else {
97+
parentIsInsert = false;
8898
parseNode(sqlFrom, sqlParseResult);
8999
}
90100
break;
@@ -141,6 +151,42 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
141151
}
142152
}
143153

154+
/**
155+
* 将第一层 select 中的 sqlNode 转化为 AsNode,解决字段名冲突问题
156+
* @param selectList select Node 的 select 字段
157+
* @param sqlSelect 第一层解析出来的 selectNode
158+
*/
159+
private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelect) {
160+
SqlNodeList sqlNodes = new SqlNodeList(selectList.getParserPosition());
161+
162+
for (int index = 0; index < selectList.size(); index++) {
163+
if (selectList.get(index).getKind().equals(SqlKind.AS)) {
164+
sqlNodes.add(selectList.get(index));
165+
continue;
166+
}
167+
sqlNodes.add(transformToAsNode(selectList.get(index)));
168+
}
169+
sqlSelect.setSelectList(sqlNodes);
170+
}
171+
172+
/**
173+
* 将 sqlNode 转化为 AsNode
174+
* @param sqlNode 需要转化的 sqlNode
175+
* @return 重新构造的 AsNode
176+
*/
177+
public static SqlBasicCall transformToAsNode(SqlNode sqlNode) {
178+
String asName = "";
179+
SqlParserPos pos = new SqlParserPos(sqlNode.getParserPosition().getLineNum(),
180+
sqlNode.getParserPosition().getEndColumnNum());
181+
if (sqlNode.getKind().equals(SqlKind.IDENTIFIER)) {
182+
asName = ((SqlIdentifier) sqlNode).names.get(1);
183+
}
184+
SqlNode[] operands = new SqlNode[2];
185+
operands[0] = sqlNode;
186+
operands[1] = new SqlIdentifier(asName, null, pos);
187+
return new SqlBasicCall(new SqlAsOperator(), operands, pos);
188+
}
189+
144190
public static class SqlParseResult {
145191

146192
private List<String> sourceTableList = Lists.newArrayList();

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public static SqlTree parseSql(String sql) throws Exception {
6868
throw new RuntimeException("need to set local sql plugin root");
6969
}
7070

71-
sql = sql.replaceAll("--.*", "")
71+
sql = DtStringUtil.dealSqlComment(sql)
7272
.replaceAll("\r\n", " ")
7373
.replaceAll("\n", " ")
7474
.replace("\t", " ").trim();

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,11 @@ public void exec(String sql,
115115
SideSQLParser sideSQLParser = new SideSQLParser();
116116
sideSQLParser.setLocalTableCache(localTableCache);
117117
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet(), scope);
118-
Object pollObj = null;
118+
Object pollObj;
119+
120+
//need clean
121+
boolean preIsSideJoin = false;
122+
List<FieldReplaceInfo> replaceInfoList = Lists.newArrayList();
119123

120124
while((pollObj = exeQueue.poll()) != null){
121125

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

+51
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,57 @@ public static String replaceIgnoreQuota(String str, String oriStr, String replac
106106
return str.replaceAll(splitPatternStr, replaceStr);
107107
}
108108

109+
/**
110+
* 处理 sql 中 "--" 注释,而不删除引号内的内容
111+
*
112+
* @param sql 解析出来的 sql
113+
* @return 返回无注释内容的 sql
114+
*/
115+
public static String dealSqlComment(String sql) {
116+
boolean inQuotes = false;
117+
boolean inSingleQuotes = false;
118+
int bracketLeftNum = 0;
119+
StringBuilder b = new StringBuilder(sql.length());
120+
char[] chars = sql.toCharArray();
121+
for (int index = 0; index < chars.length; index ++) {
122+
if (index == chars.length) {
123+
return b.toString();
124+
}
125+
StringBuilder tempSb = new StringBuilder(2);
126+
if (index > 1) {
127+
tempSb.append(chars[index - 1]);
128+
tempSb.append(chars[index]);
129+
}
130+
131+
if (tempSb.toString().equals("--")) {
132+
if (inQuotes) {
133+
b.append(chars[index]);
134+
} else if (inSingleQuotes) {
135+
b.append(chars[index]);
136+
} else if (bracketLeftNum > 0) {
137+
b.append(chars[index]);
138+
} else {
139+
b.deleteCharAt(b.length() - 1);
140+
while (chars[index] != '\n') {
141+
// 判断注释内容是不是行尾或者 sql 的最后一行
142+
if (index == chars.length - 1) {
143+
break;
144+
}
145+
index++;
146+
}
147+
}
148+
} else if (chars[index] == '\"' && '\\' != chars[index] && !inSingleQuotes) {
149+
inQuotes = !inQuotes;
150+
b.append(chars[index]);
151+
} else if (chars[index] == '\'' && '\\' != chars[index] && !inQuotes) {
152+
inSingleQuotes = !inSingleQuotes;
153+
b.append(chars[index]);
154+
} else {
155+
b.append(chars[index]);
156+
}
157+
}
158+
return b.toString();
159+
}
109160

110161
public static String col2string(Object column, String type) {
111162
String rowData = column.toString();

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@
3939
public class WaterMarkerAssigner {
4040

4141
public boolean checkNeedAssignWaterMarker(AbstractSourceTableInfo tableInfo){
42-
if(Strings.isNullOrEmpty(tableInfo.getEventTimeField())){
43-
return false;
44-
}
45-
46-
return true;
42+
return !Strings.isNullOrEmpty(tableInfo.getEventTimeField());
4743
}
4844

4945
public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo typeInfo, AbstractSourceTableInfo sourceTableInfo){
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
24+
/**
25+
* @author tiezhu
26+
* Date 2020/6/17 星期三
27+
*/
28+
public class TestDtStringUtil {
29+
@Test
30+
public void dealSqlCommentTest() {
31+
String testSQLWithComment = "CREATE TABLE MyTable --this is a comment\n";
32+
Assert.assertEquals("CREATE TABLE MyTable ", DtStringUtil.dealSqlComment(testSQLWithComment));
33+
testSQLWithComment = "CREATE TABLE 'MyTable--' --this is a comment";
34+
Assert.assertEquals("CREATE TABLE 'MyTable--' ", DtStringUtil.dealSqlComment(testSQLWithComment));
35+
testSQLWithComment = "CREATE TABLE MyTable -- this is a '--comment'\n";
36+
Assert.assertEquals("CREATE TABLE MyTable ", DtStringUtil.dealSqlComment(testSQLWithComment));
37+
}
38+
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public ReplaceInfo getReplaceInfo(String field){
120120
}
121121

122122
private List<ReplaceInfo> makeFormula(String formula){
123-
if (formula == null || formula.length() <= 0) {
123+
if(formula == null || formula.length() <= 0){
124124
return Lists.newArrayList();
125125
}
126126

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import com.dtstack.flink.sql.enums.EUpdateMode;
2424
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
25-
import com.google.common.collect.Lists;
2625
import com.google.common.collect.Maps;
2726
import org.apache.commons.lang3.StringUtils;
2827
import org.apache.flink.api.java.tuple.Tuple2;
@@ -41,7 +40,6 @@
4140
import org.slf4j.LoggerFactory;
4241

4342
import java.io.IOException;
44-
import java.util.List;
4543
import java.util.Map;
4644
import java.util.Set;
4745

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/RowKeyBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public ReplaceInfo getReplaceInfo(String field){
130130

131131
private List<ReplaceInfo> makeFormula(String formula){
132132
if(formula == null || formula.length() <= 0){
133-
Lists.newArrayList();
133+
return Lists.newArrayList();
134134
}
135135
List<ReplaceInfo> result = Lists.newArrayList();
136136
for(String meta: splitIgnoreQuotaBrackets(formula, "\\+")){

0 commit comments

Comments
 (0)