-
Notifications
You must be signed in to change notification settings - Fork 61
Adding Kafka end_time support for batch processing and SASL optionality. #607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for contributing!
Left a couple of comments :)
if start_from is not None: | ||
ts_offsets = self._consumer.offsets_for_times(parts) | ||
# get offsets for the timestamp ranges, if given | ||
if start_from is not None and end_time is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if
(start_from is not None) and (end_time is None)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this one I am not that sure, I dont think it really makes it more readable (it is a simple AND
). I made a quick check on the folder as well, and I think it all without the parenthesis (in helpers.py
for instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, sorry, I wasn't clear. I meant what would happen in the case if start_from
is specified and end_time
is None?
It seems that this case is not covered
THis commit adds an end_time functionality to the kafka consumer function which makes it more batch-processing friendly, as it allows the user to achieve indempotency
fef86c8
to
7985d47
Compare
Tell us what you do here
Short description
This PR enhances the Kafka source by adding an end_time parameter to the kafka_consumer function, allowing users to specify an upper time boundary for message consumption. This complements the existing start_from parameter, providing more precise control over the time range of messages to be consumed, allowing for batch processing to be idempotent.
Related Issues
Key changes:
Additional Context