Skip to content

feat: add support for BatchWriteAtLeastOnce #2520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7ff7cb2
feat: add support for BatchWriteAtleastOnce
rajatbhatta Jun 5, 2023
43c4f63
test: add batchwrite() support to MockSpannerServiceImpl
rajatbhatta Jun 5, 2023
abedd15
test: add commit timestamp to proto
rajatbhatta Jun 5, 2023
21d8f67
test: add commit timestamp to proto
rajatbhatta Jun 5, 2023
4858965
test: add commit timestamp to proto
rajatbhatta Jun 12, 2023
81743a7
consume the stream in tests
rajatbhatta Jun 12, 2023
65abc96
refactor tests
rajatbhatta Jun 12, 2023
8d76346
refactor tests
rajatbhatta Jun 14, 2023
a3569ae
test if mutations are correctly applied
rajatbhatta Jun 20, 2023
4cb0ada
null check
rajatbhatta Jun 20, 2023
5401b7d
skip for emulator
rajatbhatta Jun 20, 2023
5103b5c
Merge branch 'googleapis:main' into batch-write
rajatbhatta Jun 20, 2023
95c3326
add method documentation
rajatbhatta Jun 20, 2023
5678207
add method documentation
rajatbhatta Jun 20, 2023
347e9c3
add method documentation
rajatbhatta Jun 20, 2023
4c7251f
Merge branch 'googleapis:main' into batch-write
rajatbhatta Jul 6, 2023
1deaa29
remove autogenerated code
rajatbhatta Jul 6, 2023
acece36
remove autogenerated tests
rajatbhatta Jul 6, 2023
5f04154
batchWriteAtleastOnce -> batchWriteAtLeastOnce
rajatbhatta Jul 6, 2023
8e15c5c
batchWriteAtleastOnceWithOptions -> batchWriteAtLeastOnceWithOptions
rajatbhatta Jul 6, 2023
56f3929
changes based on updated batch write API
rajatbhatta Aug 17, 2023
356849e
add copyright and doc
rajatbhatta Aug 17, 2023
07f3bfb
Merge branch 'main' into batch-write-review
rajatbhatta Aug 17, 2023
1ca0d31
address review comments
rajatbhatta Aug 22, 2023
c3e3d7b
address review comments
rajatbhatta Aug 22, 2023
a036ced
add more documentation
rajatbhatta Aug 25, 2023
d8ef791
Merge branch 'googleapis:main' into batch-write-review
rajatbhatta Sep 18, 2023
7c4c73d
Merge branch 'googleapis:main' into batch-write-review
arpan14 Sep 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
changes based on updated batch write API
  • Loading branch information
rajatbhatta committed Aug 17, 2023
commit 56f39293e4ea80a4b2a9fab9207068aed7a2d426
Original file line number Diff line number Diff line change
Expand Up @@ -194,75 +194,58 @@ CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException;

/**
* Batches the supplied mutations in a collection of efficient transactions. The mutations are
* applied non-atomically in an unspecified order and thus, they must be independent of each
* other. Partial failure is possible, i.e., some mutations may have been applied successfully,
* while some may have failed. The results of individual batches are streamed into the response as
* and when the batches are applied.
* Applies batch of mutation groups in a collection of efficient transactions. The mutation groups
* are applied non-atomically in an unspecified order and thus, they must be independent of each
* other. Partial failure is possible, i.e., some mutation groups may have been applied
* successfully, while some may have failed. The results of individual batches are streamed into
* the response as and when the batches are applied.
*
* <p>Since this method does not feature replay protection, it may attempt to apply {@code
* mutations} more than once; if the mutations are not idempotent, this may lead to a failure
* being reported when the mutation was applied once. For example, an insert may fail with {@link
* ErrorCode#ALREADY_EXISTS} even though the row did not exist before this method was called. For
* this reason, most users of the library will prefer to use {@link #write(Iterable)} instead.
* However, {@code batchWriteAtLeastOnce()} method may be appropriate for non-atomically
* committing multiple mutations in a single RPC with low latency.
* mutation groups} more than once; if the mutation groups are not idempotent, this may lead to a
* failure being reported when the mutation group was applied once. For example, an insert may
* fail with {@link ErrorCode#ALREADY_EXISTS} even though the row did not exist before this method
* was called. For this reason, most users of the library will prefer to use {@link
* #write(Iterable)} instead. However, {@code batchWriteAtLeastOnce()} method may be appropriate
* for non-atomically committing multiple mutation groups in a single RPC with low latency.
*
* <p>Example of BatchWriteAtleastOnce
* <p>Example of BatchWriteAtLeastOnce
*
* <pre>{@code
* long singerId = my_singer_id;
* Mutation mutation = Mutation.newInsertBuilder("Singers")
* .set("SingerId")
* .to(singerId)
* .set("FirstName")
* .to("Billy")
* .set("LastName")
* .to("Joel")
* .build();
* ServerStream<BatchWriteResponse> responses =
* dbClient.batchWriteAtLeastOnce(Collections.singletonList(mutation));
* dbClient.batchWriteAtLeastOnceWithOptions(
* ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2));
* for (BatchWriteResponse response : responses) {
* // Do something when a response is received.
* }
* }</pre>
*
* @return ServerStream\<BatchWriteResponse>
*/
ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(Iterable<Mutation> mutations)
ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(Iterable<MutationGroup> mutationGroups)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should I create multiple MutationGroups instead of just one large mutation group? What's the tradeoff between a large number of small MutationGroups vs a few large MutationGroups? Is there any limits on the number of mutations that I may have in a MutationGroup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should I create multiple MutationGroups instead of just one large mutation group?

Dependent mutations (to be committed together) should be part of a single MutationGroup.

For example, the following should be part of same MutationGroup:

  • Inserting rows into both parent and child tables. If they're part of different MutationGroups, it will result in a failure when a commit on the child table’s row is attempted first.
  • Inserting rows into tables T1, T2; T2 has a foreign key pointing to T1. If they're part of different MutationGroups, it will result in a failure when a commit on T2 is attempted first.

If there's a need to create just one large MutationGroup, it's better to use the existing Commit RPC for it.

What's the tradeoff between a large number of small MutationGroups vs a few large MutationGroups?

Depends on the use case. The bottomline is we should only put mutations meant to be committed together in a group, otherwise, should prefer a different group.

Is there any limits on the number of mutations that I may have in a MutationGroup?

Considering that in the base case, each mutation group could be committed into its own transaction, which has a limit of 40k mutations, the same limit would apply to the MutationGroup also.

throws SpannerException;

/**
* Batches the supplied mutations in a collection of efficient transactions. The mutations are
* applied non-atomically in an unspecified order and thus, they must be independent of each
* other. Partial failure is possible, i.e., some mutations may have been applied successfully,
* while some may have failed. The results of individual batches are streamed into the response as
* and when the batches are applied.
* Applies batch of mutation groups in a collection of efficient transactions. The mutation groups
* are applied non-atomically in an unspecified order and thus, they must be independent of each
* other. Partial failure is possible, i.e., some mutation groups may have been applied
* successfully, while some may have failed. The results of individual batches are streamed into
* the response as and when the batches are applied.
*
* <p>Since this method does not feature replay protection, it may attempt to apply {@code
* mutations} more than once; if the mutations are not idempotent, this may lead to a failure
* being reported when the mutation was applied once. For example, an insert may fail with {@link
* ErrorCode#ALREADY_EXISTS} even though the row did not exist before this method was called. For
* this reason, most users of the library will prefer to use {@link #write(Iterable)} instead.
* However, {@code batchWriteAtLeastOnce()} method may be appropriate for non-atomically
* committing multiple mutations in a single RPC with low latency.
* mutation groups} more than once; if the mutation groups are not idempotent, this may lead to a
* failure being reported when the mutation group was applied once. For example, an insert may
* fail with {@link ErrorCode#ALREADY_EXISTS} even though the row did not exist before this method
* was called. For this reason, most users of the library will prefer to use {@link
* #write(Iterable)} instead. However, {@code batchWriteAtLeastOnce()} method may be appropriate
* for non-atomically committing multiple mutation groups in a single RPC with low latency.
*
* <p>Example of BatchWriteAtleastOnceWithOptions
* <p>Example of BatchWriteAtLeastOnceWithOptions
*
* <pre>{@code
* long singerId = my_singer_id;
* Mutation mutation = Mutation.newInsertBuilder("Singers")
* .set("SingerId")
* .to(singerId)
* .set("FirstName")
* .to("Billy")
* .set("LastName")
* .to("Joel")
* .build();
* ServerStream<BatchWriteResponse> responses =
* dbClient.batchWriteAtLeastOnce(
* Collections.singletonList(mutation),
* Options.priority(RpcPriority.LOW));
* dbClient.batchWriteAtLeastOnceWithOptions(
* ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
* Options.tag("batch-write-tag"));
* for (BatchWriteResponse response : responses) {
* // Do something when a response is received.
* }
Expand All @@ -279,7 +262,7 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(Iterable<Mutation> mutati
* @return ServerStream\<BatchWriteResponse>
*/
ServerStream<BatchWriteResponse> batchWriteAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException;
Iterable<MutationGroup> mutationGroups, TransactionOption... options) throws SpannerException;

