Skip to content

Adding Kafka end_time support for batch processing and SASL optionality. #607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

lfagliano
Copy link

Tell us what you do here

  • improving, documenting, or customizing an existing source (please link an issue or describe below)

Short description

This PR enhances the Kafka source by adding an end_time parameter to the kafka_consumer function, allowing users to specify an upper time boundary for message consumption. This complements the existing start_from parameter, providing more precise control over the time range of messages to be consumed, allowing for batch processing to be idempotent.

Related Issues

Key changes:

  1. Added end_time parameter to limit message consumption to a specific time
  2. Made SASL authentication parameters optional in KafkaCredentials

Additional Context

  • The changes maintain backward compatibility - existing code using the Kafka source will continue to work without modification

@VioletM VioletM added the support an issue from SE label Apr 9, 2025
@VioletM VioletM self-requested a review April 9, 2025 16:58
Copy link
Contributor

@VioletM VioletM left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for contributing!
Left a couple of comments :)

if start_from is not None:
ts_offsets = self._consumer.offsets_for_times(parts)
# get offsets for the timestamp ranges, if given
if start_from is not None and end_time is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if

(start_from is not None) and (end_time is None)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this one I am not that sure, I dont think it really makes it more readable (it is a simple AND). I made a quick check on the folder as well, and I think it all without the parenthesis (in helpers.py for instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry, I wasn't clear. I meant what would happen in the case if start_from is specified and end_time is None?
It seems that this case is not covered

THis commit adds an end_time functionality to the kafka consumer
function which makes it more batch-processing friendly, as it allows the
user to achieve indempotency
@lfagliano lfagliano force-pushed the vdb-kafka-end-time branch from fef86c8 to 7985d47 Compare April 14, 2025 10:50
@lfagliano lfagliano requested a review from VioletM April 14, 2025 10:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
support an issue from SE
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants