Skip to content

binder: Add a connection timeout #11255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import android.os.DeadObjectException;
import android.os.Parcel;
import android.os.RemoteException;
import androidx.core.content.ContextCompat;
import androidx.test.core.app.ApplicationProvider;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Empty;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
Expand All @@ -36,13 +35,10 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.binder.AndroidComponentAddress;
import io.grpc.binder.BindServiceFlags;
import io.grpc.binder.BinderChannelCredentials;
import io.grpc.binder.BinderServerBuilder;
import io.grpc.binder.HostServices;
import io.grpc.binder.InboundParcelablePolicy;
import io.grpc.binder.SecurityPolicies;
import io.grpc.binder.SecurityPolicy;
import io.grpc.binder.internal.OneWayBinderProxies.BlackHoleOneWayBinderProxy;
import io.grpc.binder.internal.OneWayBinderProxies.BlockingBinderDecorator;
import io.grpc.binder.internal.OneWayBinderProxies.ThrowingOneWayBinderProxy;
import io.grpc.internal.ClientStream;
Expand All @@ -59,9 +55,11 @@
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.junit.After;
Expand All @@ -77,6 +75,8 @@
*/
@RunWith(AndroidJUnit4.class)
public final class BinderClientTransportTest {
private static final long TIMEOUT_SECONDS = 5;

private static final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
new ClientStreamTracer() {}
};
Expand All @@ -100,9 +100,12 @@ public final class BinderClientTransportTest {

AndroidComponentAddress serverAddress;
BinderTransport.BinderClientTransport transport;
BlockingSecurityPolicy blockingSecurityPolicy = new BlockingSecurityPolicy();

private final ObjectPool<ScheduledExecutorService> executorServicePool =
new FixedObjectPool<>(Executors.newScheduledThreadPool(1));
private final ObjectPool<ScheduledExecutorService> offloadServicePool =
new FixedObjectPool<>(Executors.newScheduledThreadPool(1));
private final TestTransportListener transportListener = new TestTransportListener();
private final TestStreamListener streamListener = new TestStreamListener();

Expand Down Expand Up @@ -146,7 +149,7 @@ private class BinderClientTransportBuilder {
final BinderClientTransportFactory.Builder factoryBuilder = new BinderClientTransportFactory.Builder()
.setSourceContext(appContext)
.setScheduledExecutorPool(executorServicePool)
.setOffloadExecutorPool(executorServicePool);
.setOffloadExecutorPool(offloadServicePool);

public BinderClientTransportBuilder setSecurityPolicy(SecurityPolicy securityPolicy) {
factoryBuilder.setSecurityPolicy(securityPolicy);
Expand All @@ -159,6 +162,11 @@ public BinderClientTransportBuilder setBinderDecorator(
return this;
}

public BinderClientTransportBuilder setReadyTimeoutMillis(int timeoutMillis) {
factoryBuilder.setReadyTimeoutMillis(timeoutMillis);
return this;
}

public BinderTransport.BinderClientTransport build() {
return factoryBuilder.buildClientTransportFactory()
.newClientTransport(serverAddress, new ClientTransportOptions(), null);
Expand All @@ -167,9 +175,19 @@ public BinderTransport.BinderClientTransport build() {

@After
public void tearDown() throws Exception {
blockingSecurityPolicy.provideNextCheckAuthorizationResult(Status.ABORTED);
transport.shutdownNow(Status.OK);
HostServices.awaitServiceShutdown();
executorServicePool.getObject().shutdownNow();
shutdownAndTerminate(executorServicePool.getObject());
shutdownAndTerminate(offloadServicePool.getObject());
}

private static void shutdownAndTerminate(ExecutorService executorService)
throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
throw new AssertionError("executor failed to terminate promptly");
}
}

@Test
Expand Down Expand Up @@ -261,23 +279,22 @@ public void testMessageProducerClosedAfterStream_b169313545() throws Exception {
}

@Test
public void testNewStreamBeforeTransportReadyFails() throws InterruptedException {
public void testNewStreamBeforeTransportReadyFails() throws Exception {
// Use a special SecurityPolicy that lets us act before the transport is setup/ready.
BlockingSecurityPolicy bsp = new BlockingSecurityPolicy();
transport = new BinderClientTransportBuilder().setSecurityPolicy(bsp).build();
transport = new BinderClientTransportBuilder().setSecurityPolicy(blockingSecurityPolicy).build();
transport.start(transportListener).run();
ClientStream stream =
transport.newStream(streamingMethodDesc, new Metadata(), CallOptions.DEFAULT, tracers);
stream.start(streamListener);
assertThat(streamListener.awaitClose().getCode()).isEqualTo(Code.INTERNAL);

// Unblock the SETUP_TRANSPORT handshake and make sure it becomes ready in the usual way.
bsp.provideNextCheckAuthorizationResult(Status.OK);
blockingSecurityPolicy.provideNextCheckAuthorizationResult(Status.OK);
transportListener.awaitReady();
}

@Test
public void testTxnFailureDuringSetup() throws InterruptedException {
public void testTxnFailureDuringSetup() throws Exception {
BlockingBinderDecorator<ThrowingOneWayBinderProxy> decorator = new BlockingBinderDecorator<>();
transport = new BinderClientTransportBuilder()
.setBinderDecorator(decorator)
Expand All @@ -304,7 +321,7 @@ public void testTxnFailureDuringSetup() throws InterruptedException {
}

@Test
public void testTxnFailurePostSetup() throws InterruptedException {
public void testTxnFailurePostSetup() throws Exception {
BlockingBinderDecorator<ThrowingOneWayBinderProxy> decorator = new BlockingBinderDecorator<>();
transport = new BinderClientTransportBuilder()
.setBinderDecorator(decorator)
Expand Down Expand Up @@ -332,59 +349,82 @@ public void testTxnFailurePostSetup() throws InterruptedException {
assertThat(streamStatus.getCause()).isSameInstanceAs(doe);
}

@Test
public void testBlackHoleEndpointConnectTimeout() throws Exception {
BlockingBinderDecorator<BlackHoleOneWayBinderProxy> decorator = new BlockingBinderDecorator<>();
transport = new BinderClientTransportBuilder()
.setBinderDecorator(decorator)
.setReadyTimeoutMillis(1_234)
.build();
transport.start(transportListener).run();
BlackHoleOneWayBinderProxy endpointBinder = new BlackHoleOneWayBinderProxy(
decorator.takeNextRequest());
endpointBinder.dropAllTransactions(true);
decorator.putNextResult(endpointBinder);
Status transportStatus = transportListener.awaitShutdown();
assertThat(transportStatus.getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
assertThat(transportStatus.getDescription()).contains("1234");
transportListener.awaitTermination();
}

@Test
public void testBlackHoleSecurityPolicyConnectTimeout() throws Exception {
transport = new BinderClientTransportBuilder()
.setSecurityPolicy(blockingSecurityPolicy)
.setReadyTimeoutMillis(1_234)
.build();
transport.start(transportListener).run();
Status transportStatus = transportListener.awaitShutdown();
assertThat(transportStatus.getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
assertThat(transportStatus.getDescription()).contains("1234");
transportListener.awaitTermination();
blockingSecurityPolicy.provideNextCheckAuthorizationResult(Status.OK);
}

private static void startAndAwaitReady(
BinderTransport.BinderClientTransport transport, TestTransportListener transportListener) {
BinderTransport.BinderClientTransport transport, TestTransportListener transportListener)
throws Exception {
transport.start(transportListener).run();
transportListener.awaitReady();
}

private static final class TestTransportListener implements ManagedClientTransport.Listener {
@GuardedBy("this")
private boolean ready;

public boolean inUse;
@Nullable public Status shutdownStatus;
public boolean terminated;
private final SettableFuture<Boolean> isReady = SettableFuture.create();
private final SettableFuture<Status> shutdownStatus = SettableFuture.create();
private final SettableFuture<Boolean> isTerminated = SettableFuture.create();

@Override
public synchronized void transportShutdown(Status shutdownStatus) {
this.shutdownStatus = shutdownStatus;
notifyAll();
public void transportShutdown(Status shutdownStatus) {
if (!this.shutdownStatus.set(shutdownStatus)) {
throw new IllegalStateException("transportShutdown() already called");
}
}

public synchronized Status awaitShutdown() throws InterruptedException {
while (shutdownStatus == null) {
wait();
}
return shutdownStatus;
public Status awaitShutdown() throws Exception {
return shutdownStatus.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

@Override
public synchronized void transportTerminated() {
terminated = true;
notifyAll();
public void transportTerminated() {
if (!isTerminated.set(true)) {
throw new IllegalStateException("isTerminated() already called");
}
}

public synchronized void awaitTermination() throws InterruptedException {
while (!terminated) {
wait();
}
public void awaitTermination() throws Exception {
isTerminated.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

@Override
public synchronized void transportReady() {
ready = true;
notifyAll();
public void transportReady() {
if (!isReady.set(true)) {
throw new IllegalStateException("isTerminated() already called");
}
}

public synchronized void awaitReady() {
while (!ready) {
try {
wait();
} catch (InterruptedException inte) {
throw new AssertionError("Interrupted waiting for ready");
}
}
public void awaitReady() throws Exception {
isReady.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,37 @@ public void transact(int code, ParcelHolder data) throws RemoteException {
}
}

/**
* A {@link OneWayBinderProxy} decorator whose transact method can be configured to silently drop.
*/
public static final class BlackHoleOneWayBinderProxy extends OneWayBinderProxy {

private final OneWayBinderProxy wrapped;
private boolean dropAllTransactions;

BlackHoleOneWayBinderProxy(OneWayBinderProxy wrapped) {
super(wrapped.getDelegate());
this.wrapped = wrapped;
}

/**
* Causes all future invocations of transact to be silently dropped.
*
* <p>Users are responsible for ensuring their calls "happen-before" the relevant calls to
* {@link #transact(int, ParcelHolder)}.
*/
public void dropAllTransactions(boolean dropAllTransactions) {
this.dropAllTransactions = dropAllTransactions;
}

@Override
public void transact(int code, ParcelHolder data) throws RemoteException {
if (!dropAllTransactions) {
wrapped.transact(code, data);
}
}
}

// Cannot be instantiated.
private OneWayBinderProxies() {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class BinderClientTransportFactory implements ClientTransportFactor
final BindServiceFlags bindServiceFlags;
final InboundParcelablePolicy inboundParcelablePolicy;
final OneWayBinderProxy.Decorator binderDecorator;
final long readyTimeoutMillis;

ScheduledExecutorService executorService;
Executor offloadExecutor;
Expand All @@ -74,6 +75,7 @@ private BinderClientTransportFactory(Builder builder) {
bindServiceFlags = checkNotNull(builder.bindServiceFlags);
inboundParcelablePolicy = checkNotNull(builder.inboundParcelablePolicy);
binderDecorator = checkNotNull(builder.binderDecorator);
readyTimeoutMillis = builder.readyTimeoutMillis;

executorService = scheduledExecutorPool.getObject();
offloadExecutor = offloadExecutorPool.getObject();
Expand Down Expand Up @@ -129,6 +131,7 @@ public static final class Builder implements ClientTransportFactoryBuilder {
BindServiceFlags bindServiceFlags = BindServiceFlags.DEFAULTS;
InboundParcelablePolicy inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT;
OneWayBinderProxy.Decorator binderDecorator = OneWayBinderProxy.IDENTITY_DECORATOR;
long readyTimeoutMillis = -1; // TODO(jdcormie) Set an non-infinite default in a separate PR.

@Override
public BinderClientTransportFactory buildClientTransportFactory() {
Expand Down Expand Up @@ -191,5 +194,31 @@ public Builder setBinderDecorator(OneWayBinderProxy.Decorator binderDecorator) {
this.binderDecorator = checkNotNull(binderDecorator, "binderDecorator");
return this;
}

/**
* Limits how long it can take to for a new transport to become ready after being started.
*
* <p>This process currently includes:
* <ul>
* <li>Creating an Android binding.
* <li>Waiting for Android to create the server process.
* <li>Waiting for the remote Service to be created and handle onBind().
* <li>Exchanging handshake transactions according to the wire protocol.
* <li>Evaluating a {@link SecurityPolicy} on both sides.
* </ul>
*
* <p>This setting doesn't change the need for deadlines at the call level. It merely ensures
* that gRPC features like
* <a href="https://github.com/grpc/grpc/blob/master/doc/load-balancing.md">load balancing</a>
* and <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">fail-fast</a>
* work as expected despite certain edge cases that could otherwise stall the transport
* indefinitely.
*
* <p>Optional. Use a negative value to wait indefinitely.
*/
public Builder setReadyTimeoutMillis(long readyTimeoutMillis) {
this.readyTimeoutMillis = readyTimeoutMillis;
return this;
}
}
}
Loading
Loading