-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19476: Improve state transition handling in SharePartition #20124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. A few comments for your consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, though it's a good progress but I have 1 doubt.
// hasn't reached a terminal state. If acquisition lock has expired by that time, the record can | ||
// be stuck in ACQUIRED state unless we run the acquisition lock task again. | ||
if (!state.isTerminalState() && state.acquisitionLockTimeoutTask.hasExpired()) { | ||
state.acquisitionLockTimeoutTask.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will record the metric of timeout again. The previous run of timeout task must have already issued a call to persister in background then is it not concerning us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a call to persister only if stateBatches
is non-empty. Since the state of the record in this situation cannot be ACQUIRED
(it will be in AVAILABLE/ACKNOWLEDGED/ARCHIVED
state because of an ongoing transition) during the first acquisition lock timeout, state batches cannot have an entry for this record state. Thus, there won't be any persister calls for this record.
Regarding "This will record the metric of timeout again.", I have a added a code change as described below to not record the metric of timeout twice.
if (!hasExpired) {
sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. The terminal state is easier to follow, I think.
} | ||
|
||
long expirationMs() { | ||
return expirationMs; | ||
} | ||
|
||
boolean hasExpired() { | ||
return hasExpired; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to me to be accessed on multiple threads. It is checked underneath the share-partition write lock to determine whether to run the task following a write error, but it can also change if the timer task runs normally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I think it makes sense to make this function thread safe. I have added synchronized around hasExpired()
and run()
functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Some minor comments.
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); | ||
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); | ||
|
||
// LSO is at 9. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// LSO is at 9. | |
// Move LSO to 9, so some records/offsets can be marked archived. |
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(7L).state()); | ||
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(8L).state()); | ||
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(9L).state()); | ||
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(10L).state()); | ||
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(11L).state()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are not being test as future2 is never compeleted, is it intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is intended because we are testing that the records that have been marked ARCHIVED due to LSO movement should remain ARCHIVED even though we have failures in Write State RPC for those records. Perhaps, I'll just remove the last 2 asserts to avoid any confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might want to fail the future2 as well here to see that offsets past 9 are back in Acquired, due to rollback. That should test completely that some of the offsets are archived but some rollbacked. Or is that test exists, can you please point me to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @apoorvmittal10 , while writing the test as you mentioned, I encountered another problem in the code where during write state call processing, a batch can be split into offsets. This can happen due to LSO movement etc. Hence, we should deal with that issue in a separate JIRA https://issues.apache.org/jira/browse/KAFKA-19502 and we should change the test case then
}, | ||
() -> { | ||
inFlightState.completeStateTransition(false); | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a test case when commit succeeds. It will be good to have so future refactoring do not introduce new issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -7467,6 +7468,164 @@ public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() { | |||
assertEquals(20, sharePartition.nextFetchOffset()); | |||
} | |||
|
|||
@Test | |||
public void testLsoMovementWithWriteStateRPCFailuresInAck() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void testLsoMovementWithWriteStateRPCFailuresInAck() { | |
public void testLsoMovementWithWriteStateRPCFailuresInAcknowledgement() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I ll take up the other issue.
About
The way state transition works in SharePartition has a few
problems -
of that batch/offset (example - LSO movement which causes offset/batch
to be ARCHIVED permanently), the result of pending write state RPCs for
that offset/batch should not matter for such cases
inconsistency in case a request to archive record and rollback state
transition request arrive at the same
time.
undergoing transition followed by write state RPC failure, then we can
land in a scenario where the offset stays in ACQUIRED state with no
acquisition lock timeout task.
Testing
The code has been tested with new and exiting unit tests and existing
integration tests.
Reviewers: Apoorv Mittal [email protected], Andrew Schofield
[email protected]