Skip to content

Commit e8aff30

Browse files
mairbeklesv
authored andcommitted
Cloud Spanner Dataflow read api sample (GoogleCloudPlatform#1068)
* Cloud Spanner Dataflow read api sample * Fix the tagging * Fix the tags
1 parent df10b39 commit e8aff30

File tree

3 files changed

+100
-1
lines changed

3 files changed

+100
-1
lines changed

dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public static void main(String[] args) {
9797
.withInstanceId(instanceId)
9898
.withDatabaseId(databaseId)
9999
.withQuery("SELECT * FROM " + options.getTable()));
100-
// [START spanner_dataflow_read]
100+
// [END spanner_dataflow_read]
101101

102102

103103
PCollection<Long> tableEstimatedSize = records
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2017 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
import com.google.cloud.spanner.Struct;
20+
import org.apache.beam.sdk.Pipeline;
21+
import org.apache.beam.sdk.io.TextIO;
22+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
23+
import org.apache.beam.sdk.options.Description;
24+
import org.apache.beam.sdk.options.PipelineOptions;
25+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
26+
import org.apache.beam.sdk.options.Validation;
27+
import org.apache.beam.sdk.transforms.Sum;
28+
import org.apache.beam.sdk.transforms.ToString;
29+
import org.apache.beam.sdk.values.PCollection;
30+
31+
/**
32+
* This sample demonstrates how to read from a Spanner table using the Read API.
33+
*/
34+
public class SpannerReadApi {
35+
36+
public interface Options extends PipelineOptions {
37+
38+
@Description("Spanner instance ID to query from")
39+
@Validation.Required
40+
String getInstanceId();
41+
42+
void setInstanceId(String value);
43+
44+
@Description("Spanner database name to query from")
45+
@Validation.Required
46+
String getDatabaseId();
47+
48+
void setDatabaseId(String value);
49+
50+
@Description("Output filename for records size")
51+
@Validation.Required
52+
String getOutput();
53+
54+
void setOutput(String value);
55+
}
56+
57+
58+
public static void main(String[] args) {
59+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
60+
Pipeline p = Pipeline.create(options);
61+
62+
String instanceId = options.getInstanceId();
63+
String databaseId = options.getDatabaseId();
64+
// [START spanner_dataflow_readapi]
65+
// Query for all the columns and rows in the specified Spanner table
66+
PCollection<Struct> records = p.apply(
67+
SpannerIO.read()
68+
.withInstanceId(instanceId)
69+
.withDatabaseId(databaseId)
70+
.withTable("Singers")
71+
.withColumns("singerId", "firstName", "lastName"));
72+
// [END spanner_dataflow_readapi]
73+
74+
75+
PCollection<Long> tableEstimatedSize = records
76+
// Estimate the size of every row
77+
.apply(EstimateSize.create())
78+
// Sum all the row sizes to get the total estimated size of the table
79+
.apply(Sum.longsGlobally());
80+
81+
// Write the total size to a file
82+
tableEstimatedSize
83+
.apply(ToString.elements())
84+
.apply(TextIO.write().to(options.getOutput()).withoutSharding());
85+
86+
p.run().waitUntilFinish();
87+
}
88+
}

dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java

+11
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,17 @@ public void readTableEndToEnd() throws Exception {
156156
assertEquals("53", content);
157157
}
158158

159+
@Test
160+
public void readApiEndToEnd() throws Exception {
161+
Path outPath = Files.createTempFile("out", "txt");
162+
SpannerReadApi.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId,
163+
"--output=" + outPath, "--runner=DirectRunner" });
164+
165+
String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n"));
166+
167+
assertEquals("79", content);
168+
}
169+
159170
@Test
160171
public void reaTransactionalReadEndToEnd() throws Exception {
161172
Path singersPath = Files.createTempFile("singers", "txt");

0 commit comments

Comments
 (0)