Skip to content

Commit 7476571

Browse files
authored
Merge pull request #2 from aiven/add-es7-support
Add ES7 support
2 parents 23fdf76 + 69970b8 commit 7476571

File tree

8 files changed

+30
-17
lines changed

8 files changed

+30
-17
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
<kafka.version>2.2.0</kafka.version>
5959
<es.version>2.4.1</es.version>
6060
<lucene.version>5.5.2</lucene.version>
61-
<jest.version>2.4.0</jest.version>
61+
<jest.version>6.3.1</jest.version>
6262

6363
<junit.version>4.12</junit.version>
6464
<hamcrest.version>1.3</hamcrest.version>

src/main/java/io/aiven/connect/elasticsearch/DataConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ private Object preProcessLogicalValue(String schemaName, Object value) {
321321
}
322322

323323
private Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) {
324-
Collection collection = (Collection) value;
324+
Collection<?> collection = (Collection<?>) value;
325325
List<Object> result = new ArrayList<>();
326326
for (Object element: collection) {
327327
result.add(preProcessValue(element, schema.valueSchema(), newSchema.valueSchema()));

src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public interface ElasticsearchClient extends AutoCloseable {
3030

3131
enum Version {
32-
ES_V1, ES_V2, ES_V5, ES_V6
32+
ES_V1, ES_V2, ES_V5, ES_V6, ES_V7
3333
}
3434

3535
/**
@@ -96,5 +96,5 @@ enum Version {
9696
/**
9797
* Shuts down the client.
9898
*/
99-
void close();
99+
void close() throws IOException;
100100
}

src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31+
import java.io.IOException;
3132
import java.util.Collection;
3233
import java.util.HashMap;
3334
import java.util.HashSet;
@@ -182,7 +183,11 @@ public void stop() throws ConnectException {
182183
writer.stop();
183184
}
184185
if (client != null) {
185-
client.close();
186+
try {
187+
client.close();
188+
} catch (IOException e) {
189+
throw new ConnectException(e);
190+
}
186191
}
187192
}
188193

src/main/java/io/aiven/connect/elasticsearch/jest/JestElasticsearchClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ private Version getServerVersion() throws IOException {
198198
return Version.ES_V5;
199199
} else if (esVersion.startsWith("6.")) {
200200
return Version.ES_V6;
201+
} else if (esVersion.startsWith("7.")) {
202+
return Version.ES_V7;
201203
}
202204
return defaultVersion;
203205
}
@@ -229,7 +231,11 @@ private boolean indexExists(String index) {
229231
public void createIndices(Set<String> indices) {
230232
for (String index : indices) {
231233
if (!indexExists(index)) {
232-
CreateIndex createIndex = new CreateIndex.Builder(index).build();
234+
CreateIndex.Builder builder = new CreateIndex.Builder(index);
235+
if (version.equals(Version.ES_V7)) {
236+
builder.setParameter("include_type_name", true);
237+
}
238+
CreateIndex createIndex = builder.build();
233239
try {
234240
JestResult result = client.execute(createIndex);
235241
if (!result.isSucceeded()) {
@@ -285,7 +291,7 @@ public BulkRequest createBulkRequest(List<IndexableRecord> batch) {
285291
}
286292

287293
// visible for testing
288-
protected BulkableAction toBulkableAction(IndexableRecord record) {
294+
protected BulkableAction<?> toBulkableAction(IndexableRecord record) {
289295
// If payload is null, the record was a tombstone and we should delete from the index.
290296
return record.payload != null ? toIndexRequest(record) : toDeleteRequest(record);
291297
}
@@ -364,7 +370,7 @@ public JsonObject search(String query, String index, String type) throws IOExcep
364370
return result.getJsonObject();
365371
}
366372

367-
public void close() {
368-
client.shutdownClient();
373+
public void close() throws IOException {
374+
client.close();
369375
}
370376
}

src/test/java/io/aiven/connect/elasticsearch/DataConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public void stringKeyedMapCompactFormat() {
234234
SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(),
235235
preProcessedSchema
236236
);
237-
HashMap newValue = (HashMap) converter.preProcessValue(origValue, origSchema, preProcessedSchema);
237+
HashMap<?, ?> newValue = (HashMap<?, ?>) converter.preProcessValue(origValue, origSchema, preProcessedSchema);
238238
assertEquals(origValue, newValue);
239239
}
240240

src/test/java/io/aiven/connect/elasticsearch/bulk/BulkProcessorTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,10 @@ public void ignoreOrWarnOnMalformedDoc() throws InterruptedException {
375375
// Test both IGNORE and WARN options
376376
// There is no difference in logic between IGNORE and WARN, except for the logging.
377377
// Test to ensure they both work the same logically
378-
final List<BehaviorOnMalformedDoc> behaviorsToTest = new ArrayList<BehaviorOnMalformedDoc>() {{
379-
add(BehaviorOnMalformedDoc.WARN);
380-
add(BehaviorOnMalformedDoc.IGNORE);
381-
}};
378+
final List<BehaviorOnMalformedDoc> behaviorsToTest = Arrays.asList(
379+
BehaviorOnMalformedDoc.WARN,
380+
BehaviorOnMalformedDoc.IGNORE
381+
);
382382

383383
for(BehaviorOnMalformedDoc behaviorOnMalformedDoc : behaviorsToTest)
384384
{

src/test/java/io/aiven/connect/elasticsearch/jest/JestElasticsearchClientTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.searchbox.client.JestClient;
3232
import io.searchbox.client.JestClientFactory;
3333
import io.searchbox.client.JestResult;
34+
import io.searchbox.client.config.ElasticsearchVersion;
3435
import io.searchbox.client.config.HttpClientConfig;
3536
import io.searchbox.cluster.NodesInfo;
3637
import io.searchbox.core.BulkResult;
@@ -52,6 +53,7 @@
5253
import org.mockito.ArgumentMatcher;
5354
import org.mockito.InOrder;
5455

56+
import java.io.IOException;
5557
import java.util.ArrayList;
5658
import java.util.HashMap;
5759
import java.util.HashSet;
@@ -169,7 +171,7 @@ private ArgumentMatcher<CreateIndex> isCreateIndexForTestIndex() {
169171
@Override
170172
public boolean matches(CreateIndex createIndex) {
171173
// check the URI as the equals method on CreateIndex doesn't work
172-
return createIndex.getURI().equals(INDEX);
174+
return createIndex.getURI(ElasticsearchVersion.V2).equals(INDEX);
173175
}
174176
};
175177
}
@@ -313,11 +315,11 @@ public void searches() throws Exception {
313315
}
314316

315317
@Test
316-
public void closes() {
318+
public void closes() throws IOException {
317319
JestElasticsearchClient client = new JestElasticsearchClient(jestClient);
318320
client.close();
319321

320-
verify(jestClient).shutdownClient();
322+
verify(jestClient).close();
321323
}
322324

323325
private BulkResult createBulkResultFailure(String exception) {

0 commit comments

Comments
 (0)