Skip to content

[fix][io] Make record properties configurable for kinesis source #24495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 10, 2025

Conversation

shibd
Copy link
Member

@shibd shibd commented Jul 9, 2025

Motivation

The Kinesis source connector's handling of metadata properties was rigid and contained a critical bug.

  • Properties were not configurable: Users could not select which Kinesis metadata properties to include in Pulsar messages. This forced all properties to be included, which could be inefficient in terms of message size.

  • Key Collision Bug: A bug was discovered where all property keys were defined as an empty string (""), causing a key collision that resulted in the loss of all metadata except for the last one set (sequenceNumber).

This PR fixes these issues by introducing a configuration to control which properties are included, which also resolves the underlying bug.

Modifications

  • Added a new configuration, kinesisRecordProperties, to KinesisSourceConfig.java. This allows users to provide a comma-separated list of properties to include.
  • The default value retains all previously available properties to ensure backward compatibility.
  • As part of making the properties configurable, the empty string constants in KinesisRecord.java were replaced with unique, descriptive keys (e.g., "kinesis.arrival.timestamp"), fixing the data loss bug.

Verifying this change

  • Add unit test to covert this change

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@codecov-commenter
Copy link

codecov-commenter commented Jul 9, 2025

Codecov Report

Attention: Patch coverage is 76.92308% with 6 lines in your changes missing coverage. Please review.

Project coverage is 74.28%. Comparing base (bbc6224) to head (42fc078).
Report is 1187 commits behind head on master.

Files with missing lines Patch % Lines
...ache/pulsar/io/kinesis/KinesisRecordProcessor.java 0.00% 5 Missing ⚠️
.../apache/pulsar/io/kinesis/KinesisSourceConfig.java 87.50% 0 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24495      +/-   ##
============================================
+ Coverage     73.57%   74.28%   +0.71%     
- Complexity    32624    32843     +219     
============================================
  Files          1877     1868       -9     
  Lines        139502   145902    +6400     
  Branches      15299    16728    +1429     
============================================
+ Hits         102638   108390    +5752     
- Misses        28908    28914       +6     
- Partials       7956     8598     +642     
Flag Coverage Δ
inttests 26.71% <ø> (+2.13%) ⬆️
systests 23.33% <0.00%> (-0.99%) ⬇️
unittests 73.77% <76.92%> (+0.92%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/pulsar/io/kinesis/KinesisRecord.java 73.52% <100.00%> (+73.52%) ⬆️
.../apache/pulsar/io/kinesis/KinesisSourceConfig.java 27.50% <87.50%> (-9.76%) ⬇️
...ache/pulsar/io/kinesis/KinesisRecordProcessor.java 0.00% <0.00%> (ø)

... and 1091 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@RobertIndie RobertIndie requested a review from Copilot July 10, 2025 02:20
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR makes Kinesis metadata properties configurable and fixes a key-collision bug by giving each property a unique key. It updates the source configuration, record processor, and record class to honor a user-defined list of properties, and adds unit tests for the new behavior.

  • Introduce kinesisRecordProperties config to select which metadata properties to include
  • Replace empty-string constants in KinesisRecord with descriptive keys and conditionally set them
  • Update processor and config loading logic, and add tests for default, custom, and empty property lists

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
KinesisSourceConfig.java Add kinesisRecordProperties field, parse it into a propertiesToInclude set
KinesisRecordProcessor.java Store and pass propertiesToInclude into each KinesisRecord; update logging
KinesisRecord.java Replace empty constants with descriptive keys; conditionally populate properties
KinesisSourceConfigTests.java Add tests for default, custom, and empty property configurations
KinesisRecordTest.java Add tests covering all/some/none property inclusion scenarios
Comments suppressed due to low confidence (1)

pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java:176

  • The default properties test only verifies the total count and two keys; consider asserting all six default properties are present to ensure full coverage.
        assertEquals(properties.size(), 6);

@shibd shibd merged commit e0efcbb into apache:master Jul 10, 2025
51 checks passed
shibd added a commit that referenced this pull request Jul 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants