Skip to content

feat: support isolation level REPEATABLE_READ for R/W transactions #3670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 17, 2025
Next Next commit
feat(spanner): Support REPEATABLE_READ for RW transaction
  • Loading branch information
shobhitsg committed Mar 17, 2025
commit 23494f56bcecdf2b9a088c5761addc19a3dc6654
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ default String getDatabaseRole() {
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
* from the backend.
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
* the backend.
* </ul>
*
* @return a response with the timestamp at which the write was committed
Expand Down Expand Up @@ -186,6 +190,10 @@ CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption.
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
* from the backend.
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
* the backend.
* </ul>
*
* @return a response with the timestamp at which the write was committed
Expand Down Expand Up @@ -414,6 +422,10 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
* from the backend.
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
* the backend.
* </ul>
*/
TransactionRunner readWriteTransaction(TransactionOption... options);
Expand Down Expand Up @@ -454,6 +466,10 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
* from the backend.
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
* the backend.
* </ul>
*/
TransactionManager transactionManager(TransactionOption... options);
Expand Down Expand Up @@ -494,6 +510,10 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
* from the backend.
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
* the backend.
* </ul>
*/
AsyncRunner runAsync(TransactionOption... options);
Expand Down Expand Up @@ -548,6 +568,10 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
* from the backend.
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
* the backend.
* </ul>
*/
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import com.google.spanner.v1.ReadRequest.LockHint;
import com.google.spanner.v1.ReadRequest.OrderBy;
import com.google.spanner.v1.RequestOptions.Priority;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Stream;

/** Specifies options for various spanner operations */
public final class Options implements Serializable {
Expand Down Expand Up @@ -131,7 +135,29 @@ public interface UpdateAdminApiOption extends AdminApiOption {}
public interface QueryOption {}

/** Marker interface to mark options applicable to write operations */
public interface TransactionOption {}
public interface TransactionOption {
Predicate<TransactionOption> isValidIsolationLevelOption =
txnOption ->
txnOption instanceof RepeatableReadOption || txnOption instanceof SerializableOption;

/**
* Combines two arrays of TransactionOption, with primaryOptions taking precedence in case of
* conflicts. Note that {@link
* com.google.cloud.spanner.SpannerOptions.Builder.TransactionOptions} supports only the {@link
* IsolationLevel} TransactionOption, meaning spannerOptions will contain a maximum of one
* TransactionOption.
*/
static TransactionOption[] combine(
TransactionOption[] primaryOptions, TransactionOption[] spannerOptions) {
if (spannerOptions == null
|| Arrays.stream(primaryOptions).anyMatch(isValidIsolationLevelOption)) {
return primaryOptions;
} else {
return Stream.concat(Arrays.stream(primaryOptions), Arrays.stream(spannerOptions))
.toArray(TransactionOption[]::new);
}
}
}

/** Marker interface to mark options applicable to update operation. */
public interface UpdateOption {}
Expand Down Expand Up @@ -159,6 +185,22 @@ public static TransactionOption optimisticLock() {
return OPTIMISTIC_LOCK_OPTION;
}

/**
* Specifying this instructs the transaction to request Repeatable Read Isolation Level from the
* backend.
*/
public static TransactionOption repeatableReadIsolationLevel() {
return REPEATABLE_READ_OPTION;
}

/**
* Specifying this instructs the transaction to request Serializable Isolation Level from the
* backend.
*/
public static TransactionOption serializableIsolationLevel() {
return SERIALIZABLE_OPTION;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating two methods, one for each isolation level, this should be one method that takes an IsolationLevel input argument. This means that any future isolation levels that we add can be used without any changes.

Copy link
Contributor Author

@shobhitsg shobhitsg Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely agree with your point. However, IMO, including request options that are not yet fully supported(when we add more IsolationLevel) by the library, even for short duration(e.g. development if that's even required?), can create confusion among lib users. Though the backend will reject unsupported options, their presence without clear documentation could mislead developers who often rely on request options(as it's a valid contract) to determine functionality. Please let me know if I've misunderstood anything and I'd like to know your POV. Here, I've assumed that the proto will always be merged in the main branch before any development could start.


/**
* Specifying this instructs the transaction to be excluded from being recorded in change streams
* with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
Expand Down Expand Up @@ -490,6 +532,26 @@ void appendToOptions(Options options) {
}
}

/** Option to request REPEATABLE READ isolation level for read/write transactions. */
static final class RepeatableReadOption extends InternalOption implements TransactionOption {
@Override
void appendToOptions(Options options) {
options.isolationLevel = IsolationLevel.REPEATABLE_READ;
}
}

static final RepeatableReadOption REPEATABLE_READ_OPTION = new RepeatableReadOption();

/** Option to request SERIALIZABLE isolation level for read/write transactions. */
static final class SerializableOption extends InternalOption implements TransactionOption {
@Override
void appendToOptions(Options options) {
options.isolationLevel = IsolationLevel.SERIALIZABLE;
}
}

static final SerializableOption SERIALIZABLE_OPTION = new SerializableOption();

private boolean withCommitStats;

private Duration maxCommitDelay;
Expand All @@ -512,6 +574,7 @@ void appendToOptions(Options options) {
private RpcOrderBy orderBy;
private RpcLockHint lockHint;
private Boolean lastStatement;
private IsolationLevel isolationLevel;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -664,6 +727,10 @@ LockHint lockHint() {
return lockHint == null ? null : lockHint.proto;
}

IsolationLevel isolationLevel() {
return isolationLevel;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand Down Expand Up @@ -726,6 +793,9 @@ public String toString() {
if (lockHint != null) {
b.append("lockHint: ").append(lockHint).append(' ');
}
if (isolationLevel != null) {
b.append("isolationLevel: ").append(isolationLevel).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -767,7 +837,8 @@ public boolean equals(Object o) {
&& Objects.equals(directedReadOptions(), that.directedReadOptions())
&& Objects.equals(orderBy(), that.orderBy())
&& Objects.equals(isLastStatement(), that.isLastStatement())
&& Objects.equals(lockHint(), that.lockHint());
&& Objects.equals(lockHint(), that.lockHint())
&& Objects.equals(isolationLevel(), that.isolationLevel());
}

@Override
Expand Down Expand Up @@ -833,6 +904,9 @@ public int hashCode() {
if (lockHint != null) {
result = 31 * result + lockHint.hashCode();
}
if (isolationLevel != null) {
result = 31 * result + isolationLevel.hashCode();
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ static TransactionOptions createReadWriteTransactionOptions(
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
}
if (options.isolationLevel() != null) {
transactionOptions.setIsolationLevel(options.isolationLevel());
}
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}
Expand Down Expand Up @@ -239,7 +242,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
setActive(null);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProtoAndReturnRandomMutation(mutations, mutationsProto);
Options options = Options.fromTransactionOptions(transactionOptions);
Options options =
Options.fromTransactionOptions(
TransactionOption.combine(
transactionOptions, this.spanner.getOptions().getTransactionOptions()));
final CommitRequest.Builder requestBuilder =
CommitRequest.newBuilder()
.setSession(getName())
Expand All @@ -252,6 +258,9 @@ public CommitResponse writeAtLeastOnceWithOptions(
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true);
}
if (options.isolationLevel() != null) {
transactionOptionsBuilder.setIsolationLevel(options.isolationLevel());
}
requestBuilder.setSingleUseTransaction(transactionOptionsBuilder);

if (options.hasMaxCommitDelay()) {
Expand Down Expand Up @@ -396,22 +405,37 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
return setActive(new TransactionRunnerImpl(this, options));
return setActive(
new TransactionRunnerImpl(
this,
TransactionOption.combine(options, this.spanner.getOptions().getTransactionOptions())));
}

@Override
public AsyncRunner runAsync(TransactionOption... options) {
return new AsyncRunnerImpl(setActive(new TransactionRunnerImpl(this, options)));
return new AsyncRunnerImpl(
setActive(
new TransactionRunnerImpl(
this,
TransactionOption.combine(
options, this.spanner.getOptions().getTransactionOptions()))));
}

@Override
public TransactionManager transactionManager(TransactionOption... options) {
return new TransactionManagerImpl(this, currentSpan, tracer, options);
return new TransactionManagerImpl(
this,
currentSpan,
tracer,
TransactionOption.combine(options, this.spanner.getOptions().getTransactionOptions()));
}

@Override
public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) {
return new AsyncTransactionManagerImpl(this, currentSpan, options);
return new AsyncTransactionManagerImpl(
this,
currentSpan,
TransactionOption.combine(options, this.spanner.getOptions().getTransactionOptions()));
}

@Override
Expand Down
Loading