From 61d5e632e6c64e273ac064150af91e0a814979dd Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 16 Oct 2020 15:00:20 +0000 Subject: [PATCH 1/6] Make refreshing channel compatible with BigtableDataClientFactory --- .../data/v2/BigtableDataClientFactory.java | 4 +- .../v2/stub/EnhancedBigtableStubSettings.java | 13 ++ .../v2/BigtableDataClientFactoryTest.java | 115 +++++++++++++++++- 3 files changed, 126 insertions(+), 6 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java index b08613cc69..19bfcf703b 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java @@ -24,6 +24,7 @@ import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.FixedWatchdogProvider; import com.google.api.gax.rpc.StubSettings; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import java.io.IOException; import javax.annotation.Nonnull; @@ -189,8 +190,9 @@ public BigtableDataClient createForInstance( } // Update stub settings to use shared resources in this factory - private void patchStubSettings(StubSettings.Builder stubSettings) { + private void patchStubSettings(EnhancedBigtableStubSettings.Builder stubSettings) { stubSettings + .setRefreshingChannel(false) .setTransportChannelProvider( FixedTransportChannelProvider.create(sharedClientContext.getTransportChannel())) .setCredentialsProvider( diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 9b9c71ea8c..6a284276fe 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -42,6 +42,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -806,6 +807,18 @@ public EnhancedBigtableStubSettings build() { Preconditions.checkArgument( getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider, "refreshingChannel only works with InstantiatingGrpcChannelProviders"); + Preconditions.checkArgument(appProfileId != null, + "refreshingChannel only works when appProfileId is set"); + InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = + ((InstantiatingGrpcChannelProvider) getTransportChannelProvider()).toBuilder(); + try { + channelProviderBuilder.setChannelPrimer( + BigtableChannelPrimer.create(getCredentialsProvider().getCredentials(), + projectId, instanceId, appProfileId, primedTableIds)); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.setTransportChannelProvider(channelProviderBuilder.build()); } return new EnhancedBigtableStubSettings(this); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index d322654b81..f900fb82b6 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -18,19 +18,36 @@ import static com.google.common.truth.Truth.assertThat; import com.google.api.core.ApiClock; +import com.google.api.core.ApiFunction; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.WatchdogProvider; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.RowFilter; +import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import io.grpc.Attributes; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerTransportFilter; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.lang.reflect.Method; +import java.net.ServerSocket; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -51,7 +68,7 @@ public class BigtableDataClientFactoryTest { private static final String DEFAULT_INSTANCE_ID = "fake-instance"; private static final String DEFAULT_APP_PROFILE_ID = "fake-app-profile"; - private FakeServiceHelper serviceHelper; + private Server fakeServer; private FakeBigtableService service; private TransportChannelProvider transportChannelProvider; @@ -60,16 +77,38 @@ public class BigtableDataClientFactoryTest { private WatchdogProvider watchdogProvider; private ApiClock apiClock; private BigtableDataSettings defaultSettings; + private int port; + + private final BlockingQueue attributes = new LinkedBlockingDeque<>(); + private final AtomicInteger terminationCount = new AtomicInteger(); @Before public void setUp() throws IOException { service = new FakeBigtableService(); - serviceHelper = new FakeServiceHelper(service); - serviceHelper.start(); + try (ServerSocket ss = new ServerSocket(0)) { + port = ss.getLocalPort(); + } + + fakeServer = ServerBuilder.forPort(port).addService(service) + .addTransportFilter(new ServerTransportFilter() { + @Override + public Attributes transportReady(Attributes transportAttrs) { + attributes.add(transportAttrs); + return super.transportReady(transportAttrs); + } + + @Override + public void transportTerminated(Attributes transportAttrs) { + terminationCount.incrementAndGet(); + super.transportTerminated(transportAttrs); + } + }).build(); + + fakeServer.start(); BigtableDataSettings.Builder builder = - BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort()) + BigtableDataSettings.newBuilderForEmulator(port) .setProjectId(DEFAULT_PROJECT_ID) .setInstanceId(DEFAULT_INSTANCE_ID) .setAppProfileId(DEFAULT_APP_PROFILE_ID); @@ -114,7 +153,7 @@ public void setUp() throws IOException { @After public void tearDown() { - serviceHelper.shutdown(); + fakeServer.shutdown(); } @Test @@ -191,8 +230,73 @@ public void testCreateForInstanceWithAppProfileHasCorrectSettings() throws Excep assertThat(service.lastRequest.getAppProfileId()).isEqualTo("other-app-profile"); } + @Test + public void testCreateWithRefreshingChannel() throws Exception { + String[] tableIds = {"fake-table1", "fake-table2"}; + int poolSize = 3; + BigtableDataSettings.Builder builder = BigtableDataSettings.newBuilderForEmulator(port) + .setProjectId(DEFAULT_PROJECT_ID) + .setInstanceId(DEFAULT_INSTANCE_ID) + .setAppProfileId(DEFAULT_APP_PROFILE_ID) + .setPrimingTableIds(tableIds) + .setRefreshingChannel(true); + InstantiatingGrpcChannelProvider channelProvider = + (InstantiatingGrpcChannelProvider) builder.stubSettings().getTransportChannelProvider(); + InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = channelProvider.toBuilder(); + channelProviderBuilder.setPoolSize(poolSize); + builder.stubSettings().setTransportChannelProvider(channelProviderBuilder.build()); + + BigtableDataClientFactory factory = BigtableDataClientFactory.create(builder.build()); + factory.createDefault(); + factory.createForAppProfile("other-appprofile"); + factory.createForInstance("other-project", "other-instance"); + + // Make sure that the clients are sharing the same ChannelPool + assertThat(attributes).hasSize(poolSize); + // Make sure that prime requests were sent only once per table per connection + assertThat(service.readRowsRequests).hasSize(poolSize * tableIds.length); + List expectedRequests = new LinkedList<>(); + for (String tableId : tableIds) { + for (int i = 0; i < poolSize; i++) { + expectedRequests.add( + ReadRowsRequest.newBuilder() + .setTableName(String.format("projects/%s/instances/%s/tables/%s", + DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID, tableId)) + .setAppProfileId(DEFAULT_APP_PROFILE_ID) + .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("nonexistent-priming-row"))) + .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) + .setRowsLimit(1) + .build() + ); + } + } + assertThat(service.readRowsRequests).containsExactly(expectedRequests.toArray()); + factory.close(); + assertThat(terminationCount.get()).isEqualTo(poolSize); + } + private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase { volatile MutateRowRequest lastRequest; + BlockingQueue readRowsRequests = new LinkedBlockingDeque<>(); + private ApiFunction readRowsCallback = + new ApiFunction() { + @Override + public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) { + return ReadRowsResponse.getDefaultInstance(); + } + }; + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + try { + readRowsRequests.add(request); + responseObserver.onNext(readRowsCallback.apply(request)); + responseObserver.onCompleted(); + } catch (RuntimeException e) { + responseObserver.onError(e); + } + } @Override public void mutateRow( @@ -204,6 +308,7 @@ public void mutateRow( } private static class BuilderAnswer implements Answer { + private final Class targetClass; private T targetInstance; From 45673db1fec87c67863ad0b1d243c7091d3bd27d Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 16 Oct 2020 23:30:12 +0000 Subject: [PATCH 2/6] Fix formatting --- .../data/v2/BigtableDataClientFactory.java | 1 - .../v2/stub/EnhancedBigtableStubSettings.java | 12 ++-- .../v2/BigtableDataClientFactoryTest.java | 64 +++++++++++-------- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java index 19bfcf703b..e5dbb54bbb 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java @@ -23,7 +23,6 @@ import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.FixedWatchdogProvider; -import com.google.api.gax.rpc.StubSettings; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import java.io.IOException; import javax.annotation.Nonnull; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 6a284276fe..9ad52f91dc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -807,14 +807,18 @@ public EnhancedBigtableStubSettings build() { Preconditions.checkArgument( getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider, "refreshingChannel only works with InstantiatingGrpcChannelProviders"); - Preconditions.checkArgument(appProfileId != null, - "refreshingChannel only works when appProfileId is set"); + Preconditions.checkArgument( + appProfileId != null, "refreshingChannel only works when appProfileId is set"); InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = ((InstantiatingGrpcChannelProvider) getTransportChannelProvider()).toBuilder(); try { channelProviderBuilder.setChannelPrimer( - BigtableChannelPrimer.create(getCredentialsProvider().getCredentials(), - projectId, instanceId, appProfileId, primedTableIds)); + BigtableChannelPrimer.create( + getCredentialsProvider().getCredentials(), + projectId, + instanceId, + appProfileId, + primedTableIds)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index f900fb82b6..579f0d9fb4 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -90,20 +90,24 @@ public void setUp() throws IOException { port = ss.getLocalPort(); } - fakeServer = ServerBuilder.forPort(port).addService(service) - .addTransportFilter(new ServerTransportFilter() { - @Override - public Attributes transportReady(Attributes transportAttrs) { - attributes.add(transportAttrs); - return super.transportReady(transportAttrs); - } - - @Override - public void transportTerminated(Attributes transportAttrs) { - terminationCount.incrementAndGet(); - super.transportTerminated(transportAttrs); - } - }).build(); + fakeServer = + ServerBuilder.forPort(port) + .addService(service) + .addTransportFilter( + new ServerTransportFilter() { + @Override + public Attributes transportReady(Attributes transportAttrs) { + attributes.add(transportAttrs); + return super.transportReady(transportAttrs); + } + + @Override + public void transportTerminated(Attributes transportAttrs) { + terminationCount.incrementAndGet(); + super.transportTerminated(transportAttrs); + } + }) + .build(); fakeServer.start(); @@ -234,12 +238,13 @@ public void testCreateForInstanceWithAppProfileHasCorrectSettings() throws Excep public void testCreateWithRefreshingChannel() throws Exception { String[] tableIds = {"fake-table1", "fake-table2"}; int poolSize = 3; - BigtableDataSettings.Builder builder = BigtableDataSettings.newBuilderForEmulator(port) - .setProjectId(DEFAULT_PROJECT_ID) - .setInstanceId(DEFAULT_INSTANCE_ID) - .setAppProfileId(DEFAULT_APP_PROFILE_ID) - .setPrimingTableIds(tableIds) - .setRefreshingChannel(true); + BigtableDataSettings.Builder builder = + BigtableDataSettings.newBuilderForEmulator(port) + .setProjectId(DEFAULT_PROJECT_ID) + .setInstanceId(DEFAULT_INSTANCE_ID) + .setAppProfileId(DEFAULT_APP_PROFILE_ID) + .setPrimingTableIds(tableIds) + .setRefreshingChannel(true); InstantiatingGrpcChannelProvider channelProvider = (InstantiatingGrpcChannelProvider) builder.stubSettings().getTransportChannelProvider(); InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = channelProvider.toBuilder(); @@ -260,14 +265,17 @@ public void testCreateWithRefreshingChannel() throws Exception { for (int i = 0; i < poolSize; i++) { expectedRequests.add( ReadRowsRequest.newBuilder() - .setTableName(String.format("projects/%s/instances/%s/tables/%s", - DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID, tableId)) - .setAppProfileId(DEFAULT_APP_PROFILE_ID) - .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("nonexistent-priming-row"))) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build() - ); + .setTableName( + String.format( + "projects/%s/instances/%s/tables/%s", + DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID, tableId)) + .setAppProfileId(DEFAULT_APP_PROFILE_ID) + .setRows( + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("nonexistent-priming-row"))) + .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) + .setRowsLimit(1) + .build()); } } assertThat(service.readRowsRequests).containsExactly(expectedRequests.toArray()); From 139254dd248d95fbdea3de99f61920aa5ccd6c6a Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Sun, 18 Oct 2020 01:48:01 +0000 Subject: [PATCH 3/6] Fix test case --- .../data/v2/stub/EnhancedBigtableStubSettings.java | 2 -- .../data/v2/BigtableDataClientFactoryTest.java | 10 ---------- 2 files changed, 12 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 9ad52f91dc..600e4b3330 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -807,8 +807,6 @@ public EnhancedBigtableStubSettings build() { Preconditions.checkArgument( getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider, "refreshingChannel only works with InstantiatingGrpcChannelProviders"); - Preconditions.checkArgument( - appProfileId != null, "refreshingChannel only works when appProfileId is set"); InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = ((InstantiatingGrpcChannelProvider) getTransportChannelProvider()).toBuilder(); try { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index 579f0d9fb4..b9d8d733a5 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -47,7 +47,6 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -80,7 +79,6 @@ public class BigtableDataClientFactoryTest { private int port; private final BlockingQueue attributes = new LinkedBlockingDeque<>(); - private final AtomicInteger terminationCount = new AtomicInteger(); @Before public void setUp() throws IOException { @@ -100,12 +98,6 @@ public Attributes transportReady(Attributes transportAttrs) { attributes.add(transportAttrs); return super.transportReady(transportAttrs); } - - @Override - public void transportTerminated(Attributes transportAttrs) { - terminationCount.incrementAndGet(); - super.transportTerminated(transportAttrs); - } }) .build(); @@ -279,8 +271,6 @@ public void testCreateWithRefreshingChannel() throws Exception { } } assertThat(service.readRowsRequests).containsExactly(expectedRequests.toArray()); - factory.close(); - assertThat(terminationCount.get()).isEqualTo(poolSize); } private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase { From 213112311e3d47587f79255d75fb13784dd53213 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 19 Oct 2020 17:32:06 +0000 Subject: [PATCH 4/6] Add ServerTransportFilter in FakeServiceHelper --- .../v2/BigtableDataClientFactoryTest.java | 37 +++++++------------ .../bigtable/data/v2/FakeServiceHelper.java | 12 ++++++ 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index b9d8d733a5..f4bb847d5e 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -36,13 +36,10 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import io.grpc.Attributes; -import io.grpc.Server; -import io.grpc.ServerBuilder; import io.grpc.ServerTransportFilter; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.lang.reflect.Method; -import java.net.ServerSocket; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -67,7 +64,7 @@ public class BigtableDataClientFactoryTest { private static final String DEFAULT_INSTANCE_ID = "fake-instance"; private static final String DEFAULT_APP_PROFILE_ID = "fake-app-profile"; - private Server fakeServer; + private FakeServiceHelper serviceHelper; private FakeBigtableService service; private TransportChannelProvider transportChannelProvider; @@ -83,25 +80,17 @@ public class BigtableDataClientFactoryTest { @Before public void setUp() throws IOException { service = new FakeBigtableService(); - - try (ServerSocket ss = new ServerSocket(0)) { - port = ss.getLocalPort(); - } - - fakeServer = - ServerBuilder.forPort(port) - .addService(service) - .addTransportFilter( - new ServerTransportFilter() { - @Override - public Attributes transportReady(Attributes transportAttrs) { - attributes.add(transportAttrs); - return super.transportReady(transportAttrs); - } - }) - .build(); - - fakeServer.start(); + ServerTransportFilter transportFilter = + new ServerTransportFilter() { + @Override + public Attributes transportReady(Attributes transportAttrs) { + attributes.add(transportAttrs); + return super.transportReady(transportAttrs); + } + }; + serviceHelper = new FakeServiceHelper(null, transportFilter, service); + port = serviceHelper.getPort(); + serviceHelper.start(); BigtableDataSettings.Builder builder = BigtableDataSettings.newBuilderForEmulator(port) @@ -149,7 +138,7 @@ public Attributes transportReady(Attributes transportAttrs) { @After public void tearDown() { - fakeServer.shutdown(); + serviceHelper.shutdown(); } @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java index abd5569702..9ec5e59cb7 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java @@ -19,6 +19,7 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerInterceptor; +import io.grpc.ServerTransportFilter; import java.io.IOException; import java.net.ServerSocket; @@ -33,6 +34,14 @@ public FakeServiceHelper(BindableService... services) throws IOException { public FakeServiceHelper(ServerInterceptor interceptor, BindableService... services) throws IOException { + this(interceptor, null, services); + } + + public FakeServiceHelper( + ServerInterceptor interceptor, + ServerTransportFilter transportFilter, + BindableService... services) + throws IOException { try (ServerSocket ss = new ServerSocket(0)) { port = ss.getLocalPort(); } @@ -40,6 +49,9 @@ public FakeServiceHelper(ServerInterceptor interceptor, BindableService... servi if (interceptor != null) { builder = builder.intercept(interceptor); } + if (transportFilter != null) { + builder = builder.addTransportFilter(transportFilter); + } for (BindableService service : services) { builder = builder.addService(service); } From 2ff40075d061197b3fa3fead5dc8d3c48ddb5f8e Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 21 Oct 2020 17:01:32 +0000 Subject: [PATCH 5/6] Fix credentials provider settings add added test to verify it --- .../data/v2/BigtableDataClientFactory.java | 2 + .../v2/stub/EnhancedBigtableStubSettings.java | 24 +++++--- .../v2/BigtableDataClientFactoryTest.java | 25 ++++++++ .../EnhancedBigtableStubSettingsTest.java | 61 ++++++++++++++++++- 4 files changed, 101 insertions(+), 11 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java index e5dbb54bbb..d4561ab4df 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactory.java @@ -191,6 +191,8 @@ public BigtableDataClient createForInstance( // Update stub settings to use shared resources in this factory private void patchStubSettings(EnhancedBigtableStubSettings.Builder stubSettings) { stubSettings + // Channel refreshing will be configured in the shared ClientContext. Derivative clients + // won't be able to reconfigure the refreshing logic .setRefreshingChannel(false) .setTransportChannelProvider( FixedTransportChannelProvider.create(sharedClientContext.getTransportChannel())) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 600e4b3330..2806420bb4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -20,6 +20,7 @@ import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.GoogleCredentialsProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; @@ -29,6 +30,7 @@ import com.google.api.gax.rpc.StubSettings; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.auth.Credentials; import com.google.cloud.bigtable.Version; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; @@ -809,17 +811,19 @@ public EnhancedBigtableStubSettings build() { "refreshingChannel only works with InstantiatingGrpcChannelProviders"); InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = ((InstantiatingGrpcChannelProvider) getTransportChannelProvider()).toBuilder(); - try { - channelProviderBuilder.setChannelPrimer( - BigtableChannelPrimer.create( - getCredentialsProvider().getCredentials(), - projectId, - instanceId, - appProfileId, - primedTableIds)); - } catch (IOException e) { - throw new RuntimeException(e); + Credentials credentials = null; + if (getCredentialsProvider() != null) { + try { + credentials = getCredentialsProvider().getCredentials(); + } catch (IOException e) { + throw new RuntimeException(e); + } } + // Use shared credentials + this.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); + channelProviderBuilder.setChannelPrimer( + BigtableChannelPrimer.create( + credentials, projectId, instanceId, appProfileId, primedTableIds)); this.setTransportChannelProvider(channelProviderBuilder.build()); } return new EnhancedBigtableStubSettings(this); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index f4bb847d5e..792ecdba6a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -87,6 +87,11 @@ public Attributes transportReady(Attributes transportAttrs) { attributes.add(transportAttrs); return super.transportReady(transportAttrs); } + + @Override + public void transportTerminated(Attributes transportAttrs) { + attributes.add(transportAttrs); + } }; serviceHelper = new FakeServiceHelper(null, transportFilter, service); port = serviceHelper.getPort(); @@ -226,6 +231,11 @@ public void testCreateWithRefreshingChannel() throws Exception { .setAppProfileId(DEFAULT_APP_PROFILE_ID) .setPrimingTableIds(tableIds) .setRefreshingChannel(true); + builder + .stubSettings() + .setCredentialsProvider(credentialsProvider) + .setStreamWatchdogProvider(watchdogProvider) + .setExecutorProvider(executorProvider); InstantiatingGrpcChannelProvider channelProvider = (InstantiatingGrpcChannelProvider) builder.stubSettings().getTransportChannelProvider(); InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = channelProvider.toBuilder(); @@ -237,8 +247,14 @@ public void testCreateWithRefreshingChannel() throws Exception { factory.createForAppProfile("other-appprofile"); factory.createForInstance("other-project", "other-instance"); + // Make sure that only 1 instance is created by each provider + Mockito.verify(credentialsProvider, Mockito.times(1)).getCredentials(); + Mockito.verify(executorProvider, Mockito.times(1)).getExecutor(); + Mockito.verify(watchdogProvider, Mockito.times(1)).getWatchdog(); + // Make sure that the clients are sharing the same ChannelPool assertThat(attributes).hasSize(poolSize); + // Make sure that prime requests were sent only once per table per connection assertThat(service.readRowsRequests).hasSize(poolSize * tableIds.length); List expectedRequests = new LinkedList<>(); @@ -260,9 +276,18 @@ public void testCreateWithRefreshingChannel() throws Exception { } } assertThat(service.readRowsRequests).containsExactly(expectedRequests.toArray()); + attributes.clear(); + + // Wait for all the connections to close asynchronously + factory.close(); + long sleepTimeMs = 1000; + Thread.sleep(sleepTimeMs); + // Verify that all the channels are closed + assertThat(attributes).hasSize(poolSize); } private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase { + volatile MutateRowRequest lastRequest; BlockingQueue readRowsRequests = new LinkedBlockingDeque<>(); private ApiFunction readRowsCallback = diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index 9a3eb874d1..d9273b5fd7 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -19,20 +19,27 @@ import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.WatchdogProvider; +import com.google.auth.Credentials; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Range; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import org.junit.Test; import org.junit.runner.RunWith; @@ -61,7 +68,7 @@ public void settingsAreNotLostTest() { String projectId = "my-project"; String instanceId = "my-instance"; String appProfileId = "my-app-profile-id"; - boolean isRefreshingChannel = true; + boolean isRefreshingChannel = false; String endpoint = "some.other.host:123"; CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class); @@ -612,4 +619,56 @@ public void isRefreshingChannelFalseValueTest() { assertThat(builder.build().isRefreshingChannel()).isFalse(); assertThat(builder.build().toBuilder().isRefreshingChannel()).isFalse(); } + + @Test + public void refreshingChannelSetFixedCredentialProvider() throws Exception { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + FakeCredentials expectedCredentials = new FakeCredentials(); + Mockito.when(credentialsProvider.getCredentials()) + .thenReturn(expectedCredentials, new FakeCredentials(), new FakeCredentials()); + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId) + .setRefreshingChannel(true) + .setCredentialsProvider(credentialsProvider); + assertThat(builder.isRefreshingChannel()).isTrue(); + // Verify that isRefreshing setting is not lost and stubSettings will always return the same + // credential + EnhancedBigtableStubSettings stubSettings = builder.build(); + assertThat(stubSettings.isRefreshingChannel()).isTrue(); + assertThat(stubSettings.getCredentialsProvider()).isInstanceOf(FixedCredentialsProvider.class); + assertThat(stubSettings.getCredentialsProvider().getCredentials()) + .isEqualTo(expectedCredentials); + assertThat(stubSettings.toBuilder().isRefreshingChannel()).isTrue(); + assertThat(stubSettings.toBuilder().getCredentialsProvider().getCredentials()) + .isEqualTo(expectedCredentials); + } + + private static class FakeCredentials extends Credentials { + @Override + public String getAuthenticationType() { + return "fake"; + } + + @Override + public Map> getRequestMetadata(URI uri) throws IOException { + return ImmutableMap.of("my-header", Arrays.asList("fake-credential")); + } + + @Override + public boolean hasRequestMetadata() { + return true; + } + + @Override + public boolean hasRequestMetadataOnly() { + return true; + } + + @Override + public void refresh() throws IOException {} + } } From d26e17a64445146c6fe81463e003324196a04ede Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 22 Oct 2020 16:59:04 +0000 Subject: [PATCH 6/6] Clean up test cases and comments --- .../data/v2/BigtableDataClientFactoryTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index 792ecdba6a..25c341d650 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -75,7 +75,8 @@ public class BigtableDataClientFactoryTest { private BigtableDataSettings defaultSettings; private int port; - private final BlockingQueue attributes = new LinkedBlockingDeque<>(); + private final BlockingQueue setUpAttributes = new LinkedBlockingDeque<>(); + private final BlockingQueue terminateAttributes = new LinkedBlockingDeque<>(); @Before public void setUp() throws IOException { @@ -84,13 +85,13 @@ public void setUp() throws IOException { new ServerTransportFilter() { @Override public Attributes transportReady(Attributes transportAttrs) { - attributes.add(transportAttrs); + setUpAttributes.add(transportAttrs); return super.transportReady(transportAttrs); } @Override public void transportTerminated(Attributes transportAttrs) { - attributes.add(transportAttrs); + terminateAttributes.add(transportAttrs); } }; serviceHelper = new FakeServiceHelper(null, transportFilter, service); @@ -247,13 +248,13 @@ public void testCreateWithRefreshingChannel() throws Exception { factory.createForAppProfile("other-appprofile"); factory.createForInstance("other-project", "other-instance"); - // Make sure that only 1 instance is created by each provider + // Make sure that only 1 instance is created for all clients Mockito.verify(credentialsProvider, Mockito.times(1)).getCredentials(); Mockito.verify(executorProvider, Mockito.times(1)).getExecutor(); Mockito.verify(watchdogProvider, Mockito.times(1)).getWatchdog(); // Make sure that the clients are sharing the same ChannelPool - assertThat(attributes).hasSize(poolSize); + assertThat(setUpAttributes).hasSize(poolSize); // Make sure that prime requests were sent only once per table per connection assertThat(service.readRowsRequests).hasSize(poolSize * tableIds.length); @@ -276,14 +277,13 @@ public void testCreateWithRefreshingChannel() throws Exception { } } assertThat(service.readRowsRequests).containsExactly(expectedRequests.toArray()); - attributes.clear(); // Wait for all the connections to close asynchronously factory.close(); long sleepTimeMs = 1000; Thread.sleep(sleepTimeMs); // Verify that all the channels are closed - assertThat(attributes).hasSize(poolSize); + assertThat(terminateAttributes).hasSize(poolSize); } private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase {