Skip to content

feat: add support for new cloud client test framework in google-cloud-spanner-executor #2217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 61 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
2652512
Add Spanner executor proto and implementation for Read/Mutation and a…
gyang-google Oct 10, 2022
6131cff
Add Spanner executor proto and implementation for Read/Mutation and a…
gyang-google Oct 13, 2022
11c42e1
Add Spanner executor proto and implementation for Read/Mutation and a…
gyang-google Oct 13, 2022
090fadd
Add Spanner executor proto and implementation for Read/Mutation and a…
gyang-google Oct 13, 2022
26317d9
Add Spanner executor proto and implementation for Read/Mutation and a…
gyang-google Oct 13, 2022
2a9ef63
Fix error code mapping.
gyang-google Oct 13, 2022
ec686fe
Remove read_timestamp field in read result as it will never use it.(f…
gyang-google Oct 13, 2022
df4a9eb
Mark all fields in spanner action outcome as optional.
gyang-google Oct 13, 2022
8296ed9
Refactored existing protos according to api dev lint.
gyang-google Nov 18, 2022
2b3dfa5
Added DML/batch DML/query features.
gyang-google Nov 18, 2022
a1a4121
Added support to Write.
gyang-google Nov 18, 2022
921370c
Update batch txn related protos.
gyang-google Nov 22, 2022
86d8968
Add support for batch txn feature.
gyang-google Nov 23, 2022
97d11eb
Add metadata for batch read.
gyang-google Nov 30, 2022
7f5671b
Implement the support for change stream.
gyang-google Nov 30, 2022
4c94983
Update change stream proto fields.
gyang-google Nov 30, 2022
fdcc520
Fix create cloud backup bug.
gyang-google Dec 1, 2022
35f6476
Fix numeric related issue.
gyang-google Dec 6, 2022
3297cce
Make all calls to streamObserver synchronized to prevent concurrency …
gyang-google Dec 6, 2022
57e784c
Rebase onto java-spanner changes.
gyang-google Oct 10, 2022
16a1548
Add Spanner executor proto and implementation for Read/Mutation and a…
gyang-google Oct 13, 2022
3104630
Add Spanner executor proto and implementation for Read/Mutation and a…
gyang-google Oct 13, 2022
06c199d
Add Spanner executor proto and implementation for Read/Mutation and a…
gyang-google Oct 13, 2022
d8e54b6
Add Spanner executor proto and implementation for Read/Mutation and a…
gyang-google Oct 13, 2022
1bd26fb
Fix error code mapping.
gyang-google Oct 13, 2022
11725b8
Remove read_timestamp field in read result as it will never use it.(f…
gyang-google Oct 13, 2022
3ea8e28
Mark all fields in spanner action outcome as optional.
gyang-google Oct 13, 2022
2f83657
Refactored existing protos according to api dev lint.
gyang-google Nov 18, 2022
9c7e4fe
Added DML/batch DML/query features.
gyang-google Nov 18, 2022
00a7133
Added support to Write.
gyang-google Nov 18, 2022
d75ef89
Update batch txn related protos.
gyang-google Nov 22, 2022
491bdc1
Add support for batch txn feature.
gyang-google Nov 23, 2022
2a04be1
Add metadata for batch read.
gyang-google Nov 30, 2022
132c7c7
Implement the support for change stream.
gyang-google Nov 30, 2022
b70dbb1
Update change stream proto fields.
gyang-google Nov 30, 2022
4a16ad1
Fix create cloud backup bug.
gyang-google Dec 1, 2022
bbdd51e
Fix numeric related issue.
gyang-google Dec 6, 2022
ccd2c14
Make all calls to streamObserver synchronized to prevent concurrency …
gyang-google Dec 6, 2022
196360b
Replaces protos with the final released version.
gyang-google Dec 13, 2022
734e5b7
Apply maven-protobuf-plugin to generate grpc stubs, remove grpc folde…
gyang-google Dec 14, 2022
5f4e06e
Remove partitionTestUtil since partitionToken can be fetched now.
gyang-google Dec 14, 2022
e450aca
Merge remote-tracking branch 'git-fork/spanner-executor' into spanner…
gyang-google Dec 14, 2022
bf98484
Remove partitionTestUtil since partitionToken can be fetched now.
gyang-google Dec 14, 2022
b853576
Cleanup.
gyang-google Dec 14, 2022
500a936
Fix pageToken for all the list operations.
gyang-google Dec 15, 2022
52807ec
Rephrase some comments.
gyang-google Dec 15, 2022
3dcc60b
Fix key flag issue.
gyang-google Dec 15, 2022
401285d
Fix some admin actions regarding instanceId args.
gyang-google Dec 16, 2022
51b2b42
Remove unused stuff in metadata.
gyang-google Dec 17, 2022
58089b7
Remove debug message.
gyang-google Dec 17, 2022
d979668
Merge remote-tracking branch 'git-fork/spanner-executor' into spanner…
gyang-google Dec 21, 2022
69069ef
Remove debug message.
gyang-google Dec 21, 2022
6b96f14
Update build files to accommodate with proto dependency changes.
gyang-google Jan 3, 2023
6289715
Merge branch 'googleapis:main' into spanner-executor
gyang-google Jan 3, 2023
57d6596
Update pom file.
gyang-google Jan 3, 2023
5bb0184
Update pom file.
gyang-google Jan 4, 2023
2e6e171
fix: update comments
gyang-google Jan 5, 2023
6d59274
fix: clean up dependencies.
gyang-google Jan 5, 2023
e702f7a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 5, 2023
bd0241f
fix: addressed comments.
gyang-google Jan 7, 2023
ba63018
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add support for batch txn feature.
  • Loading branch information
gyang-google committed Nov 23, 2022
commit 86d89685b73c8e67ffef451315a18d910c20910f
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Backup;
import com.google.cloud.spanner.BatchClient;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.BatchTransactionId;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
Expand All @@ -45,6 +47,8 @@
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation.WriteBuilder;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReadContext;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ReplicaInfo;
Expand Down Expand Up @@ -73,7 +77,9 @@
import com.google.spanner.executor.v1.AdminAction;
import com.google.spanner.executor.v1.AdminResult;
import com.google.spanner.executor.v1.BatchDmlAction;
import com.google.spanner.executor.v1.BatchPartition;
import com.google.spanner.executor.v1.CancelOperationAction;
import com.google.spanner.executor.v1.CloseBatchTransactionAction;
import com.google.spanner.executor.v1.CloudBackupResponse;
import com.google.spanner.executor.v1.CloudDatabaseResponse;
import com.google.spanner.executor.v1.CloudInstanceConfigResponse;
Expand All @@ -86,6 +92,9 @@
import com.google.spanner.executor.v1.DeleteCloudInstanceAction;
import com.google.spanner.executor.v1.DmlAction;
import com.google.spanner.executor.v1.DropCloudDatabaseAction;
import com.google.spanner.executor.v1.ExecutePartitionAction;
import com.google.spanner.executor.v1.GenerateDbPartitionsForQueryAction;
import com.google.spanner.executor.v1.GenerateDbPartitionsForReadAction;
import com.google.spanner.executor.v1.GetCloudBackupAction;
import com.google.spanner.executor.v1.GetCloudDatabaseAction;
import com.google.spanner.executor.v1.GetCloudInstanceAction;
Expand All @@ -103,6 +112,7 @@
import com.google.spanner.executor.v1.OperationResponse;
import com.google.spanner.executor.v1.QueryAction;
import com.google.spanner.executor.v1.RestoreCloudDatabaseAction;
import com.google.spanner.executor.v1.StartBatchTransactionAction;
import com.google.spanner.executor.v1.UpdateCloudBackupAction;
import com.google.spanner.executor.v1.UpdateCloudDatabaseDdlAction;
import com.google.spanner.executor.v1.UpdateCloudInstanceAction;
Expand All @@ -122,6 +132,10 @@
import com.google.spanner.executor.v1.SpannerAsyncActionRequest;
import com.google.spanner.executor.v1.SpannerAsyncActionResponse;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigDecimal;
import java.text.ParseException;
import java.util.ArrayList;
Expand Down Expand Up @@ -500,6 +514,43 @@ public synchronized void startReadWriteTxn(DatabaseClient dbClient, Metadata met
rwTxn.startRWTransaction();
}

/** Start a batch transaction. */
public synchronized Status startBatchTxn(
StartBatchTransactionAction action, BatchClient batchClient, OutcomeSender sender) {
try {
if ((rwTxn != null) || (roTxn != null) || (batchTxn != null)) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Already in a transaction");
}

if (action.hasBatchTxnTime()) {
TimestampBound timestampBound =
TimestampBound.ofReadTimestamp(Timestamp.fromProto(action.getBatchTxnTime()));
batchTxn = batchClient.batchReadOnlyTransaction(timestampBound);
} else if (action.hasTid()) {
BatchTransactionId tId = unmarshall(action.getTid());
batchTxn = batchClient.batchReadOnlyTransaction(tId);
} else {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Either timestamp or tid must be set");
}
SpannerActionOutcome outcome =
SpannerActionOutcome.newBuilder()
.setStatus(toProto(Status.OK))
.setBatchTxnId(marshall(batchTxn.getBatchTransactionId()))
.build();
initReadState();
return sender.sendOutcome(outcome);
} catch (SpannerException e) {
return sender.finishWithError(toStatus(e));
} catch (Exception e) {
return sender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
}
}

/** Increase the read count when a read/query is issued. */
public synchronized void startRead() {
++numPendingReads;
Expand Down Expand Up @@ -771,6 +822,25 @@ private Status executeAction(
} else if (action.hasWrite()) {
return executeMutation(
action.getWrite().getMutation(), outcomeSender, executionContext, /*isWrite=*/ true);
} else if (action.hasStartBatchTxn()) {
if (dbPath == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "database path must be set for this action");
}
BatchClient batchClient = getClient().getBatchClient(getDatabaseId(dbPath));
return executeStartBatchTxn(
action.getStartBatchTxn(), batchClient, outcomeSender, executionContext);
} else if (action.hasGenerateDbPartitionsRead()) {
return executeGenerateDbPartitionsRead(
action.getGenerateDbPartitionsRead(), outcomeSender, executionContext);
} else if (action.hasGenerateDbPartitionsQuery()) {
return executeGenerateDbPartitionsQuery(
action.getGenerateDbPartitionsQuery(), outcomeSender, executionContext);
} else if (action.hasExecutePartition()) {
return executeExecutePartition(
action.getExecutePartition(), outcomeSender, executionContext);
} else if (action.hasCloseBatchTxn()) {
return executeCloseBatchTxn(action.getCloseBatchTxn(), outcomeSender, executionContext);
} else {
return toStatus(
SpannerExceptionFactory.newSpannerException(
Expand Down Expand Up @@ -1631,6 +1701,168 @@ private Status executeCancelOperation(CancelOperationAction action, OutcomeSende
}
}

/** Execute action that start a batch transaction. */
private Status executeStartBatchTxn(
StartBatchTransactionAction action,
BatchClient batchClient,
OutcomeSender sender,
ExecutionFlowContext executionContext) {
LOGGER.log(Level.INFO, "Starting batch transaction");
return executionContext.startBatchTxn(action, batchClient, sender);
}

/** Execute action that finish a batch transaction. */
private Status executeCloseBatchTxn(
CloseBatchTransactionAction action,
OutcomeSender sender,
ExecutionFlowContext executionContext) {
try {
LOGGER.log(Level.INFO, "Closing batch transaction");
if (action.getCleanup()) {
executionContext.closeBatchTxn();
}
return sender.finishWithOK();
} catch (SpannerException e) {
return sender.finishWithError(toStatus(e));
}
}

/** Execute action that generate database partitions for the given read. */
private Status executeGenerateDbPartitionsRead(
GenerateDbPartitionsForReadAction action,
OutcomeSender sender,
ExecutionFlowContext executionContext) {
try {
BatchReadOnlyTransaction batchTxn = executionContext.getBatchTxn();
ReadAction request = action.getRead();

List<com.google.spanner.v1.Type> typeList = new ArrayList<>();
for (int i = 0; i < request.getColumnCount(); ++i) {
typeList.add(executionContext.getColumnType(request.getTable(), request.getColumn(i)));
}
KeySet keySet = keySetProtoToCloudKeySet(request.getKeys(), typeList);
PartitionOptions.Builder partitionOptionsBuilder = PartitionOptions.newBuilder();
if (action.hasDesiredBytesPerPartition() && action.getDesiredBytesPerPartition() > 0) {
partitionOptionsBuilder.setPartitionSizeBytes(action.getDesiredBytesPerPartition());
}
if (action.hasMaxPartitionCount()) {
partitionOptionsBuilder.setMaxPartitions(action.getMaxPartitionCount());
}
List<Partition> parts;
if (request.hasIndex()) {
parts =
batchTxn.partitionReadUsingIndex(
partitionOptionsBuilder.build(),
request.getTable(),
request.getIndex(),
keySet,
request.getColumnList());
} else {
parts =
batchTxn.partitionRead(
partitionOptionsBuilder.build(),
request.getTable(),
keySet,
request.getColumnList());
}
List<BatchPartition> batchPartitions = new ArrayList<>();
for (Partition part : parts) {
batchPartitions.add(
BatchPartition.newBuilder()
.setPartition(marshall(part))
.setTable(request.getTable())
.setIndex(request.getIndex())
.build());
}

SpannerActionOutcome outcome =
SpannerActionOutcome.newBuilder()
.setStatus(toProto(Status.OK))
.addAllDbPartition(batchPartitions)
.build();
return sender.sendOutcome(outcome);
} catch (SpannerException e) {
LOGGER.log(Level.WARNING, String.format("GenerateDbPartitionsRead failed for %s", action));
return sender.finishWithError(toStatus(e));
} catch (Exception e) {
return sender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
}
}

/** Execute action that generate database partitions for the given query. */
private Status executeGenerateDbPartitionsQuery(
GenerateDbPartitionsForQueryAction action,
OutcomeSender sender,
ExecutionFlowContext executionContext) {
try {
BatchReadOnlyTransaction batchTxn = executionContext.getBatchTxn();
Statement.Builder stmt = Statement.newBuilder(action.getQuery().getSql());
for (int i = 0; i < action.getQuery().getParamsCount(); ++i) {
stmt.bind(action.getQuery().getParams(i).getName())
.to(
valueProtoToCloudValue(
action.getQuery().getParams(i).getType(),
action.getQuery().getParams(i).getValue()));
}
PartitionOptions partitionOptions =
PartitionOptions.newBuilder()
.setPartitionSizeBytes(action.getDesiredBytesPerPartition())
.build();
List<Partition> parts = batchTxn.partitionQuery(partitionOptions, stmt.build());
List<BatchPartition> batchPartitions = new ArrayList<>();
for (Partition part : parts) {
batchPartitions.add(BatchPartition.newBuilder().setPartition(marshall(part)).build());
}

SpannerActionOutcome outcome =
SpannerActionOutcome.newBuilder()
.setStatus(toProto(Status.OK))
.addAllDbPartition(batchPartitions)
.build();
return sender.sendOutcome(outcome);
} catch (SpannerException e) {
LOGGER.log(Level.WARNING, String.format("GenerateDbPartitionsQuery failed for %s", action));
return sender.finishWithError(toStatus(e));
} catch (Exception e) {
return sender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
}
}

/** Execute a read or query for the given partitions. */
private Status executeExecutePartition(
ExecutePartitionAction action, OutcomeSender sender, ExecutionFlowContext executionContext) {
try {
BatchReadOnlyTransaction batchTxn = executionContext.getBatchTxn();
ByteString partitionBinary = action.getPartition().getPartition();
if (partitionBinary.size() == 0) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Invalid batchPartition " + action);
}
if (action.getPartition().hasTable()) {
sender.initForRead(action.getPartition().getTable(), action.getPartition().getIndex());
} else {
sender.initForQuery();
}
Partition partition = unmarshall(partitionBinary);
executionContext.startRead();
ResultSet result = batchTxn.execute(partition);
return processResults(result, 0, sender, executionContext);
} catch (SpannerException e) {
return sender.finishWithError(toStatus(e));
} catch (Exception e) {
return sender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
}
}

/**
* Execute action that start a read-write or read-only transaction. For read-write transaction,
* see {@link ReadWriteTransaction}.
Expand All @@ -1639,8 +1871,7 @@ private Status executeStartTxn(
StartTransactionAction action,
DatabaseClient dbClient,
OutcomeSender sender,
ExecutionFlowContext executionContext)
throws Exception {
ExecutionFlowContext executionContext) {
try {
executionContext.updateTransactionSeed(action.getTransactionSeed());
Metadata metadata = new Metadata(action.getTableList());
Expand All @@ -1663,6 +1894,11 @@ private Status executeStartTxn(
return sender.finishWithOK();
} catch (SpannerException e) {
return sender.finishWithError(toStatus(e));
} catch (Exception e) {
return sender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
}
}

Expand Down Expand Up @@ -2768,6 +3004,23 @@ private static com.google.spanner.v1.Type cloudTypeToTypeProto(@NotNull Type clo
}
}

/** Unmarshall ByteString to serializable object. */
private <T extends Serializable> T unmarshall(ByteString input)
throws IOException, ClassNotFoundException {
ObjectInputStream objectInputStream = new ObjectInputStream(input.newInput());
return (T) objectInputStream.readObject();
}

/** Marshall a serializable object into ByteString. */
private <T extends Serializable> ByteString marshall(T object) throws IOException {
ByteString.Output output = ByteString.newOutput();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(output);
objectOutputStream.writeObject(object);
objectOutputStream.flush();
objectOutputStream.close();
return output.toByteString();
}

/** Build Timestamp from micros. */
private Timestamp timestampFromMicros(long micros) {
long seconds = TimeUnit.MICROSECONDS.toSeconds(micros);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,12 @@ message CloseBatchTransactionAction {
message GenerateDbPartitionsForReadAction {
// Read to generate partitions for.
ReadAction read = 1;
// Metadata related to the tables involved in the read.
repeated TableMetadata table = 2;
// Desired size of data in each partition. Spanner doesn't guarantee to
// respect this value.
optional int64 desired_bytes_per_partition = 3;
optional int64 desired_bytes_per_partition = 2;
// If set, the desired max number of partitions. Spanner doesn't guarantee to
// respect this value.
optional int64 max_partition_count = 4;
optional int64 max_partition_count = 3;
}

// Generate database partitions for the given query. Successful outcomes will
Expand Down