Skip to content

core: updates the backoff range as per the A6 redefinition #11858

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,8 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
if (pushbackMillis == null) {
if (isRetryableStatusCode) {
shouldRetry = true;
backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble());
// Apply jitter by multiplying with a random factor between 0.8 and 1.2
backoffNanos = (long) (nextBackoffIntervalNanos * (0.8 + random.nextDouble() * 0.4));
nextBackoffIntervalNanos = Math.min(
(long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
retryPolicy.maxBackoffNanos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3875,7 +3875,7 @@ public double nextDouble() {
Status.UNAVAILABLE, PROCESSED, new Metadata());

// in backoff
timer.forwardTime(5, TimeUnit.SECONDS);
timer.forwardTime(6, TimeUnit.SECONDS);
assertThat(timer.getPendingTasks()).hasSize(1);
verify(mockStream2, never()).start(any(ClientStreamListener.class));

Expand All @@ -3894,7 +3894,7 @@ public double nextDouble() {
assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription());

// backoff ends
timer.forwardTime(5, TimeUnit.SECONDS);
timer.forwardTime(6, TimeUnit.SECONDS);
assertThat(timer.getPendingTasks()).isEmpty();
verify(mockStream2).start(streamListenerCaptor.capture());
verify(mockLoadBalancer, never()).shutdown();
Expand Down
89 changes: 48 additions & 41 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,27 @@ public double nextDouble() {
private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
private final FakeClock fakeClock = new FakeClock();

private static double calculateJitterFactor() {
return (0.8 + FAKE_RANDOM * 0.4);
}

private static long calculateInitialBackoff() {
return (long) (INITIAL_BACKOFF_IN_SECONDS * calculateJitterFactor());
}

private static long calculateBackoff() {
return (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * calculateJitterFactor());
}

private static long calculateBackoffSquared() {
return (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER
* calculateJitterFactor());
}

private static long calculateMaxBackoff() {
return (long) (MAX_BACKOFF_IN_SECONDS * calculateJitterFactor());
}

private final class RecordedRetriableStream extends RetriableStream<String> {
RecordedRetriableStream(MethodDescriptor<String, ?> method, Metadata headers,
ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
Expand Down Expand Up @@ -307,7 +328,7 @@ public Void answer(InvocationOnMock in) {
retriableStream.sendMessage("msg1 during backoff1");
retriableStream.sendMessage("msg2 during backoff1");

fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff() - 1L, TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
Expand Down Expand Up @@ -364,9 +385,7 @@ public Void answer(InvocationOnMock in) {
retriableStream.sendMessage("msg2 during backoff2");
retriableStream.sendMessage("msg3 during backoff2");

fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoff() - 1L, TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
Expand Down Expand Up @@ -459,7 +478,7 @@ public void retry_headersRead_cancel() {
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
Expand Down Expand Up @@ -518,7 +537,7 @@ public void retry_headersRead_closed() {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
Expand Down Expand Up @@ -584,7 +603,7 @@ public void retry_cancel_closed() {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
Expand Down Expand Up @@ -687,7 +706,7 @@ public void retry_unretriableClosed_cancel() {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
Expand Down Expand Up @@ -821,7 +840,7 @@ public boolean isReady() {
// send more requests during backoff
retriableStream.request(789);

fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

inOrder.verify(mockStream2).start(sublistenerCaptor2.get());
inOrder.verify(mockStream2).request(3);
Expand Down Expand Up @@ -875,7 +894,7 @@ public void request(int numMessages) {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).request(3);
Expand Down Expand Up @@ -920,7 +939,7 @@ public void start(ClientStreamListener listener) {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(retriableStreamRecorder).postCommit();
Expand Down Expand Up @@ -1028,7 +1047,7 @@ public boolean isReady() {
retriableStream.request(789);
readiness.add(retriableStream.isReady()); // expected false b/c in backoff

fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

verify(mockStream2).start(any(ClientStreamListener.class));
readiness.add(retriableStream.isReady()); // expected true
Expand Down Expand Up @@ -1110,7 +1129,7 @@ public void addPrevRetryAttemptsToRespHeaders() {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
Expand Down Expand Up @@ -1160,13 +1179,12 @@ public void start(ClientStreamListener listener) {
listener1.closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());

// send requests during backoff
retriableStream.request(3);
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoff(), TimeUnit.SECONDS);

retriableStream.request(1);
verify(mockStream1, never()).request(anyInt());
Expand Down Expand Up @@ -1207,7 +1225,7 @@ public void start(ClientStreamListener listener) {
// retry
listener1.closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);

verify(mockStream2).start(any(ClientStreamListener.class));
verify(retriableStreamRecorder).postCommit();
Expand Down Expand Up @@ -1260,7 +1278,7 @@ public void perRpcBufferLimitExceededDuringBackoff() {
bufferSizeTracer.outboundWireSize(2);
verify(retriableStreamRecorder, never()).postCommit();

fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);
verify(mockStream2).start(any(ClientStreamListener.class));
verify(mockStream2).isReady();

Expand Down Expand Up @@ -1332,7 +1350,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
Expand All @@ -1347,9 +1365,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
sublistenerCaptor2.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoff() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
Expand All @@ -1364,10 +1380,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
sublistenerCaptor3.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM)
- 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffSquared() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
Expand All @@ -1382,7 +1395,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
sublistenerCaptor4.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateMaxBackoff() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
Expand All @@ -1397,7 +1410,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
sublistenerCaptor5.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateMaxBackoff() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
Expand Down Expand Up @@ -1480,7 +1493,7 @@ public void pushback() {
sublistenerCaptor3.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
Expand All @@ -1495,9 +1508,7 @@ public void pushback() {
sublistenerCaptor4.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoff() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
Expand All @@ -1512,10 +1523,7 @@ public void pushback() {
sublistenerCaptor5.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM)
- 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffSquared() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
Expand Down Expand Up @@ -1804,7 +1812,7 @@ public void transparentRetry_onlyOnceOnRefused() {
.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata());

assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);
inOrder.verify(retriableStreamRecorder).newSubstream(1);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
Expand Down Expand Up @@ -1907,7 +1915,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() {
.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());

assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);
inOrder.verify(retriableStreamRecorder).newSubstream(1);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
Expand All @@ -1923,8 +1931,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() {
.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata());

assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoff(), TimeUnit.SECONDS);
inOrder.verify(retriableStreamRecorder).newSubstream(2);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
Expand Down Expand Up @@ -1960,7 +1967,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() {
.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());

assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS);
inOrder.verify(retriableStreamRecorder).newSubstream(1);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public void retryUntilBufferLimitExceeded() throws Exception {
serverCall.close(
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
elapseBackoff(10, SECONDS);
elapseBackoff(12, SECONDS);
// 2nd attempt received
serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
Expand Down Expand Up @@ -348,7 +348,7 @@ public void statsRecorded() throws Exception {
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1);
elapseBackoff(10, SECONDS);
elapseBackoff(12, SECONDS);
assertRpcStartedRecorded();
assertOutboundMessageRecorded();
serverCall = serverCalls.poll(5, SECONDS);
Expand All @@ -366,7 +366,7 @@ public void statsRecorded() throws Exception {
call.request(1);
assertInboundMessageRecorded();
assertInboundWireSizeRecorded(1);
assertRpcStatusRecorded(Status.Code.OK, 12000, 2);
assertRpcStatusRecorded(Status.Code.OK, 14000, 2);
assertRetryStatsRecorded(1, 0, 0);
}

Expand Down Expand Up @@ -418,7 +418,7 @@ public void streamClosed(Status status) {
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1);
elapseBackoff(10, SECONDS);
elapseBackoff(12, SECONDS);
assertRpcStartedRecorded();
assertOutboundMessageRecorded();
serverCall = serverCalls.poll(5, SECONDS);
Expand All @@ -431,7 +431,7 @@ public void streamClosed(Status status) {
streamClosedLatch.countDown();
// The call listener is closed.
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1);
assertRpcStatusRecorded(Code.CANCELLED, 19_000, 1);
assertRetryStatsRecorded(1, 0, 0);
}

Expand Down
Loading