Skip to content

feat: support multiplexed session for blind write with single use transaction #3229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(spanner): support multiplexed session for blind write with singl…
…e use transaction.
  • Loading branch information
pratickchokhani committed Jul 30, 2024
commit 3cf0cd404a7f54ad67708391ebc7985f393722a6
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,7 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
throw new UnsupportedOperationException();
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,28 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;

final boolean useMultiplexedSessionForBlindWrite;

@VisibleForTesting
DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
this("", pool, /* multiplexedSessionDatabaseClient = */ null, false, tracer);
}

@VisibleForTesting
DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, false, tracer);
}

DatabaseClientImpl(
String clientId,
SessionPool pool,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
boolean useMultiplexedSessionForBlindWrite,
TraceWrapper tracer) {
this.clientId = clientId;
this.pool = pool;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.useMultiplexedSessionForBlindWrite = useMultiplexedSessionForBlindWrite;
this.tracer = tracer;
}

Expand Down Expand Up @@ -112,6 +116,16 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
if (useMultiplexedSessionForBlindWrite) {
return getMultiplexedSession().writeAtLeastOnceWithOptions(mutations, options);
} else {
return writeAtLeastOnceWithSession(mutations, options);
}
}

public CommitResponse writeAtLeastOnceWithSession(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;

/**
* Represents a delayed execution of a transaction on a multiplexed session. The execution is
Expand Down Expand Up @@ -119,4 +121,27 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.readOnlyTransaction(bound),
MoreExecutors.directExecutor()));
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
try {
return ApiFutures.transform(
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, false)
.writeAtLeastOnceWithOptions(mutations, options),
MoreExecutors.directExecutor()).get();
} catch (ExecutionException executionException) {
// Propagate the underlying exception as a RuntimeException (SpannerException is also a
// RuntimeException).
if (executionException.getCause() instanceof RuntimeException) {
throw (RuntimeException) executionException.getCause();
}
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -358,6 +360,12 @@ private int getSingleUseChannelHint() {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
return createMultiplexedSessionTransaction(true)
.writeAtLeastOnceWithOptions(mutations, options);
}
@Override
public ReadContext singleUse() {
return createMultiplexedSessionTransaction(true).singleUse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2185,15 +2185,13 @@ public CommitResponse writeWithOptions(

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session");
return this.delegate.writeAtLeastOnce(mutations);
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session");
return this.delegate.writeAtLeastOnceWithOptions(mutations, options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class SessionPoolOptions {

private final boolean useMultiplexedSession;

private final boolean useMultiplexedSessionForBlindWrite;

// TODO: Change to use java.time.Duration.
private final Duration multiplexedSessionMaintenanceDuration;

Expand Down Expand Up @@ -108,6 +110,12 @@ private SessionPoolOptions(Builder builder) {
(useMultiplexedSessionFromEnvVariable != null)
? useMultiplexedSessionFromEnvVariable
: builder.useMultiplexedSession;
// useMultiplexedSessionForBlindWrite priority => Environment var > private setter > client default
Boolean useMultiplexedSessionBlindWriteFromEnvVariable = getUseMultiplexedSessionBlindWriteFromEnvVariable();
this.useMultiplexedSessionForBlindWrite =
(useMultiplexedSessionBlindWriteFromEnvVariable != null)
? useMultiplexedSessionBlindWriteFromEnvVariable
: builder.useMultiplexedSessionForBlindWrite;
this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration;
}

Expand Down Expand Up @@ -307,6 +315,28 @@ public boolean getUseMultiplexedSession() {
return useMultiplexedSession;
}

@VisibleForTesting
@InternalApi
public boolean getUseMultiplexedSessionForBlindWrite() {
return useMultiplexedSessionForBlindWrite;
}

private static Boolean getUseMultiplexedSessionBlindWriteFromEnvVariable() {
String useMultiplexedSessionFromEnvVariable =
System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_BLIND_WRITES");
if (useMultiplexedSessionFromEnvVariable != null
&& useMultiplexedSessionFromEnvVariable.length() > 0) {
if ("true".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable)
|| "false".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable)) {
return Boolean.parseBoolean(useMultiplexedSessionFromEnvVariable);
} else {
throw new IllegalArgumentException(
"GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_BLIND_WRITES should be either true or false.");
}
}
return null;
}

private static Boolean getUseMultiplexedSessionFromEnvVariable() {
String useMultiplexedSessionFromEnvVariable =
System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS");
Expand Down Expand Up @@ -529,6 +559,11 @@ public static class Builder {
// Set useMultiplexedSession to true to make multiplexed session the default.
private boolean useMultiplexedSession = false;

// This field controls the default behavior of session management in Java client.
// Set useMultiplexedSessionForBlindWrite to true to make multiplexed session the default for
// blind writes.
private boolean useMultiplexedSessionForBlindWrite = false;

private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
private Clock poolMaintainerClock = Clock.INSTANCE;

Expand Down Expand Up @@ -570,6 +605,7 @@ private Builder(SessionPoolOptions options) {
this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.useMultiplexedSession = options.useMultiplexedSession;
this.useMultiplexedSessionForBlindWrite = options.useMultiplexedSessionForBlindWrite;
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
this.poolMaintainerClock = options.poolMaintainerClock;
}
Expand Down Expand Up @@ -757,6 +793,22 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
return this;
}

/**
* Sets whether the client should use multiplexed session or not for writeAtLeastOnce. If set to
* true, the client optimises and runs multiple applicable requests concurrently on a single
* session. A single multiplexed session is sufficient to handle all concurrent traffic.
*
* <p>When set to false, the client uses the regular session cached in the session pool for
* running 1 concurrent transaction per session. We require to provision sufficient sessions by
* making use of {@link SessionPoolOptions#minSessions} and {@link
* SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in
* higher latencies.
*/
Builder setUseMultiplexedSessionForBlindWrite(boolean useMultiplexedSessionForBlindWrite) {
this.useMultiplexedSessionForBlindWrite = useMultiplexedSessionForBlindWrite;
return this;
}

@VisibleForTesting
Builder setMultiplexedSessionMaintenanceDuration(
Duration multiplexedSessionMaintenanceDuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {

boolean useMultiplexedSession =
getOptions().getSessionPoolOptions().getUseMultiplexedSession();
boolean useMultiplexedSessionForBlindWrite =
getOptions().getSessionPoolOptions().getUseMultiplexedSessionForBlindWrite();
MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient =
useMultiplexedSession
? new MultiplexedSessionDatabaseClient(SpannerImpl.this.getSessionClient(db))
Expand All @@ -300,7 +302,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
numMultiplexedSessionsReleased);
pool.maybeWaitOnMinSessions();
DatabaseClientImpl dbClient =
createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient);
createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient,
useMultiplexedSessionForBlindWrite);
dbClients.put(db, dbClient);
return dbClient;
}
Expand All @@ -311,8 +314,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
DatabaseClientImpl createDatabaseClient(
String clientId,
SessionPool pool,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) {
return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer);
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient,
boolean multiplexedSessionForBlindWrite) {
return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient,
multiplexedSessionForBlindWrite, tracer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl {

@Override
DatabaseClientImpl createDatabaseClient(
String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) {
String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore,
boolean multiplexedSessionForBlindWrite) {
return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void createSpannerInstance() {
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setUseMultiplexedSession(true)
.setUseMultiplexedSessionForBlindWrite(true)
// Set the maintainer to loop once every 1ms
.setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L))
// Set multiplexed sessions to be replaced once every 1ms
Expand Down