-
Notifications
You must be signed in to change notification settings - Fork 14.5k
MINOR: Improve docs for retries & cleanup #19595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -88,8 +88,9 @@ public class CommonClientConfigs { | |||||
"If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms."; | ||||||
|
||||||
public static final String RETRIES_CONFIG = "retries"; | ||||||
public static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error." + | ||||||
" It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request."; | ||||||
public static final String RETRIES_DOC = "It is recommended to set the value to either `MAX_VALUE` or zero, and use corresponding timeout parameters to control how long a client should retry a request." + | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: We can change '`' to
Suggested change
|
||||||
" Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error." + | ||||||
" Setting a value of zero will lead to transient errors not being retried, and they will be propagated to the application to be handled."; | ||||||
|
||||||
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; | ||||||
public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. " + | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -415,7 +415,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali | |
this.transactionManager = configureTransactionState(config, logContext); | ||
// There is no need to do work required for adaptive partitioning, if we use a custom partitioner. | ||
boolean enableAdaptivePartitioning = partitionerPlugin.get() == null && | ||
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uh totally, good catch. Removed all the changes on this |
||
config.getBoolean(ProducerConfig.PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG); | ||
RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig( | ||
enableAdaptivePartitioning, | ||
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG) | ||
|
@@ -1502,11 +1502,6 @@ String getClientId() { | |
return clientId; | ||
} | ||
|
||
// Visible for testing | ||
TransactionManager getTransactionManager() { | ||
return transactionManager; | ||
} | ||
|
||
private static class ClusterAndWaitTime { | ||
final Cluster cluster; | ||
final long waitedOnMetadataMs; | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -100,7 +100,7 @@ public class ProducerConfig extends AbstractConfig { | |||||
+ "similar or lower producer latency despite the increased linger."; | ||||||
|
||||||
/** <code>partitioner.adaptive.partitioning.enable</code> */ | ||||||
public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable"; | ||||||
public static final String PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable"; | ||||||
private static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC = | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also typo.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll leave of the adaptive typo changes out, it probably makes sense to address them together when the public config can be fixed. |
||||||
"When set to 'true', the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers. " | ||||||
+ "If 'false', the producer will try to distribute messages uniformly. Note: this setting has no effect if a custom partitioner is used."; | ||||||
|
@@ -110,7 +110,7 @@ public class ProducerConfig extends AbstractConfig { | |||||
private static final String PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC = | ||||||
"If a broker cannot process produce requests from a partition for <code>" + PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG + "</code> time, " | ||||||
+ "the partitioner treats that partition as not available. If the value is 0, this logic is disabled. " | ||||||
+ "Note: this setting has no effect if a custom partitioner is used or <code>" + PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG | ||||||
+ "Note: this setting has no effect if a custom partitioner is used or <code>" + PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG | ||||||
+ "</code> is set to 'false'."; | ||||||
|
||||||
/** <code>partitioner.ignore.keys</code> */ | ||||||
|
@@ -276,6 +276,7 @@ public class ProducerConfig extends AbstractConfig { | |||||
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; | ||||||
private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." | ||||||
+ " Note that this retry is no different than if the client resent the record upon receiving the error." | ||||||
+ " Setting a value of zero will make that transient errors won't be retried, and will be propagated to the application to be handled." | ||||||
mjsax marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
+ " Produce requests will be failed before the number of retries has been exhausted if the timeout configured by" | ||||||
+ " <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> expires first before successful acknowledgement. Users should generally" | ||||||
+ " prefer to leave this config unset and instead use <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> to control" | ||||||
|
@@ -390,7 +391,7 @@ public class ProducerConfig extends AbstractConfig { | |||||
.define(COMPRESSION_LZ4_LEVEL_CONFIG, Type.INT, CompressionType.LZ4.defaultLevel(), CompressionType.LZ4.levelValidator(), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC) | ||||||
.define(COMPRESSION_ZSTD_LEVEL_CONFIG, Type.INT, CompressionType.ZSTD.defaultLevel(), CompressionType.ZSTD.levelValidator(), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC) | ||||||
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) | ||||||
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC) | ||||||
.define(PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC) | ||||||
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC) | ||||||
.define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC) | ||||||
.define(LINGER_MS_CONFIG, Type.LONG, 5, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) | ||||||
|
@@ -667,10 +668,6 @@ public ProducerConfig(Map<String, Object> props) { | |||||
super(CONFIG, props); | ||||||
} | ||||||
|
||||||
ProducerConfig(Map<?, ?> props, boolean doLog) { | ||||||
super(CONFIG, props, doLog); | ||||||
} | ||||||
|
||||||
public static Set<String> configNames() { | ||||||
return CONFIG.names(); | ||||||
} | ||||||
|
Uh oh!
There was an error while loading. Please reload this page.