Skip to content

Commit 46c3f26

Browse files
authored
Remove ZetaSQL from nexmark and tpcds benchmark suites (#34590)
* Move SQL dependencies into sql/ package for nexmark
1 parent b9fcd80 commit 46c3f26

24 files changed

+82
-232
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
{}
1+
{
2+
"modification": 1
3+
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"runFor": "#33146"
3+
"modification": 1
44
}

Diff for: .github/workflows/beam_PostCommit_Java_Nexmark_Dataflow.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ jobs:
8686
job_name: [beam_PostCommit_Java_Nexmark_Dataflow]
8787
job_phrase: [Run Dataflow Runner Nexmark Tests]
8888
streaming: [false, true]
89-
queryLanguage: [sql, zetasql, none]
89+
queryLanguage: [sql, none]
9090
if: |
9191
github.event_name == 'workflow_dispatch' ||
9292
github.event_name == 'pull_request_target' ||

Diff for: .github/workflows/beam_PostCommit_Java_Nexmark_Direct.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ jobs:
8181
job_name: [beam_PostCommit_Java_Nexmark_Direct]
8282
job_phrase: [Run Direct Runner Nexmark Tests]
8383
streaming: [false, true]
84-
queryLanguage: [sql, zetasql, none]
84+
queryLanguage: [sql, none]
8585
if: |
8686
github.event_name == 'workflow_dispatch' ||
8787
github.event_name == 'pull_request_target' ||

Diff for: .github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ jobs:
8080
job_name: [beam_PostCommit_Java_Nexmark_Flink]
8181
job_phrase: [Run Flink Runner Nexmark Tests]
8282
streaming: [false, true]
83-
queryLanguage: [sql, zetasql, none]
83+
queryLanguage: [sql, none]
8484
if: |
8585
github.event_name == 'workflow_dispatch' ||
8686
github.event_name == 'pull_request_target' ||

Diff for: .github/workflows/beam_PostCommit_Java_Nexmark_Spark.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ jobs:
8080
job_name: [beam_PostCommit_Java_Nexmark_Spark]
8181
job_phrase: [Run Spark Runner Nexmark Tests]
8282
runner: [SparkRunner, SparkStructuredStreamingRunner --skipQueries=3]
83-
queryLanguage: [sql, zetasql, none]
83+
queryLanguage: [sql, none]
8484
if: |
8585
github.event_name == 'workflow_dispatch' ||
8686
github.event_name == 'pull_request_target' ||

Diff for: sdks/java/testing/nexmark/build.gradle

+1-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ dependencies {
7070
implementation project(":sdks:java:extensions:avro")
7171
implementation project(":sdks:java:extensions:google-cloud-platform-core")
7272
implementation project(":sdks:java:extensions:sql")
73-
implementation project(":sdks:java:extensions:sql:zetasql")
7473
implementation project(":sdks:java:io:kafka")
7574
implementation project(":sdks:java:testing:test-utils")
7675
implementation library.java.google_api_client
@@ -87,7 +86,7 @@ dependencies {
8786
implementation library.java.kafka_clients
8887
compileOnly library.java.error_prone_annotations
8988
testRuntimeOnly library.java.slf4j_jdk14
90-
testImplementation project(path: ":sdks:java:io:google-cloud-platform", configuration: "testRuntimeMigration")
89+
testImplementation project(path: ":sdks:java:io:google-cloud-platform")
9190
testImplementation project(path: ":sdks:java:testing:test-utils")
9291
gradleRun project(project.path)
9392
gradleRun project(path: nexmarkRunnerDependency, configuration: runnerConfiguration)

Diff for: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java

+3-71
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.nexmark.NexmarkQueryName.PORTABILITY_BATCH;
2121
import static org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.COMBINED;
22+
import static org.apache.beam.sdk.nexmark.queries.sql.SqlQueryUtils.createSqlQueries;
2223
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2324
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2425

@@ -82,12 +83,6 @@
8283
import org.apache.beam.sdk.nexmark.queries.Query9Model;
8384
import org.apache.beam.sdk.nexmark.queries.SessionSideInputJoin;
8485
import org.apache.beam.sdk.nexmark.queries.SessionSideInputJoinModel;
85-
import org.apache.beam.sdk.nexmark.queries.sql.SqlBoundedSideInputJoin;
86-
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery0;
87-
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery1;
88-
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery2;
89-
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery3;
90-
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery7;
9186
import org.apache.beam.sdk.testing.PAssert;
9287
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
9388
import org.apache.beam.sdk.transforms.DoFn;
@@ -125,9 +120,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
125120
/** Command line parameter value for query language. */
126121
private static final String SQL = "sql";
127122

128-
/** Command line parameter value for zetasql language. */
129-
private static final String ZETA_SQL = "zetasql";
130-
131123
/** Minimum number of samples needed for 'stead-state' rate calculation. */
132124
private static final int MIN_SAMPLES = 9;
133125
/** Minimum length of time over which to consider samples for 'steady-state' rate calculation. */
@@ -1254,10 +1246,6 @@ private boolean isSql() {
12541246
return SQL.equalsIgnoreCase(options.getQueryLanguage());
12551247
}
12561248

1257-
private boolean isZetaSql() {
1258-
return ZETA_SQL.equalsIgnoreCase(options.getQueryLanguage());
1259-
}
1260-
12611249
private NexmarkQueryModel getNexmarkQueryModel() {
12621250
return models.get(configuration.query);
12631251
}
@@ -1267,7 +1255,7 @@ private NexmarkQuery<?> getNexmarkQuery() {
12671255
}
12681256

12691257
private Map<NexmarkQueryName, NexmarkQueryModel> createQueryModels() {
1270-
return (isSql() || isZetaSql()) ? createSqlQueryModels() : createJavaQueryModels();
1258+
return isSql() ? createSqlQueryModels() : createJavaQueryModels();
12711259
}
12721260

12731261
private Map<NexmarkQueryName, NexmarkQueryModel> createSqlQueryModels() {
@@ -1294,9 +1282,7 @@ private Map<NexmarkQueryName, NexmarkQueryModel> createJavaQueryModels() {
12941282
private Map<NexmarkQueryName, NexmarkQuery> createQueries() {
12951283
Map<NexmarkQueryName, NexmarkQuery> defaultQueries;
12961284
if (isSql()) {
1297-
defaultQueries = createSqlQueries();
1298-
} else if (isZetaSql()) {
1299-
defaultQueries = createZetaSqlQueries();
1285+
defaultQueries = createSqlQueries(configuration);
13001286
} else {
13011287
defaultQueries = createJavaQueries();
13021288
}
@@ -1317,60 +1303,6 @@ private Set<NexmarkQueryName> getSkippableQueries() {
13171303
return skipQueries;
13181304
}
13191305

1320-
private Map<NexmarkQueryName, NexmarkQuery> createSqlQueries() {
1321-
return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
1322-
.put(
1323-
NexmarkQueryName.PASSTHROUGH,
1324-
new NexmarkQuery(configuration, SqlQuery0.calciteSqlQuery0()))
1325-
.put(NexmarkQueryName.CURRENCY_CONVERSION, new NexmarkQuery(configuration, new SqlQuery1()))
1326-
.put(
1327-
NexmarkQueryName.SELECTION,
1328-
new NexmarkQuery(configuration, SqlQuery2.calciteSqlQuery2(configuration.auctionSkip)))
1329-
.put(
1330-
NexmarkQueryName.LOCAL_ITEM_SUGGESTION,
1331-
new NexmarkQuery(configuration, SqlQuery3.calciteSqlQuery3(configuration)))
1332-
1333-
// SqlQuery5 is disabled for now, uses non-equi-joins,
1334-
// never worked right, was giving incorrect results.
1335-
// Gets rejected after PR/8301, causing failures.
1336-
//
1337-
// See:
1338-
// https://github.com/apache/beam/issues/19541
1339-
// https://github.com/apache/beam/pull/8301
1340-
// https://github.com/apache/beam/pull/8422#issuecomment-487676350
1341-
//
1342-
// .put(
1343-
// NexmarkQueryName.HOT_ITEMS,
1344-
// new NexmarkQuery(configuration, new SqlQuery5(configuration)))
1345-
.put(
1346-
NexmarkQueryName.HIGHEST_BID,
1347-
new NexmarkQuery(configuration, new SqlQuery7(configuration)))
1348-
.put(
1349-
NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN,
1350-
new NexmarkQuery(
1351-
configuration,
1352-
SqlBoundedSideInputJoin.calciteSqlBoundedSideInputJoin(configuration)))
1353-
.build();
1354-
}
1355-
1356-
private Map<NexmarkQueryName, NexmarkQuery> createZetaSqlQueries() {
1357-
return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
1358-
.put(
1359-
NexmarkQueryName.PASSTHROUGH,
1360-
new NexmarkQuery(configuration, SqlQuery0.zetaSqlQuery0()))
1361-
.put(
1362-
NexmarkQueryName.SELECTION,
1363-
new NexmarkQuery(configuration, SqlQuery2.zetaSqlQuery2(configuration.auctionSkip)))
1364-
.put(
1365-
NexmarkQueryName.LOCAL_ITEM_SUGGESTION,
1366-
new NexmarkQuery(configuration, SqlQuery3.zetaSqlQuery3(configuration)))
1367-
.put(
1368-
NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN,
1369-
new NexmarkQuery(
1370-
configuration, SqlBoundedSideInputJoin.zetaSqlBoundedSideInputJoin(configuration)))
1371-
.build();
1372-
}
1373-
13741306
private Map<NexmarkQueryName, NexmarkQuery> createJavaQueries() {
13751307
return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
13761308
.put(NexmarkQueryName.PASSTHROUGH, new NexmarkQuery(configuration, new Query0()))

Diff for: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -414,10 +414,7 @@ void setPubsubMessageSerializationMethod(
414414

415415
void setRunningTimeMinutes(Long value);
416416

417-
@Description(
418-
"Specify 'sql' to use Calcite SQL queries "
419-
+ "or 'zetasql' to use ZetaSQL queries."
420-
+ "Otherwise Java transforms will be used")
417+
@Description("Specify 'sql' to use Calcite SQL queries. Otherwise Java transforms will be used")
421418
@Nullable
422419
String getQueryLanguage();
423420

Diff for: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlBoundedSideInputJoin.java

-21
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.beam.sdk.extensions.sql.SqlTransform;
2323
import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner;
2424
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
25-
import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
2625
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
2726
import org.apache.beam.sdk.nexmark.model.Bid;
2827
import org.apache.beam.sdk.nexmark.model.Event;
@@ -81,26 +80,6 @@ public static SqlBoundedSideInputJoin calciteSqlBoundedSideInputJoin(
8180
+ " WHERE bid_with_side.side_id = side.id");
8281
}
8382

84-
public static SqlBoundedSideInputJoin zetaSqlBoundedSideInputJoin(
85-
NexmarkConfiguration configuration) {
86-
// Differences from Calcite SQL:
87-
// - INT64 instead of BIGINT
88-
// - no column list for WITH table alias
89-
return new SqlBoundedSideInputJoin(
90-
configuration,
91-
ZetaSQLQueryPlanner.class,
92-
"WITH bid_with_side AS (%n"
93-
+ " SELECT *, CAST(MOD(bidder, %d) AS INT64) side_id FROM bid%n"
94-
+ ")%n"
95-
+ " SELECT bid_with_side.auction%n"
96-
+ ", bid_with_side.bidder%n"
97-
+ ", bid_with_side.price%n"
98-
+ ", bid_with_side.dateTime%n"
99-
+ ", side.extra%n"
100-
+ " FROM bid_with_side, side%n"
101-
+ " WHERE bid_with_side.side_id = side.id");
102-
}
103-
10483
@Override
10584
public boolean needsSideInput() {
10685
return true;

Diff for: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java

-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.beam.sdk.extensions.sql.SqlTransform;
2525
import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner;
2626
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
27-
import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
2827
import org.apache.beam.sdk.metrics.Counter;
2928
import org.apache.beam.sdk.metrics.Metrics;
3029
import org.apache.beam.sdk.nexmark.model.Bid;
@@ -61,10 +60,6 @@ private SqlQuery0(Class<? extends QueryPlanner> plannerClass) {
6160
this.plannerClass = plannerClass;
6261
}
6362

64-
public static SqlQuery0 zetaSqlQuery0() {
65-
return new SqlQuery0(ZetaSQLQueryPlanner.class);
66-
}
67-
6863
public static SqlQuery0 calciteSqlQuery0() {
6964
return new SqlQuery0(CalciteQueryPlanner.class);
7065
}

Diff for: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2.java

-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.beam.sdk.extensions.sql.SqlTransform;
2121
import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner;
2222
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
23-
import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
2423
import org.apache.beam.sdk.nexmark.model.AuctionPrice;
2524
import org.apache.beam.sdk.nexmark.model.Event;
2625
import org.apache.beam.sdk.nexmark.model.sql.SelectEvent;
@@ -57,10 +56,6 @@ public static SqlQuery2 calciteSqlQuery2(long skipFactor) {
5756
return new SqlQuery2("SqlQuery2", skipFactor, CalciteQueryPlanner.class);
5857
}
5958

60-
public static SqlQuery2 zetaSqlQuery2(long skipFactor) {
61-
return new SqlQuery2("ZetaSqlQuery2", skipFactor, ZetaSQLQueryPlanner.class);
62-
}
63-
6459
@Override
6560
public PCollection<AuctionPrice> expand(PCollection<Event> allEvents) {
6661
return allEvents

Diff for: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3.java

-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.beam.sdk.extensions.sql.SqlTransform;
2121
import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner;
2222
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
23-
import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
2423
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
2524
import org.apache.beam.sdk.nexmark.model.Auction;
2625
import org.apache.beam.sdk.nexmark.model.Event;
@@ -93,10 +92,6 @@ public static SqlQuery3 calciteSqlQuery3(NexmarkConfiguration configuration) {
9392
return new SqlQuery3("SqlQuery3", configuration, CalciteQueryPlanner.class);
9493
}
9594

96-
public static SqlQuery3 zetaSqlQuery3(NexmarkConfiguration configuration) {
97-
return new SqlQuery3("ZetaSqlQuery3", configuration, ZetaSQLQueryPlanner.class);
98-
}
99-
10095
@Override
10196
public PCollection<NameCityStateId> expand(PCollection<Event> allEvents) {
10297
PCollection<Event> windowed =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.nexmark.queries.sql;
19+
20+
import java.util.Map;
21+
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
22+
import org.apache.beam.sdk.nexmark.NexmarkQueryName;
23+
import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
24+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
25+
26+
@SuppressWarnings({
27+
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
28+
})
29+
public class SqlQueryUtils {
30+
public static Map<NexmarkQueryName, NexmarkQuery> createSqlQueries(
31+
NexmarkConfiguration configuration) {
32+
return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
33+
.put(
34+
NexmarkQueryName.PASSTHROUGH,
35+
new NexmarkQuery(configuration, SqlQuery0.calciteSqlQuery0()))
36+
.put(NexmarkQueryName.CURRENCY_CONVERSION, new NexmarkQuery(configuration, new SqlQuery1()))
37+
.put(
38+
NexmarkQueryName.SELECTION,
39+
new NexmarkQuery(configuration, SqlQuery2.calciteSqlQuery2(configuration.auctionSkip)))
40+
.put(
41+
NexmarkQueryName.LOCAL_ITEM_SUGGESTION,
42+
new NexmarkQuery(configuration, SqlQuery3.calciteSqlQuery3(configuration)))
43+
44+
// SqlQuery5 is disabled for now, uses non-equi-joins,
45+
// never worked right, was giving incorrect results.
46+
// Gets rejected after PR/8301, causing failures.
47+
//
48+
// See:
49+
// https://github.com/apache/beam/issues/19541
50+
// https://github.com/apache/beam/pull/8301
51+
// https://github.com/apache/beam/pull/8422#issuecomment-487676350
52+
//
53+
// .put(
54+
// NexmarkQueryName.HOT_ITEMS,
55+
// new NexmarkQuery(configuration, new SqlQuery5(configuration)))
56+
.put(
57+
NexmarkQueryName.HIGHEST_BID,
58+
new NexmarkQuery(configuration, new SqlQuery7(configuration)))
59+
.put(
60+
NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN,
61+
new NexmarkQuery(
62+
configuration,
63+
SqlBoundedSideInputJoin.calciteSqlBoundedSideInputJoin(configuration)))
64+
.build();
65+
}
66+
}

0 commit comments

Comments
 (0)