Skip to content

Commit f658c3d

Browse files
nielmShabirmean
authored andcommitted
Add sample code for reading table with Index
1 parent e598586 commit f658c3d

File tree

2 files changed

+197
-5
lines changed

2 files changed

+197
-5
lines changed
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2017 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
import com.google.cloud.spanner.Dialect;
20+
import com.google.cloud.spanner.Struct;
21+
import org.apache.beam.sdk.Pipeline;
22+
import org.apache.beam.sdk.io.TextIO;
23+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
24+
import org.apache.beam.sdk.options.Default;
25+
import org.apache.beam.sdk.options.Default.Enum;
26+
import org.apache.beam.sdk.options.Description;
27+
import org.apache.beam.sdk.options.PipelineOptions;
28+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
29+
import org.apache.beam.sdk.options.Validation;
30+
import org.apache.beam.sdk.transforms.Sum;
31+
import org.apache.beam.sdk.transforms.ToString;
32+
import org.apache.beam.sdk.values.PCollection;
33+
34+
/**
35+
* This sample demonstrates how to read from a Spanner table using the Read API, reading from a
36+
* secondary index.
37+
*/
38+
public class SpannerReadApiWithIndex {
39+
40+
public interface Options extends PipelineOptions {
41+
42+
@Description("Spanner instance ID to query from")
43+
@Validation.Required
44+
String getInstanceId();
45+
46+
void setInstanceId(String value);
47+
48+
@Description("Spanner database name to query from")
49+
@Validation.Required
50+
String getDatabaseId();
51+
52+
void setDatabaseId(String value);
53+
54+
@Description("Dialect of the database that is used")
55+
@Default
56+
@Enum("GOOGLE_STANDARD_SQL")
57+
Dialect getDialect();
58+
59+
void setDialect(Dialect dialect);
60+
61+
@Description("Output filename for records size")
62+
@Validation.Required
63+
String getOutput();
64+
65+
void setOutput(String value);
66+
}
67+
68+
public static void main(String[] args) {
69+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
70+
Pipeline pipeline = Pipeline.create(options);
71+
72+
String instanceId = options.getInstanceId();
73+
String databaseId = options.getDatabaseId();
74+
Dialect dialect = options.getDialect();
75+
PCollection<Struct> records;
76+
if (dialect == Dialect.POSTGRESQL) {
77+
records = postgreSqlRead(instanceId, databaseId, pipeline);
78+
} else {
79+
records = googleSqlRead(instanceId, databaseId, pipeline);
80+
}
81+
82+
PCollection<Long> tableEstimatedSize =
83+
records
84+
// Estimate the size of every row
85+
.apply(EstimateSize.create())
86+
// Sum all the row sizes to get the total estimated size of the table
87+
.apply(Sum.longsGlobally());
88+
89+
// Write the total size to a file
90+
tableEstimatedSize
91+
.apply(ToString.elements())
92+
.apply(TextIO.write().to(options.getOutput()).withoutSharding());
93+
94+
pipeline.run().waitUntilFinish();
95+
}
96+
97+
/**
98+
* GoogleSQL databases retain the casing of table and column names. It is therefore common to use
99+
* CamelCase for identifiers.
100+
*/
101+
static PCollection<Struct> googleSqlRead(
102+
String instanceId, String databaseId, Pipeline pipeline) {
103+
// [START spanner_dataflow_readapi_withindex]
104+
// Query for all the columns and rows in the specified Spanner table
105+
PCollection<Struct> records =
106+
pipeline.apply(
107+
SpannerIO.read()
108+
.withInstanceId(instanceId)
109+
.withDatabaseId(databaseId)
110+
.withTable("Songs")
111+
.withIndex("SongsBySongName")
112+
// Can only read columns that are either indexed, STORED in the index or
113+
// part of the primary key of the Songs table,
114+
.withColumns("SingerId", "AlbumId", "TrackId", "SongName"));
115+
// [END spanner_dataflow_readapi_withindex]
116+
return records;
117+
}
118+
119+
/**
120+
* PostgreSQL databases automatically fold identifiers to lower case. It is therefore common to
121+
* use all lower case identifiers with underscores to separate multiple words in an identifier.
122+
*/
123+
static PCollection<Struct> postgreSqlRead(
124+
String instanceId, String databaseId, Pipeline pipeline) {
125+
// [START spanner_pg_dataflow_readapi_withindex]
126+
// Query for all the columns and rows in the specified Spanner table
127+
PCollection<Struct> records =
128+
pipeline.apply(
129+
SpannerIO.read()
130+
.withInstanceId(instanceId)
131+
.withDatabaseId(databaseId)
132+
.withTable("Songs")
133+
.withIndex("SongsBySongName")
134+
// Can only read columns that are either indexed, STORED in the index or
135+
// part of the primary key of the songs table,
136+
.withColumns("singer_id", "album_id", "track_id", "song_name"));
137+
// [END spanner_pg_dataflow_readapi_withindex]
138+
return records;
139+
}
140+
}

dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@
5252
@RunWith(Parameterized.class)
5353
public class SpannerReadIT {
5454

55-
@Parameter
56-
public Dialect dialect;
55+
@Parameter public Dialect dialect;
5756

5857
@Parameters(name = "dialect = {0}")
5958
public static List<Object[]> data() {
@@ -104,7 +103,11 @@ public void setUp() throws InterruptedException, ExecutionException {
104103
+ "(singer_id bigint NOT NULL primary key, first_name varchar NOT NULL, "
105104
+ "last_name varchar NOT NULL)",
106105
"CREATE TABLE Albums (singer_id bigint NOT NULL, album_id bigint NOT NULL, "
107-
+ "album_title varchar NOT NULL, PRIMARY KEY (singer_id, album_id))"),
106+
+ "album_title varchar NOT NULL, PRIMARY KEY (singer_id, album_id))",
107+
"CREATE TABLE Songs (singer_id bigint NOT NULL, album_id bigint NOT NULL, "
108+
+ "track_id bigint NOT NULL, song_name varchar, Duration bigint, "
109+
+ "song_genre varchar, PRIMARY KEY(singer_id, album_id, track_id))",
110+
"CREATE INDEX SongsBySongName ON Songs(song_name)"),
108111
null)
109112
.get();
110113
} else {
@@ -117,7 +120,11 @@ public void setUp() throws InterruptedException, ExecutionException {
117120
+ "(SingerId INT64 NOT NULL, FirstName STRING(MAX) NOT NULL, "
118121
+ "LastName STRING(MAX) NOT NULL,) PRIMARY KEY (SingerId)",
119122
"CREATE TABLE Albums (SingerId INT64 NOT NULL, AlbumId INT64 NOT NULL, "
120-
+ "AlbumTitle STRING(MAX) NOT NULL,) PRIMARY KEY (SingerId, AlbumId)"))
123+
+ "AlbumTitle STRING(MAX) NOT NULL,) PRIMARY KEY (SingerId, AlbumId)",
124+
"CREATE TABLE Songs (SingerId INT64 NOT NULL, AlbumId INT64 NOT NULL, "
125+
+ "TrackId INT64 NOT NULL, SongName STRING(MAX), Duration INT64, "
126+
+ "SongGenre STRING(25)) PRIMARY KEY(SingerId, AlbumId, TrackId)",
127+
"CREATE INDEX SongsBySongName ON Songs(SongName)"))
121128
.get();
122129
}
123130

@@ -163,13 +170,41 @@ public void setUp() throws InterruptedException, ExecutionException {
163170
.set(formatColumnName("AlbumTitle", dialect))
164171
.to("Imagine")
165172
.build(),
173+
Mutation.newInsertBuilder("Songs")
174+
.set(formatColumnName("SingerId", dialect))
175+
.to(1L)
176+
.set(formatColumnName("AlbumId", dialect))
177+
.to(1L)
178+
.set(formatColumnName("TrackId", dialect))
179+
.to(1L)
180+
.set(formatColumnName("SongName", dialect))
181+
.to("Imagine")
182+
.set(formatColumnName("Duration", dialect))
183+
.to(181L)
184+
.set(formatColumnName("SongGenre", dialect))
185+
.to("Rock/Pop")
186+
.build(),
166187
Mutation.newInsertBuilder("Albums")
167188
.set(formatColumnName("SingerId", dialect))
168189
.to(2L)
169190
.set(formatColumnName("AlbumId", dialect))
170191
.to(1L)
171192
.set(formatColumnName("AlbumTitle", dialect))
172193
.to("Pipes of Peace")
194+
.build(),
195+
Mutation.newInsertBuilder("Songs")
196+
.set(formatColumnName("SingerId", dialect))
197+
.to(2L)
198+
.set(formatColumnName("AlbumId", dialect))
199+
.to(1L)
200+
.set(formatColumnName("TrackId", dialect))
201+
.to(1L)
202+
.set(formatColumnName("SongName", dialect))
203+
.to("Pipes of Peace")
204+
.set(formatColumnName("Duration", dialect))
205+
.to(236L)
206+
.set(formatColumnName("SongGenre", dialect))
207+
.to("Rock/Pop")
173208
.build());
174209

175210
DatabaseClient dbClient = getDbClient();
@@ -222,7 +257,7 @@ public void readDbEndToEnd() throws Exception {
222257

223258
String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n"));
224259

225-
assertEquals("132", content);
260+
assertEquals("233", content);
226261
}
227262

228263
@Test
@@ -259,6 +294,23 @@ public void readApiEndToEnd() throws Exception {
259294
assertEquals("79", content);
260295
}
261296

297+
@Test
298+
public void readApiWithIndexEndToEnd() throws Exception {
299+
Path outPath = Files.createTempFile("out", "txt");
300+
SpannerReadApiWithIndex.main(
301+
new String[] {
302+
"--instanceId=" + instanceId,
303+
"--databaseId=" + databaseId,
304+
"--output=" + outPath,
305+
"--runner=DirectRunner",
306+
"--dialect=" + dialect
307+
});
308+
309+
String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n"));
310+
311+
assertEquals("69", content);
312+
}
313+
262314
@Test
263315
public void readTransactionalReadEndToEnd() throws Exception {
264316
Path singersPath = Files.createTempFile("singers", "txt");

0 commit comments

Comments
 (0)