16
16
17
17
package dynamok .sink ;
18
18
19
+ import com .amazonaws .auth .AWSStaticCredentialsProvider ;
19
20
import com .amazonaws .auth .BasicAWSCredentials ;
20
21
import com .amazonaws .auth .InstanceProfileCredentialsProvider ;
22
+ import com .amazonaws .services .dynamodbv2 .AmazonDynamoDB ;
21
23
import com .amazonaws .services .dynamodbv2 .AmazonDynamoDBClient ;
24
+ import com .amazonaws .services .dynamodbv2 .AmazonDynamoDBClientBuilder ;
22
25
import com .amazonaws .services .dynamodbv2 .model .*;
23
26
import com .fasterxml .jackson .core .JsonParseException ;
24
- import com .fasterxml .jackson .core .type .TypeReference ;
25
27
import com .fasterxml .jackson .databind .JsonMappingException ;
26
- import com .fasterxml .jackson .databind .ObjectMapper ;
27
28
import dynamok .Version ;
29
+ import dynamok .commons .Util ;
28
30
import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
29
31
import org .apache .kafka .clients .producer .KafkaProducer ;
30
- import org .apache .kafka .clients .producer .ProducerConfig ;
31
32
import org .apache .kafka .clients .producer .ProducerRecord ;
32
33
import org .apache .kafka .common .TopicPartition ;
33
- import org .apache .kafka .common .serialization .StringSerializer ;
34
34
import org .apache .kafka .connect .data .Schema ;
35
35
import org .apache .kafka .connect .errors .ConnectException ;
36
36
import org .apache .kafka .connect .errors .DataException ;
40
40
import org .slf4j .Logger ;
41
41
import org .slf4j .LoggerFactory ;
42
42
43
+ import java .io .IOException ;
43
44
import java .util .*;
44
45
45
46
@@ -63,92 +64,62 @@ String topAttributeName(ConnectorConfig config) {
63
64
}
64
65
65
66
private final Logger log = LoggerFactory .getLogger (DynamoDbSinkTask .class );
66
- private ObjectMapper objectMapper = new ObjectMapper ();
67
67
68
68
private ConnectorConfig config ;
69
- private AmazonDynamoDBClient client ;
69
+ private AmazonDynamoDB client ;
70
70
private KafkaProducer <String , String > producer ;
71
- private int remainingRetries ;
72
71
73
72
@ Override
74
73
public void start (Map <String , String > props ) {
75
74
config = new ConnectorConfig (props );
76
- producer = getKafkaProducer ();
75
+ producer = Util . getKafkaProducer (config . broker );
77
76
78
77
if (config .accessKeyId .value ().isEmpty () || config .secretKey .value ().isEmpty ()) {
79
- client = new AmazonDynamoDBClient (InstanceProfileCredentialsProvider .getInstance ());
78
+ client = AmazonDynamoDBClientBuilder
79
+ .standard ()
80
+ .withCredentials (InstanceProfileCredentialsProvider .getInstance ())
81
+ .withRegion (config .region )
82
+ .build ();
80
83
log .debug ("AmazonDynamoDBStreamsClient created with DefaultAWSCredentialsProviderChain" );
81
84
} else {
82
85
final BasicAWSCredentials awsCreds = new BasicAWSCredentials (config .accessKeyId .value (), config .secretKey .value ());
83
- client = new AmazonDynamoDBClient (awsCreds );
86
+ client = AmazonDynamoDBClientBuilder
87
+ .standard ()
88
+ .withCredentials (new AWSStaticCredentialsProvider (awsCreds ))
89
+ .withRegion (config .region )
90
+ .build ();
84
91
log .debug ("AmazonDynamoDBClient created with AWS credentials from connector configuration" );
85
92
}
86
-
87
- client .configureRegion (config .region );
88
- remainingRetries = config .maxRetries ;
89
93
}
90
94
91
95
@ Override
92
96
public void put (Collection <SinkRecord > records ) {
93
97
if (records .isEmpty ()) return ;
94
98
95
- try {
96
- if (records .size () == 1 || config .batchSize == 1 ) {
97
- for (final SinkRecord record : records ) {
98
- ProducerRecord <String , String > producerRecord = new ProducerRecord <>(config .errorKafkaTopic ,
99
- "" + record .key (), record .value ().toString ());
100
- Map <String , Object > map ;
101
- try {
102
- map = objectMapper .readValue (record .value ().toString (), new TypeReference <Map <String , String >>() {
103
- });
104
- SinkRecord newRecord = new SinkRecord (record .topic (), record .kafkaPartition (), null ,
105
- record .key (), null , map , record .kafkaOffset ());
106
- client .putItem (tableName (record ), toPutRequest (newRecord ).getItem ());
107
- } catch (JsonParseException | JsonMappingException e ) {
108
- log .error ("Exception occurred while converting JSON to Map: {}" , record , e );
109
- log .info ("Sending to error topic..." );
110
- producer .send (producerRecord );
111
- } catch (AmazonDynamoDBException e ) {
112
- log .error ("Exception in writing into DynamoDB: {}" , record , e );
113
- throw e ;
114
- } catch (Exception e ) {
115
- log .error ("Unknown Exception occurred:" , e );
116
- producer .send (producerRecord );
117
- }
118
- }
119
- } else {
120
- final Iterator <SinkRecord > recordIterator = records .iterator ();
121
- while (recordIterator .hasNext ()) {
122
- final Map <String , List <WriteRequest >> writesByTable = toWritesByTable (recordIterator );
123
- final BatchWriteItemResult batchWriteResponse = client .batchWriteItem (new BatchWriteItemRequest (writesByTable ));
124
- if (!batchWriteResponse .getUnprocessedItems ().isEmpty ()) {
125
- throw new UnprocessedItemsException (batchWriteResponse .getUnprocessedItems ());
126
- }
127
- }
128
- }
129
- } catch (LimitExceededException | ProvisionedThroughputExceededException e ) {
130
- log .debug ("Write failed with Limit/Throughput Exceeded exception; backing off" );
131
- context .timeout (config .retryBackoffMs );
132
- throw new RetriableException (e );
133
- } catch (AmazonDynamoDBException | UnprocessedItemsException e ) {
134
- log .warn ("Write failed, remainingRetries={}" , remainingRetries , e );
135
- if (remainingRetries == 0 ) {
136
- ArrayList <SinkRecord > list = new ArrayList <>(records );
137
- log .error ("Unable to process this range from: {}\n \t \t \t \t \t \t \t to: {}" , list .get (0 ), list .get (list .size () - 1 ));
138
- log .info ("Writing to error kafka topic: {}" , config .errorKafkaTopic );
139
- list .forEach (record -> {
140
- ProducerRecord <String , String > producerRecord = new ProducerRecord <>(config .errorKafkaTopic ,
141
- "" + record .key (), record .value ().toString ());
142
- producer .send (producerRecord );
143
- });
144
- } else {
145
- remainingRetries --;
99
+ for (final SinkRecord record : records ) {
100
+ ProducerRecord <String , String > producerRecord = new ProducerRecord <>(config .errorKafkaTopic ,
101
+ "" + record .key (), record .value ().toString ());
102
+ try {
103
+ SinkRecord newRecord = new SinkRecord (record .topic (), record .kafkaPartition (), null ,
104
+ record .key (), null , Util .jsonToMap (record .value ().toString ()), record .kafkaOffset ());
105
+ client .putItem (tableName (record ), toPutRequest (newRecord ).getItem ());
106
+ } catch (JsonParseException | JsonMappingException e ) {
107
+ log .error ("Exception occurred while converting JSON to Map: {}" , record , e );
108
+ log .info ("Sending to error topic..." );
109
+ producer .send (producerRecord );
110
+ } catch (LimitExceededException | ProvisionedThroughputExceededException e ) {
111
+ log .debug ("Write failed with Limit/Throughput Exceeded exception; backing off" );
146
112
context .timeout (config .retryBackoffMs );
147
113
throw new RetriableException (e );
114
+ } catch (IOException e ) {
115
+ log .error ("Exception occurred in Json Parsing " , e );
116
+ producer .send (producerRecord );
117
+ } catch (AmazonDynamoDBException e ) {
118
+ if (e .getErrorCode ().equalsIgnoreCase ( "ValidationException" )) {
119
+ producer .send (producerRecord );
120
+ } else throw e ;
148
121
}
149
122
}
150
-
151
- remainingRetries = config .maxRetries ;
152
123
}
153
124
154
125
private Map <String , List <WriteRequest >> toWritesByTable (Iterator <SinkRecord > recordIterator ) {
@@ -219,15 +190,4 @@ public String version() {
219
190
return Version .get ();
220
191
}
221
192
222
- private KafkaProducer <String , String > getKafkaProducer () {
223
- Properties props = new Properties ();
224
- props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , config .broker );
225
- props .put (ProducerConfig .CLIENT_ID_CONFIG , "Dynamo Sink Connector Error Pipeline" );
226
- props .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG ,
227
- StringSerializer .class .getName ());
228
- props .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG ,
229
- StringSerializer .class .getName ());
230
- return new KafkaProducer <>(props );
231
- }
232
-
233
193
}
0 commit comments