Skip to content

feat: PostgreSQL dialect databases #1673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 10, 2022
Prev Previous commit
Next Next commit
test: add parameterization for ITBatchReadTest
  • Loading branch information
olavloite committed Feb 10, 2022
commit 5318ec46ec07bff7f4a2715b53a849d07f443ad3
1 change: 0 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
8001: com.google.cloud.spanner.connection.StatementParser: Class com.google.cloud.spanner.connection.StatementParser removed
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner.it;

import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator;
import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.ByteArray;
Expand All @@ -25,6 +26,7 @@
import com.google.cloud.spanner.BatchTransactionId;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
Expand All @@ -34,6 +36,7 @@
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.util.ArrayList;
Expand All @@ -50,32 +53,46 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

/**
* Integration test reading large amounts of data using the Batch APIs. The size of data ensures
* that multiple partitions are returned by the server.
*/
@Category(ParallelIntegrationTest.class)
@RunWith(JUnit4.class)
@RunWith(Parameterized.class)
public class ITBatchReadTest {
private static int numRows;

private static final int WRITE_BATCH_SIZE = 1 << 20;
private static final String TABLE_NAME = "BatchTestTable";
private static final String INDEX_NAME = "TestIndexByValue";
private static final long STALENESS_MILLISEC = 1 * 1000;
private static final long STALENESS_MILLISEC = 1000;

@ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();

private static Database db;
private static HashFunction hasher;
private static DatabaseClient dbClient;
private static BatchClient client;
private static BatchClient googleStandardSQLBatchClient;
private static BatchClient postgreSQLBatchClient;
private static final Random RANDOM = new Random();

private BatchReadOnlyTransaction batchTxn;

@Parameters(name = "Dialect = {0}")
public static List<DialectTestParameter> data() {
List<DialectTestParameter> params = new ArrayList<>();
params.add(new DialectTestParameter(Dialect.GOOGLE_STANDARD_SQL));
// PG dialect tests are not supported by the emulator
if (!isUsingEmulator()) {
params.add(new DialectTestParameter(Dialect.POSTGRESQL));
}
return params;
}

@Parameter public DialectTestParameter dialect;

// Generate a large number of rows to allow multiple read/query partitions.
private static List<Integer> manyRows() {
List<Integer> rows = new ArrayList<>();
Expand All @@ -88,7 +105,7 @@ private static List<Integer> manyRows() {

@BeforeClass
public static void setUpDatabase() throws Exception {
db =
Database googleStandardDatabase =
env.getTestHelper()
.createTestDatabase(
"CREATE TABLE "
Expand All @@ -101,46 +118,81 @@ public static void setUpDatabase() throws Exception {
+ ") PRIMARY KEY (Key)",
"CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME + "(Fingerprint)");
hasher = Hashing.goodFastHash(64);
dbClient = env.getTestHelper().getDatabaseClient(db);
client = env.getTestHelper().getBatchClient(db);

List<Mutation> mutations = new ArrayList<>();
int totalSize = 0;
int i = 0;
for (int row : manyRows()) {
numRows++;
byte[] data = new byte[row];
RANDOM.nextBytes(data);
mutations.add(
Mutation.newInsertOrUpdateBuilder(TABLE_NAME)
.set("Key")
.to(i)
.set("Data")
.to(ByteArray.copyFrom(data))
.set("Fingerprint")
.to(hasher.hashBytes(data).asLong())
.set("Size")
.to(row)
.build());
totalSize += row;
i++;
if (totalSize >= WRITE_BATCH_SIZE) {
dbClient.write(mutations);
mutations.clear();
totalSize = 0;
googleStandardSQLBatchClient = env.getTestHelper().getBatchClient(googleStandardDatabase);

List<DatabaseClient> databaseClients = new ArrayList<>();
databaseClients.add(env.getTestHelper().getDatabaseClient(googleStandardDatabase));

if (!isUsingEmulator()) {
Database postgreSQLDatabase =
env.getTestHelper().createTestDatabase(Dialect.POSTGRESQL, Collections.emptyList());
env.getTestHelper()
.getClient()
.getDatabaseAdminClient()
.updateDatabaseDdl(
env.getTestHelper().getInstanceId().getInstance(),
postgreSQLDatabase.getId().getDatabase(),
ImmutableList.of(
"CREATE TABLE "
+ TABLE_NAME
+ " ("
+ " Key bigint not null primary key,"
+ " Data bytea,"
+ " Fingerprint bigint,"
+ " Size bigint"
+ ")",
"CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME + "(Fingerprint)"),
null);
postgreSQLBatchClient = env.getTestHelper().getBatchClient(postgreSQLDatabase);
databaseClients.add(env.getTestHelper().getDatabaseClient(postgreSQLDatabase));
}

for (DatabaseClient dbClient : databaseClients) {
List<Mutation> mutations = new ArrayList<>();
int totalSize = 0;
int i = 0;
for (int row : manyRows()) {
numRows++;
byte[] data = new byte[row];
RANDOM.nextBytes(data);
mutations.add(
Mutation.newInsertOrUpdateBuilder(TABLE_NAME)
.set("Key")
.to(i)
.set("Data")
.to(ByteArray.copyFrom(data))
.set("Fingerprint")
.to(hasher.hashBytes(data).asLong())
.set("Size")
.to(row)
.build());
totalSize += row;
i++;
if (totalSize >= WRITE_BATCH_SIZE) {
dbClient.write(mutations);
mutations.clear();
totalSize = 0;
}
}
dbClient.write(mutations);
}
dbClient.write(mutations);
// Our read/queries are executed with some staleness.
Thread.sleep(2 * STALENESS_MILLISEC);
}

private BatchClient getBatchClient() {
if (dialect.dialect == Dialect.POSTGRESQL) {
return postgreSQLBatchClient;
}
return googleStandardSQLBatchClient;
}

@Test
public void read() {
BitSet seenRows = new BitSet(numRows);
TimestampBound bound = getRandomBound();
PartitionOptions partitionParams = getRandomPartitionOptions();
batchTxn = client.batchReadOnlyTransaction(bound);
batchTxn = getBatchClient().batchReadOnlyTransaction(bound);
List<Partition> partitions =
batchTxn.partitionRead(
partitionParams,
Expand All @@ -155,7 +207,7 @@ public void read() {
public void readUsingIndex() {
TimestampBound bound = getRandomBound();
PartitionOptions partitionParams = getRandomPartitionOptions();
batchTxn = client.batchReadOnlyTransaction(bound);
batchTxn = getBatchClient().batchReadOnlyTransaction(bound);
List<Partition> partitions =
batchTxn.partitionReadUsingIndex(
partitionParams,
Expand All @@ -166,7 +218,8 @@ public void readUsingIndex() {
BatchTransactionId txnID = batchTxn.getBatchTransactionId();
int numRowsRead = 0;
for (Partition p : partitions) {
BatchReadOnlyTransaction batchTxnOnEachWorker = client.batchReadOnlyTransaction(txnID);
BatchReadOnlyTransaction batchTxnOnEachWorker =
getBatchClient().batchReadOnlyTransaction(txnID);
try (ResultSet result = batchTxnOnEachWorker.execute(p)) {
while (result.next()) {
numRowsRead++;
Expand All @@ -188,7 +241,7 @@ public void query() {
BitSet seenRows = new BitSet(numRows);
TimestampBound bound = getRandomBound();
PartitionOptions partitionParams = getRandomPartitionOptions();
batchTxn = client.batchReadOnlyTransaction(bound);
batchTxn = getBatchClient().batchReadOnlyTransaction(bound);
List<Partition> partitions =
batchTxn.partitionQuery(
partitionParams,
Expand Down Expand Up @@ -227,7 +280,8 @@ private TimestampBound getRandomBound() {
private void fetchAndValidateRows(
List<Partition> partitions, BatchTransactionId txnID, BitSet seenRows) {
for (Partition p : partitions) {
BatchReadOnlyTransaction batchTxnOnEachWorker = client.batchReadOnlyTransaction(txnID);
BatchReadOnlyTransaction batchTxnOnEachWorker =
getBatchClient().batchReadOnlyTransaction(txnID);
try (ResultSet result = batchTxnOnEachWorker.execute(p)) {
// validate no duplicate rows; verify all columns read.
validate(result, seenRows);
Expand Down