|
20 | 20 | import com.amazonaws.auth.BasicAWSCredentials;
|
21 | 21 | import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
22 | 22 | import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
23 |
| -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; |
24 | 23 | import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
|
25 | 24 | import com.amazonaws.services.dynamodbv2.model.*;
|
26 | 25 | import com.fasterxml.jackson.core.JsonParseException;
|
@@ -105,17 +104,18 @@ public void put(Collection<SinkRecord> records) {
|
105 | 104 | client.putItem(tableName(record), toPutRequest(newRecord).getItem());
|
106 | 105 | } catch (JsonParseException | JsonMappingException e) {
|
107 | 106 | log.error("Exception occurred while converting JSON to Map: {}", record, e);
|
108 |
| - log.info("Sending to error topic..."); |
| 107 | + log.warn("Sending to error topic..."); |
109 | 108 | producer.send(producerRecord);
|
110 | 109 | } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
|
111 | 110 | log.debug("Write failed with Limit/Throughput Exceeded exception; backing off");
|
112 | 111 | context.timeout(config.retryBackoffMs);
|
113 | 112 | throw new RetriableException(e);
|
114 | 113 | } catch (IOException e) {
|
115 |
| - log.error("Exception occurred in Json Parsing ", e); |
| 114 | + log.error("Exception occurred in Json Parsing", e); |
116 | 115 | producer.send(producerRecord);
|
117 |
| - } catch(AmazonDynamoDBException e) { |
118 |
| - if (e.getErrorCode().equalsIgnoreCase( "ValidationException")) { |
| 116 | + } catch (AmazonDynamoDBException e) { |
| 117 | + log.warn("Error in sending data to DynamoDB in record: {}", record, e); |
| 118 | + if (e.getErrorCode().equalsIgnoreCase("ValidationException")) { |
119 | 119 | producer.send(producerRecord);
|
120 | 120 | } else throw e;
|
121 | 121 | }
|
|
0 commit comments