diff --git a/build.gradle.kts b/build.gradle.kts index 8dcdc14f04e7..a665d04b527e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -244,6 +244,8 @@ tasks.register("javaPreCommit") { dependsOn(":beam-validate-runner:build") dependsOn(":examples:java:build") dependsOn(":examples:java:preCommit") + dependsOn(":examples:java:sql:build") + dependsOn(":examples:java:sql:preCommit") dependsOn(":examples:java:twitter:build") dependsOn(":examples:java:twitter:preCommit") dependsOn(":examples:multi-language:build") diff --git a/examples/java/sql/README.md b/examples/java/sql/README.md new file mode 100644 index 000000000000..d27b8ce7025c --- /dev/null +++ b/examples/java/sql/README.md @@ -0,0 +1,51 @@ + + +# Example Pipelines for Beam SQL and Schema Transforms + +The examples included in this module serve to demonstrate the basic +functionality of Apache Beam SQL, and act as starting points for +the development of more complex pipelines. + +## SQL transform + +An example that leverage the powerful SQL syntax in Beam [SqlTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/sql/SqlTransform.html) directly in your Beam pipelines. + + +[`SqlTransformExample`](https://github.com/apache/beam/blob/master/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SqlTransformExample.java) is a simple pipeline that calculates multiple metrics per key : Min, Max and Sum. + + +## Schema transform + +[Beam Schemas](https://beam.apache.org/documentation/programming-guide/#schemas) offer a flexible way of writing in code the same operations that are so easy to express in SQL. + +[`SchemaTransformExample`](https://github.com/apache/beam/blob/master/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SchemaTransformExample.java) is a simple pipeline that calculates multiple metrics per key : Min, Max and Sum. + +## Running Examples + +See [Apache Beam WordCount Example](https://beam.apache.org/get-started/wordcount-example/) for information on running these examples. + +Gradle cmd line can be similar to: + +`./gradlew clean :examples:java:sql:execute --args="--runner=DirectRunner" -Pdirect-runner -PmainClass=org.apache.beam.examples.sql.SqlTransformExample` + +## Beyond SQL and Schemas + +Both SQL and Schema Transforms leverage Row type. +The same results can be achieved directly using Beam transforms on a KV input PCollection. See [Composed Cobiners example](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java). diff --git a/examples/java/sql/build.gradle b/examples/java/sql/build.gradle new file mode 100644 index 000000000000..466e2d0f429d --- /dev/null +++ b/examples/java/sql/build.gradle @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import groovy.json.JsonOutput + +plugins { + id 'java' + id 'org.apache.beam.module' + id 'com.github.johnrengelman.shadow' +} + +applyJavaNature( + exportJavadoc: false, + automaticModuleName: 'org.apache.beam.examples.sql', +) +provideIntegrationTestingDependencies() +enableJavaPerformanceTesting() + +description = "Apache Beam :: Examples :: Java" +ext.summary = """Apache Beam SDK provides a simple, Java-based +interface for processing virtually any size data. This +artifact includes all Apache Beam Java SDK examples.""" + +/** Define the list of runners which execute a precommit test. + * Some runners are run from separate projects, see the preCommit task below + * for details. + */ +def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"] +for (String runner : preCommitRunners) { + configurations.create(runner + "PreCommit") +} +configurations.sparkRunnerPreCommit { + // Ban certain dependencies to prevent a StackOverflow within Spark + // because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14 + exclude group: "org.slf4j", module: "jul-to-slf4j" + exclude group: "org.slf4j", module: "slf4j-jdk14" +} + +dependencies { + implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:extensions:sql") + implementation library.java.slf4j_api + runtimeOnly project(path: ":runners:direct-java", configuration: "shadow") + runtimeOnly library.java.hadoop_client + runtimeOnly library.java.bigdataoss_gcs_connector + testImplementation project(path: ":runners:direct-java", configuration: "shadow") + testImplementation library.java.hamcrest + testImplementation library.java.junit + testImplementation library.java.mockito_core + testImplementation library.java.testcontainers_gcloud + + // Add dependencies for the PreCommit configurations + // For each runner a project level dependency on the examples project. + for (String runner : preCommitRunners) { + delegate.add(runner + "PreCommit", project(path: ":examples:java", configuration: "testRuntimeMigration")) + } + directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow") + flinkRunnerPreCommit project(":runners:flink:${project.ext.latestFlinkVersion}") + sparkRunnerPreCommit project(":runners:spark:3") + sparkRunnerPreCommit project(":sdks:java:io:hadoop-file-system") + + // Add dependency if requested on command line for runner + if (project.hasProperty("runnerDependency")) { + runtimeOnly project(path: project.getProperty("runnerDependency")) + } +} + +/* + * Create a ${runner}PreCommit task for each runner which runs a set + * of integration tests for WordCount and WindowedWordCount. + */ +def preCommitRunnerClass = [ + directRunner: "org.apache.beam.runners.direct.DirectRunner", + flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner", + sparkRunner: "org.apache.beam.runners.spark.TestSparkRunner", +] +def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' +def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests/' + +for (String runner : preCommitRunners) { + tasks.create(name: runner + "PreCommit", type: Test) { + def preCommitBeamTestPipelineOptions = [ + "--project=${gcpProject}", + "--tempRoot=${gcsTempRoot}", + "--runner=" + preCommitRunnerClass[runner], + ] + classpath = configurations."${runner}PreCommit" + forkEvery 1 + maxParallelForks 4 + systemProperty "beamTestPipelineOptions", JsonOutput.toJson(preCommitBeamTestPipelineOptions) + } +} + +/* Define a common precommit task which depends on all the individual precommits. */ +task preCommit() { + for (String runner : preCommitRunners) { + dependsOn runner + "PreCommit" + } +} + +tasks.create(name:"execute", type:JavaExec) { + main = project.hasProperty("mainClass") ? project.getProperty("mainClass") : "NONE" + classpath = sourceSets.main.runtimeClasspath + systemProperties System.getProperties() + args project.hasProperty("exec.args") ? project.getProperty("exec.args").split() : [] +} \ No newline at end of file diff --git a/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SchemaTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SchemaTransformExample.java new file mode 100644 index 000000000000..d50490c91b03 --- /dev/null +++ b/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SchemaTransformExample.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.sql; + +// beam-playground: +// name: Group.ByFields +// description: Demonstration of Schema transform usage. +// multifile: false +// default_example: false +// context_line: 60 +// categories: +// - Schemas +// - Combiners +// complexity: BASIC +// tags: +// - transforms +// - numbers + +// gradle clean execute -DmainClass=org.apache.beam.examples.SchemaTransformExample +// --args="--runner=DirectRunner" -Pdirect-runner + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.Group; +import org.apache.beam.sdk.schemas.transforms.Select; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An example that uses Schema transforms to apply multiple combiners (Sum, Min, Max) on the input + * PCollection. + * + *

For a detailed documentation of Schemas, see + * https://beam.apache.org/documentation/programming-guide/#schemas + */ +public class SchemaTransformExample { + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline pipeline = Pipeline.create(options); + // [START main_section] + // define the input row schema + Schema inputSchema = Schema.builder().addInt32Field("k").addInt32Field("n").build(); + // Create input + PCollection input = + pipeline + .apply( + Create.of( + Row.withSchema(inputSchema).addValues(1, 1).build(), + Row.withSchema(inputSchema).addValues(1, 5).build(), + Row.withSchema(inputSchema).addValues(2, 10).build(), + Row.withSchema(inputSchema).addValues(2, 20).build(), + Row.withSchema(inputSchema).addValues(3, 1).build())) + .setRowSchema(inputSchema); + + PCollection result = + input + .apply(Select.fieldNames("n", "k")) + .apply( + Group.byFieldNames("k") + .aggregateField("n", Min.ofIntegers(), "min_n") + .aggregateField("n", Max.ofIntegers(), "max_n") + .aggregateField("n", Sum.ofIntegers(), "sum_n")); + // [END main_section] + // Log values + result.apply(ParDo.of(new LogOutput<>("PCollection values after Schema transform: "))); + pipeline.run(); + } + + static class LogOutput extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); + private final String prefix; + + public LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + c.element()); + c.output(c.element()); + } + } +} diff --git a/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SqlTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SqlTransformExample.java new file mode 100644 index 000000000000..1f8ae1973b4e --- /dev/null +++ b/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SqlTransformExample.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.sql; + +// beam-playground: +// name: SqlTransform +// description: Demonstration of SQL transform usage. +// multifile: false +// default_example: false +// context_line: 60 +// categories: +// - Beam SQL +// - Combiners +// complexity: BASIC +// tags: +// - transforms +// - numbers + +// gradle clean execute -DmainClass=org.apache.beam.examples.SqlTransformExample +// --args="--runner=DirectRunner" -Pdirect-runner + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An example that uses Beam SQL transformation to apply multiple combiners (Min, Max, Sum) on the + * input PCollection. + * + *

Using SQL syntax to define a transform than can be integrated in a Java pipeline. + * + *

For a detailed documentation of Beam SQL, see + * https://beam.apache.org/documentation/dsls/sql/overview/ + */ +public class SqlTransformExample { + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline pipeline = Pipeline.create(options); + // [START main_section] + // define the input row format + Schema inputSchema = Schema.builder().addInt32Field("k").addInt32Field("n").build(); + // Create input + PCollection input = + pipeline + .apply( + Create.of( + Row.withSchema(inputSchema).addValues(1, 1).build(), + Row.withSchema(inputSchema).addValues(1, 5).build(), + Row.withSchema(inputSchema).addValues(2, 10).build(), + Row.withSchema(inputSchema).addValues(2, 20).build(), + Row.withSchema(inputSchema).addValues(3, 1).build())) + .setRowSchema(inputSchema); + + PCollection result = + input.apply( + SqlTransform.query( + "select k, min(n) as min_n, max(n) as max_n, sum(n) as sum_n from PCOLLECTION group by k")); + // [END main_section] + // Log values + result.apply(ParDo.of(new LogOutput<>("PCollection values after SQL transform: "))); + pipeline.run(); + } + + static class LogOutput extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); + private final String prefix; + + public LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + c.element()); + c.output(c.element()); + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java new file mode 100644 index 000000000000..3b91b29e9f28 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +// beam-playground: +// name: CombineFns.ComposedCombineFn +// description: Demonstration of Composed Combine transform usage. +// multifile: false +// default_example: false +// context_line: 143 +// categories: +// - Schemas +// - Combiners +// complexity: MEDIUM +// tags: +// - transforms +// - numbers + +// gradle clean execute -DmainClass=org.apache.beam.examples.CoCombineTransformExample +// --args="--runner=DirectRunner" -Pdirect-runner + +import java.util.ArrayList; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineFns; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An example that uses Composed combiners to apply multiple combiners (Sum, Min, Max) on the input + * PCollection. + * + *

For a detailed documentation of Composed Combines, see + * https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/CombineFns.html + * + * + *

Remark, the combiners are wrapped in a DropNullFn, because when cobining the input usually has + * many null values that need to be handled by the combiner. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CoCombineTransformExample { + + /** + * A wrapper for combiners, that will drop the null elements before applying the combiner. Similar + * to org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations + * private DropNullFn() + */ + public static class DropNullFn + extends Combine.CombineFn { + + protected final Combine.CombineFn combineFn; + + public DropNullFn(Combine.CombineFn combineFn) { + this.combineFn = combineFn; + } + + @Override + public AccumT createAccumulator() { + return null; + } + + @Override + public AccumT addInput(AccumT accumulator, InputT input) { + if (input == null) { + return accumulator; + } + + if (accumulator == null) { + accumulator = combineFn.createAccumulator(); + } + return combineFn.addInput(accumulator, input); + } + + @Override + public AccumT mergeAccumulators(Iterable accumulators) { + // filter out nulls + accumulators = Iterables.filter(accumulators, Predicates.notNull()); + + // handle only nulls + if (!accumulators.iterator().hasNext()) { + return null; + } + + return combineFn.mergeAccumulators(accumulators); + } + + @Override + public OutputT extractOutput(AccumT accumulator) { + if (accumulator == null) { + return null; + } + return combineFn.extractOutput(accumulator); + } + + @Override + public Coder getAccumulatorCoder(CoderRegistry registry, Coder inputCoder) + throws CannotProvideCoderException { + Coder coder = combineFn.getAccumulatorCoder(registry, inputCoder); + if (coder instanceof NullableCoder) { + return coder; + } + return NullableCoder.of(coder); + } + } + + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline pipeline = Pipeline.create(options); + // [START main_section] + // Create input + PCollection> inputKV = + pipeline.apply( + Create.of(KV.of(1L, 1L), KV.of(1L, 5L), KV.of(2L, 10L), KV.of(2L, 20L), KV.of(3L, 1L))); + /** + * Define the function used to filter elements before sending them to the Combiner. With + * identityFn all elements (here perKey) will be combined. + */ + SimpleFunction identityFn = + new SimpleFunction() { + @Override + public Long apply(Long input) { + return input; + } + }; + + // tuple tags to identify the outputs of the Composed Combine + TupleTag sumTag = new TupleTag("sum_n"); + TupleTag minTag = new TupleTag("min_n"); + TupleTag maxTag = new TupleTag("max_n"); + + CombineFns.ComposedCombineFn composedCombine = + CombineFns.compose() + .with( + identityFn, + new DropNullFn(Sum.ofLongs()), + sumTag) // elements filtered by the identityFn, will be combined in a Sum and the + // output will be tagged + .with(identityFn, new DropNullFn(Min.ofLongs()), minTag) + .with(identityFn, new DropNullFn(Max.ofLongs()), maxTag); + + PCollection> combinedData = + inputKV.apply("Combine all", Combine.perKey(composedCombine)); + + // transform the CoCombineResult output into a KV format, simpler to use for printing + PCollection>>> result = + combinedData.apply( + ParDo.of( + new DoFn< + KV, KV>>>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + CombineFns.CoCombineResult e = c.element().getValue(); + ArrayList> o = new ArrayList>(); + o.add(KV.of(minTag.getId(), e.get(minTag))); + o.add(KV.of(maxTag.getId(), e.get(maxTag))); + o.add(KV.of(sumTag.getId(), e.get(sumTag))); + c.output(KV.of(c.element().getKey(), o)); + } + })); + + // [END main_section] + // Log values + result.apply(ParDo.of(new LogOutput<>("PCollection values after CoCombine transform: "))); + pipeline.run(); + } + + static class LogOutput extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); + private final String prefix; + + public LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + c.element()); + c.output(c.element()); + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 9bf86c9bf2f1..9dd77f9e18f4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -78,6 +78,7 @@ include(":examples:java:cdap:hubspot") include(":examples:java:cdap:salesforce") include(":examples:java:cdap:servicenow") include(":examples:java:cdap:zendesk") +include(":examples:java:sql") include(":examples:java:webapis") include(":examples:kotlin") include(":examples:multi-language")