Skip to content

Add 3 examples in playground SQL transform, schema transform and Composite Combine #34322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ tasks.register("javaPreCommit") {
dependsOn(":beam-validate-runner:build")
dependsOn(":examples:java:build")
dependsOn(":examples:java:preCommit")
dependsOn(":examples:java:sql:build")
dependsOn(":examples:java:sql:preCommit")
dependsOn(":examples:java:twitter:build")
dependsOn(":examples:java:twitter:preCommit")
dependsOn(":examples:multi-language:build")
Expand Down
51 changes: 51 additions & 0 deletions examples/java/sql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Example Pipelines for Beam SQL and Schema Transforms

The examples included in this module serve to demonstrate the basic
functionality of Apache Beam SQL, and act as starting points for
the development of more complex pipelines.

## SQL transform

An example that leverage the powerful SQL syntax in Beam [SqlTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/sql/SqlTransform.html) directly in your Beam pipelines.


[`SqlTransformExample`](https://github.com/apache/beam/blob/master/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SqlTransformExample.java) is a simple pipeline that calculates multiple metrics per key : Min, Max and Sum.


## Schema transform

[Beam Schemas](https://beam.apache.org/documentation/programming-guide/#schemas) offer a flexible way of writing in code the same operations that are so easy to express in SQL.

[`SchemaTransformExample`](https://github.com/apache/beam/blob/master/examples/java/sql/src/main/java/org/apache/beam/examples/sql/SchemaTransformExample.java) is a simple pipeline that calculates multiple metrics per key : Min, Max and Sum.

## Running Examples

See [Apache Beam WordCount Example](https://beam.apache.org/get-started/wordcount-example/) for information on running these examples.

Gradle cmd line can be similar to:

`./gradlew clean :examples:java:sql:execute --args="--runner=DirectRunner" -Pdirect-runner -PmainClass=org.apache.beam.examples.sql.SqlTransformExample`

## Beyond SQL and Schemas

Both SQL and Schema Transforms leverage Row type.
The same results can be achieved directly using Beam transforms on a KV input PCollection. See [Composed Cobiners example](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java).
122 changes: 122 additions & 0 deletions examples/java/sql/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import groovy.json.JsonOutput

plugins {
id 'java'
id 'org.apache.beam.module'
id 'com.github.johnrengelman.shadow'
}

applyJavaNature(
exportJavadoc: false,
automaticModuleName: 'org.apache.beam.examples.sql',
)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: Examples :: Java"
ext.summary = """Apache Beam SDK provides a simple, Java-based
interface for processing virtually any size data. This
artifact includes all Apache Beam Java SDK examples."""

/** Define the list of runners which execute a precommit test.
* Some runners are run from separate projects, see the preCommit task below
* for details.
*/
def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"]
for (String runner : preCommitRunners) {
configurations.create(runner + "PreCommit")
}
configurations.sparkRunnerPreCommit {
// Ban certain dependencies to prevent a StackOverflow within Spark
// because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14
exclude group: "org.slf4j", module: "jul-to-slf4j"
exclude group: "org.slf4j", module: "slf4j-jdk14"
}

dependencies {
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:sql")
implementation library.java.slf4j_api
runtimeOnly project(path: ":runners:direct-java", configuration: "shadow")
runtimeOnly library.java.hadoop_client
runtimeOnly library.java.bigdataoss_gcs_connector
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.testcontainers_gcloud

// Add dependencies for the PreCommit configurations
// For each runner a project level dependency on the examples project.
for (String runner : preCommitRunners) {
delegate.add(runner + "PreCommit", project(path: ":examples:java", configuration: "testRuntimeMigration"))
}
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
flinkRunnerPreCommit project(":runners:flink:${project.ext.latestFlinkVersion}")
sparkRunnerPreCommit project(":runners:spark:3")
sparkRunnerPreCommit project(":sdks:java:io:hadoop-file-system")

// Add dependency if requested on command line for runner
if (project.hasProperty("runnerDependency")) {
runtimeOnly project(path: project.getProperty("runnerDependency"))
}
}

/*
* Create a ${runner}PreCommit task for each runner which runs a set
* of integration tests for WordCount and WindowedWordCount.
*/
def preCommitRunnerClass = [
directRunner: "org.apache.beam.runners.direct.DirectRunner",
flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner",
sparkRunner: "org.apache.beam.runners.spark.TestSparkRunner",
]
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests/'

for (String runner : preCommitRunners) {
tasks.create(name: runner + "PreCommit", type: Test) {
def preCommitBeamTestPipelineOptions = [
"--project=${gcpProject}",
"--tempRoot=${gcsTempRoot}",
"--runner=" + preCommitRunnerClass[runner],
]
classpath = configurations."${runner}PreCommit"
forkEvery 1
maxParallelForks 4
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(preCommitBeamTestPipelineOptions)
}
}

/* Define a common precommit task which depends on all the individual precommits. */
task preCommit() {
for (String runner : preCommitRunners) {
dependsOn runner + "PreCommit"
}
}

tasks.create(name:"execute", type:JavaExec) {
main = project.hasProperty("mainClass") ? project.getProperty("mainClass") : "NONE"
classpath = sourceSets.main.runtimeClasspath
systemProperties System.getProperties()
args project.hasProperty("exec.args") ? project.getProperty("exec.args").split() : []
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.sql;

// beam-playground:
// name: Group.ByFields
// description: Demonstration of Schema transform usage.
// multifile: false
// default_example: false
// context_line: 60
// categories:
// - Schemas
// - Combiners
// complexity: BASIC
// tags:
// - transforms
// - numbers

// gradle clean execute -DmainClass=org.apache.beam.examples.SchemaTransformExample
// --args="--runner=DirectRunner" -Pdirect-runner

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Group;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An example that uses Schema transforms to apply multiple combiners (Sum, Min, Max) on the input
* PCollection.
*
* <p>For a detailed documentation of Schemas, see <a
* href="https://beam.apache.org/documentation/programming-guide/#schemas">
* https://beam.apache.org/documentation/programming-guide/#schemas </a>
*/
public class SchemaTransformExample {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
// [START main_section]
// define the input row schema
Schema inputSchema = Schema.builder().addInt32Field("k").addInt32Field("n").build();
// Create input
PCollection<Row> input =
pipeline
.apply(
Create.of(
Row.withSchema(inputSchema).addValues(1, 1).build(),
Row.withSchema(inputSchema).addValues(1, 5).build(),
Row.withSchema(inputSchema).addValues(2, 10).build(),
Row.withSchema(inputSchema).addValues(2, 20).build(),
Row.withSchema(inputSchema).addValues(3, 1).build()))
.setRowSchema(inputSchema);

PCollection<Row> result =
input
.apply(Select.fieldNames("n", "k"))
.apply(
Group.<Row>byFieldNames("k")
.aggregateField("n", Min.ofIntegers(), "min_n")
.aggregateField("n", Max.ofIntegers(), "max_n")
.aggregateField("n", Sum.ofIntegers(), "sum_n"));
// [END main_section]
// Log values
result.apply(ParDo.of(new LogOutput<>("PCollection values after Schema transform: ")));
pipeline.run();
}

static class LogOutput<T> extends DoFn<T, T> {
private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class);
private final String prefix;

public LogOutput(String prefix) {
this.prefix = prefix;
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info(prefix + c.element());
c.output(c.element());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.sql;

// beam-playground:
// name: SqlTransform
// description: Demonstration of SQL transform usage.
// multifile: false
// default_example: false
// context_line: 60
// categories:
// - Beam SQL
// - Combiners
// complexity: BASIC
// tags:
// - transforms
// - numbers

// gradle clean execute -DmainClass=org.apache.beam.examples.SqlTransformExample
// --args="--runner=DirectRunner" -Pdirect-runner

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An example that uses Beam SQL transformation to apply multiple combiners (Min, Max, Sum) on the
* input PCollection.
*
* <p>Using SQL syntax to define a transform than can be integrated in a Java pipeline.
*
* <p>For a detailed documentation of Beam SQL, see <a
* href="https://beam.apache.org/documentation/dsls/sql/overview/">
* https://beam.apache.org/documentation/dsls/sql/overview/ </a>
*/
public class SqlTransformExample {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
// [START main_section]
// define the input row format
Schema inputSchema = Schema.builder().addInt32Field("k").addInt32Field("n").build();
// Create input
PCollection<Row> input =
pipeline
.apply(
Create.of(
Row.withSchema(inputSchema).addValues(1, 1).build(),
Row.withSchema(inputSchema).addValues(1, 5).build(),
Row.withSchema(inputSchema).addValues(2, 10).build(),
Row.withSchema(inputSchema).addValues(2, 20).build(),
Row.withSchema(inputSchema).addValues(3, 1).build()))
.setRowSchema(inputSchema);

PCollection<Row> result =
input.apply(
SqlTransform.query(
"select k, min(n) as min_n, max(n) as max_n, sum(n) as sum_n from PCOLLECTION group by k"));
// [END main_section]
// Log values
result.apply(ParDo.of(new LogOutput<>("PCollection values after SQL transform: ")));
pipeline.run();
}

static class LogOutput<T> extends DoFn<T, T> {
private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class);
private final String prefix;

public LogOutput(String prefix) {
this.prefix = prefix;
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info(prefix + c.element());
c.output(c.element());
}
}
}
Loading
Loading