Skip to content

Commit 3a6be9c

Browse files
authored
Detect transport executors with no remaining threads (#11503)
Detect misconfigured transport executors with too few threads that could further throttle the transport. Fixes #11271
1 parent b8c1aa5 commit 3a6be9c

File tree

2 files changed

+55
-0
lines changed

2 files changed

+55
-0
lines changed

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java

+33
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,13 @@
8383
import java.util.Locale;
8484
import java.util.Map;
8585
import java.util.Random;
86+
import java.util.concurrent.BrokenBarrierException;
8687
import java.util.concurrent.CountDownLatch;
88+
import java.util.concurrent.CyclicBarrier;
8789
import java.util.concurrent.Executor;
8890
import java.util.concurrent.ScheduledExecutorService;
91+
import java.util.concurrent.TimeUnit;
92+
import java.util.concurrent.TimeoutException;
8993
import java.util.logging.Level;
9094
import java.util.logging.Logger;
9195
import javax.annotation.Nullable;
@@ -499,8 +503,15 @@ public Runnable start(Listener listener) {
499503
outboundFlow = new OutboundFlowController(this, frameWriter);
500504
}
501505
final CountDownLatch latch = new CountDownLatch(1);
506+
final CountDownLatch latchForExtraThread = new CountDownLatch(1);
507+
// The transport needs up to two threads to function once started,
508+
// but only needs one during handshaking. Start another thread during handshaking
509+
// to make sure there's still a free thread available. If the number of threads is exhausted,
510+
// it is better to kill the transport than for all the transports to hang unable to send.
511+
CyclicBarrier barrier = new CyclicBarrier(2);
502512
// Connecting in the serializingExecutor, so that some stream operations like synStream
503513
// will be executed after connected.
514+
504515
serializingExecutor.execute(new Runnable() {
505516
@Override
506517
public void run() {
@@ -510,8 +521,14 @@ public void run() {
510521
// initial preface.
511522
try {
512523
latch.await();
524+
barrier.await(1000, TimeUnit.MILLISECONDS);
513525
} catch (InterruptedException e) {
514526
Thread.currentThread().interrupt();
527+
} catch (TimeoutException | BrokenBarrierException e) {
528+
startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
529+
.withDescription("Timed out waiting for second handshake thread. "
530+
+ "The transport executor pool may have run out of threads"));
531+
return;
515532
}
516533
// Use closed source on failure so that the reader immediately shuts down.
517534
BufferedSource source = Okio.buffer(new Source() {
@@ -575,6 +592,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
575592
return;
576593
} finally {
577594
clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
595+
latchForExtraThread.countDown();
578596
}
579597
synchronized (lock) {
580598
socket = Preconditions.checkNotNull(sock, "socket");
@@ -584,6 +602,21 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
584602
}
585603
}
586604
});
605+
606+
executor.execute(new Runnable() {
607+
@Override
608+
public void run() {
609+
try {
610+
barrier.await(1000, TimeUnit.MILLISECONDS);
611+
latchForExtraThread.await();
612+
} catch (BrokenBarrierException | TimeoutException e) {
613+
// Something bad happened, maybe too few threads available!
614+
// This will be handled in the handshake thread.
615+
} catch (InterruptedException e) {
616+
Thread.currentThread().interrupt();
617+
}
618+
}
619+
});
587620
// Schedule to send connection preface & settings before any other write.
588621
try {
589622
sendConnectionPrefaceAndSettings();

okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,28 @@ public void testToString() throws Exception {
247247
assertTrue("Unexpected: " + s, s.contains(address.toString()));
248248
}
249249

250+
@Test
251+
public void testTransportExecutorWithTooFewThreads() throws Exception {
252+
ExecutorService fixedPoolExecutor = Executors.newFixedThreadPool(1);
253+
channelBuilder.transportExecutor(fixedPoolExecutor);
254+
InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415);
255+
clientTransport = new OkHttpClientTransport(
256+
channelBuilder.buildTransportFactory(),
257+
address,
258+
"hostname",
259+
null,
260+
EAG_ATTRS,
261+
NO_PROXY,
262+
tooManyPingsRunnable);
263+
clientTransport.start(transportListener);
264+
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
265+
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture());
266+
Status capturedStatus = statusCaptor.getValue();
267+
assertEquals("Timed out waiting for second handshake thread. "
268+
+ "The transport executor pool may have run out of threads",
269+
capturedStatus.getDescription());
270+
}
271+
250272
/**
251273
* Test logging is functioning correctly for client received Http/2 frames. Not intended to test
252274
* actual frame content being logged.

0 commit comments

Comments
 (0)