Skip to content

Commit f8dc450

Browse files
修竹修竹
authored andcommitted
es增加用户名和密码配置
1 parent 95107d6 commit f8dc450

File tree

6 files changed

+254
-9
lines changed

6 files changed

+254
-9
lines changed

elasticsearch5/elasticsearch5-sink/pom.xml

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
<dependencies>
1616
<dependency>
17-
<groupId>org.apache.flink</groupId>
18-
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
19-
<version>${flink.version}</version>
17+
<groupId>org.elasticsearch.client</groupId>
18+
<artifactId>transport</artifactId>
19+
<version>5.1.2</version>
2020
</dependency>
2121

2222
<dependency>
@@ -30,6 +30,30 @@
3030
<artifactId>logback-classic</artifactId>
3131
<version>1.1.7</version>
3232
</dependency>
33+
34+
<dependency>
35+
<groupId>org.elasticsearch.client</groupId>
36+
<artifactId>x-pack-transport</artifactId>
37+
<version>5.1.1</version>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.apache.logging.log4j</groupId>
42+
<artifactId>log4j-to-slf4j</artifactId>
43+
<version>2.7</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.flink</groupId>
48+
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
49+
<version>${flink.version}</version>
50+
</dependency>
51+
52+
<dependency>
53+
<groupId>org.apache.flink</groupId>
54+
<artifactId>flink-connector-elasticsearch-base_2.11</artifactId>
55+
<version>${flink.version}</version>
56+
</dependency>
3357
</dependencies>
3458

3559
<build>
@@ -47,7 +71,7 @@
4771
<configuration>
4872
<artifactSet>
4973
<excludes>
50-
<exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>
74+
<!--<exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>-->
5175
</excludes>
5276
</artifactSet>
5377
<filters>

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
7676

7777
private int parallelism = -1;
7878

79+
private ElasticsearchTableInfo esTableInfo;
80+
7981

8082
@Override
8183
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
@@ -130,9 +132,17 @@ private RichSinkFunction createEsSinkFunction(){
130132
}
131133
}
132134

135+
boolean authMesh = esTableInfo.isAuthMesh();
136+
if (authMesh) {
137+
String username = esTableInfo.getUserName();
138+
String password = esTableInfo.getPassword();
139+
String authPassword = esTableInfo.getUserName() + ":" + esTableInfo.getPassword();
140+
userConfig.put("xpack.security.user", authPassword);
141+
}
142+
133143
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), idIndexList);
134144

135-
return new MetricElasticsearchSink(userConfig, transports, customerSinkFunc);
145+
return new MetricElasticsearchSink(userConfig, transports, customerSinkFunc, esTableInfo);
136146
}
137147

