Skip to content

test: add tests for session not found retries for async runner #545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -725,14 +725,9 @@ public void close() {
@Override
public TransactionContext begin() {
this.delegate = session.get().transactionManager();
while (true) {
try {
return internalBegin();
} catch (SessionNotFoundException e) {
session = sessionPool.replaceSession(e, session);
delegate = session.get().delegate.transactionManager();
}
}
// This cannot throw a SessionNotFoundException, as it does not call the BeginTransaction RPC.
// Instead, the BeginTransaction will be included with the first statement of the transaction.
return internalBegin();
}

private TransactionContext internalBegin() {
Expand All @@ -743,7 +738,8 @@ private TransactionContext internalBegin() {

private SpannerException handleSessionNotFound(SessionNotFoundException notFound) {
session = sessionPool.replaceSession(notFound, session);
delegate = session.get().delegate.transactionManager();
PooledSession pooledSession = session.get();
delegate = pooledSession.delegate.transactionManager();
restartedAfterSessionNotFound = true;
return SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, notFound.getMessage(), notFound);
Expand Down Expand Up @@ -784,7 +780,8 @@ public TransactionContext resetForRetry() {
}
} catch (SessionNotFoundException e) {
session = sessionPool.replaceSession(e, session);
delegate = session.get().delegate.transactionManager();
PooledSession pooledSession = session.get();
delegate = pooledSession.delegate.transactionManager();
restartedAfterSessionNotFound = true;
}
}
Expand Down Expand Up @@ -852,7 +849,8 @@ public <T> T run(TransactionCallable<T> callable) {
break;
} catch (SessionNotFoundException e) {
session = sessionPool.replaceSession(e, session);
runner = session.get().delegate.readWriteTransaction();
PooledSession ps = session.get();
runner = ps.delegate.readWriteTransaction();
}
}
session.get().markUsed();
Expand Down Expand Up @@ -893,33 +891,43 @@ public <R> ApiFuture<R> runAsync(final AsyncWork<R> work, Executor executor) {
new Runnable() {
@Override
public void run() {
SpannerException se = null;
SpannerException exception = null;
R r = null;
AsyncRunner runner = null;
while (true) {
SpannerException se = null;
try {
runner = session.get().runAsync();
r = runner.runAsync(work, MoreExecutors.directExecutor()).get();
break;
} catch (ExecutionException e) {
se = SpannerExceptionFactory.newSpannerException(e.getCause());
se = SpannerExceptionFactory.asSpannerException(e.getCause());
} catch (InterruptedException e) {
se = SpannerExceptionFactory.propagateInterrupt(e);
} catch (Throwable t) {
se = SpannerExceptionFactory.newSpannerException(t);
} finally {
if (se != null && se instanceof SessionNotFoundException) {
session = sessionPool.replaceSession((SessionNotFoundException) se, session);
if (se instanceof SessionNotFoundException) {
try {
// The replaceSession method will re-throw the SessionNotFoundException if the
// session cannot be replaced with a new one.
session = sessionPool.replaceSession((SessionNotFoundException) se, session);
se = null;
} catch (SessionNotFoundException e) {
exception = e;
break;
}
} else {
exception = se;
break;
}
}
}
session.get().markUsed();
session.close();
setCommitTimestamp(runner);
if (se != null) {
res.setException(se);
if (exception != null) {
res.setException(exception);
} else {
res.set(r);
}
Expand Down Expand Up @@ -1023,7 +1031,8 @@ public ReadContext singleUse() {
new Function<PooledSessionFuture, ReadContext>() {
@Override
public ReadContext apply(PooledSessionFuture session) {
return session.get().delegate.singleUse();
PooledSession ps = session.get();
return ps.delegate.singleUse();
}
},
SessionPool.this,
Expand All @@ -1042,7 +1051,8 @@ public ReadContext singleUse(final TimestampBound bound) {
new Function<PooledSessionFuture, ReadContext>() {
@Override
public ReadContext apply(PooledSessionFuture session) {
return session.get().delegate.singleUse(bound);
PooledSession ps = session.get();
return ps.delegate.singleUse(bound);
}
},
SessionPool.this,
Expand All @@ -1060,7 +1070,8 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
new Function<PooledSessionFuture, ReadOnlyTransaction>() {
@Override
public ReadOnlyTransaction apply(PooledSessionFuture session) {
return session.get().delegate.singleUseReadOnlyTransaction();
PooledSession ps = session.get();
return ps.delegate.singleUseReadOnlyTransaction();
}
},
true);
Expand All @@ -1072,7 +1083,8 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(final TimestampBound bou
new Function<PooledSessionFuture, ReadOnlyTransaction>() {
@Override
public ReadOnlyTransaction apply(PooledSessionFuture session) {
return session.get().delegate.singleUseReadOnlyTransaction(bound);
PooledSession ps = session.get();
return ps.delegate.singleUseReadOnlyTransaction(bound);
}
},
true);
Expand All @@ -1084,7 +1096,8 @@ public ReadOnlyTransaction readOnlyTransaction() {
new Function<PooledSessionFuture, ReadOnlyTransaction>() {
@Override
public ReadOnlyTransaction apply(PooledSessionFuture session) {
return session.get().delegate.readOnlyTransaction();
PooledSession ps = session.get();
return ps.delegate.readOnlyTransaction();
}
},
false);
Expand All @@ -1096,7 +1109,8 @@ public ReadOnlyTransaction readOnlyTransaction(final TimestampBound bound) {
new Function<PooledSessionFuture, ReadOnlyTransaction>() {
@Override
public ReadOnlyTransaction apply(PooledSessionFuture session) {
return session.get().delegate.readOnlyTransaction(bound);
PooledSession ps = session.get();
return ps.delegate.readOnlyTransaction(bound);
}
},
false);
Expand Down
Loading