Skip to content

Commit 3abab95

Browse files
authored
core: Provide DEADLINE_EXCEEDED insights for context deadline
We provided extra details when the RPC is killed by CallOptions' Deadline, but didn't do the same for Context. To avoid duplicating code, things were restructured, including the threading. There are more code flows now, but I think the multi-threading came out more obvious and less error-prone. I didn't change the status when the deadline is already expired, because the text is shared with DelayedClientCall and AbstractInteropTest doesn't distinguish between the two cases. This is a roll-forward that avoids a NPE when cancel() is called without an earlier call to start(). As seen at b/300991330
1 parent 51f811d commit 3abab95

File tree

3 files changed

+98
-95
lines changed

3 files changed

+98
-95
lines changed

core/src/main/java/io/grpc/internal/ClientCallImpl.java

+78-92
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
2929
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
3030
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
31-
import static java.lang.Math.max;
3231

3332
import com.google.common.annotations.VisibleForTesting;
3433
import com.google.common.base.MoreObjects;
@@ -62,6 +61,7 @@
6261
import java.util.concurrent.ScheduledExecutorService;
6362
import java.util.concurrent.ScheduledFuture;
6463
import java.util.concurrent.TimeUnit;
64+
import java.util.concurrent.TimeoutException;
6565
import java.util.logging.Level;
6666
import java.util.logging.Logger;
6767
import javax.annotation.Nullable;
@@ -82,16 +82,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
8282
private final boolean callExecutorIsDirect;
8383
private final CallTracer channelCallsTracer;
8484
private final Context context;
85-
private volatile ScheduledFuture<?> deadlineCancellationFuture;
85+
private CancellationHandler cancellationHandler;
8686
private final boolean unaryRequest;
8787
private CallOptions callOptions;
8888
private ClientStream stream;
89-
private volatile boolean cancelListenersShouldBeRemoved;
9089
private boolean cancelCalled;
9190
private boolean halfCloseCalled;
9291
private final ClientStreamProvider clientStreamProvider;
93-
private final ContextCancellationListener cancellationListener =
94-
new ContextCancellationListener();
9592
private final ScheduledExecutorService deadlineCancellationExecutor;
9693
private boolean fullStreamDecompression;
9794
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
@@ -128,13 +125,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
128125
PerfMark.event("ClientCall.<init>", tag);
129126
}
130127

