Skip to content

Commit df10b39

Browse files
mairbekjsimonweb
authored andcommitted
Updated Spanner Dataflow connector samples (GoogleCloudPlatform#1059)
* Updated Spanner Dataflow connector samples * Make checkstyle happy * First test * Updated the test * Added tests * Makes checkstyle happy * Use system properties * Add add system property * New lines
1 parent ebdc07f commit df10b39

File tree

10 files changed

+931
-87
lines changed

10 files changed

+931
-87
lines changed

dataflow/spanner-io/pom.xml

+10-7
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@
4848
<artifactId>maven-compiler-plugin</artifactId>
4949
<version>3.7.0</version>
5050
</plugin>
51+
<plugin>
52+
<groupId>org.apache.maven.plugins</groupId>
53+
<artifactId>maven-failsafe-plugin</artifactId>
54+
<version>2.19.1</version>
55+
<configuration>
56+
<systemPropertyVariables>
57+
<spanner.test.instance>default-instance</spanner.test.instance>
58+
</systemPropertyVariables>
59+
</configuration>
60+
</plugin>
5161
</plugins>
5262
</build>
5363

@@ -84,13 +94,6 @@
8494
<version>${apache_beam.version}</version>
8595
</dependency>
8696

87-
<!-- Google Cloud -->
88-
<dependency>
89-
<groupId>com.google.cloud</groupId>
90-
<artifactId>google-cloud-spanner</artifactId>
91-
<version>0.34.0-beta</version>
92-
</dependency>
93-
9497
<!-- Misc -->
9598
<dependency>
9699
<groupId>org.slf4j</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2018 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
import com.google.cloud.spanner.Struct;
20+
import org.apache.beam.sdk.transforms.DoFn;
21+
import org.apache.beam.sdk.transforms.PTransform;
22+
import org.apache.beam.sdk.transforms.ParDo;
23+
import org.apache.beam.sdk.values.PCollection;
24+
25+
/**
26+
* Estimates the size of the {@code Struct}.
27+
*/
28+
public class EstimateSize extends PTransform<PCollection<Struct>, PCollection<Long>> {
29+
30+
public static EstimateSize create() {
31+
return new EstimateSize();
32+
}
33+
34+
private EstimateSize() {
35+
}
36+
37+
@Override
38+
public PCollection<Long> expand(PCollection<Struct> input) {
39+
return input.apply(ParDo.of(new EstimateStructSizeFn()));
40+
}
41+
42+
/**
43+
* Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported.
44+
*/
45+
public static class EstimateStructSizeFn extends DoFn<Struct, Long> {
46+
47+
@ProcessElement
48+
public void processElement(ProcessContext c) throws Exception {
49+
Struct row = c.element();
50+
long sum = 0;
51+
for (int i = 0; i < row.getColumnCount(); i++) {
52+
if (row.isNull(i)) {
53+
continue;
54+
}
55+
56+
switch (row.getColumnType(i).getCode()) {
57+
case BOOL:
58+
sum += 1;
59+
break;
60+
case INT64:
61+
case FLOAT64:
62+
sum += 8;
63+
break;
64+
case TIMESTAMP:
65+
case DATE:
66+
sum += 12;
67+
break;
68+
case BYTES:
69+
sum += row.getBytes(i).length();
70+
break;
71+
case STRING:
72+
sum += row.getString(i).length();
73+
break;
74+
case ARRAY:
75+
throw new IllegalArgumentException("Arrays are not supported :(");
76+
case STRUCT:
77+
throw new IllegalArgumentException("Structs are not supported :(");
78+
default:
79+
throw new IllegalArgumentException("Unsupported type :(");
80+
}
81+
}
82+
c.output(sum);
83+
}
84+
}
85+
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2018 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
import com.google.cloud.Timestamp;
20+
import com.google.cloud.spanner.Mutation;
21+
import com.google.common.base.Charsets;
22+
import com.google.common.hash.Hashing;
23+
import org.apache.beam.sdk.Pipeline;
24+
import org.apache.beam.sdk.io.TextIO;
25+
import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
26+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
27+
import org.apache.beam.sdk.options.Description;
28+
import org.apache.beam.sdk.options.PipelineOptions;
29+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
30+
import org.apache.beam.sdk.options.Validation;
31+
import org.apache.beam.sdk.transforms.MapElements;
32+
import org.apache.beam.sdk.transforms.SimpleFunction;
33+
import org.apache.beam.sdk.values.PCollection;
34+
35+
/**
36+
* This sample demonstrates how to group together mutations when writing to the Cloud Spanner
37+
* database.
38+
*/
39+
public class SpannerGroupWrite {
40+
public interface Options extends PipelineOptions {
41+
42+
@Description("Spanner instance ID to write to")
43+
@Validation.Required
44+
String getInstanceId();
45+
46+
void setInstanceId(String value);
47+
48+
@Description("Spanner database name to write to")
49+
@Validation.Required
50+
String getDatabaseId();
51+
52+
void setDatabaseId(String value);
53+
54+
@Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name")
55+
@Validation.Required
56+
String getSuspiciousUsersFile();
57+
58+
void setSuspiciousUsersFile(String value);
59+
60+
}
61+
62+
public static void main(String[] args) {
63+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
64+
Pipeline p = Pipeline.create(options);
65+
66+
String instanceId = options.getInstanceId();
67+
String databaseId = options.getDatabaseId();
68+
69+
String usersIdFile = options.getSuspiciousUsersFile();
70+
71+
PCollection<String> suspiciousUserIds = p.apply(TextIO.read().from(usersIdFile));
72+
73+
final Timestamp timestamp = Timestamp.now();
74+
75+
// [START spanner_dataflow_writegroup]
76+
PCollection<MutationGroup> mutations = suspiciousUserIds
77+
.apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {
78+
79+
@Override
80+
public MutationGroup apply(String userId) {
81+
// Immediately block the user.
82+
Mutation userMutation = Mutation.newUpdateBuilder("Users")
83+
.set("id").to(userId)
84+
.set("state").to("BLOCKED")
85+
.build();
86+
long generatedId = Hashing.sha1().newHasher()
87+
.putString(userId, Charsets.UTF_8)
88+
.putLong(timestamp.getSeconds())
89+
.putLong(timestamp.getNanos())
90+
.hash()
91+
.asLong();
92+
93+
// Add an entry to pending review requests.
94+
Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
95+
.set("id").to(generatedId) // Must be deterministically generated.
96+
.set("userId").to(userId)
97+
.set("action").to("REVIEW ACCOUNT")
98+
.set("note").to("Suspicious activity detected.")
99+
.build();
100+
101+
return MutationGroup.create(userMutation, pendingReview);
102+
}
103+
}));
104+
105+
mutations.apply(SpannerIO.write()
106+
.withInstanceId(instanceId)
107+
.withDatabaseId(databaseId)
108+
.grouped());
109+
// [END spanner_dataflow_writegroup]
110+
111+
p.run().waitUntilFinish();
112+
113+
}
114+
115+
}

dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java

+11-55
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@
2424
import org.apache.beam.sdk.options.PipelineOptions;
2525
import org.apache.beam.sdk.options.PipelineOptionsFactory;
2626
import org.apache.beam.sdk.options.Validation;
27-
import org.apache.beam.sdk.transforms.Count;
28-
import org.apache.beam.sdk.transforms.DoFn;
29-
import org.apache.beam.sdk.transforms.PTransform;
30-
import org.apache.beam.sdk.transforms.ParDo;
3127
import org.apache.beam.sdk.transforms.Sum;
3228
import org.apache.beam.sdk.transforms.ToString;
3329
import org.apache.beam.sdk.values.PCollection;
@@ -87,73 +83,33 @@ public interface Options extends PipelineOptions {
8783
void setOutput(String value);
8884
}
8985

90-
/**
91-
* Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported.
92-
*/
93-
public static class EstimateStructSizeFn extends DoFn<Struct, Long> {
94-
95-
@ProcessElement
96-
public void processElement(ProcessContext c) throws Exception {
97-
Struct row = c.element();
98-
long sum = 0;
99-
for (int i = 0; i < row.getColumnCount(); i++) {
100-
if (row.isNull(i)) {
101-
continue;
102-
}
103-
104-
switch (row.getColumnType(i).getCode()) {
105-
case BOOL:
106-
sum += 1;
107-
break;
108-
case INT64:
109-
case FLOAT64:
110-
sum += 8;
111-
break;
112-
case TIMESTAMP:
113-
case DATE:
114-
sum += 12;
115-
break;
116-
case BYTES:
117-
sum += row.getBytes(i).length();
118-
break;
119-
case STRING:
120-
sum += row.getString(i).length();
121-
break;
122-
case ARRAY:
123-
throw new IllegalArgumentException("Arrays are not supported :(");
124-
case STRUCT:
125-
throw new IllegalArgumentException("Structs are not supported :(");
126-
default:
127-
throw new IllegalArgumentException("Unsupported type :(");
128-
}
129-
}
130-
c.output(sum);
131-
}
132-
}
13386

13487
public static void main(String[] args) {
13588
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
13689
Pipeline p = Pipeline.create(options);
13790

13891
String instanceId = options.getInstanceId();
13992
String databaseId = options.getDatabaseId();
140-
String query = "SELECT * FROM " + options.getTable();
141-
142-
PCollection<Long> tableEstimatedSize = p
143-
// Query for all the columns and rows in the specified Spanner table
144-
.apply(SpannerIO.read()
93+
// [START spanner_dataflow_read]
94+
// Query for all the columns and rows in the specified Spanner table
95+
PCollection<Struct> records = p.apply(
96+
SpannerIO.read()
14597
.withInstanceId(instanceId)
14698
.withDatabaseId(databaseId)
147-
.withQuery(query))
99+
.withQuery("SELECT * FROM " + options.getTable()));
100+
// [START spanner_dataflow_read]
101+
102+
103+
PCollection<Long> tableEstimatedSize = records
148104
// Estimate the size of every row
149-
.apply(ParDo.of(new EstimateStructSizeFn()))
105+
.apply(EstimateSize.create())
150106
// Sum all the row sizes to get the total estimated size of the table
151107
.apply(Sum.longsGlobally());
152108

153109
// Write the total size to a file
154110
tableEstimatedSize
155111
.apply(ToString.elements())
156-
.apply(TextIO.write().to(options.getOutput()));
112+
.apply(TextIO.write().to(options.getOutput()).withoutSharding());
157113

158114
p.run().waitUntilFinish();
159115
}

0 commit comments

Comments
 (0)