Skip to content

Commit bbfe6c4

Browse files
authored
[BEAM-12609] Enable projection pushdown in SchemaIO. (apache#15216)
* [BEAM-12609] Enable projection pushdown in SchemaIO. Introduce PushdownProjector interface and connect it to SchemaIOTableProviderWrapper. * Throw errors if pushdown is unsupported.
1 parent 9673d8b commit bbfe6c4

File tree

5 files changed

+335
-1
lines changed

5 files changed

+335
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.schemas.io;
19+
20+
import org.apache.beam.sdk.annotations.Experimental;
21+
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
22+
import org.apache.beam.sdk.transforms.PTransform;
23+
import org.apache.beam.sdk.values.PCollection;
24+
import org.apache.beam.sdk.values.PInput;
25+
import org.apache.beam.sdk.values.Row;
26+
27+
/**
28+
* Factory for creating a {@link PTransform} that can execute a projection.
29+
*
30+
* <p>Typically this interface will be implemented by a reader {@link PTransform} that is capable of
31+
* pushing down projection to an external source. For example, {@link SchemaIO#buildReader()} may
32+
* return a {@link PushdownProjector} to which a projection may be applied later.
33+
*/
34+
@Experimental
35+
public interface PushdownProjector {
36+
/**
37+
* Returns a {@link PTransform} that will execute the projection specified by the {@link
38+
* FieldAccessDescriptor}.
39+
*/
40+
PTransform<? extends PInput, PCollection<Row>> withProjectionPushdown(
41+
FieldAccessDescriptor fieldAccessDescriptor);
42+
43+
/**
44+
* Returns true if this instance can do a projection that returns fields in a different order than
45+
* the projection's inputs.
46+
*/
47+
boolean supportsFieldReordering();
48+
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/DefaultTableFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
public final class DefaultTableFilter implements BeamSqlTableFilter {
2828
private final List<RexNode> filters;
2929

30-
DefaultTableFilter(List<RexNode> filters) {
30+
public DefaultTableFilter(List<RexNode> filters) {
3131
this.filters = filters;
3232
}
3333

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java

+45
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,22 @@
2222
import com.alibaba.fastjson.JSONObject;
2323
import com.fasterxml.jackson.core.JsonProcessingException;
2424
import java.io.Serializable;
25+
import java.util.List;
2526
import org.apache.beam.sdk.annotations.Experimental;
2627
import org.apache.beam.sdk.annotations.Internal;
2728
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
2829
import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
2930
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
31+
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
32+
import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
33+
import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
3034
import org.apache.beam.sdk.extensions.sql.meta.Table;
3135
import org.apache.beam.sdk.options.PipelineOptions;
36+
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
3237
import org.apache.beam.sdk.schemas.Schema;
3338
import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
3439
import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
40+
import org.apache.beam.sdk.schemas.io.PushdownProjector;
3541
import org.apache.beam.sdk.schemas.io.SchemaIO;
3642
import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
3743
import org.apache.beam.sdk.transforms.PTransform;
@@ -118,6 +124,45 @@ public PCollection<Row> buildIOReader(PBegin begin) {
118124
return begin.apply(readerTransform);
119125
}
120126

127+
@Override
128+
public PCollection<Row> buildIOReader(
129+
PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
130+
PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
131+
if (!(filters instanceof DefaultTableFilter)) {
132+
throw new UnsupportedOperationException(
133+
String.format(
134+
"Filter pushdown is not yet supported in %s. BEAM-12663",
135+
SchemaIOTableWrapper.class));
136+
}
137+
if (!fieldNames.isEmpty()) {
138+
if (readerTransform instanceof PushdownProjector) {
139+
PushdownProjector pushdownProjector = (PushdownProjector) readerTransform;
140+
FieldAccessDescriptor fieldAccessDescriptor =
141+
FieldAccessDescriptor.withFieldNames(fieldNames);
142+
// The pushdown must return a PTransform that can be applied to a PBegin, or this cast
143+
// will fail.
144+
readerTransform =
145+
(PTransform<PBegin, PCollection<Row>>)
146+
pushdownProjector.withProjectionPushdown(fieldAccessDescriptor);
147+
} else {
148+
throw new UnsupportedOperationException(
149+
String.format("%s does not support projection pushdown.", this.getClass()));
150+
}
151+
}
152+
return begin.apply(readerTransform);
153+
}
154+
155+
@Override
156+
public ProjectSupport supportsProjects() {
157+
PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
158+
if (readerTransform instanceof PushdownProjector) {
159+
return ((PushdownProjector) readerTransform).supportsFieldReordering()
160+
? ProjectSupport.WITH_FIELD_REORDERING
161+
: ProjectSupport.WITHOUT_FIELD_REORDERING;
162+
}
163+
return ProjectSupport.NONE;
164+
}
165+
121166
@Override
122167
public POutput buildIOWriter(PCollection<Row> input) {
123168
PTransform<PCollection<Row>, ? extends POutput> writerTransform = schemaIO.buildWriter();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.extensions.sql.meta.provider;
19+
20+
import com.alibaba.fastjson.JSON;
21+
import java.util.List;
22+
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
23+
import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
24+
import org.apache.beam.sdk.extensions.sql.meta.Table;
25+
import org.apache.beam.sdk.schemas.Schema;
26+
import org.apache.beam.sdk.testing.PAssert;
27+
import org.apache.beam.sdk.testing.TestPipeline;
28+
import org.apache.beam.sdk.values.PCollection;
29+
import org.apache.beam.sdk.values.Row;
30+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
31+
import org.junit.BeforeClass;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
import org.junit.runner.RunWith;
35+
import org.junit.runners.JUnit4;
36+
37+
/**
38+
* Tests {@link org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper} using
39+
* {@link org.apache.beam.sdk.extensions.sql.meta.provider.TestSchemaIOTableProviderWrapper}.
40+
*/
41+
@RunWith(JUnit4.class)
42+
public class SchemaIOTableProviderWrapperTest {
43+
@Rule public TestPipeline pipeline = TestPipeline.create();
44+
45+
private static final Schema inputSchema =
46+
Schema.builder()
47+
.addStringField("f_string")
48+
.addInt64Field("f_long")
49+
.addBooleanField("f_bool")
50+
.build();
51+
private static final List<Row> rows =
52+
ImmutableList.of(
53+
Row.withSchema(inputSchema).addValues("zero", 0L, false).build(),
54+
Row.withSchema(inputSchema).addValues("one", 1L, true).build());
55+
private final Table testTable =
56+
Table.builder()
57+
.name("table")
58+
.comment("table")
59+
.schema(inputSchema)
60+
.properties(JSON.parseObject("{}"))
61+
.type("test")
62+
.build();
63+
64+
@BeforeClass
65+
public static void setUp() {
66+
TestSchemaIOTableProviderWrapper.addRows(rows.stream().toArray(Row[]::new));
67+
}
68+
69+
@Test
70+
public void testBuildIOReader() {
71+
TestSchemaIOTableProviderWrapper provider = new TestSchemaIOTableProviderWrapper();
72+
BeamSqlTable beamSqlTable = provider.buildBeamSqlTable(testTable);
73+
74+
PCollection<Row> result = beamSqlTable.buildIOReader(pipeline.begin());
75+
PAssert.that(result).containsInAnyOrder(rows);
76+
77+
pipeline.run();
78+
}
79+
80+
@Test
81+
public void testBuildIOReader_withProjectionPushdown() {
82+
TestSchemaIOTableProviderWrapper provider = new TestSchemaIOTableProviderWrapper();
83+
BeamSqlTable beamSqlTable = provider.buildBeamSqlTable(testTable);
84+
85+
PCollection<Row> result =
86+
beamSqlTable.buildIOReader(
87+
pipeline.begin(),
88+
new DefaultTableFilter(ImmutableList.of()),
89+
ImmutableList.of("f_long"));
90+
Schema outputSchema = Schema.builder().addInt64Field("f_long").build();
91+
PAssert.that(result)
92+
.containsInAnyOrder(
93+
Row.withSchema(outputSchema).addValues(0L).build(),
94+
Row.withSchema(outputSchema).addValues(1L).build());
95+
96+
pipeline.run();
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.extensions.sql.meta.provider;
19+
20+
import java.util.ArrayList;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
24+
import org.apache.beam.sdk.schemas.Schema;
25+
import org.apache.beam.sdk.schemas.io.PushdownProjector;
26+
import org.apache.beam.sdk.schemas.io.SchemaIO;
27+
import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
28+
import org.apache.beam.sdk.schemas.transforms.Select;
29+
import org.apache.beam.sdk.transforms.Create;
30+
import org.apache.beam.sdk.transforms.PTransform;
31+
import org.apache.beam.sdk.values.PBegin;
32+
import org.apache.beam.sdk.values.PCollection;
33+
import org.apache.beam.sdk.values.PInput;
34+
import org.apache.beam.sdk.values.POutput;
35+
import org.apache.beam.sdk.values.Row;
36+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
37+
import org.checkerframework.checker.nullness.qual.Nullable;
38+
39+
/**
40+
* A mock {@link org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper} that
41+
* reads in-memory data for testing.
42+
*/
43+
@VisibleForTesting
44+
public class TestSchemaIOTableProviderWrapper extends SchemaIOTableProviderWrapper {
45+
private static final List<Row> rows = new ArrayList<>();
46+
47+
@Override
48+
public SchemaIOProvider getSchemaIOProvider() {
49+
return new TestSchemaIOProvider();
50+
}
51+
52+
public static void addRows(Row... newRows) {
53+
rows.addAll(Arrays.asList(newRows));
54+
}
55+
56+
private class TestSchemaIOProvider implements SchemaIOProvider {
57+
@Override
58+
public String identifier() {
59+
return "TestSchemaIOProvider";
60+
}
61+
62+
@Override
63+
public Schema configurationSchema() {
64+
return Schema.of();
65+
}
66+
67+
@Override
68+
public SchemaIO from(String location, Row configuration, @Nullable Schema dataSchema) {
69+
return new TestSchemaIO(dataSchema);
70+
}
71+
72+
@Override
73+
public boolean requiresDataSchema() {
74+
return true;
75+
}
76+
77+
@Override
78+
public PCollection.IsBounded isBounded() {
79+
return PCollection.IsBounded.BOUNDED;
80+
}
81+
}
82+
83+
private class TestSchemaIO implements SchemaIO {
84+
private final Schema schema;
85+
86+
TestSchemaIO(Schema schema) {
87+
this.schema = schema;
88+
}
89+
90+
@Override
91+
public Schema schema() {
92+
return schema;
93+
}
94+
95+
@Override
96+
public PTransform<PBegin, PCollection<Row>> buildReader() {
97+
// Read all fields by default.
98+
return new TestPushdownProjector(schema, FieldAccessDescriptor.withAllFields());
99+
}
100+
101+
@Override
102+
public PTransform<PCollection<Row>, ? extends POutput> buildWriter() {
103+
throw new UnsupportedOperationException();
104+
}
105+
}
106+
107+
/**
108+
* {@link PTransform} that reads in-memory data for testing. Simulates projection pushdown using
109+
* {@link Select}.
110+
*/
111+
private class TestPushdownProjector extends PTransform<PBegin, PCollection<Row>>
112+
implements PushdownProjector {
113+
/** The schema of the input data. */
114+
private final Schema schema;
115+
/** The fields to be projected. */
116+
private final FieldAccessDescriptor fieldAccessDescriptor;
117+
118+
TestPushdownProjector(Schema schema, FieldAccessDescriptor fieldAccessDescriptor) {
119+
this.schema = schema;
120+
this.fieldAccessDescriptor = fieldAccessDescriptor;
121+
}
122+
123+
@Override
124+
public PTransform<? extends PInput, PCollection<Row>> withProjectionPushdown(
125+
FieldAccessDescriptor fieldAccessDescriptor) {
126+
return new TestPushdownProjector(schema, fieldAccessDescriptor);
127+
}
128+
129+
@Override
130+
public boolean supportsFieldReordering() {
131+
return true;
132+
}
133+
134+
@Override
135+
public PCollection<Row> expand(PBegin input) {
136+
// Simulate projection pushdown using Select. In a real IO, projection would be pushed down to
137+
// the source.
138+
return input
139+
.apply(Create.of(rows).withRowSchema(schema))
140+
.apply(Select.fieldAccess(fieldAccessDescriptor));
141+
}
142+
}
143+
}

0 commit comments

Comments
 (0)