|
1 |
| -# flinkStreamSQL |
2 |
| -> * 基于开源的flink,对其实时sql进行扩展 |
3 |
| -> > * 自定义create table 语法(包括源表,输出表,维表) |
4 |
| -> > * 自定义create view 语法 |
5 |
| -> > * 自定义create function 语法 |
6 |
| -> > * 实现了流与维表的join |
7 |
| -> > * 支持原生FLinkSQL所有的语法 |
8 |
| -> > * 扩展了输入和输出的性能指标到promethus |
9 |
| -
|
10 |
| - ## 新特性: |
11 |
| - * 1.kafka源表支持not null语法,支持字符串类型的时间转换。 |
12 |
| - * 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。 |
13 |
| - * 3.异步维表支持非等值连接,比如:<>,<,>。 |
14 |
| - * 4.增加kafka数组解析 |
15 |
| - * 5.增加kafka1.0以上版本的支持 |
16 |
| - * 6.增加postgresql、kudu、clickhouse维表、结果表的支持 |
17 |
| - * 7.支持插件的依赖方式,参考pluginLoadMode参数 |
18 |
| - * 8.支持cep处理 |
19 |
| - * 9.支持udaf |
20 |
| - * 10.支持谓词下移 |
21 |
| - * 11.支持状态的ttl |
22 |
| - |
23 |
| - ## BUG修复: |
24 |
| - * 1.修复不能解析sql中orderby,union语法。 |
25 |
| - * 2.修复yarnPer模式提交失败的异常。 |
26 |
| - * 3.一些bug的修复 |
27 |
| - |
28 |
| -# 已支持 |
29 |
| - * 源表:kafka 0.9、0.10、0.11、1.x版本 |
30 |
| - * 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver |
31 |
| - * 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver |
32 |
| - |
33 |
| -# 后续开发计划 |
34 |
| - * 维表快照 |
35 |
| - * kafka avro格式 |
36 |
| - * topN |
37 |
| - |
38 |
| -## 1 快速起步 |
39 |
| -### 1.1 运行模式 |
40 |
| - |
41 |
| - |
42 |
| -* 单机模式:对应Flink集群的单机模式 |
43 |
| -* standalone模式:对应Flink集群的分布式模式 |
44 |
| -* yarn模式:对应Flink集群的yarn模式 |
45 |
| - |
46 |
| -### 1.2 执行环境 |
47 |
| - |
48 |
| -* Java: JDK8及以上 |
49 |
| -* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群) |
50 |
| -* 操作系统:理论上不限 |
51 |
| -* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例: |
52 |
| -``` |
53 |
| -## hadoop配置文件路径 |
54 |
| -fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250 |
55 |
| -security.kerberos.login.use-ticket-cache: true |
56 |
| -security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/maqi.keytab |
57 |
| -security.kerberos.login.principal: [email protected] |
58 |
| -security.kerberos.login.contexts: Client,KafkaClient |
59 |
| -zookeeper.sasl.service-name: zookeeper |
60 |
| -zookeeper.sasl.login-context-name: Client |
61 |
| -
|
62 |
| -``` |
63 |
| - |
64 |
| -### 1.3 打包 |
65 |
| - |
66 |
| -进入项目根目录,使用maven打包: |
67 |
| - |
68 |
| -``` |
69 |
| -mvn clean package -Dmaven.test.skip |
70 |
| -
|
71 |
| -``` |
72 |
| - |
73 |
| -打包完成后的包结构: |
74 |
| - |
75 |
| -> * dt-center-flinkStreamSQL |
76 |
| -> > * bin: 任务启动脚本 |
77 |
| -> > * lib: launcher包存储路径,是任务提交的入口 |
78 |
| -> > * plugins: 插件包存储路径 |
79 |
| -> > * ........ : core及插件代码 |
80 |
| -
|
81 |
| -### 1.4 启动 |
82 |
| - |
83 |
| -#### 1.4.1 启动命令 |
84 |
| - |
85 |
| -``` |
86 |
| -sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\} -yarnSessionConf \{\"yid\":\"application_1564971615273_38182\"} |
87 |
| -``` |
88 |
| - |
89 |
| -#### 1.4.2 命令行参数选项 |
90 |
| - |
91 |
| -* **mode** |
92 |
| - * 描述:执行模式,也就是flink集群的工作模式 |
93 |
| - * local: 本地模式 |
94 |
| - * standalone: 提交到独立部署模式的flink集群 |
95 |
| - * yarn: 提交到yarn模式的flink集群(即提交到已有flink集群) |
96 |
| - * yarnPer: yarn per_job模式提交(即创建新flink application) |
97 |
| - * 必选:否 |
98 |
| - * 默认值:local |
99 |
| - |
100 |
| -* **name** |
101 |
| - * 描述:flink 任务对应名称。 |
102 |
| - * 必选:是 |
103 |
| - * 默认值:无 |
104 |
| - |
105 |
| -* **sql** |
106 |
| - * 描述:执行flink sql 的主体语句。 |
107 |
| - * 必选:是 |
108 |
| - * 默认值:无 |
109 |
| - |
110 |
| -* **localSqlPluginPath** |
111 |
| - * 描述:本地插件根目录地址,也就是打包后产生的plugins目录。 |
112 |
| - * 必选:是 |
113 |
| - * 默认值:无 |
114 |
| - |
115 |
| -* **remoteSqlPluginPath** |
116 |
| - * 描述:flink执行集群上的插件根目录地址(将打包好的插件存放到各个flink节点上,如果是yarn集群需要存放到所有的nodemanager上)。 |
117 |
| - * 必选:否 |
118 |
| - * 默认值:无 |
119 |
| - |
120 |
| -* **addjar** |
121 |
| - * 描述:扩展jar路径,当前主要是UDF定义的jar; |
122 |
| - * 格式:json |
123 |
| - * 必选:否 |
124 |
| - * 默认值:无 |
| 1 | +FlinkStreamSQL |
| 2 | +============ |
| 3 | +[](https://www.apache.org/licenses/LICENSE-2.0.html) |
| 4 | + |
| 5 | +##技术交流 |
| 6 | +- 招聘 **大数据平台开发工程师 **,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至 [[email protected]](mailto:[email protected]) |
| 7 | +- 我们使用[钉钉](https://www.dingtalk.com/)沟通交流,可以搜索群号[**30537511**]或者扫描下面的二维码进入钉钉群 |
| 8 | +<div align=center> |
| 9 | + <img src=docs/images/streamsql_dd.jpg width=300 /> |
| 10 | +</div> |
| 11 | + |
| 12 | +##介绍 |
| 13 | + * 基于开源的flink,对其实时sql进行扩展 |
| 14 | + * 自定义create table 语法(包括源表,输出表,维表) |
| 15 | + * 自定义create view 语法 |
| 16 | + * 自定义create function 语法 |
| 17 | + * 实现了流与维表的join |
| 18 | + * 支持原生FLinkSQL所有的语法 |
| 19 | + * 扩展了输入和输出的性能指标到promethus |
125 | 20 |
|
126 |
| -* **confProp** |
127 |
| - * 描述:一些参数设置 |
128 |
| - * 格式: json |
129 |
| - * 必选:是 (如无参数填写空json即可) |
130 |
| - * 默认值:无 |
131 |
| - * 可选参数: |
132 |
| - * sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒) |
133 |
| - * sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟 |
134 |
| - * state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。 |
135 |
| - * state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。 |
136 |
| - * state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。 |
137 |
| - * sql.env.parallelism: 默认并行度设置 |
138 |
| - * sql.max.env.parallelism: 最大并行度设置 |
139 |
| - * time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime] |
140 |
| - * sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms) |
141 |
| - * sql.checkpoint.mode: 可选值[EXACTLY_ONCE|AT_LEAST_ONCE] |
142 |
| - * sql.checkpoint.timeout: 生成checkpoint的超时时间(ms) |
143 |
| - * sql.max.concurrent.checkpoints: 最大并发生成checkpoint数 |
144 |
| - * sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)] |
145 |
| - * flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file:// |
146 |
| - * jobmanager.memory.mb: per_job模式下指定jobmanager的内存大小(单位MB, 默认值:768) |
147 |
| - * taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768) |
148 |
| - * taskmanager.num: per_job模式下指定taskmanager的实例数(默认1) |
149 |
| - * taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1) |
150 |
| - * savePointPath:任务恢复点的路径(默认无) |
151 |
| - * allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false) |
152 |
| - * logLevel: 日志级别动态配置(默认info) |
153 |
| - * [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例 |
| 21 | +##目录 |
| 22 | + |
| 23 | +[ 1.1 demo]((docs/demo.md)) |
| 24 | +[ 1.2 快速开始](docs/quickStart.md) |
| 25 | +[ 1.3 参数配置](docs/config.md) |
| 26 | +[ 1.4 支持的插件介绍和demo](docs/pluginsInfo.md) |
| 27 | +[ 1.5 指标参数](docs/newMetric.md) |
| 28 | +[ 1.6 自定义函数](docs/function.md) |
| 29 | +[ 1.7 视图定义](docs/createView.md) |
| 30 | + |
154 | 31 |
|
155 |
| - |
156 |
| -* **flinkconf** |
157 |
| - * 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf |
158 |
| - * 必选:否 |
159 |
| - * 默认值:无 |
160 |
| - |
161 |
| -* **yarnconf** |
162 |
| - * 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop |
163 |
| - * 必选:否 |
164 |
| - * 默认值:无 |
165 |
| - |
166 |
| -* **flinkJarPath** |
167 |
| - * 描述:per_job 模式提交需要指定本地的flink jar存放路径 |
168 |
| - * 必选:否 |
169 |
| - * 默认值:false |
170 |
| - |
171 |
| -* **queue** |
172 |
| - * 描述:per_job 模式下指定的yarn queue |
173 |
| - * 必选:否 |
174 |
| - * 默认值:false |
175 |
| - |
176 |
| -* **pluginLoadMode** |
177 |
| - * 描述:per_job 模式下的插件包加载方式。classpath:从每台机器加载插件包,shipfile:将需要插件从提交的节点上传到hdfs,不需要每台安装插件 |
178 |
| - * 必选:否 |
179 |
| - * 默认值:classpath |
180 |
| - |
181 |
| -* **yarnSessionConf** |
182 |
| - * 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid |
183 |
| - * 必选:否 |
184 |
| - * 默认值:false |
185 |
| - |
186 |
| - |
187 |
| -## 2 结构 |
188 |
| -### 2.1 源表插件 |
189 |
| -* [kafka 源表插件](docs/kafkaSource.md) |
190 |
| - |
191 |
| -### 2.2 结果表插件 |
192 |
| -* [elasticsearch 结果表插件](docs/elasticsearchSink.md) |
193 |
| -* [hbase 结果表插件](docs/hbaseSink.md) |
194 |
| -* [mysql 结果表插件](docs/mysqlSink.md) |
195 |
| -* [oracle 结果表插件](docs/oracleSink.md) |
196 |
| -* [mongo 结果表插件](docs/mongoSink.md) |
197 |
| -* [redis 结果表插件](docs/redisSink.md) |
198 |
| -* [cassandra 结果表插件](docs/cassandraSink.md) |
199 |
| -* [kudu 结果表插件](docs/kuduSink.md) |
200 |
| -* [postgresql 结果表插件](docs/postgresqlSink.md) |
201 |
| -* [clickhouse 结果表插件](docs/clickhouseSink.md) |
202 |
| -* [impala 结果表插件](docs/impalaSink.md) |
203 |
| -* [db2 结果表插件](docs/db2Sink.md) |
204 |
| -* [sqlserver 结果表插件](docs/sqlserverSink.md) |
205 |
| - |
206 |
| -### 2.3 维表插件 |
207 |
| -* [hbase 维表插件](docs/hbaseSide.md) |
208 |
| -* [mysql 维表插件](docs/mysqlSide.md) |
209 |
| -* [oracle 维表插件](docs/oracleSide.md) |
210 |
| -* [mongo 维表插件](docs/mongoSide.md) |
211 |
| -* [redis 维表插件](docs/redisSide.md) |
212 |
| -* [cassandra 维表插件](docs/cassandraSide.md) |
213 |
| -* [kudu 维表插件](docs/kuduSide.md) |
214 |
| -* [postgresql 维表插件](docs/postgresqlSide.md) |
215 |
| -* [clickhouse 维表插件](docs/clickhouseSide.md) |
216 |
| -* [impala 维表插件](docs/impalaSide.md) |
217 |
| -* [db2 维表插件](docs/db2Side.md) |
218 |
| -* [sqlserver 维表插件](docs/sqlserverSide.md) |
219 |
| - |
220 |
| -## 3 性能指标(新增) |
221 |
| - |
222 |
| -### kafka插件 |
223 |
| -* 业务延迟: flink_taskmanager_job_task_operator_dtEventDelay(单位s) |
224 |
| - 数据本身的时间和进入flink的当前时间的差值. |
225 |
| - |
226 |
| -* 各个输入源的脏数据:flink_taskmanager_job_task_operator_dtDirtyData |
227 |
| - 从kafka获取的数据解析失败的视为脏数据 |
228 |
| - |
229 |
| -* 各Source的数据输入TPS: flink_taskmanager_job_task_operator_dtNumRecordsInRate |
230 |
| - kafka接受的记录数(未解析前)/s |
231 |
| - |
232 |
| -* 各Source的数据输入RPS: flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate |
233 |
| - kafka接受的记录数(解析后)/s |
234 |
| - |
235 |
| -* 各Source的数据输入BPS: flink_taskmanager_job_task_operator_dtNumBytesInRate |
236 |
| - kafka接受的字节数/s |
237 |
| - |
238 |
| -* Kafka作为输入源的各个分区的延迟数: flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag |
239 |
| - 当前kafka10,kafka11有采集该指标 |
240 |
| - |
241 |
| -* 各个输出源RPS: flink_taskmanager_job_task_operator_dtNumRecordsOutRate |
242 |
| - 写入的外部记录数/s |
243 |
| - |
244 |
| - |
245 |
| -## 4 样例 |
246 |
| - |
247 |
| -``` |
248 |
| -
|
249 |
| -CREATE (scala|table|aggregate) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun; |
250 |
| -
|
251 |
| -
|
252 |
| -CREATE TABLE MyTable( |
253 |
| - name varchar, |
254 |
| - channel varchar, |
255 |
| - pv int, |
256 |
| - xctime bigint, |
257 |
| - CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数 |
258 |
| - )WITH( |
259 |
| - type ='kafka09', |
260 |
| - bootstrapServers ='172.16.8.198:9092', |
261 |
| - zookeeperQuorum ='172.16.8.198:2181/kafka', |
262 |
| - offsetReset ='latest', |
263 |
| - topic ='nbTest1', |
264 |
| - parallelism ='1' |
265 |
| - ); |
266 |
| -
|
267 |
| -CREATE TABLE MyResult( |
268 |
| - channel varchar, |
269 |
| - pv varchar |
270 |
| - )WITH( |
271 |
| - type ='mysql', |
272 |
| - url ='jdbc:mysql://172.16.8.104:3306/test?charset=utf8', |
273 |
| - userName ='dtstack', |
274 |
| - password ='abc123', |
275 |
| - tableName ='pv2', |
276 |
| - parallelism ='1' |
277 |
| - ); |
278 |
| -
|
279 |
| -CREATE TABLE workerinfo( |
280 |
| - cast(logtime as TIMESTAMP) AS rtime, |
281 |
| - cast(logtime) AS rtime |
282 |
| - )WITH( |
283 |
| - type ='hbase', |
284 |
| - zookeeperQuorum ='rdos1:2181', |
285 |
| - tableName ='workerinfo', |
286 |
| - rowKey ='ce,de', |
287 |
| - parallelism ='1', |
288 |
| - zookeeperParent ='/hbase' |
289 |
| - ); |
290 |
| -
|
291 |
| -CREATE TABLE sideTable( |
292 |
| - cf:name varchar as name, |
293 |
| - cf:info varchar as info, |
294 |
| - PRIMARY KEY(name), |
295 |
| - PERIOD FOR SYSTEM_TIME //维表标识 |
296 |
| - )WITH( |
297 |
| - type ='hbase', |
298 |
| - zookeeperQuorum ='rdos1:2181', |
299 |
| - zookeeperParent ='/hbase', |
300 |
| - tableName ='workerinfo', |
301 |
| - cache ='LRU', |
302 |
| - cacheSize ='10000', |
303 |
| - cacheTTLMs ='60000', |
304 |
| - parallelism ='1' |
305 |
| - ); |
306 |
| -
|
307 |
| -insert |
308 |
| -into |
309 |
| - MyResult |
310 |
| - select |
311 |
| - d.channel, |
312 |
| - d.info |
313 |
| - from |
314 |
| - ( select |
315 |
| - a.*,b.info |
316 |
| - from |
317 |
| - MyTable a |
318 |
| - join |
319 |
| - sideTable b |
320 |
| - on a.channel=b.name |
321 |
| - where |
322 |
| - a.channel = 'xc2' |
323 |
| - and a.pv=10 ) as d |
324 |
| -``` |
| 32 | +## License |
325 | 33 |
|
326 |
| -# 招聘 |
327 |
| -1.大数据平台开发工程师,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至[email protected]。 |
328 |
| - |
| 34 | +FlinkStreamSQL is under the Apache 2.0 license. See the [LICENSE](http://www.apache.org/licenses/LICENSE-2.0) file for details. |
| 35 | + |
| 36 | + |
0 commit comments