Skip to content

MINOR: Improve docs for retries & cleanup #19595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ public class CommonClientConfigs {
"If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.";

public static final String RETRIES_CONFIG = "retries";
public static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error." +
" It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request.";
public static final String RETRIES_DOC = "It is recommended to set the value to either `MAX_VALUE` or zero, and use corresponding timeout parameters to control how long a client should retry a request." +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can change '`' to <code>, so we can have syntax on document.

Suggested change
public static final String RETRIES_DOC = "It is recommended to set the value to either `MAX_VALUE` or zero, and use corresponding timeout parameters to control how long a client should retry a request." +
public static final String RETRIES_DOC = "It is recommended to set the value to either <code>MAX_VALUE</code> or zero, and use corresponding timeout parameters to control how long a client should retry a request." +

before:
Screenshot 2025-04-30 at 3 24 22 PM

after:
Screenshot 2025-04-30 at 3 22 56 PM

" Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error." +
" Setting a value of zero will lead to transient errors not being retried, and they will be propagated to the application to be handled.";

public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
this.transactionManager = configureTransactionState(config, logContext);
// There is no need to do work required for adaptive partitioning, if we use a custom partitioner.
boolean enableAdaptivePartitioning = partitionerPlugin.get() == null &&
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
Copy link
Collaborator

@mingyen066 mingyen066 Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st found that this is public interface and I found someone has previously renamed this config. The earlier conclusion was that we should submit a trivial KIP to deprecate this config.
#17908

Copy link
Member Author

@lianetm lianetm Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh totally, good catch. Removed all the changes on this

config.getBoolean(ProducerConfig.PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG);
RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
enableAdaptivePartitioning,
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
Expand Down Expand Up @@ -1502,11 +1502,6 @@ String getClientId() {
return clientId;
}

// Visible for testing
TransactionManager getTransactionManager() {
return transactionManager;
}

private static class ClusterAndWaitTime {
final Cluster cluster;
final long waitedOnMetadataMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class ProducerConfig extends AbstractConfig {
+ "similar or lower producer latency despite the increased linger.";

/** <code>partitioner.adaptive.partitioning.enable</code> */
public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable";
public static final String PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable";
private static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also typo.

Suggested change
private static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC =
private static final String PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_DOC =

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave of the adaptive typo changes out, it probably makes sense to address them together when the public config can be fixed.

"When set to 'true', the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers. "
+ "If 'false', the producer will try to distribute messages uniformly. Note: this setting has no effect if a custom partitioner is used.";
Expand All @@ -110,7 +110,7 @@ public class ProducerConfig extends AbstractConfig {
private static final String PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC =
"If a broker cannot process produce requests from a partition for <code>" + PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG + "</code> time, "
+ "the partitioner treats that partition as not available. If the value is 0, this logic is disabled. "
+ "Note: this setting has no effect if a custom partitioner is used or <code>" + PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG
+ "Note: this setting has no effect if a custom partitioner is used or <code>" + PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG
+ "</code> is set to 'false'.";

/** <code>partitioner.ignore.keys</code> */
Expand Down Expand Up @@ -276,6 +276,7 @@ public class ProducerConfig extends AbstractConfig {
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
+ " Note that this retry is no different than if the client resent the record upon receiving the error."
+ " Setting a value of zero will make that transient errors won't be retried, and will be propagated to the application to be handled."
+ " Produce requests will be failed before the number of retries has been exhausted if the timeout configured by"
+ " <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> expires first before successful acknowledgement. Users should generally"
+ " prefer to leave this config unset and instead use <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> to control"
Expand Down Expand Up @@ -390,7 +391,7 @@ public class ProducerConfig extends AbstractConfig {
.define(COMPRESSION_LZ4_LEVEL_CONFIG, Type.INT, CompressionType.LZ4.defaultLevel(), CompressionType.LZ4.levelValidator(), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
.define(COMPRESSION_ZSTD_LEVEL_CONFIG, Type.INT, CompressionType.ZSTD.defaultLevel(), CompressionType.ZSTD.levelValidator(), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
.define(PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
.define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 5, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
Expand Down Expand Up @@ -667,10 +668,6 @@ public ProducerConfig(Map<String, Object> props) {
super(CONFIG, props);
}

ProducerConfig(Map<?, ?> props, boolean doLog) {
super(CONFIG, props, doLog);
}

public static Set<String> configNames() {
return CONFIG.names();
}
Expand Down
Loading