-
Notifications
You must be signed in to change notification settings - Fork 14.5k
MINOR: Throw exceptions if source topic is missing #20123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RaidenE1 Thanks for the PR! I left a few comments.
@@ -91,6 +95,11 @@ public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo testName) thr | |||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |||
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); | |||
streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000); | |||
|
|||
// set group protocol according to parameter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd omit inline comments that don't add any new information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, removed it
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); | ||
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); | ||
|
||
// set group protocol according to parameter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd omit inline comments that don't add any new information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, removed it
} else if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) { | ||
final String errorMsg = "Missing source topics"; | ||
log.error(errorMsg); | ||
throw new MissingSourceTopicException(errorMsg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely need a better error message here. The StreamsGroupHeartbeatResponse
contains a statusDetail
. Could we bring it to this exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, append the detail at the end
@@ -3873,6 +3874,15 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreads() { | |||
)); | |||
thread.runOnceWithoutProcessingThreads(); | |||
verify(shutdownErrorHook).run(); | |||
|
|||
// Test MISSING_SOURCE_TOPICS status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a separate unit test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, add a separate unit test, please check the name, don't know if it's appropriate
@@ -3932,6 +3942,15 @@ public void testStreamsProtocolRunOnceWithProcessingThreads() { | |||
)); | |||
thread.runOnceWithProcessingThreads(); | |||
verify(shutdownErrorHook).run(); | |||
|
|||
// Test MISSING_SOURCE_TOPICS status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a separate unit test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, add a separate unit test, please check the name, don't know if it's appropriate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
@RaidenE1 Can you check the CI output? It seems to say you have to run: Please mention me with my github handle when another review is needed, that increases the chance that I see it quickly. |
@aliehsaeedii Thanks! Already change the title |
@lucasbru All checks pass, I think it's ready for merge! |
In the old protocol, Kafka Streams used to throw a
MissingSourceTopicException
when a source topic is missing. In the newprotocol, it doesn’t do that anymore, while only log the status that is
returned from the broker, which contains a status that indicates that a
source topic is missing.
This change:
MissingSourceTopicException
when source topic is missingReviewers: Lucas Brutschy [email protected]