/**
* Returns a context in which a single read can be performed using {@link TimestampBound#strong()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,19 @@ public CommitResponse writeAtLeastOnceWithOptions(
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(final Iterable<Mutation> mutations)
throws SpannerException {
return batchWriteAtLeastOnceWithOptions(mutations);
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups) throws SpannerException {
return batchWriteAtLeastOnceWithOptions(mutationGroups);
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
session -> session.batchWriteAtLeastOnceWithOptions(mutations, options));
session -> session.batchWriteAtLeastOnceWithOptions(mutationGroups, options));
} catch (RuntimeException e) {
TraceUtil.setWithFailure(span, e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.google.cloud.spanner;

import com.google.common.base.Preconditions;
import com.google.spanner.v1.BatchWriteRequest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MutationGroup {
private final List<Mutation> mutations;

private MutationGroup(List<Mutation> mutations) {
this.mutations = mutations;
}

public static MutationGroup of(Mutation... mutations) {
Preconditions.checkArgument(mutations.length > 0, "Should pass in at least one mutation.");
return new MutationGroup(Arrays.asList(mutations));
}

public List<Mutation> getMutations() {
return mutations;
}

static BatchWriteRequest.MutationGroup toProto(final MutationGroup mutationGroup) {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutationGroup.getMutations(), mutationsProto);
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
}

static List<BatchWriteRequest.MutationGroup> toListProto(
final Iterable<MutationGroup> mutationGroups) {
List<BatchWriteRequest.MutationGroup> mutationGroupsProto = new ArrayList<>();
for (MutationGroup group : mutationGroups) {
mutationGroupsProto.add(toProto(group));
}
return mutationGroupsProto;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,21 @@ public CommitResponse writeAtLeastOnceWithOptions(
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(Iterable<Mutation> mutations)
throws SpannerException {
return batchWriteAtLeastOnceWithOptions(mutations);
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups) throws SpannerException {
return batchWriteAtLeastOnceWithOptions(mutationGroups);
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... transactionOptions)
Iterable<MutationGroup> mutationGroups, TransactionOption... transactionOptions)
throws SpannerException {
setActive(null);
Options batchWriteRequestOptions = Options.fromTransactionOptions(transactionOptions);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
List<BatchWriteRequest.MutationGroup> mutationGroupsProto =
MutationGroup.toListProto(mutationGroups);
final BatchWriteRequest.Builder requestBuilder =
BatchWriteRequest.newBuilder().setSession(name).addAllMutations(mutationsProto);
BatchWriteRequest.newBuilder().setSession(name).addAllMutationGroups(mutationGroupsProto);
if (batchWriteRequestOptions.hasPriority() || batchWriteRequestOptions.hasTag()) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (batchWriteRequestOptions.hasPriority()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,16 +1173,17 @@ public CommitResponse writeAtLeastOnceWithOptions(
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(Iterable<Mutation> mutations)
throws SpannerException {
return batchWriteAtLeastOnceWithOptions(mutations);
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups) throws SpannerException {
return batchWriteAtLeastOnceWithOptions(mutationGroups);
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
throws SpannerException {
try {
return get().batchWriteAtLeastOnceWithOptions(mutations, options);
return get().batchWriteAtLeastOnceWithOptions(mutationGroups, options);
} finally {
close();
}
Expand Down Expand Up @@ -1436,17 +1437,18 @@ public CommitResponse writeAtLeastOnceWithOptions(
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(Iterable<Mutation> mutations)
throws SpannerException {
return batchWriteAtLeastOnceWithOptions(mutations);
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups) throws SpannerException {
return batchWriteAtLeastOnceWithOptions(mutationGroups);
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
throws SpannerException {
try {
markUsed();
return delegate.batchWriteAtLeastOnceWithOptions(mutations, options);
return delegate.batchWriteAtLeastOnceWithOptions(mutationGroups, options);
} catch (SpannerException e) {
throw lastException = e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,19 @@ public class DatabaseClientImplTest {
private static final long UPDATE_COUNT = 1L;
private static final com.google.rpc.Status STATUS_OK =
com.google.rpc.Status.newBuilder().setCode(com.google.rpc.Code.OK_VALUE).build();
private static final Iterable<Mutation> MUTATIONS =
private static final Iterable<MutationGroup> MUTATION_GROUPS =
ImmutableList.of(
Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(),
Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build(),
Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(),
Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build());
MutationGroup.of(
Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(),
Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()),
MutationGroup.of(
Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(),
Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build()),
MutationGroup.of(
Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build(),
Mutation.newInsertBuilder("FOO5").set("ID").to(5L).set("NAME").to("Bar5").build()),
MutationGroup.of(
Mutation.newInsertBuilder("FOO6").set("ID").to(6L).set("NAME").to("Bar6").build()));
private static final Iterable<BatchWriteResponse> BATCH_WRITE_RESPONSES =
ImmutableList.of(
BatchWriteResponse.newBuilder()
Expand Down Expand Up @@ -339,7 +346,7 @@ public void testBatchWriteAtLeastOnce() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));

ServerStream<BatchWriteResponse> responseStream = client.batchWriteAtLeastOnce(MUTATIONS);
ServerStream<BatchWriteResponse> responseStream = client.batchWriteAtLeastOnce(MUTATION_GROUPS);
int idx = 0;
for (BatchWriteResponse response : responseStream) {
assertEquals(
Expand All @@ -353,7 +360,7 @@ public void testBatchWriteAtLeastOnce() {
List<BatchWriteRequest> requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class);
assertEquals(requests.size(), 1);
BatchWriteRequest request = requests.get(0);
assertEquals(request.getMutationsCount(), 4);
assertEquals(request.getMutationGroupsCount(), 4);
assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_UNSPECIFIED);
}

Expand All @@ -362,14 +369,14 @@ public void testBatchWriteAtLeastOnceWithOptions() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
ServerStream<BatchWriteResponse> responseStream =
client.batchWriteAtLeastOnceWithOptions(MUTATIONS, Options.priority(RpcPriority.LOW));
client.batchWriteAtLeastOnceWithOptions(MUTATION_GROUPS, Options.priority(RpcPriority.LOW));
for (BatchWriteResponse response : responseStream) {}

assertNotNull(responseStream);
List<BatchWriteRequest> requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class);
assertEquals(requests.size(), 1);
BatchWriteRequest request = requests.get(0);
assertEquals(request.getMutationsCount(), 4);
assertEquals(request.getMutationGroupsCount(), 4);
assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_LOW);
}

Expand All @@ -378,14 +385,15 @@ public void testBatchWriteAtLeastOnceWithTagOptions() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
ServerStream<BatchWriteResponse> responseStream =
client.batchWriteAtLeastOnceWithOptions(MUTATIONS, Options.tag("app=spanner,env=test"));
client.batchWriteAtLeastOnceWithOptions(
MUTATION_GROUPS, Options.tag("app=spanner,env=test"));
for (BatchWriteResponse response : responseStream) {}

assertNotNull(responseStream);
List<BatchWriteRequest> requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class);
assertEquals(requests.size(), 1);
BatchWriteRequest request = requests.get(0);
assertEquals(request.getMutationsCount(), 4);
assertEquals(request.getMutationGroupsCount(), 4);
assertEquals(request.getRequestOptions().getTransactionTag(), "app=spanner,env=test");
assertThat(request.getRequestOptions().getRequestTag()).isEmpty();
}
Expand Down
Loading