Skip to content

Move all test helper classes out of AbstractTransportTest so they can be used elsewhere #12125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 5, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.grpc.binder.internal;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand All @@ -29,11 +28,9 @@
import android.os.Parcel;
import com.google.common.collect.ImmutableList;
import io.grpc.Attributes;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.MockServerTransportListener;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -55,7 +52,7 @@ public final class BinderServerTransportTest {
@Rule public MockitoRule mocks = MockitoJUnit.rule();

private final ScheduledExecutorService executorService = new MainThreadScheduledExecutorService();
private final TestTransportListener transportListener = new TestTransportListener();
private MockServerTransportListener transportListener;

@Mock IBinder mockBinder;

Expand All @@ -70,6 +67,7 @@ public void setUp() throws Exception {
ImmutableList.of(),
OneWayBinderProxy.IDENTITY_DECORATOR,
mockBinder);
transportListener = new MockServerTransportListener(transport);
}

@Test
Expand All @@ -82,34 +80,6 @@ public void testSetupTransactionFailureCausesMultipleShutdowns_b153460678() thro
transport.shutdownNow(Status.UNKNOWN.withDescription("reasons"));
shadowOf(Looper.getMainLooper()).idle();

assertThat(transportListener.terminated).isTrue();
}

private static final class TestTransportListener implements ServerTransportListener {

public boolean ready;
public boolean terminated;

/**
* Called when a new stream was created by the remote client.
*
* @param stream the newly created stream.
* @param method the fully qualified method name being called on the server.
* @param headers containing metadata for the call.
*/
@Override
public void streamCreated(ServerStream stream, String method, Metadata headers) {}

@Override
public Attributes transportReady(Attributes attributes) {
ready = true;
return attributes;
}

@Override
public void transportTerminated() {
checkState(!terminated, "Terminated twice");
terminated = true;
}
assertThat(transportListener.isTerminated()).isTrue();
}
}
360 changes: 78 additions & 282 deletions core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import static org.junit.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ClientStreamListenerBase implements ClientStreamListener {
public final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
// Would have used Void instead of Object, but null elements are not allowed
private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<>();
private final SettableFuture<Metadata> headers = SettableFuture.create();
private final SettableFuture<Metadata> trailers = SettableFuture.create();
private final SettableFuture<Status> status = SettableFuture.create();

/**
* Returns the stream's status or throws {@link java.util.concurrent.TimeoutException} if it isn't
* closed before the timeout.
*/
public Status awaitClose(int timeout, TimeUnit unit) throws Exception {
return status.get(timeout, unit);
}

/**
* Returns response headers from the server or throws {@link
* java.util.concurrent.TimeoutException} if they aren't delivered before the timeout.
*
* <p>Callers must not modify the returned object.
*/
public Metadata awaitHeaders(int timeout, TimeUnit unit) throws Exception {
return headers.get(timeout, unit);
}

/**
* Returns response trailers from the server or throws {@link
* java.util.concurrent.TimeoutException} if they aren't delivered before the timeout.
*
* <p>Callers must not modify the returned object.
*/
public Metadata awaitTrailers(int timeout, TimeUnit unit) throws Exception {
return trailers.get(timeout, unit);
}

public boolean awaitOnReady(int timeout, TimeUnit unit) throws Exception {
return readyQueue.poll(timeout, unit) != null;
}

public boolean awaitOnReadyAndDrain(int timeout, TimeUnit unit) throws Exception {
if (!awaitOnReady(timeout, unit)) {
return false;
}
// Throw the rest away
readyQueue.drainTo(Lists.newArrayList());
return true;
}

@Override
public void messagesAvailable(MessageProducer producer) {
if (status.isDone()) {
fail("messagesAvailable invoked after closed");
}
InputStream message;
while ((message = producer.next()) != null) {
messageQueue.add(message);
}
}

@Override
public void onReady() {
if (status.isDone()) {
fail("onReady invoked after closed");
}
readyQueue.add(new Object());
}

@Override
public void headersRead(Metadata headers) {
if (status.isDone()) {
fail("headersRead invoked after closed");
}
this.headers.set(headers);
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
if (this.status.isDone()) {
fail("headersRead invoked after closed");
}
this.status.set(status);
this.trailers.set(trailers);
}

/** Returns true iff response headers have been received from the server. */
public boolean hasHeaders() {
return headers.isDone();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* A {@link ServerListener} that helps you write blocking unit tests.
*
* <p>TODO: Rename, since this is not actually a mock:
* https://testing.googleblog.com/2013/07/testing-on-toilet-know-your-test-doubles.html
*/
public class MockServerListener implements ServerListener {
private final BlockingQueue<MockServerTransportListener> listeners = new LinkedBlockingQueue<>();
private final SettableFuture<?> shutdown = SettableFuture.create();
private final ServerTransportListenerFactory serverTransportListenerFactory;

/**
* Lets you customize the {@link MockServerTransportListener} installed on newly created
* {@link ServerTransport}s.
*/
public interface ServerTransportListenerFactory {
MockServerTransportListener create(ServerTransport transport);
}

public MockServerListener(ServerTransportListenerFactory serverTransportListenerFactory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to other reviewers: This is the new logic apart from simple refactoring changes, to take a factory for the transport listener whereas the original inner class was using a hardcoded implementation (MockServerTransportListener) by itself.

this.serverTransportListenerFactory = serverTransportListenerFactory;
}

public MockServerListener() {
this(MockServerTransportListener::new);
}

@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
MockServerTransportListener listener = serverTransportListenerFactory.create(transport);
listeners.add(listener);
return listener;
}

@Override
public void serverShutdown() {
assertTrue(shutdown.set(null));
}

public boolean waitForShutdown(long timeout, TimeUnit unit) throws InterruptedException {
return AbstractTransportTest.waitForFuture(shutdown, timeout, unit);
}

public MockServerTransportListener takeListenerOrFail(long timeout, TimeUnit unit)
throws InterruptedException {
MockServerTransportListener listener = listeners.poll(timeout, unit);
if (listener == null) {
fail("Timed out waiting for server transport");
}
return listener;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.Metadata;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* A {@link ServerTransportListener} that helps you write blocking unit tests.
*
* <p>TODO: Rename, since this is not actually a mock:
* https://testing.googleblog.com/2013/07/testing-on-toilet-know-your-test-doubles.html
*/
public class MockServerTransportListener implements ServerTransportListener {
public final ServerTransport transport;
private final BlockingQueue<StreamCreation> streams = new LinkedBlockingQueue<>();
private final SettableFuture<?> terminated = SettableFuture.create();

public MockServerTransportListener(ServerTransport transport) {
this.transport = transport;
}

@Override
public void streamCreated(ServerStream stream, String method, Metadata headers) {
ServerStreamListenerBase listener = new ServerStreamListenerBase();
streams.add(new StreamCreation(stream, method, headers, listener));
stream.setListener(listener);
}

@Override
public Attributes transportReady(Attributes attributes) {
assertFalse(terminated.isDone());
return attributes;
}

@Override
public void transportTerminated() {
assertTrue(terminated.set(null));
}

public boolean waitForTermination(long timeout, TimeUnit unit) throws InterruptedException {
return AbstractTransportTest.waitForFuture(terminated, timeout, unit);
}

public boolean isTerminated() {
return terminated.isDone();
}

public StreamCreation takeStreamOrFail(long timeout, TimeUnit unit) throws InterruptedException {
StreamCreation stream = streams.poll(timeout, unit);
if (stream == null) {
fail("Timed out waiting for server stream");
}
return stream;
}

public static class StreamCreation {
public final ServerStream stream;
public final String method;
public final Metadata headers;
public final ServerStreamListenerBase listener;

public StreamCreation(
ServerStream stream, String method, Metadata headers, ServerStreamListenerBase listener) {
this.stream = stream;
this.method = method;
this.headers = headers;
this.listener = listener;
}
}
}
Loading