Skip to content

Commit b251fb9

Browse files
committed
deal lateral
1 parent e5dbaf4 commit b251fb9

File tree

4 files changed

+119
-0
lines changed

4 files changed

+119
-0
lines changed

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

+4
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,10 @@ private Set<String> extractFieldFromGroupByList(SqlNodeList parentGroupByList,
507507
* @param joinFieldSet
508508
*/
509509
private void extractJoinField(SqlNode condition, Set<Tuple2<String, String>> joinFieldSet){
510+
if (null == condition || condition.getKind() == LITERAL) {
511+
return;
512+
}
513+
510514
SqlKind joinKind = condition.getKind();
511515
if( joinKind == AND || joinKind == EQUALS ){
512516
extractJoinField(((SqlBasicCall)condition).operands[0], joinFieldSet);

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

+3
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ public Object parseSql(SqlNode sqlNode,
165165
case ORDER_BY:
166166
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
167167
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
168+
169+
case LITERAL:
170+
return LITERAL.toString();
168171
}
169172
return "";
170173
}

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

+3
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,9 @@ private static void replaceOneSelectField(SqlIdentifier sqlIdentifier,
435435
* @param oldTabFieldRefNew
436436
*/
437437
public static void replaceJoinFieldRefTableName(SqlNode condition, Map<String, String> oldTabFieldRefNew){
438+
if (null == condition || condition.getKind() == LITERAL) {
439+
return;
440+
}
438441
SqlKind joinKind = condition.getKind();
439442
if( joinKind == AND || joinKind == EQUALS ){
440443
replaceJoinFieldRefTableName(((SqlBasicCall)condition).operands[0], oldTabFieldRefNew);

docs/function.md

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
## 支持UDF,UDTF,UDAT:
2+
3+
### UDTF使用案例
4+
5+
1. cross join:左表的每一行数据都会关联上UDTF 产出的每一行数据,如果UDTF不产出任何数据,那么这1行不会输出。
6+
2. left join:左表的每一行数据都会关联上UDTF 产出的每一行数据,如果UDTF不产出任何数据,则这1行的UDTF的字段会用null值填充。 left join UDTF 语句后面必须接 on true参数。
7+
8+
9+
场景:将某个字段拆分为两个字段。
10+
11+
```$xslt
12+
13+
create table function UDTFOneColumnToMultiColumn with cn.todd.flink180.udflib.UDTFOneColumnToMultiColumn;
14+
15+
CREATE TABLE MyTable (
16+
userID VARCHAR ,
17+
eventType VARCHAR,
18+
productID VARCHAR)
19+
WITH (
20+
type = 'kafka11',
21+
bootstrapServers = '172.16.8.107:9092',
22+
zookeeperQuorum = '172.16.8.107:2181/kafka',
23+
offsetReset = 'latest',
24+
topic ='mqTest03',
25+
topicIsPattern = 'false'
26+
);
27+
28+
CREATE TABLE MyTable1 (
29+
channel VARCHAR ,
30+
pv VARCHAR,
31+
name VARCHAR)
32+
WITH (
33+
type = 'kafka11',
34+
bootstrapServers = '172.16.8.107:9092',
35+
zookeeperQuorum = '172.16.8.107:2181/kafka',
36+
offsetReset = 'latest',
37+
topic ='mqTest01',
38+
topicIsPattern = 'false'
39+
);
40+
41+
CREATE TABLE MyTable2 (
42+
userID VARCHAR,
43+
eventType VARCHAR,
44+
productID VARCHAR,
45+
date1 VARCHAR,
46+
time1 VARCHAR
47+
)
48+
WITH (
49+
type = 'console',
50+
bootstrapServers = '172.16.8.107:9092',
51+
zookeeperQuorum = '172.16.8.107:2181/kafka',
52+
offsetReset = 'latest',
53+
topic ='mqTest02',
54+
topicIsPattern = 'false'
55+
);
56+
57+
## 视图使用UDTF
58+
--create view udtf_table as
59+
-- select MyTable.userID,MyTable.eventType,MyTable.productID,date1,time1
60+
-- from MyTable LEFT JOIN lateral table(UDTFOneColumnToMultiColumn(productID))
61+
-- as T(date1,time1) on true;
62+
63+
64+
65+
66+
insert
67+
into
68+
MyTable2
69+
select
70+
userID,eventType,productID,date1,time1
71+
from
72+
(
73+
select MyTable.userID,MyTable.eventType,MyTable.productID,date1,time1
74+
from MyTable ,lateral table(UDTFOneColumnToMultiColumn(productID)) as T(date1,time1)) as udtf_table;
75+
76+
```
77+
一行转多列UDTFOneColumnToMultiColumn
78+
79+
```$xslt
80+
public class UDTFOneColumnToMultiColumn extends TableFunction<Row> {
81+
public void eval(String value) {
82+
String[] valueSplits = value.split("_");
83+
84+
//一行,两列
85+
Row row = new Row(2);
86+
row.setField(0, valueSplits[0]);
87+
row.setField(1, valueSplits[1]);
88+
collect(row);
89+
}
90+
91+
@Override
92+
public TypeInformation<Row> getResultType() {
93+
return new RowTypeInfo(Types.STRING, Types.STRING);
94+
}
95+
}
96+
```
97+
98+
输入输出:
99+
100+
101+
输入: {"userID": "user_5", "eventType": "browse", "productID":"product_5"}
102+
103+
输出:
104+
105+
+--------+-----------+-----------+---------+-------+
106+
| userID | eventType | productID | date1 | time1 |
107+
+--------+-----------+-----------+---------+-------+
108+
| user_5 | browse | product_5 | product | 5 |
109+
+--------+-----------+-----------+---------+-------+

0 commit comments

Comments
 (0)