Skip to content

Commit 26a0755

Browse files
committed
[fix][io] Make record properties configurable for kinesis source (#24495)
(cherry picked from commit e0efcbb)
1 parent 03b497b commit 26a0755

File tree

5 files changed

+222
-12
lines changed

5 files changed

+222
-12
lines changed

pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecord.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,49 @@
2424
import java.util.HashMap;
2525
import java.util.Map;
2626
import java.util.Optional;
27+
import java.util.Set;
2728
import org.apache.pulsar.functions.api.Record;
2829
import software.amazon.awssdk.services.kinesis.model.EncryptionType;
2930
import software.amazon.kinesis.retrieval.KinesisClientRecord;
3031

3132
public class KinesisRecord implements Record<byte[]> {
32-
public static final String ARRIVAL_TIMESTAMP = "";
33-
public static final String ENCRYPTION_TYPE = "";
34-
public static final String PARTITION_KEY = "";
35-
public static final String SEQUENCE_NUMBER = "";
33+
public static final String ARRIVAL_TIMESTAMP = "kinesis.arrival.timestamp";
34+
public static final String ENCRYPTION_TYPE = "kinesis.encryption.type";
35+
public static final String PARTITION_KEY = "kinesis.partition.key";
36+
public static final String SEQUENCE_NUMBER = "kinesis.sequence.number";
37+
public static final String SHARD_ID = "kinesis.shard.id";
38+
public static final String MILLIS_BEHIND_LATEST = "kinesis.millis.behind.latest";
3639

3740
private static final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
3841
private final Optional<String> key;
3942
private final byte[] value;
4043
private final HashMap<String, String> userProperties = new HashMap<>();
41-
public KinesisRecord(KinesisClientRecord record) {
44+
public KinesisRecord(KinesisClientRecord record, String shardId, long millisBehindLatest,
45+
Set<String> propertiesToInclude) {
4246
this.key = Optional.of(record.partitionKey());
4347
// encryption type can (annoyingly) be null, so we default to NONE
4448
EncryptionType encType = EncryptionType.NONE;
4549
if (record.encryptionType() != null) {
4650
encType = record.encryptionType();
4751
}
48-
setProperty(ARRIVAL_TIMESTAMP, record.approximateArrivalTimestamp().toString());
49-
setProperty(ENCRYPTION_TYPE, encType.toString());
50-
setProperty(PARTITION_KEY, record.partitionKey());
51-
setProperty(SEQUENCE_NUMBER, record.sequenceNumber());
52+
if (propertiesToInclude.contains(ARRIVAL_TIMESTAMP)) {
53+
setProperty(ARRIVAL_TIMESTAMP, record.approximateArrivalTimestamp().toString());
54+
}
55+
if (propertiesToInclude.contains(ENCRYPTION_TYPE)) {
56+
setProperty(ENCRYPTION_TYPE, encType.toString());
57+
}
58+
if (propertiesToInclude.contains(PARTITION_KEY)) {
59+
setProperty(PARTITION_KEY, record.partitionKey());
60+
}
61+
if (propertiesToInclude.contains(SEQUENCE_NUMBER)) {
62+
setProperty(SEQUENCE_NUMBER, record.sequenceNumber());
63+
}
64+
if (propertiesToInclude.contains(SHARD_ID)) {
65+
setProperty(SHARD_ID, shardId);
66+
}
67+
if (propertiesToInclude.contains(MILLIS_BEHIND_LATEST)) {
68+
setProperty(MILLIS_BEHIND_LATEST, String.valueOf(millisBehindLatest));
69+
}
5270

5371
if (encType == EncryptionType.NONE) {
5472
String s = null;

pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.io.kinesis;
2020

21+
import java.util.Set;
2122
import java.util.concurrent.LinkedBlockingQueue;
2223
import lombok.extern.slf4j.Slf4j;
2324
import software.amazon.kinesis.exceptions.InvalidStateException;
@@ -43,11 +44,13 @@ public class KinesisRecordProcessor implements ShardRecordProcessor {
4344
private final LinkedBlockingQueue<KinesisRecord> queue;
4445
private long nextCheckpointTimeInNanos;
4546
private String kinesisShardId;
47+
private final Set<String> propertiesToInclude;
4648
public KinesisRecordProcessor(LinkedBlockingQueue<KinesisRecord> queue, KinesisSourceConfig config) {
4749
this.queue = queue;
4850
this.checkpointInterval = config.getCheckpointInterval();
4951
this.numRetries = config.getNumRetries();
5052
this.backoffTime = config.getBackoffTime();
53+
this.propertiesToInclude = config.getPropertiesToInclude();
5154
}
5255

5356
private void checkpoint(RecordProcessorCheckpointer checkpointer) {
@@ -82,16 +85,20 @@ private void checkpoint(RecordProcessorCheckpointer checkpointer) {
8285
@Override
8386
public void initialize(InitializationInput initializationInput) {
8487
kinesisShardId = initializationInput.shardId();
88+
log.info("Initializing KinesisRecordProcessor for shard {}. Config: checkpointInterval={}ms, numRetries={}, "
89+
+ "backoffTime={}ms, propertiesToInclude={}",
90+
kinesisShardId, checkpointInterval, numRetries, backoffTime, propertiesToInclude);
8591
}
8692

8793
@Override
8894
public void processRecords(ProcessRecordsInput processRecordsInput) {
8995

9096
log.info("Processing " + processRecordsInput.records().size() + " records from " + kinesisShardId);
97+
long millisBehindLatest = processRecordsInput.millisBehindLatest();
9198

9299
for (KinesisClientRecord record : processRecordsInput.records()) {
93100
try {
94-
queue.put(new KinesisRecord(record));
101+
queue.put(new KinesisRecord(record, this.kinesisShardId, millisBehindLatest, propertiesToInclude));
95102
} catch (InterruptedException e) {
96103
log.warn("unable to create KinesisRecord ", e);
97104
}

pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@
2222
import static org.apache.commons.lang3.StringUtils.isNotBlank;
2323
import java.io.Serializable;
2424
import java.net.URI;
25+
import java.util.Arrays;
26+
import java.util.Collections;
2527
import java.util.Date;
2628
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.stream.Collectors;
2731
import lombok.Data;
2832
import lombok.EqualsAndHashCode;
2933
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
@@ -130,6 +134,16 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab
130134
)
131135
private boolean useEnhancedFanOut = true;
132136

137+
@FieldDoc(required = false,
138+
defaultValue = "kinesis.arrival.timestamp,kinesis.encryption.type,kinesis.partition.key,"
139+
+ "kinesis.sequence.number",
140+
help = "A comma-separated list of Kinesis metadata properties to include in the Pulsar message properties."
141+
+ " The supported properties are: kinesis.arrival.timestamp, kinesis.encryption.type, "
142+
+ "kinesis.partition.key, kinesis.sequence.number, kinesis.shard.id, kinesis.millis.behind.latest")
143+
private String kinesisRecordProperties = "kinesis.arrival.timestamp,kinesis.encryption.type,"
144+
+ "kinesis.partition.key,kinesis.sequence.number";
145+
private transient Set<String> propertiesToInclude;
146+
133147
public static KinesisSourceConfig load(Map<String, Object> config, SourceContext sourceContext) {
134148
KinesisSourceConfig kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config,
135149
KinesisSourceConfig.class, sourceContext);
@@ -143,6 +157,15 @@ && isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint())
143157
checkArgument((kinesisSourceConfig.getStartAtTime() != null),
144158
"When initialPositionInStream is AT_TIMESTAMP, startAtTime must be specified");
145159
}
160+
if (isNotBlank(kinesisSourceConfig.getKinesisRecordProperties())) {
161+
Set<String> properties = Arrays.stream(kinesisSourceConfig.getKinesisRecordProperties().split(","))
162+
.map(String::trim)
163+
.filter(s -> !s.isEmpty())
164+
.collect(Collectors.toSet());
165+
kinesisSourceConfig.setPropertiesToInclude(properties);
166+
} else {
167+
kinesisSourceConfig.setPropertiesToInclude(Collections.emptySet());
168+
}
146169
return kinesisSourceConfig;
147170
}
148171

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.io.kinesis;
20+
21+
import static org.mockito.Mockito.when;
22+
import static org.testng.Assert.assertEquals;
23+
import static org.testng.Assert.assertFalse;
24+
import static org.testng.Assert.assertTrue;
25+
import java.nio.ByteBuffer;
26+
import java.nio.charset.StandardCharsets;
27+
import java.time.Instant;
28+
import java.util.Arrays;
29+
import java.util.Collections;
30+
import java.util.HashSet;
31+
import java.util.Map;
32+
import java.util.Set;
33+
import org.mockito.Mockito;
34+
import org.testng.annotations.BeforeMethod;
35+
import org.testng.annotations.Test;
36+
import software.amazon.awssdk.services.kinesis.model.EncryptionType;
37+
import software.amazon.kinesis.retrieval.KinesisClientRecord;
38+
39+
public class KinesisRecordTest {
40+
41+
private KinesisClientRecord mockRecord;
42+
private final String shardId = "shard-001";
43+
private final long millisBehindLatest = 12345L;
44+
private final String partitionKey = "test-key";
45+
private final String sequenceNumber = "seq-123";
46+
private final Instant arrivalTimestamp = Instant.now();
47+
48+
@BeforeMethod
49+
public void setup() {
50+
mockRecord = Mockito.mock(KinesisClientRecord.class);
51+
when(mockRecord.partitionKey()).thenReturn(partitionKey);
52+
when(mockRecord.sequenceNumber()).thenReturn(sequenceNumber);
53+
when(mockRecord.approximateArrivalTimestamp()).thenReturn(arrivalTimestamp);
54+
when(mockRecord.encryptionType()).thenReturn(EncryptionType.NONE);
55+
when(mockRecord.data()).thenReturn(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8)));
56+
}
57+
58+
@Test
59+
public void testAllPropertiesIncluded() {
60+
Set<String> propertiesToInclude = new HashSet<>(Arrays.asList(
61+
KinesisRecord.ARRIVAL_TIMESTAMP,
62+
KinesisRecord.ENCRYPTION_TYPE,
63+
KinesisRecord.PARTITION_KEY,
64+
KinesisRecord.SEQUENCE_NUMBER,
65+
KinesisRecord.SHARD_ID,
66+
KinesisRecord.MILLIS_BEHIND_LATEST
67+
));
68+
69+
KinesisRecord kinesisRecord = new KinesisRecord(mockRecord, shardId, millisBehindLatest, propertiesToInclude);
70+
Map<String, String> properties = kinesisRecord.getProperties();
71+
72+
assertEquals(properties.size(), 6);
73+
assertEquals(properties.get(KinesisRecord.SHARD_ID), shardId);
74+
assertEquals(properties.get(KinesisRecord.MILLIS_BEHIND_LATEST), String.valueOf(millisBehindLatest));
75+
assertEquals(properties.get(KinesisRecord.PARTITION_KEY), partitionKey);
76+
assertEquals(properties.get(KinesisRecord.SEQUENCE_NUMBER), sequenceNumber);
77+
assertEquals(properties.get(KinesisRecord.ARRIVAL_TIMESTAMP), arrivalTimestamp.toString());
78+
assertEquals(properties.get(KinesisRecord.ENCRYPTION_TYPE), EncryptionType.NONE.toString());
79+
}
80+
81+
@Test
82+
public void testSomePropertiesIncluded() {
83+
Set<String> propertiesToInclude = new HashSet<>(Arrays.asList(
84+
KinesisRecord.SHARD_ID,
85+
KinesisRecord.SEQUENCE_NUMBER
86+
));
87+
88+
KinesisRecord kinesisRecord = new KinesisRecord(mockRecord, shardId, millisBehindLatest, propertiesToInclude);
89+
Map<String, String> properties = kinesisRecord.getProperties();
90+
91+
assertEquals(properties.size(), 2);
92+
assertTrue(properties.containsKey(KinesisRecord.SHARD_ID));
93+
assertTrue(properties.containsKey(KinesisRecord.SEQUENCE_NUMBER));
94+
95+
assertFalse(properties.containsKey(KinesisRecord.PARTITION_KEY));
96+
assertFalse(properties.containsKey(KinesisRecord.ARRIVAL_TIMESTAMP));
97+
assertFalse(properties.containsKey(KinesisRecord.ENCRYPTION_TYPE));
98+
assertFalse(properties.containsKey(KinesisRecord.MILLIS_BEHIND_LATEST));
99+
}
100+
101+
@Test
102+
public void testNoPropertiesIncluded() {
103+
Set<String> propertiesToInclude = Collections.emptySet();
104+
105+
KinesisRecord kinesisRecord = new KinesisRecord(mockRecord, shardId, millisBehindLatest, propertiesToInclude);
106+
Map<String, String> properties = kinesisRecord.getProperties();
107+
108+
assertTrue(properties.isEmpty());
109+
}
110+
}

pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertNotNull;
23+
import static org.testng.Assert.assertTrue;
2324

2425
import java.io.IOException;
2526
import java.time.ZoneOffset;
@@ -28,7 +29,7 @@
2829
import java.util.Date;
2930
import java.util.HashMap;
3031
import java.util.Map;
31-
32+
import java.util.Set;
3233
import org.apache.pulsar.io.core.SourceContext;
3334
import org.mockito.Mockito;
3435
import org.testng.annotations.Test;
@@ -160,4 +161,55 @@ public final void missCloudWatchEndpointTest() {
160161
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
161162
KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class));
162163
}
163-
}
164+
165+
@Test
166+
public final void propertiesDefaultTest() {
167+
Map<String, Object> map = new HashMap<>();
168+
map.put("awsRegion", "us-west-1");
169+
map.put("awsKinesisStreamName", "my-stream");
170+
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
171+
172+
SourceContext sourceContext = Mockito.mock(SourceContext.class);
173+
KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext);
174+
175+
Set<String> properties = config.getPropertiesToInclude();
176+
assertEquals(properties.size(), 4);
177+
assertTrue(properties.contains("kinesis.sequence.number"));
178+
assertTrue(properties.contains("kinesis.arrival.timestamp"));
179+
assertTrue(properties.contains("kinesis.encryption.type"));
180+
assertTrue(properties.contains("kinesis.partition.key"));
181+
}
182+
183+
@Test
184+
public final void propertiesCustomTest() {
185+
Map<String, Object> map = new HashMap<>();
186+
map.put("awsRegion", "us-west-1");
187+
map.put("awsKinesisStreamName", "my-stream");
188+
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
189+
// Set custom properties, note the extra whitespace to test trim()
190+
map.put("kinesisRecordProperties", "kinesis.shard.id, kinesis.partition.key ");
191+
192+
SourceContext sourceContext = Mockito.mock(SourceContext.class);
193+
KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext);
194+
195+
Set<String> properties = config.getPropertiesToInclude();
196+
assertEquals(properties.size(), 2);
197+
assertTrue(properties.contains("kinesis.shard.id"));
198+
assertTrue(properties.contains("kinesis.partition.key"));
199+
}
200+
201+
@Test
202+
public final void propertiesEmptyTest() {
203+
Map<String, Object> map = new HashMap<>();
204+
map.put("awsRegion", "us-west-1");
205+
map.put("awsKinesisStreamName", "my-stream");
206+
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
207+
map.put("kinesisRecordProperties", "");
208+
209+
SourceContext sourceContext = Mockito.mock(SourceContext.class);
210+
KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext);
211+
212+
Set<String> properties = config.getPropertiesToInclude();
213+
assertTrue(properties.isEmpty());
214+
}
215+
}

0 commit comments

Comments
 (0)