Skip to content

Commit 65b368b

Browse files
committed
added methods for support ValueProvider
1 parent eaa3b56 commit 65b368b

File tree

1 file changed

+67
-16
lines changed
  • sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt

1 file changed

+67
-16
lines changed

sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java

+67-16
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.beam.sdk.coders.SerializableCoder;
3636
import org.apache.beam.sdk.io.UnboundedSource;
3737
import org.apache.beam.sdk.options.PipelineOptions;
38+
import org.apache.beam.sdk.options.ValueProvider;
3839
import org.apache.beam.sdk.transforms.DoFn;
3940
import org.apache.beam.sdk.transforms.PTransform;
4041
import org.apache.beam.sdk.transforms.ParDo;
@@ -123,29 +124,29 @@ private MqttIO() {}
123124
@AutoValue
124125
public abstract static class ConnectionConfiguration implements Serializable {
125126

126-
abstract String getServerUri();
127+
abstract ValueProvider<String> getServerUri();
127128

128-
abstract String getTopic();
129+
abstract ValueProvider<String> getTopic();
129130

130-
abstract @Nullable String getClientId();
131+
abstract @Nullable ValueProvider<String> getClientId();
131132

132-
abstract @Nullable String getUsername();
133+
abstract @Nullable ValueProvider<String> getUsername();
133134

134-
abstract @Nullable String getPassword();
135+
abstract @Nullable ValueProvider<String> getPassword();
135136

136137
abstract Builder builder();
137138

138139
@AutoValue.Builder
139140
abstract static class Builder {
140-
abstract Builder setServerUri(String serverUri);
141+
abstract Builder setServerUri(ValueProvider<String> serverUri);
141142

142-
abstract Builder setTopic(String topic);
143+
abstract Builder setTopic(ValueProvider<String> topic);
143144

144-
abstract Builder setClientId(String clientId);
145+
abstract Builder setClientId(ValueProvider<String> clientId);
145146

146-
abstract Builder setUsername(String username);
147+
abstract Builder setUsername(ValueProvider<String> username);
147148

148-
abstract Builder setPassword(String password);
149+
abstract Builder setPassword(ValueProvider<String> password);
149150

150151
abstract ConnectionConfiguration build();
151152
}
@@ -161,6 +162,23 @@ abstract static class Builder {
161162
public static ConnectionConfiguration create(String serverUri, String topic) {
162163
checkArgument(serverUri != null, "serverUri can not be null");
163164
checkArgument(topic != null, "topic can not be null");
165+
return create(
166+
ValueProvider.StaticValueProvider.of(serverUri),
167+
ValueProvider.StaticValueProvider.of(topic));
168+
}
169+
170+
/**
171+
* Describe a connection configuration to the MQTT broker. This method creates a unique random
172+
* MQTT client ID.
173+
*
174+
* @param serverUri The MQTT broker URI.
175+
* @param topic The MQTT getTopic pattern.
176+
* @return A connection configuration to the MQTT broker.
177+
*/
178+
public static ConnectionConfiguration create(
179+
ValueProvider<String> serverUri, ValueProvider<String> topic) {
180+
checkArgument(serverUri != null, "serverUri can not be null");
181+
checkArgument(topic != null, "topic can not be null");
164182
return new AutoValue_MqttIO_ConnectionConfiguration.Builder()
165183
.setServerUri(serverUri)
166184
.setTopic(topic)
@@ -169,28 +187,56 @@ public static ConnectionConfiguration create(String serverUri, String topic) {
169187

170188
/** Set up the MQTT broker URI. */
171189
public ConnectionConfiguration withServerUri(String serverUri) {
190+
checkArgument(serverUri != null, "serverUri can not be null");
191+
return withServerUri(ValueProvider.StaticValueProvider.of(serverUri));
192+
}
193+
194+
/** Set up the MQTT broker URI. */
195+
public ConnectionConfiguration withServerUri(ValueProvider<String> serverUri) {
172196
checkArgument(serverUri != null, "serverUri can not be null");
173197
return builder().setServerUri(serverUri).build();
174198
}
175199

176200
/** Set up the MQTT getTopic pattern. */
177201
public ConnectionConfiguration withTopic(String topic) {
202+
checkArgument(topic != null, "topic can not be null");
203+
return withTopic(ValueProvider.StaticValueProvider.of(topic));
204+
}
205+
206+
/** Set up the MQTT getTopic pattern. */
207+
public ConnectionConfiguration withTopic(ValueProvider<String> topic) {
178208
checkArgument(topic != null, "topic can not be null");
179209
return builder().setTopic(topic).build();
180210
}
181211

182212
/** Set up the client ID prefix, which is used to construct a unique client ID. */
183213
public ConnectionConfiguration withClientId(String clientId) {
214+
checkArgument(clientId != null, "clientId can not be null");
215+
return withClientId(ValueProvider.StaticValueProvider.of(clientId));
216+
}
217+
218+
/** Set up the client ID prefix, which is used to construct a unique client ID. */
219+
public ConnectionConfiguration withClientId(ValueProvider<String> clientId) {
184220
checkArgument(clientId != null, "clientId can not be null");
185221
return builder().setClientId(clientId).build();
186222
}
187223

188224
public ConnectionConfiguration withUsername(String username) {
225+
checkArgument(username != null, "username can not be null");
226+
return withUsername(ValueProvider.StaticValueProvider.of(username));
227+
}
228+
229+
public ConnectionConfiguration withUsername(ValueProvider<String> username) {
189230
checkArgument(username != null, "username can not be null");
190231
return builder().setUsername(username).build();
191232
}
192233

193234
public ConnectionConfiguration withPassword(String password) {
235+
checkArgument(password != null, "password can not be null");
236+
return withPassword(ValueProvider.StaticValueProvider.of(password));
237+
}
238+
239+
public ConnectionConfiguration withPassword(ValueProvider<String> password) {
194240
checkArgument(password != null, "password can not be null");
195241
return builder().setPassword(password).build();
196242
}
@@ -205,12 +251,15 @@ private void populateDisplayData(DisplayData.Builder builder) {
205251
private MQTT createClient() throws Exception {
206252
LOG.debug("Creating MQTT client to {}", getServerUri());
207253
MQTT client = new MQTT();
208-
client.setHost(getServerUri());
254+
client.setHost(getServerUri().get());
209255
if (getUsername() != null) {
210-
LOG.debug("MQTT client uses username {}", getUsername());
211-
client.setUserName(getUsername());
212-
client.setPassword(getPassword());
256+
LOG.debug("MQTT client uses username {}", getUsername().get());
257+
client.setUserName(getUsername().get());
213258
}
259+
if (getPassword() != null) {
260+
client.setPassword(getPassword().get());
261+
}
262+
214263
if (getClientId() != null) {
215264
String clientId = getClientId() + "-" + UUID.randomUUID().toString();
216265
clientId =
@@ -434,7 +483,9 @@ public boolean start() throws IOException {
434483
connection = client.blockingConnection();
435484
connection.connect();
436485
connection.subscribe(
437-
new Topic[] {new Topic(spec.connectionConfiguration().getTopic(), QoS.AT_LEAST_ONCE)});
486+
new Topic[] {
487+
new Topic(spec.connectionConfiguration().getTopic().get(), QoS.AT_LEAST_ONCE)
488+
});
438489
return advance();
439490
} catch (Exception e) {
440491
throw new IOException(e);
@@ -578,7 +629,7 @@ public void processElement(ProcessContext context) throws Exception {
578629
byte[] payload = context.element();
579630
LOG.debug("Sending message {}", new String(payload, StandardCharsets.UTF_8));
580631
connection.publish(
581-
spec.connectionConfiguration().getTopic(), payload, QoS.AT_LEAST_ONCE, false);
632+
spec.connectionConfiguration().getTopic().get(), payload, QoS.AT_LEAST_ONCE, false);
582633
}
583634

584635
@Teardown

0 commit comments

Comments
 (0)