Skip to content

Commit 4e76385

Browse files
ahmedabu98twosom
authored andcommitted
cherry-pick from apache#32385
1 parent 13f4ae0 commit 4e76385

File tree

6 files changed

+468
-1
lines changed

6 files changed

+468
-1
lines changed

sdks/java/io/expansion-service/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ dependencies {
5050
permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761
5151
implementation project(":sdks:java:io:kafka:upgrade")
5252
permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761
53+
implementation project(":sdks:java:io:mqtt")
54+
permitUnusedDeclared project(":sdks:java:io:mqtt") // BEAM-11761
5355

5456
// **** IcebergIO runtime dependencies ****
5557
runtimeOnly library.java.hadoop_client

sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java

+7
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@
3737
import org.apache.beam.sdk.coders.SerializableCoder;
3838
import org.apache.beam.sdk.io.UnboundedSource;
3939
import org.apache.beam.sdk.options.PipelineOptions;
40+
import org.apache.beam.sdk.schemas.AutoValueSchema;
4041
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
42+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
43+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
4144
import org.apache.beam.sdk.transforms.DoFn;
4245
import org.apache.beam.sdk.transforms.PTransform;
4346
import org.apache.beam.sdk.transforms.ParDo;
@@ -205,13 +208,17 @@ public static <InputT> Write<InputT> dynamicWrite() {
205208
private MqttIO() {}
206209

207210
/** A POJO describing a MQTT connection. */
211+
@DefaultSchema(AutoValueSchema.class)
208212
@AutoValue
209213
public abstract static class ConnectionConfiguration implements Serializable {
210214

215+
@SchemaFieldDescription("The MQTT broker URI.")
211216
abstract String getServerUri();
212217

218+
@SchemaFieldDescription("The MQTT topic pattern.")
213219
abstract @Nullable String getTopic();
214220

221+
@SchemaFieldDescription("The client ID prefix, which is used to construct a unique client ID.")
215222
abstract @Nullable String getClientId();
216223

217224
abstract @Nullable String getUsername();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.mqtt;
19+
20+
import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
21+
import static org.apache.beam.sdk.io.mqtt.MqttReadSchemaTransformProvider.ReadConfiguration;
22+
23+
import com.google.auto.service.AutoService;
24+
import com.google.auto.value.AutoValue;
25+
import java.io.Serializable;
26+
import javax.annotation.Nullable;
27+
import org.apache.beam.sdk.schemas.AutoValueSchema;
28+
import org.apache.beam.sdk.schemas.Schema;
29+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
30+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
31+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
32+
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
33+
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
34+
import org.apache.beam.sdk.transforms.DoFn;
35+
import org.apache.beam.sdk.transforms.ParDo;
36+
import org.apache.beam.sdk.values.PCollection;
37+
import org.apache.beam.sdk.values.PCollectionRowTuple;
38+
import org.apache.beam.sdk.values.Row;
39+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
40+
import org.joda.time.Duration;
41+
42+
@AutoService(SchemaTransformProvider.class)
43+
public class MqttReadSchemaTransformProvider
44+
extends TypedSchemaTransformProvider<ReadConfiguration> {
45+
@DefaultSchema(AutoValueSchema.class)
46+
@AutoValue
47+
public abstract static class ReadConfiguration implements Serializable {
48+
public static Builder builder() {
49+
return new AutoValue_MqttReadSchemaTransformProvider_ReadConfiguration.Builder();
50+
}
51+
52+
@SchemaFieldDescription("Configuration options to set up the MQTT connection.")
53+
public abstract ConnectionConfiguration getConnectionConfiguration();
54+
55+
@SchemaFieldDescription(
56+
"The max number of records to receive. Setting this will result in a bounded PCollection.")
57+
@Nullable
58+
public abstract Long getMaxNumRecords();
59+
60+
@SchemaFieldDescription(
61+
"The maximum time for this source to read messages. Setting this will result in a bounded PCollection.")
62+
@Nullable
63+
public abstract Long getMaxReadTimeSeconds();
64+
65+
@AutoValue.Builder
66+
public abstract static class Builder {
67+
public abstract Builder setConnectionConfiguration(
68+
ConnectionConfiguration connectionConfiguration);
69+
70+
public abstract Builder setMaxNumRecords(Long maxNumRecords);
71+
72+
public abstract Builder setMaxReadTimeSeconds(Long maxReadTimeSeconds);
73+
74+
public abstract ReadConfiguration build();
75+
}
76+
}
77+
78+
@Override
79+
public String identifier() {
80+
return "beam:schematransform:org.apache.beam:mqtt_read:v1";
81+
}
82+
83+
@Override
84+
protected SchemaTransform from(ReadConfiguration configuration) {
85+
return new MqttReadSchemaTransform(configuration);
86+
}
87+
88+
private static class MqttReadSchemaTransform extends SchemaTransform {
89+
private final ReadConfiguration config;
90+
91+
MqttReadSchemaTransform(ReadConfiguration configuration) {
92+
this.config = configuration;
93+
}
94+
95+
@Override
96+
public PCollectionRowTuple expand(PCollectionRowTuple input) {
97+
Preconditions.checkState(
98+
input.getAll().isEmpty(),
99+
"Expected zero input PCollections for this source, but found: %",
100+
input.getAll().keySet());
101+
102+
MqttIO.Read<byte[]> readTransform =
103+
MqttIO.read().withConnectionConfiguration(config.getConnectionConfiguration());
104+
105+
Long maxRecords = config.getMaxNumRecords();
106+
Long maxReadTime = config.getMaxReadTimeSeconds();
107+
if (maxRecords != null) {
108+
readTransform = readTransform.withMaxNumRecords(maxRecords);
109+
}
110+
if (maxReadTime != null) {
111+
readTransform = readTransform.withMaxReadTime(Duration.standardSeconds(maxReadTime));
112+
}
113+
114+
Schema outputSchema = Schema.builder().addByteArrayField("bytes").build();
115+
116+
PCollection<Row> outputRows =
117+
input
118+
.getPipeline()
119+
.apply(readTransform)
120+
.apply(
121+
"Wrap in Beam Rows",
122+
ParDo.of(
123+
new DoFn<byte[], Row>() {
124+
@ProcessElement
125+
public void processElement(
126+
@Element byte[] data, OutputReceiver<Row> outputReceiver) {
127+
outputReceiver.output(
128+
Row.withSchema(outputSchema).addValue(data).build());
129+
}
130+
}))
131+
.setRowSchema(outputSchema);
132+
133+
return PCollectionRowTuple.of("output", outputRows);
134+
}
135+
}
136+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.mqtt;
19+
20+
import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
21+
import static org.apache.beam.sdk.io.mqtt.MqttWriteSchemaTransformProvider.WriteConfiguration;
22+
23+
import com.google.auto.service.AutoService;
24+
import com.google.auto.value.AutoValue;
25+
import java.io.Serializable;
26+
import javax.annotation.Nullable;
27+
import org.apache.beam.sdk.schemas.AutoValueSchema;
28+
import org.apache.beam.sdk.schemas.Schema;
29+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
30+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
31+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
32+
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
33+
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
34+
import org.apache.beam.sdk.transforms.DoFn;
35+
import org.apache.beam.sdk.transforms.ParDo;
36+
import org.apache.beam.sdk.values.PCollection;
37+
import org.apache.beam.sdk.values.PCollectionRowTuple;
38+
import org.apache.beam.sdk.values.Row;
39+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
40+
41+
@AutoService(SchemaTransformProvider.class)
42+
public class MqttWriteSchemaTransformProvider
43+
extends TypedSchemaTransformProvider<WriteConfiguration> {
44+
@DefaultSchema(AutoValueSchema.class)
45+
@AutoValue
46+
public abstract static class WriteConfiguration implements Serializable {
47+
public static Builder builder() {
48+
return new AutoValue_MqttWriteSchemaTransformProvider_WriteConfiguration.Builder();
49+
}
50+
51+
@SchemaFieldDescription("Configuration options to set up the MQTT connection.")
52+
public abstract ConnectionConfiguration getConnectionConfiguration();
53+
54+
@SchemaFieldDescription(
55+
"Whether or not the publish message should be retained by the messaging engine. "
56+
+ "When a subscriber connects, it gets the latest retained message. "
57+
+ "Defaults to `False`, which will clear the retained message from the server.")
58+
@Nullable
59+
public abstract Boolean getRetained();
60+
61+
@AutoValue.Builder
62+
public abstract static class Builder {
63+
public abstract Builder setConnectionConfiguration(ConnectionConfiguration foo);
64+
65+
public abstract Builder setRetained(Boolean retained);
66+
67+
public abstract WriteConfiguration build();
68+
}
69+
}
70+
71+
@Override
72+
public String identifier() {
73+
return "beam:schematransform:org.apache.beam:mqtt_write:v1";
74+
}
75+
76+
@Override
77+
protected SchemaTransform from(WriteConfiguration configuration) {
78+
return new MqttWriteSchemaTransform(configuration);
79+
}
80+
81+
private static class MqttWriteSchemaTransform extends SchemaTransform {
82+
private final WriteConfiguration config;
83+
84+
MqttWriteSchemaTransform(WriteConfiguration configuration) {
85+
this.config = configuration;
86+
}
87+
88+
@Override
89+
public PCollectionRowTuple expand(PCollectionRowTuple input) {
90+
PCollection<Row> inputRows = input.getSinglePCollection();
91+
92+
Preconditions.checkState(
93+
inputRows.getSchema().getFieldCount() == 1
94+
&& inputRows.getSchema().getField(0).getType().equals(Schema.FieldType.BYTES),
95+
"Expected only one Schema field containing bytes, but instead received: %s",
96+
inputRows.getSchema());
97+
98+
MqttIO.Write<byte[]> writeTransform =
99+
MqttIO.write().withConnectionConfiguration(config.getConnectionConfiguration());
100+
Boolean retained = config.getRetained();
101+
if (retained != null) {
102+
writeTransform = writeTransform.withRetained(retained);
103+
}
104+
105+
inputRows
106+
.apply(
107+
"Extract bytes",
108+
ParDo.of(
109+
new DoFn<Row, byte[]>() {
110+
@ProcessElement
111+
public void processElement(
112+
@Element Row row, OutputReceiver<byte[]> outputReceiver) {
113+
outputReceiver.output(
114+
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
115+
row.getBytes(0)));
116+
}
117+
}))
118+
.apply(writeTransform);
119+
120+
return PCollectionRowTuple.empty(inputRows.getPipeline());
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)