28
28
import static io .grpc .internal .GrpcUtil .CONTENT_LENGTH_KEY ;
29
29
import static io .grpc .internal .GrpcUtil .MESSAGE_ACCEPT_ENCODING_KEY ;
30
30
import static io .grpc .internal .GrpcUtil .MESSAGE_ENCODING_KEY ;
31
- import static java .lang .Math .max ;
32
31
33
32
import com .google .common .annotations .VisibleForTesting ;
34
33
import com .google .common .base .MoreObjects ;
62
61
import java .util .concurrent .ScheduledExecutorService ;
63
62
import java .util .concurrent .ScheduledFuture ;
64
63
import java .util .concurrent .TimeUnit ;
64
+ import java .util .concurrent .TimeoutException ;
65
65
import java .util .logging .Level ;
66
66
import java .util .logging .Logger ;
67
67
import javax .annotation .Nullable ;
@@ -82,16 +82,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
82
82
private final boolean callExecutorIsDirect ;
83
83
private final CallTracer channelCallsTracer ;
84
84
private final Context context ;
85
- private volatile ScheduledFuture <?> deadlineCancellationFuture ;
85
+ private CancellationHandler cancellationHandler ;
86
86
private final boolean unaryRequest ;
87
87
private CallOptions callOptions ;
88
88
private ClientStream stream ;
89
- private volatile boolean cancelListenersShouldBeRemoved ;
90
89
private boolean cancelCalled ;
91
90
private boolean halfCloseCalled ;
92
91
private final ClientStreamProvider clientStreamProvider ;
93
- private final ContextCancellationListener cancellationListener =
94
- new ContextCancellationListener ();
95
92
private final ScheduledExecutorService deadlineCancellationExecutor ;
96
93
private boolean fullStreamDecompression ;
97
94
private DecompressorRegistry decompressorRegistry = DecompressorRegistry .getDefaultInstance ();
@@ -128,13 +125,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
128
125
PerfMark .event ("ClientCall.<init>" , tag );
129
126
}
130
127
131
- private final class ContextCancellationListener implements CancellationListener {
132
- @ Override
133
- public void cancelled (Context context ) {
134
- stream .cancel (statusFromCancelled (context ));
135
- }
136
- }
137
-
138
128
/**
139
129
* Provider of {@link ClientStream}s.
140
130
*/
@@ -252,21 +242,21 @@ public void runInContext() {
252
242
prepareHeaders (headers , decompressorRegistry , compressor , fullStreamDecompression );
253
243
254
244
Deadline effectiveDeadline = effectiveDeadline ();
255
- boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline .isExpired ();
245
+ boolean contextIsDeadlineSource = effectiveDeadline != null
246
+ && effectiveDeadline .equals (context .getDeadline ());
247
+ cancellationHandler = new CancellationHandler (effectiveDeadline , contextIsDeadlineSource );
248
+ boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler .remainingNanos <= 0 ;
256
249
if (!deadlineExceeded ) {
257
- logIfContextNarrowedTimeout (
258
- effectiveDeadline , context .getDeadline (), callOptions .getDeadline ());
259
250
stream = clientStreamProvider .newStream (method , callOptions , headers , context );
260
251
} else {
261
252
ClientStreamTracer [] tracers =
262
253
GrpcUtil .getClientStreamTracers (callOptions , headers , 0 , false );
263
- String deadlineName =
264
- isFirstMin (callOptions .getDeadline (), context .getDeadline ()) ? "CallOptions" : "Context" ;
254
+ String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions" ;
265
255
Long nameResolutionDelay = callOptions .getOption (NAME_RESOLUTION_DELAYED );
266
256
String description = String .format (
267
257
"ClientCall started after %s deadline was exceeded %.9f seconds ago. "
268
258
+ "Name resolution delay %.9f seconds." , deadlineName ,
269
- effectiveDeadline . timeRemaining ( TimeUnit . NANOSECONDS ) / NANO_TO_SECS ,
259
+ cancellationHandler . remainingNanos / NANO_TO_SECS ,
270
260
nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS );
271
261
stream = new FailingClientStream (DEADLINE_EXCEEDED .withDescription (description ), tracers );
272
262
}
@@ -298,21 +288,7 @@ public void runInContext() {
298
288
// they receive cancel before start. Issue #1343 has more details
299
289
300
290
// Propagate later Context cancellation to the remote side.
301
- context .addListener (cancellationListener , directExecutor ());
302
- if (effectiveDeadline != null
303
- // If the context has the effective deadline, we don't need to schedule an extra task.
304
- && !effectiveDeadline .equals (context .getDeadline ())
305
- // If the channel has been terminated, we don't need to schedule an extra task.
306
- && deadlineCancellationExecutor != null ) {
307
- deadlineCancellationFuture = startDeadlineTimer (effectiveDeadline );
308
- }
309
- if (cancelListenersShouldBeRemoved ) {
310
- // Race detected! ClientStreamListener.closed may have been called before
311
- // deadlineCancellationFuture was set / context listener added, thereby preventing the future
312
- // and listener from being cancelled. Go ahead and cancel again, just to be sure it
313
- // was cancelled.
314
- removeContextListenerAndCancelDeadlineFuture ();
315
- }
291
+ cancellationHandler .setUp ();
316
292
}
317
293
318
294
private void applyMethodConfig () {
@@ -354,75 +330,96 @@ private void applyMethodConfig() {
354
330
}
355
331
}
356
332
357
- private static void logIfContextNarrowedTimeout (
358
- Deadline effectiveDeadline , @ Nullable Deadline outerCallDeadline ,
359
- @ Nullable Deadline callDeadline ) {
360
- if (!log .isLoggable (Level .FINE ) || effectiveDeadline == null
361
- || !effectiveDeadline .equals (outerCallDeadline )) {
362
- return ;
333
+ private final class CancellationHandler implements Runnable , CancellationListener {
334
+ private final boolean contextIsDeadlineSource ;
335
+ private final boolean hasDeadline ;
336
+ private final long remainingNanos ;
337
+ private volatile ScheduledFuture <?> deadlineCancellationFuture ;
338
+ private volatile boolean tearDownCalled ;
339
+
340
+ CancellationHandler (Deadline deadline , boolean contextIsDeadlineSource ) {
341
+ this .contextIsDeadlineSource = contextIsDeadlineSource ;
342
+ if (deadline == null ) {
343
+ hasDeadline = false ;
344
+ remainingNanos = 0 ;
345
+ } else {
346
+ hasDeadline = true ;
347
+ remainingNanos = deadline .timeRemaining (TimeUnit .NANOSECONDS );
348
+ }
363
349
}
364
350
365
- long effectiveTimeout = max (0 , effectiveDeadline .timeRemaining (TimeUnit .NANOSECONDS ));
366
- StringBuilder builder = new StringBuilder (String .format (
367
- Locale .US ,
368
- "Call timeout set to '%d' ns, due to context deadline." , effectiveTimeout ));
369
- if (callDeadline == null ) {
370
- builder .append (" Explicit call timeout was not set." );
371
- } else {
372
- long callTimeout = callDeadline .timeRemaining (TimeUnit .NANOSECONDS );
373
- builder .append (String .format (Locale .US , " Explicit call timeout was '%d' ns." , callTimeout ));
351
+ void setUp () {
352
+ if (tearDownCalled ) {
353
+ return ;
354
+ }
355
+ if (hasDeadline
356
+ // If the context has the effective deadline, we don't need to schedule an extra task.
357
+ && !contextIsDeadlineSource
358
+ // If the channel has been terminated, we don't need to schedule an extra task.
359
+ && deadlineCancellationExecutor != null ) {
360
+ deadlineCancellationFuture = deadlineCancellationExecutor .schedule (
361
+ new LogExceptionRunnable (this ), remainingNanos , TimeUnit .NANOSECONDS );
362
+ }
363
+ context .addListener (this , directExecutor ());
364
+ if (tearDownCalled ) {
365
+ // Race detected! Re-run to make sure the future is cancelled and context listener removed
366
+ tearDown ();
367
+ }
374
368
}
375
369
376
- log . fine ( builder . toString ());
377
- }
378
-
379
- private void removeContextListenerAndCancelDeadlineFuture () {
380
- context . removeListener ( cancellationListener );
381
- ScheduledFuture <?> f = deadlineCancellationFuture ;
382
- if ( f != null ) {
383
- f . cancel ( false );
370
+ // May be called multiple times, and race with setUp()
371
+ void tearDown () {
372
+ tearDownCalled = true ;
373
+ ScheduledFuture <?> deadlineCancellationFuture = this . deadlineCancellationFuture ;
374
+ if ( deadlineCancellationFuture != null ) {
375
+ deadlineCancellationFuture . cancel ( false ) ;
376
+ }
377
+ context . removeListener ( this );
384
378
}
385
- }
386
379
387
- private class DeadlineTimer implements Runnable {
388
- private final long remainingNanos ;
389
-
390
- DeadlineTimer (long remainingNanos ) {
391
- this .remainingNanos = remainingNanos ;
380
+ @ Override
381
+ public void cancelled (Context context ) {
382
+ if (hasDeadline && contextIsDeadlineSource
383
+ && context .cancellationCause () instanceof TimeoutException ) {
384
+ stream .cancel (formatDeadlineExceededStatus ());
385
+ return ;
386
+ }
387
+ stream .cancel (statusFromCancelled (context ));
392
388
}
393
389
394
390
@ Override
395
391
public void run () {
396
- InsightBuilder insight = new InsightBuilder ();
397
- stream .appendTimeoutInsight (insight );
392
+ stream .cancel (formatDeadlineExceededStatus ());
393
+ }
394
+
395
+ Status formatDeadlineExceededStatus () {
398
396
// DelayedStream.cancel() is safe to call from a thread that is different from where the
399
397
// stream is created.
400
398
long seconds = Math .abs (remainingNanos ) / TimeUnit .SECONDS .toNanos (1 );
401
399
long nanos = Math .abs (remainingNanos ) % TimeUnit .SECONDS .toNanos (1 );
402
400
403
401
StringBuilder buf = new StringBuilder ();
404
- buf .append ("deadline exceeded after " );
402
+ buf .append (contextIsDeadlineSource ? "Context" : "CallOptions" );
403
+ buf .append (" deadline exceeded after " );
405
404
if (remainingNanos < 0 ) {
406
405
buf .append ('-' );
407
406
}
408
407
buf .append (seconds );
409
408
buf .append (String .format (Locale .US , ".%09d" , nanos ));
410
409
buf .append ("s. " );
411
410
Long nsDelay = callOptions .getOption (NAME_RESOLUTION_DELAYED );
412
- buf .append (String .format (Locale .US , "Name resolution delay %.9f seconds. " ,
411
+ buf .append (String .format (Locale .US , "Name resolution delay %.9f seconds." ,
413
412
nsDelay == null ? 0 : nsDelay / NANO_TO_SECS ));
414
- buf .append (insight );
415
- stream .cancel (DEADLINE_EXCEEDED .augmentDescription (buf .toString ()));
413
+ if (stream != null ) {
414
+ InsightBuilder insight = new InsightBuilder ();
415
+ stream .appendTimeoutInsight (insight );
416
+ buf .append (" " );
417
+ buf .append (insight );
418
+ }
419
+ return DEADLINE_EXCEEDED .withDescription (buf .toString ());
416
420
}
417
421
}
418
422
419
- private ScheduledFuture <?> startDeadlineTimer (Deadline deadline ) {
420
- long remainingNanos = deadline .timeRemaining (TimeUnit .NANOSECONDS );
421
- return deadlineCancellationExecutor .schedule (
422
- new LogExceptionRunnable (
423
- new DeadlineTimer (remainingNanos )), remainingNanos , TimeUnit .NANOSECONDS );
424
- }
425
-
426
423
@ Nullable
427
424
private Deadline effectiveDeadline () {
428
425
// Call options and context are immutable, so we don't need to cache the deadline.
@@ -440,16 +437,6 @@ private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline dea
440
437
return deadline0 .minimum (deadline1 );
441
438
}
442
439
443
- private static boolean isFirstMin (@ Nullable Deadline deadline0 , @ Nullable Deadline deadline1 ) {
444
- if (deadline0 == null ) {
445
- return false ;
446
- }
447
- if (deadline1 == null ) {
448
- return true ;
449
- }
450
- return deadline0 .isBefore (deadline1 );
451
- }
452
-
453
440
@ Override
454
441
public void request (int numMessages ) {
455
442
try (TaskCloseable ignore = PerfMark .traceTask ("ClientCall.request" )) {
@@ -493,7 +480,10 @@ private void cancelInternal(@Nullable String message, @Nullable Throwable cause)
493
480
stream .cancel (status );
494
481
}
495
482
} finally {
496
- removeContextListenerAndCancelDeadlineFuture ();
483
+ // start() might not have been called
484
+ if (cancellationHandler != null ) {
485
+ cancellationHandler .tearDown ();
486
+ }
497
487
}
498
488
}
499
489
@@ -699,10 +689,7 @@ private void closedInternal(
699
689
// description. Since our timer may be delayed in firing, we double-check the deadline and
700
690
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
701
691
if (deadline .isExpired ()) {
702
- InsightBuilder insight = new InsightBuilder ();
703
- stream .appendTimeoutInsight (insight );
704
- status = DEADLINE_EXCEEDED .augmentDescription (
705
- "ClientCall was cancelled at or after deadline. " + insight );
692
+ status = cancellationHandler .formatDeadlineExceededStatus ();
706
693
// Replace trailers to prevent mixing sources of status and trailers.
707
694
trailers = new Metadata ();
708
695
}
@@ -725,6 +712,7 @@ public void runInContext() {
725
712
}
726
713
727
714
private void runInternal () {
715
+ cancellationHandler .tearDown ();
728
716
Status status = savedStatus ;
729
717
Metadata trailers = savedTrailers ;
730
718
if (exceptionStatus != null ) {
@@ -737,11 +725,9 @@ private void runInternal() {
737
725
// Replace trailers to prevent mixing sources of status and trailers.
738
726
trailers = new Metadata ();
739
727
}
740
- cancelListenersShouldBeRemoved = true ;
741
728
try {
742
729
closeObserver (observer , status , trailers );
743
730
} finally {
744
- removeContextListenerAndCancelDeadlineFuture ();
745
731
channelCallsTracer .reportCallEnded (status .isOk ());
746
732
}
747
733
}
0 commit comments