138148
@Override
@@ -155,6 +165,7 @@ public void setBulkFlushMaxActions(int bulkFlushMaxActions) {
155165
@Override
156166
public ElasticsearchSink genStreamSink(TargetTableInfo targetTableInfo) {
157167
ElasticsearchTableInfo elasticsearchTableInfo = (ElasticsearchTableInfo) targetTableInfo;
168+
esTableInfo = elasticsearchTableInfo;
158169
clusterName = elasticsearchTableInfo.getClusterName();
159170
String address = elasticsearchTableInfo.getAddress();
160171
String[] addr = address.split(",");
Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package com.dtstack.flink.sql.sink.elasticsearch;
219

3-
public class ExtendES5ApiCallBridge {
20+
import com.dtstack.flink.sql.sink.elasticsearch.table.ElasticsearchTableInfo;
21+
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
22+
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
23+
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
24+
import org.apache.flink.util.IOUtils;
25+
import org.apache.flink.util.Preconditions;
26+
import org.elasticsearch.action.bulk.BackoffPolicy;
27+
import org.elasticsearch.action.bulk.BulkItemResponse;
28+
import org.elasticsearch.action.bulk.BulkProcessor;
29+
import org.elasticsearch.client.transport.TransportClient;
30+
import org.elasticsearch.common.network.NetworkModule;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.common.transport.TransportAddress;
33+
import org.elasticsearch.common.unit.TimeValue;
34+
import org.elasticsearch.transport.Netty3Plugin;
35+
import org.elasticsearch.transport.client.PreBuiltTransportClient;
36+
import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
import javax.annotation.Nullable;
41+
import java.net.InetSocketAddress;
42+
import java.util.List;
43+
import java.util.Map;
44+
45+
/**
46+
* @date 2019/11/16
47+
* @author xiuzhu
48+
* @Company: www.dtstack.com
49+
*/
50+
51+
public class ExtendES5ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> {
52+
private static final long serialVersionUID = -5222683870097809633L;
53+
54+
private static final Logger LOG = LoggerFactory.getLogger(ExtendES5ApiCallBridge.class);
55+
56+
private final List<InetSocketAddress> transportAddresses;
57+
58+
protected ElasticsearchTableInfo esTableInfo;
59+
60+
public ExtendES5ApiCallBridge(List<InetSocketAddress> transportAddresses, ElasticsearchTableInfo esTableInfo) {
61+
Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
62+
this.transportAddresses = transportAddresses;
63+
this.esTableInfo = esTableInfo;
64+
}
65+
66+
@Override
67+
public TransportClient createClient(Map<String, String> clientConfig) {
68+
Settings settings = Settings.builder().put(clientConfig)
69+
//.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
70+
//.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
71+
.build();
72+
73+
TransportClient transportClient;
74+
if (esTableInfo.isAuthMesh()) {
75+
transportClient = new PreBuiltXPackTransportClient(settings);
76+
}else {
77+
transportClient = new PreBuiltTransportClient(settings);
78+
}
79+
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
80+
transportClient.addTransportAddress(transport);
81+
}
82+
83+
// verify that we actually are connected to a cluster
84+
if (transportClient.connectedNodes().isEmpty()) {
85+
86+
// close the transportClient here
87+
IOUtils.closeQuietly(transportClient);
88+
89+
throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
90+
}
91+
92+
if (LOG.isInfoEnabled()) {
93+
LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
94+
}
95+
96+
return transportClient;
97+
}
98+
99+
@Override
100+
public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) {
101+
return BulkProcessor.builder(client, listener);
102+
}
103+
104+
@Override
105+
public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
106+
if (!bulkItemResponse.isFailed()) {
107+
return null;
108+
} else {
109+
return bulkItemResponse.getFailure().getCause();
110+
}
111+
}
112+
113+
@Override
114+
public void configureBulkProcessorBackoff(
115+
BulkProcessor.Builder builder,
116+
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
117+
118+
BackoffPolicy backoffPolicy;
119+
if (flushBackoffPolicy != null) {
120+
switch (flushBackoffPolicy.getBackoffType()) {
121+
case CONSTANT:
122+
backoffPolicy = BackoffPolicy.constantBackoff(
123+
new TimeValue(flushBackoffPolicy.getDelayMillis()),
124+
flushBackoffPolicy.getMaxRetryCount());
125+
break;
126+
case EXPONENTIAL:
127+
default:
128+
backoffPolicy = BackoffPolicy.exponentialBackoff(
129+
new TimeValue(flushBackoffPolicy.getDelayMillis()),
130+
flushBackoffPolicy.getMaxRetryCount());
131+
}
132+
} else {
133+
backoffPolicy = BackoffPolicy.noBackoff();
134+
}
135+
136+
builder.setBackoffPolicy(backoffPolicy);
137+
}
4138
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/MetricElasticsearchSink.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,39 @@
1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

2121
import com.dtstack.flink.sql.metric.MetricConstant;
22+
import com.dtstack.flink.sql.sink.elasticsearch.table.ElasticsearchTableInfo;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.flink.metrics.Counter;
2425
import org.apache.flink.metrics.Meter;
2526
import org.apache.flink.metrics.MeterView;
2627
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
28+
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
2729

30+
import java.net.InetSocketAddress;
2831
import java.util.List;
2932
import java.util.Map;
3033

3134
/**
3235
* @Auther: jiangjunjie
3336
* @Date: 2018/11/29 14:15
3437
* @Description:
38+
*
3539
*/
36-
public class MetricElasticsearchSink extends org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink {
40+
public class MetricElasticsearchSink extends org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase {
3741

3842
protected CustomerSinkFunc customerSinkFunc;
3943

4044
protected transient Meter outRecordsRate;
4145

42-
public MetricElasticsearchSink(Map userConfig, List transportAddresses, ElasticsearchSinkFunction elasticsearchSinkFunction) {
43-
super(userConfig, transportAddresses, elasticsearchSinkFunction);
46+
protected Map userConfig;
47+
48+
49+
public MetricElasticsearchSink(Map userConfig, List transportAddresses,
50+
ElasticsearchSinkFunction elasticsearchSinkFunction,
51+
ElasticsearchTableInfo esTableInfo) {
52+
super(new ExtendES5ApiCallBridge(transportAddresses, esTableInfo), userConfig, elasticsearchSinkFunction, new NoOpFailureHandler());
4453
this.customerSinkFunc = (CustomerSinkFunc) elasticsearchSinkFunction;
54+
this.userConfig = userConfig;
4555
}
4656

4757
@Override
@@ -50,6 +60,20 @@ public void open(Configuration parameters) throws Exception {
5060
initMetric();
5161
}
5262

63+
/*public void setXPackTransportClient() throws Exception {
64+
String authPassword = esTableInfo.getUserName() + ":" + esTableInfo.getPassword();
65+
Settings settings = Settings.builder().put(userConfig).put("xpack.security.user", authPassword).build();
66+
Class clz = Class.forName("org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase");
67+
Field clientField = clz.getDeclaredField("client");
68+
clientField.setAccessible(true);
69+
PreBuiltXPackTransportClient transportClient = new PreBuiltXPackTransportClient(settings);
70+
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
71+
transportClient.addTransportAddress(transport);
72+
}
73+
74+
clientField.set(this, transportClient);
75+
}*/
76+
5377
public void initMetric() {
5478
Counter counter = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
5579
customerSinkFunc.setOutRecords(counter);

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchSinkParser.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
import com.dtstack.flink.sql.table.AbsTableParser;
2525
import com.dtstack.flink.sql.table.TableInfo;
26+
import com.dtstack.flink.sql.util.MathUtil;
27+
2628
import java.util.Map;
2729

2830
/**
@@ -42,6 +44,12 @@ public class ElasticsearchSinkParser extends AbsTableParser {
4244

4345
private static final String KEY_ES_ID_FIELD_INDEX_LIST = "id";
4446

47+
private static final String KEY_ES_AUTHMESH = "authMesh";
48+
49+
private static final String KEY_ES_USERNAME = "userName";
50+
51+
private static final String KEY_ES_PASSWORD = "password";
52+
4553
@Override
4654
protected boolean fieldNameNeedsUpperCase() {
4755
return false;
@@ -57,6 +65,14 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
5765
elasticsearchTableInfo.setId((String) props.get(KEY_ES_ID_FIELD_INDEX_LIST.toLowerCase()));
5866
elasticsearchTableInfo.setIndex((String) props.get(KEY_ES_INDEX.toLowerCase()));
5967
elasticsearchTableInfo.setEsType((String) props.get(KEY_ES_TYPE.toLowerCase()));
68+
69+
String authMeshStr = (String)props.get(KEY_ES_AUTHMESH.toLowerCase());
70+
if (authMeshStr != null & "true".equals(authMeshStr)) {
71+
elasticsearchTableInfo.setAuthMesh(MathUtil.getBoolean(authMeshStr));
72+
elasticsearchTableInfo.setUserName(MathUtil.getString(props.get(KEY_ES_USERNAME.toLowerCase())));
73+
elasticsearchTableInfo.setPassword(MathUtil.getString(props.get(KEY_ES_PASSWORD.toLowerCase())));
74+
}
75+
elasticsearchTableInfo.check();
6076
return elasticsearchTableInfo;
6177
}
6278
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ public class ElasticsearchTableInfo extends TargetTableInfo {
4343

4444
private String esType;
4545

46+
private boolean authMesh = false;
47+
48+
private String userName;
49+
50+
private String password;
51+
4652
public String getEsType() {
4753
return esType;
4854
}
@@ -89,6 +95,30 @@ public void setClusterName(String clusterName) {
8995
this.clusterName = clusterName;
9096
}
9197

98+
public boolean isAuthMesh() {
99+
return authMesh;
100+
}
101+
102+
public void setAuthMesh(boolean authMesh) {
103+
this.authMesh = authMesh;
104+
}
105+
106+
public String getUserName() {
107+
return userName;
108+
}
109+
110+
public void setUserName(String userName) {
111+
this.userName = userName;
112+
}
113+
114+
public String getPassword() {
115+
return password;
116+
}
117+
118+
public void setPassword(String password) {
119+
this.password = password;
120+
}
121+
92122
public ElasticsearchTableInfo() {
93123
setType(CURR_TYPE);
94124
}
@@ -100,6 +130,12 @@ public boolean check() {
100130
Preconditions.checkNotNull(esType, "elasticsearch type of type is required");
101131
Preconditions.checkNotNull(id, "elasticsearch type of id is required");
102132
Preconditions.checkNotNull(clusterName, "elasticsearch type of clusterName is required");
133+
134+
if (isAuthMesh()) {
135+
Preconditions.checkNotNull(userName, "elasticsearch type of userName is required");
136+
Preconditions.checkNotNull(password, "elasticsearch type of password is required");
137+
}
138+
103139
return true;
104140
}
105141

0 commit comments

Comments
 (0)