Skip to content

KAFKA-19476: Improve state transition handling in SharePartition #20124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: trunk
Choose a base branch
from

Conversation

adixitconfluent
Copy link
Contributor

@adixitconfluent adixitconfluent commented Jul 8, 2025

About

The way state transition works in SharePartition has a few
problems -

  1. In case we arrive at a state which should be treated as final state
    of that batch/offset (example - LSO movement which causes offset/batch
    to be ARCHIVED permanently), the result of pending write state RPCs for
    that offset/batch should not matter for such cases
  2. There is no locking available for state transitions which can lead to
    inconsistency in case a request to archive record and rollback state
    transition request arrive at the same
    time.
  3. If an acquisition lock timeout occurs while an offset/batch is
    undergoing transition followed by write state RPC failure, then we can
    land in a scenario where the offset stays in ACQUIRED state with no
    acquisition lock timeout task.

Testing

The code has been tested with new and exiting unit tests and existing
integration tests.

Reviewers: Apoorv Mittal [email protected], Andrew Schofield
[email protected]

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Jul 8, 2025
@adixitconfluent adixitconfluent marked this pull request as ready for review July 9, 2025 08:17
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, some comments.

@AndrewJSchofield AndrewJSchofield self-requested a review July 9, 2025 14:46
@AndrewJSchofield AndrewJSchofield removed the triage PRs from the community label Jul 9, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A few comments for your consideration.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, though it's a good progress but I have 1 doubt.

// hasn't reached a terminal state. If acquisition lock has expired by that time, the record can
// be stuck in ACQUIRED state unless we run the acquisition lock task again.
if (!state.isTerminalState() && state.acquisitionLockTimeoutTask.hasExpired()) {
state.acquisitionLockTimeoutTask.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will record the metric of timeout again. The previous run of timeout task must have already issued a call to persister in background then is it not concerning us?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a call to persister only if stateBatches is non-empty. Since the state of the record in this situation cannot be ACQUIRED (it will be in AVAILABLE/ACKNOWLEDGED/ARCHIVED state because of an ongoing transition) during the first acquisition lock timeout, state batches cannot have an entry for this record state. Thus, there won't be any persister calls for this record.

Regarding "This will record the metric of timeout again.", I have a added a code change as described below to not record the metric of timeout twice.

if (!hasExpired) {
                sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1);
}  

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. The terminal state is easier to follow, I think.

}

long expirationMs() {
return expirationMs;
}

boolean hasExpired() {
return hasExpired;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to me to be accessed on multiple threads. It is checked underneath the share-partition write lock to determine whether to run the task following a write error, but it can also change if the timer task runs normally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I think it makes sense to make this function thread safe. I have added synchronized around hasExpired() and run() functions.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Some minor comments.

assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());

// LSO is at 9.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// LSO is at 9.
// Move LSO to 9, so some records/offsets can be marked archived.

Comment on lines 7531 to 7535
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(7L).state());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(8L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(9L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(10L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(11L).state());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not being test as future2 is never compeleted, is it intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intended because we are testing that the records that have been marked ARCHIVED due to LSO movement should remain ARCHIVED even though we have failures in Write State RPC for those records. Perhaps, I'll just remove the last 2 asserts to avoid any confusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might want to fail the future2 as well here to see that offsets past 9 are back in Acquired, due to rollback. That should test completely that some of the offsets are archived but some rollbacked. Or is that test exists, can you please point me to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @apoorvmittal10 , while writing the test as you mentioned, I encountered another problem in the code where during write state call processing, a batch can be split into offsets. This can happen due to LSO movement etc. Hence, we should deal with that issue in a separate JIRA https://issues.apache.org/jira/browse/KAFKA-19502 and we should change the test case then

Comment on lines +7554 to +7558
},
() -> {
inFlightState.completeStateTransition(false);
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test case when commit succeeds. It will be good to have so future refactoring do not introduce new issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -7467,6 +7468,164 @@ public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() {
assertEquals(20, sharePartition.nextFetchOffset());
}

@Test
public void testLsoMovementWithWriteStateRPCFailuresInAck() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void testLsoMovementWithWriteStateRPCFailuresInAck() {
public void testLsoMovementWithWriteStateRPCFailuresInAcknowledgement() {

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I ll take up the other issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants