Skip to content

MINOR: Throw exceptions if source topic is missing #20123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 9, 2025

Conversation

RaidenE1
Copy link
Contributor

@RaidenE1 RaidenE1 commented Jul 8, 2025

In the old protocol, Kafka Streams used to throw a
MissingSourceTopicException when a source topic is missing. In the new
protocol, it doesn’t do that anymore, while only log the status that is
returned from the broker, which contains a status that indicates that a
source topic is missing.

This change:

  1. Throws an MissingSourceTopicException when source topic is missing
  2. Adds unit tests
  3. Modifies integration tests to fit both old and new protocols

Reviewers: Lucas Brutschy [email protected]

@github-actions github-actions bot added triage PRs from the community streams small Small PRs labels Jul 8, 2025
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RaidenE1 Thanks for the PR! I left a few comments.

@@ -91,6 +95,11 @@ public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo testName) thr
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);

// set group protocol according to parameter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd omit inline comments that don't add any new information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, removed it

STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

// set group protocol according to parameter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd omit inline comments that don't add any new information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, removed it

} else if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) {
final String errorMsg = "Missing source topics";
log.error(errorMsg);
throw new MissingSourceTopicException(errorMsg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely need a better error message here. The StreamsGroupHeartbeatResponse contains a statusDetail. Could we bring it to this exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, append the detail at the end

@@ -3873,6 +3874,15 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreads() {
));
thread.runOnceWithoutProcessingThreads();
verify(shutdownErrorHook).run();

// Test MISSING_SOURCE_TOPICS status
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a separate unit test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, add a separate unit test, please check the name, don't know if it's appropriate

@@ -3932,6 +3942,15 @@ public void testStreamsProtocolRunOnceWithProcessingThreads() {
));
thread.runOnceWithProcessingThreads();
verify(shutdownErrorHook).run();

// Test MISSING_SOURCE_TOPICS status
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a separate unit test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, add a separate unit test, please check the name, don't know if it's appropriate

@github-actions github-actions bot removed the small Small PRs label Jul 8, 2025
@github-actions github-actions bot removed the triage PRs from the community label Jul 9, 2025
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@lucasbru
Copy link
Member

lucasbru commented Jul 9, 2025

@RaidenE1 Can you check the CI output? It seems to say you have to run:
./gradlew :streams:integration-tests:spotlessApply

Please mention me with my github handle when another review is needed, that increases the chance that I see it quickly.

@aliehsaeedii
Copy link
Contributor

Thanks, @RaidenE1
could we please remove the internal ticket number from the PR title? Either define a new ticket in AK Jira or consider it as a subtask of an already exiting ticket.
cc: @lucasbru

@RaidenE1 RaidenE1 changed the title KSTREAMS-7492: Throw exceptions if source topic is missing Throw exceptions if source topic is missing Jul 9, 2025
@RaidenE1 RaidenE1 changed the title Throw exceptions if source topic is missing MINOR: Throw exceptions if source topic is missing Jul 9, 2025
@RaidenE1
Copy link
Contributor Author

RaidenE1 commented Jul 9, 2025

@aliehsaeedii Thanks! Already change the title

@RaidenE1
Copy link
Contributor Author

RaidenE1 commented Jul 9, 2025

@lucasbru All checks pass, I think it's ready for merge!

@lucasbru lucasbru merged commit c625b44 into apache:trunk Jul 9, 2025
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants