Skip to content

Commit 4100319

Browse files
authored
Fix Fetch._reset_offsets_async() KeyError when fetching from multiple nodes (#2612)
1 parent 827832a commit 4100319

File tree

2 files changed

+23
-14
lines changed

2 files changed

+23
-14
lines changed

kafka/consumer/fetcher.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ def _reset_offsets_async(self, timestamps):
418418
expire_at = time.time() + self.config['request_timeout_ms'] / 1000
419419
self._subscriptions.set_reset_pending(partitions, expire_at)
420420

421-
def on_success(result):
421+
def on_success(timestamps_and_epochs, result):
422422
fetched_offsets, partitions_to_retry = result
423423
if partitions_to_retry:
424424
self._subscriptions.reset_failed(partitions_to_retry, time.time() + self.config['retry_backoff_ms'] / 1000)
@@ -428,7 +428,7 @@ def on_success(result):
428428
ts, _epoch = timestamps_and_epochs[partition]
429429
self._reset_offset_if_needed(partition, ts, offset.offset)
430430

431-
def on_failure(error):
431+
def on_failure(partitions, error):
432432
self._subscriptions.reset_failed(partitions, time.time() + self.config['retry_backoff_ms'] / 1000)
433433
self._client.cluster.request_update()
434434

@@ -439,8 +439,8 @@ def on_failure(error):
439439
log.error("Discarding error in ListOffsetResponse because another error is pending: %s", error)
440440

441441
future = self._send_list_offsets_request(node_id, timestamps_and_epochs)
442-
future.add_callback(on_success)
443-
future.add_errback(on_failure)
442+
future.add_callback(on_success, timestamps_and_epochs)
443+
future.add_errback(on_failure, partitions)
444444

445445
def _send_list_offsets_requests(self, timestamps):
446446
"""Fetch offsets for each partition in timestamps dict. This may send

test/test_fetcher.py

+19-10
Original file line numberDiff line numberDiff line change
@@ -134,18 +134,27 @@ def test_reset_offsets_if_needed(fetcher, topic, mocker):
134134

135135

136136
def test__reset_offsets_async(fetcher, mocker):
137-
tp = TopicPartition("topic", 0)
137+
tp0 = TopicPartition("topic", 0)
138+
tp1 = TopicPartition("topic", 1)
138139
fetcher._subscriptions.subscribe(topics=["topic"])
139-
fetcher._subscriptions.assign_from_subscribed([tp])
140-
fetcher._subscriptions.request_offset_reset(tp)
141-
fetched_offsets = {tp: OffsetAndTimestamp(1001, None, -1)}
140+
fetcher._subscriptions.assign_from_subscribed([tp0, tp1])
141+
fetcher._subscriptions.request_offset_reset(tp0)
142+
fetcher._subscriptions.request_offset_reset(tp1)
143+
mocker.patch.object(fetcher._client.cluster, "leader_for_partition", side_effect=[0, 1])
142144
mocker.patch.object(fetcher._client, 'ready', return_value=True)
143-
mocker.patch.object(fetcher, '_send_list_offsets_request',
144-
return_value=Future().success((fetched_offsets, set())))
145-
mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0)
146-
fetcher._reset_offsets_async({tp: OffsetResetStrategy.EARLIEST})
147-
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
148-
assert fetcher._subscriptions.assignment[tp].position.offset == 1001
145+
future1 = Future()
146+
future2 = Future()
147+
mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=[future1, future2])
148+
fetcher._reset_offsets_async({
149+
tp0: OffsetResetStrategy.EARLIEST,
150+
tp1: OffsetResetStrategy.EARLIEST,
151+
})
152+
future1.success(({tp0: OffsetAndTimestamp(1001, None, -1)}, set())),
153+
future2.success(({tp1: OffsetAndTimestamp(1002, None, -1)}, set())),
154+
assert not fetcher._subscriptions.assignment[tp0].awaiting_reset
155+
assert not fetcher._subscriptions.assignment[tp1].awaiting_reset
156+
assert fetcher._subscriptions.assignment[tp0].position.offset == 1001
157+
assert fetcher._subscriptions.assignment[tp1].position.offset == 1002
149158

150159

151160
def test__send_list_offsets_requests(fetcher, mocker):

0 commit comments

Comments
 (0)