Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 06dbf12

Browse files
authored
fix: fix watchdog NPE red herring (#1344)
* fix: fix watchdog NPE red herring * adding a test * checking a few more things in the test * make innercallable volatile
1 parent 32240eb commit 06dbf12

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

gax/src/main/java/com/google/api/gax/rpc/Watchdog.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class WatchdogStream<ResponseT> extends StateCheckingResponseObserver<ResponseT>
183183
private boolean autoAutoFlowControl = true;
184184

185185
private final ResponseObserver<ResponseT> outerResponseObserver;
186-
private StreamController innerController;
186+
private volatile StreamController innerController;
187187

188188
@GuardedBy("lock")
189189
private State state = State.IDLE;
@@ -296,6 +296,12 @@ public void onCompleteImpl() {
296296
* @return True if the stream was canceled.
297297
*/
298298
boolean cancelIfStale() {
299+
// If the stream hasn't started yet, innerController will be null. Skip the check this time
300+
// and return false so the stream is still watched.
301+
if (innerController == null) {
302+
return false;
303+
}
304+
299305
Throwable myError = null;
300306

301307
synchronized (lock) {

gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,31 @@ public void testIdleTimeout() throws InterruptedException {
128128
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
129129
}
130130

131+
@Test
132+
public void testTimedOutBeforeStart() throws InterruptedException {
133+
MockServerStreamingCallable<String, String> callable1 = new MockServerStreamingCallable<>();
134+
AccumulatingObserver<String> downstreamObserver1 = new AccumulatingObserver<>();
135+
ResponseObserver observer = watchdog.watch(downstreamObserver1, waitTime, idleTime);
136+
clock.incrementNanoTime(idleTime.toNanos() + 1);
137+
// This should not remove callable1 from watched list
138+
watchdog.run();
139+
assertThat(downstreamObserver1.done.isDone()).isFalse();
140+
141+
callable1.call("request", observer);
142+
// This should cancel callable1
143+
watchdog.run();
144+
MockServerStreamingCall<String, String> call1 = callable1.popLastCall();
145+
assertThat(call1.getController().isCancelled()).isTrue();
146+
call1.getController().getObserver().onError(new CancellationException("User cancelled"));
147+
Throwable error = null;
148+
try {
149+
downstreamObserver1.done.get();
150+
} catch (ExecutionException t) {
151+
error = t.getCause();
152+
}
153+
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
154+
}
155+
131156
@Test
132157
public void testMultiple() throws Exception {
133158
// Start stream1

0 commit comments

Comments
 (0)