131-
private final class ContextCancellationListener implements CancellationListener {
132-
@Override
133-
public void cancelled(Context context) {
134-
stream.cancel(statusFromCancelled(context));
135-
}
136-
}
137-
138128
/**
139129
* Provider of {@link ClientStream}s.
140130
*/
@@ -252,21 +242,21 @@ public void runInContext() {
252242
prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
253243

254244
Deadline effectiveDeadline = effectiveDeadline();
255-
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
245+
boolean contextIsDeadlineSource = effectiveDeadline != null
246+
&& effectiveDeadline.equals(context.getDeadline());
247+
cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
248+
boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
256249
if (!deadlineExceeded) {
257-
logIfContextNarrowedTimeout(
258-
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
259250
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
260251
} else {
261252
ClientStreamTracer[] tracers =
262253
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
263-
String deadlineName =
264-
isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context";
254+
String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
265255
Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
266256
String description = String.format(
267257
"ClientCall started after %s deadline was exceeded %.9f seconds ago. "
268258
+ "Name resolution delay %.9f seconds.", deadlineName,
269-
effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS,
259+
cancellationHandler.remainingNanos / NANO_TO_SECS,
270260
nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
271261
stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
272262
}
@@ -298,21 +288,7 @@ public void runInContext() {
298288
// they receive cancel before start. Issue #1343 has more details
299289

300290
// Propagate later Context cancellation to the remote side.
301-
context.addListener(cancellationListener, directExecutor());
302-
if (effectiveDeadline != null
303-
// If the context has the effective deadline, we don't need to schedule an extra task.
304-
&& !effectiveDeadline.equals(context.getDeadline())
305-
// If the channel has been terminated, we don't need to schedule an extra task.
306-
&& deadlineCancellationExecutor != null) {
307-
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
308-
}
309-
if (cancelListenersShouldBeRemoved) {
310-
// Race detected! ClientStreamListener.closed may have been called before
311-
// deadlineCancellationFuture was set / context listener added, thereby preventing the future
312-
// and listener from being cancelled. Go ahead and cancel again, just to be sure it
313-
// was cancelled.
314-
removeContextListenerAndCancelDeadlineFuture();
315-
}
291+
cancellationHandler.setUp();
316292
}
317293

318294
private void applyMethodConfig() {
@@ -354,75 +330,96 @@ private void applyMethodConfig() {
354330
}
355331
}
356332

357-
private static void logIfContextNarrowedTimeout(
358-
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
359-
@Nullable Deadline callDeadline) {
360-
if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
361-
|| !effectiveDeadline.equals(outerCallDeadline)) {
362-
return;
333+
private final class CancellationHandler implements Runnable, CancellationListener {
334+
private final boolean contextIsDeadlineSource;
335+
private final boolean hasDeadline;
336+
private final long remainingNanos;
337+
private volatile ScheduledFuture<?> deadlineCancellationFuture;
338+
private volatile boolean tearDownCalled;
339+
340+
CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
341+
this.contextIsDeadlineSource = contextIsDeadlineSource;
342+
if (deadline == null) {
343+
hasDeadline = false;
344+
remainingNanos = 0;
345+
} else {
346+
hasDeadline = true;
347+
remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
348+
}
363349
}
364350

365-
long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
366-
StringBuilder builder = new StringBuilder(String.format(
367-
Locale.US,
368-
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
369-
if (callDeadline == null) {
370-
builder.append(" Explicit call timeout was not set.");
371-
} else {
372-
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
373-
builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
351+
void setUp() {
352+
if (tearDownCalled) {
353+
return;
354+
}
355+
if (hasDeadline
356+
// If the context has the effective deadline, we don't need to schedule an extra task.
357+
&& !contextIsDeadlineSource
358+
// If the channel has been terminated, we don't need to schedule an extra task.
359+
&& deadlineCancellationExecutor != null) {
360+
deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
361+
new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
362+
}
363+
context.addListener(this, directExecutor());
364+
if (tearDownCalled) {
365+
// Race detected! Re-run to make sure the future is cancelled and context listener removed
366+
tearDown();
367+
}
374368
}
375369

376-
log.fine(builder.toString());
377-
}
378-
379-
private void removeContextListenerAndCancelDeadlineFuture() {
380-
context.removeListener(cancellationListener);
381-
ScheduledFuture<?> f = deadlineCancellationFuture;
382-
if (f != null) {
383-
f.cancel(false);
370+
// May be called multiple times, and race with setUp()
371+
void tearDown() {
372+
tearDownCalled = true;
373+
ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
374+
if (deadlineCancellationFuture != null) {
375+
deadlineCancellationFuture.cancel(false);
376+
}
377+
context.removeListener(this);
384378
}
385-
}
386379

387-
private class DeadlineTimer implements Runnable {
388-
private final long remainingNanos;
389-
390-
DeadlineTimer(long remainingNanos) {
391-
this.remainingNanos = remainingNanos;
380+
@Override
381+
public void cancelled(Context context) {
382+
if (hasDeadline && contextIsDeadlineSource
383+
&& context.cancellationCause() instanceof TimeoutException) {
384+
stream.cancel(formatDeadlineExceededStatus());
385+
return;
386+
}
387+
stream.cancel(statusFromCancelled(context));
392388
}
393389

394390
@Override
395391
public void run() {
396-
InsightBuilder insight = new InsightBuilder();
397-
stream.appendTimeoutInsight(insight);
392+
stream.cancel(formatDeadlineExceededStatus());
393+
}
394+
395+
Status formatDeadlineExceededStatus() {
398396
// DelayedStream.cancel() is safe to call from a thread that is different from where the
399397
// stream is created.
400398
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
401399
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
402400

403401
StringBuilder buf = new StringBuilder();
404-
buf.append("deadline exceeded after ");
402+
buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
403+
buf.append(" deadline exceeded after ");
405404
if (remainingNanos < 0) {
406405
buf.append('-');
407406
}
408407
buf.append(seconds);
409408
buf.append(String.format(Locale.US, ".%09d", nanos));
410409
buf.append("s. ");
411410
Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
412-
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds. ",
411+
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
413412
nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
414-
buf.append(insight);
415-
stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
413+
if (stream != null) {
414+
InsightBuilder insight = new InsightBuilder();
415+
stream.appendTimeoutInsight(insight);
416+
buf.append(" ");
417+
buf.append(insight);
418+
}
419+
return DEADLINE_EXCEEDED.withDescription(buf.toString());
416420
}
417421
}
418422

419-
private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
420-
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
421-
return deadlineCancellationExecutor.schedule(
422-
new LogExceptionRunnable(
423-
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
424-
}
425-
426423
@Nullable
427424
private Deadline effectiveDeadline() {
428425
// Call options and context are immutable, so we don't need to cache the deadline.
@@ -440,16 +437,6 @@ private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline dea
440437
return deadline0.minimum(deadline1);
441438
}
442439

443-
private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
444-
if (deadline0 == null) {
445-
return false;
446-
}
447-
if (deadline1 == null) {
448-
return true;
449-
}
450-
return deadline0.isBefore(deadline1);
451-
}
452-
453440
@Override
454441
public void request(int numMessages) {
455442
try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
@@ -493,7 +480,10 @@ private void cancelInternal(@Nullable String message, @Nullable Throwable cause)
493480
stream.cancel(status);
494481
}
495482
} finally {
496-
removeContextListenerAndCancelDeadlineFuture();
483+
// start() might not have been called
484+
if (cancellationHandler != null) {
485+
cancellationHandler.tearDown();
486+
}
497487
}
498488
}
499489

@@ -699,10 +689,7 @@ private void closedInternal(
699689
// description. Since our timer may be delayed in firing, we double-check the deadline and
700690
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
701691
if (deadline.isExpired()) {
702-
InsightBuilder insight = new InsightBuilder();
703-
stream.appendTimeoutInsight(insight);
704-
status = DEADLINE_EXCEEDED.augmentDescription(
705-
"ClientCall was cancelled at or after deadline. " + insight);
692+
status = cancellationHandler.formatDeadlineExceededStatus();
706693
// Replace trailers to prevent mixing sources of status and trailers.
707694
trailers = new Metadata();
708695
}
@@ -725,6 +712,7 @@ public void runInContext() {
725712
}
726713

727714
private void runInternal() {
715+
cancellationHandler.tearDown();
728716
Status status = savedStatus;
729717
Metadata trailers = savedTrailers;
730718
if (exceptionStatus != null) {
@@ -737,11 +725,9 @@ private void runInternal() {
737725
// Replace trailers to prevent mixing sources of status and trailers.
738726
trailers = new Metadata();
739727
}
740-
cancelListenersShouldBeRemoved = true;
741728
try {
742729
closeObserver(observer, status, trailers);
743730
} finally {
744-
removeContextListenerAndCancelDeadlineFuture();
745731
channelCallsTracer.reportCallEnded(status.isOk());
746732
}
747733
}

core/src/test/java/io/grpc/internal/ClientCallImplTest.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ public void expiredDeadlineCancelsStream_CallOptions() {
926926
verify(stream, times(1)).cancel(statusCaptor.capture());
927927
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
928928
assertThat(statusCaptor.getValue().getDescription())
929-
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. "
929+
.matches("CallOptions deadline exceeded after [0-9]+\\.[0-9]+s. "
930930
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
931931
}
932932

@@ -954,7 +954,24 @@ public void expiredDeadlineCancelsStream_Context() {
954954

955955
verify(stream, times(1)).cancel(statusCaptor.capture());
956956
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
957-
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
957+
assertThat(statusCaptor.getValue().getDescription())
958+
.matches("Context deadline exceeded after [0-9]+\\.[0-9]+s. "
959+
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
960+
}
961+
962+
@Test
963+
public void cancelWithoutStart() {
964+
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
965+
966+
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
967+
method,
968+
MoreExecutors.directExecutor(),
969+
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
970+
clientStreamProvider,
971+
deadlineCancellationExecutor,
972+
channelCallTracer, configSelector);
973+
// Nothing happens as a result, but it shouldn't throw
974+
call.cancel("canceled", null);
958975
}
959976

960977
@Test

interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1190,7 +1190,7 @@ public void deadlineExceeded() throws Exception {
11901190
assertTrue(desc,
11911191
// There is a race between client and server-side deadline expiration.
11921192
// If client expires first, it'd generate this message
1193-
Pattern.matches("deadline exceeded after .*s. \\[.*\\]", desc)
1193+
Pattern.matches("CallOptions deadline exceeded after .*s. \\[.*\\]", desc)
11941194
// If server expires first, it'd reset the stream and client would generate a different
11951195
// message
11961196
|| desc.startsWith("ClientCall was cancelled at or after deadline."));

0 commit comments

Comments
 (0)