1818import static com .google .common .truth .Truth .assertThat ;
1919
2020import com .google .api .core .ApiClock ;
21+ import com .google .api .core .ApiFunction ;
2122import com .google .api .gax .core .CredentialsProvider ;
2223import com .google .api .gax .core .ExecutorProvider ;
24+ import com .google .api .gax .grpc .InstantiatingGrpcChannelProvider ;
2325import com .google .api .gax .rpc .TransportChannelProvider ;
2426import com .google .api .gax .rpc .WatchdogProvider ;
2527import com .google .bigtable .v2 .BigtableGrpc ;
2628import com .google .bigtable .v2 .MutateRowRequest ;
2729import com .google .bigtable .v2 .MutateRowResponse ;
30+ import com .google .bigtable .v2 .ReadRowsRequest ;
31+ import com .google .bigtable .v2 .ReadRowsResponse ;
32+ import com .google .bigtable .v2 .RowFilter ;
33+ import com .google .bigtable .v2 .RowSet ;
2834import com .google .cloud .bigtable .data .v2 .internal .NameUtil ;
2935import com .google .cloud .bigtable .data .v2 .models .RowMutation ;
3036import com .google .common .base .Preconditions ;
37+ import com .google .protobuf .ByteString ;
38+ import io .grpc .Attributes ;
39+ import io .grpc .ServerTransportFilter ;
3140import io .grpc .stub .StreamObserver ;
3241import java .io .IOException ;
3342import java .lang .reflect .Method ;
43+ import java .util .LinkedList ;
44+ import java .util .List ;
45+ import java .util .concurrent .BlockingQueue ;
46+ import java .util .concurrent .LinkedBlockingDeque ;
3447import org .junit .After ;
3548import org .junit .Before ;
3649import org .junit .Rule ;
@@ -60,16 +73,33 @@ public class BigtableDataClientFactoryTest {
6073 private WatchdogProvider watchdogProvider ;
6174 private ApiClock apiClock ;
6275 private BigtableDataSettings defaultSettings ;
76+ private int port ;
77+
78+ private final BlockingQueue <Attributes > setUpAttributes = new LinkedBlockingDeque <>();
79+ private final BlockingQueue <Attributes > terminateAttributes = new LinkedBlockingDeque <>();
6380
6481 @ Before
6582 public void setUp () throws IOException {
6683 service = new FakeBigtableService ();
67-
68- serviceHelper = new FakeServiceHelper (service );
84+ ServerTransportFilter transportFilter =
85+ new ServerTransportFilter () {
86+ @ Override
87+ public Attributes transportReady (Attributes transportAttrs ) {
88+ setUpAttributes .add (transportAttrs );
89+ return super .transportReady (transportAttrs );
90+ }
91+
92+ @ Override
93+ public void transportTerminated (Attributes transportAttrs ) {
94+ terminateAttributes .add (transportAttrs );
95+ }
96+ };
97+ serviceHelper = new FakeServiceHelper (null , transportFilter , service );
98+ port = serviceHelper .getPort ();
6999 serviceHelper .start ();
70100
71101 BigtableDataSettings .Builder builder =
72- BigtableDataSettings .newBuilderForEmulator (serviceHelper . getPort () )
102+ BigtableDataSettings .newBuilderForEmulator (port )
73103 .setProjectId (DEFAULT_PROJECT_ID )
74104 .setInstanceId (DEFAULT_INSTANCE_ID )
75105 .setAppProfileId (DEFAULT_APP_PROFILE_ID );
@@ -191,8 +221,94 @@ public void testCreateForInstanceWithAppProfileHasCorrectSettings() throws Excep
191221 assertThat (service .lastRequest .getAppProfileId ()).isEqualTo ("other-app-profile" );
192222 }
193223
224+ @ Test
225+ public void testCreateWithRefreshingChannel () throws Exception {
226+ String [] tableIds = {"fake-table1" , "fake-table2" };
227+ int poolSize = 3 ;
228+ BigtableDataSettings .Builder builder =
229+ BigtableDataSettings .newBuilderForEmulator (port )
230+ .setProjectId (DEFAULT_PROJECT_ID )
231+ .setInstanceId (DEFAULT_INSTANCE_ID )
232+ .setAppProfileId (DEFAULT_APP_PROFILE_ID )
233+ .setPrimingTableIds (tableIds )
234+ .setRefreshingChannel (true );
235+ builder
236+ .stubSettings ()
237+ .setCredentialsProvider (credentialsProvider )
238+ .setStreamWatchdogProvider (watchdogProvider )
239+ .setExecutorProvider (executorProvider );
240+ InstantiatingGrpcChannelProvider channelProvider =
241+ (InstantiatingGrpcChannelProvider ) builder .stubSettings ().getTransportChannelProvider ();
242+ InstantiatingGrpcChannelProvider .Builder channelProviderBuilder = channelProvider .toBuilder ();
243+ channelProviderBuilder .setPoolSize (poolSize );
244+ builder .stubSettings ().setTransportChannelProvider (channelProviderBuilder .build ());
245+
246+ BigtableDataClientFactory factory = BigtableDataClientFactory .create (builder .build ());
247+ factory .createDefault ();
248+ factory .createForAppProfile ("other-appprofile" );
249+ factory .createForInstance ("other-project" , "other-instance" );
250+
251+ // Make sure that only 1 instance is created for all clients
252+ Mockito .verify (credentialsProvider , Mockito .times (1 )).getCredentials ();
253+ Mockito .verify (executorProvider , Mockito .times (1 )).getExecutor ();
254+ Mockito .verify (watchdogProvider , Mockito .times (1 )).getWatchdog ();
255+
256+ // Make sure that the clients are sharing the same ChannelPool
257+ assertThat (setUpAttributes ).hasSize (poolSize );
258+
259+ // Make sure that prime requests were sent only once per table per connection
260+ assertThat (service .readRowsRequests ).hasSize (poolSize * tableIds .length );
261+ List <ReadRowsRequest > expectedRequests = new LinkedList <>();
262+ for (String tableId : tableIds ) {
263+ for (int i = 0 ; i < poolSize ; i ++) {
264+ expectedRequests .add (
265+ ReadRowsRequest .newBuilder ()
266+ .setTableName (
267+ String .format (
268+ "projects/%s/instances/%s/tables/%s" ,
269+ DEFAULT_PROJECT_ID , DEFAULT_INSTANCE_ID , tableId ))
270+ .setAppProfileId (DEFAULT_APP_PROFILE_ID )
271+ .setRows (
272+ RowSet .newBuilder ()
273+ .addRowKeys (ByteString .copyFromUtf8 ("nonexistent-priming-row" )))
274+ .setFilter (RowFilter .newBuilder ().setBlockAllFilter (true ).build ())
275+ .setRowsLimit (1 )
276+ .build ());
277+ }
278+ }
279+ assertThat (service .readRowsRequests ).containsExactly (expectedRequests .toArray ());
280+
281+ // Wait for all the connections to close asynchronously
282+ factory .close ();
283+ long sleepTimeMs = 1000 ;
284+ Thread .sleep (sleepTimeMs );
285+ // Verify that all the channels are closed
286+ assertThat (terminateAttributes ).hasSize (poolSize );
287+ }
288+
194289 private static class FakeBigtableService extends BigtableGrpc .BigtableImplBase {
290+
195291 volatile MutateRowRequest lastRequest ;
292+ BlockingQueue <ReadRowsRequest > readRowsRequests = new LinkedBlockingDeque <>();
293+ private ApiFunction <ReadRowsRequest , ReadRowsResponse > readRowsCallback =
294+ new ApiFunction <ReadRowsRequest , ReadRowsResponse >() {
295+ @ Override
296+ public ReadRowsResponse apply (ReadRowsRequest readRowsRequest ) {
297+ return ReadRowsResponse .getDefaultInstance ();
298+ }
299+ };
300+
301+ @ Override
302+ public void readRows (
303+ ReadRowsRequest request , StreamObserver <ReadRowsResponse > responseObserver ) {
304+ try {
305+ readRowsRequests .add (request );
306+ responseObserver .onNext (readRowsCallback .apply (request ));
307+ responseObserver .onCompleted ();
308+ } catch (RuntimeException e ) {
309+ responseObserver .onError (e );
310+ }
311+ }
196312
197313 @ Override
198314 public void mutateRow (
@@ -204,6 +320,7 @@ public void mutateRow(
204320 }
205321
206322 private static class BuilderAnswer <T > implements Answer {
323+
207324 private final Class <T > targetClass ;
208325 private T targetInstance ;
209326
0 commit comments