Skip to content

Commit e9f4626

Browse files
idivanshuDivanshu PrajapatfdelbrayelleMilosPaunovic
authored
feat(fs-tcp, fs-udp): add TCP & UDP Send and RealtimeTrigger plugins … (#254)
* feat(fs-tcp, fs-udp): add TCP & UDP Send and RealtimeTrigger plugins with tests * chore(fs-udp): update plugin category from STORAGE to MESSAGING Co-authored-by: François Delbrayelle <fdelbrayelle@gmail.com> * Update src/main/java/io/kestra/plugin/fs/tcp/package-info.java Co-authored-by: François Delbrayelle <fdelbrayelle@gmail.com> * style(fs-tcp, fs-udp): address review comments and align example YAML formatting * Apply suggestion from @fdelbrayelle --------- Co-authored-by: Divanshu Prajapat <honex@Divanshus-MacBook-Air.local> Co-authored-by: François Delbrayelle <fdelbrayelle@gmail.com> Co-authored-by: Miloš Paunović <paun992@hotmail.com>
1 parent 748dc9f commit e9f4626

17 files changed

+1331
-0
lines changed
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package io.kestra.plugin.fs.tcp;
2+
3+
import io.kestra.core.models.annotations.Example;
4+
import io.kestra.core.models.annotations.Plugin;
5+
import io.kestra.core.models.conditions.ConditionContext;
6+
import io.kestra.core.models.executions.Execution;
7+
import io.kestra.core.models.property.Property;
8+
import io.kestra.core.models.triggers.*;
9+
import io.kestra.core.runners.RunContext;
10+
import io.swagger.v3.oas.annotations.media.Schema;
11+
import jakarta.inject.Inject;
12+
import jakarta.validation.constraints.NotNull;
13+
import lombok.*;
14+
import lombok.experimental.SuperBuilder;
15+
import org.reactivestreams.Publisher;
16+
import org.slf4j.Logger;
17+
import reactor.core.publisher.Flux;
18+
19+
import java.io.BufferedReader;
20+
import java.io.InputStreamReader;
21+
import java.net.InetSocketAddress;
22+
import java.net.ServerSocket;
23+
import java.net.Socket;
24+
import java.net.SocketException;
25+
import java.nio.charset.StandardCharsets;
26+
import java.time.Instant;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
29+
30+
@SuperBuilder
31+
@ToString
32+
@EqualsAndHashCode
33+
@Getter
34+
@NoArgsConstructor
35+
@Schema(
36+
title = "Trigger a flow when a TCP message is received in real-time."
37+
)
38+
@Plugin(
39+
examples = {
40+
@Example(
41+
full = true,
42+
title = "Trigger a flow whenever a TCP message is received.",
43+
code = """
44+
id: tcp_realtime
45+
namespace: dev
46+
47+
triggers:
48+
- id: tcp_listener
49+
type: io.kestra.plugin.fs.tcp.RealtimeTrigger
50+
host: 0.0.0.0
51+
port: 9090
52+
53+
tasks:
54+
- id: log_message
55+
type: io.kestra.plugin.core.log.Log
56+
message: "Received {{ trigger.payload }} from {{ trigger.sourceIp }}"
57+
"""
58+
)
59+
}
60+
)
61+
public class RealtimeTrigger extends AbstractTrigger
62+
implements RealtimeTriggerInterface, TriggerOutput<RealtimeTrigger.Output> {
63+
64+
@Inject
65+
@Builder.Default
66+
private TcpService tcpService = TcpService.getInstance();
67+
68+
@Schema(title = "The interface to bind.", defaultValue = "0.0.0.0")
69+
@Builder.Default
70+
private Property<String> host = Property.ofValue("0.0.0.0");
71+
72+
@Schema(title = "The TCP port to listen on.")
73+
@NotNull
74+
private Property<Integer> port;
75+
76+
@Schema(title = "Encoding for incoming data.", defaultValue = "UTF-8")
77+
@Builder.Default
78+
private Property<String> encoding = Property.ofValue(StandardCharsets.UTF_8.name());
79+
80+
private transient final AtomicBoolean active = new AtomicBoolean(false);
81+
private transient ServerSocket serverSocket;
82+
83+
@Override
84+
public Publisher<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {
85+
RunContext runContext = conditionContext.getRunContext();
86+
Logger logger = runContext.logger();
87+
88+
89+
String rHost = runContext.render(this.host).as(String.class).orElse("0.0.0.0");
90+
Integer rPort = runContext.render(this.port).as(Integer.class)
91+
.orElseThrow(() -> new IllegalArgumentException("`port` is required"));
92+
String rEncoding = runContext.render(this.encoding).as(String.class).orElse(StandardCharsets.UTF_8.name());
93+
94+
active.set(true);
95+
96+
return Flux.<Execution>create(emitter -> {
97+
try {
98+
serverSocket = new ServerSocket();
99+
serverSocket.bind(new InetSocketAddress(rHost, rPort));
100+
logger.info("TCP RealtimeTrigger listening on {}:{}", rHost, rPort);
101+
102+
while (active.get()) {
103+
try (Socket client = serverSocket.accept()) {
104+
if (!active.get()) break;
105+
106+
String sourceIp = client.getInetAddress().getHostAddress();
107+
int sourcePort = client.getPort();
108+
109+
BufferedReader reader = new BufferedReader(
110+
new InputStreamReader(client.getInputStream(), rEncoding)
111+
);
112+
113+
StringBuilder sb = new StringBuilder();
114+
String line;
115+
while ((line = reader.readLine()) != null) {
116+
sb.append(line);
117+
}
118+
119+
String message = sb.toString().trim();
120+
if (message.isEmpty()) continue;
121+
122+
logger.info("Received TCP message from {}:{} -> {}", sourceIp, sourcePort, message);
123+
124+
Output output = Output.builder()
125+
.payload(message)
126+
.timestamp(Instant.now())
127+
.sourceIp(sourceIp)
128+
.sourcePort(sourcePort)
129+
.build();
130+
131+
emitter.next(TriggerService.generateRealtimeExecution(this, conditionContext, context, output));
132+
} catch (SocketException se) {
133+
if (active.get()) {
134+
logger.warn("Socket exception: {}", se.getMessage());
135+
}
136+
break;
137+
} catch (Exception e) {
138+
if (active.get()) {
139+
logger.error("Error handling TCP connection: {}", e.getMessage(), e);
140+
}
141+
}
142+
}
143+
} catch (Exception e) {
144+
logger.error("TCP listener stopped due to error: {}", e.getMessage(), e);
145+
emitter.error(e);
146+
} finally {
147+
closeServerSocket(logger, rPort);
148+
emitter.complete();
149+
}
150+
}).doOnCancel(() -> {
151+
logger.info("TCP RealtimeTrigger cancelled by flow change or stop request.");
152+
stop();
153+
});
154+
}
155+
156+
@Override
157+
public void stop() {
158+
if (active.compareAndSet(true, false)) {
159+
try {
160+
if (serverSocket != null && !serverSocket.isClosed()) {
161+
serverSocket.close();
162+
}
163+
System.out.println("[TcpRealtimeTrigger] Socket closed via stop()");
164+
} catch (Exception e) {
165+
System.out.println("[TcpRealtimeTrigger] Error closing socket: " + e.getMessage());
166+
}
167+
}
168+
}
169+
170+
@Override
171+
public void kill() {
172+
System.out.println("[TcpRealtimeTrigger] Kill signal received");
173+
stop();
174+
}
175+
176+
private void closeServerSocket(Logger logger, Integer port) {
177+
try {
178+
if (serverSocket != null && !serverSocket.isClosed()) {
179+
serverSocket.close();
180+
if (logger != null && port != null) {
181+
logger.info("TCP RealtimeTrigger stopped listening on port {}", port);
182+
} else {
183+
System.out.println("[TcpRealtimeTrigger] Stopped listening on port " + port);
184+
}
185+
}
186+
} catch (Exception e) {
187+
if (logger != null) {
188+
logger.warn("Error closing server socket: {}", e.getMessage());
189+
} else {
190+
System.out.println("[TcpRealtimeTrigger] Error closing socket: " + e.getMessage());
191+
}
192+
}
193+
}
194+
195+
@Builder
196+
@Getter
197+
public static class Output implements io.kestra.core.models.tasks.Output {
198+
@Schema(title = "The received TCP payload.")
199+
private final String payload;
200+
201+
@Schema(title = "The timestamp when the message was received.")
202+
private final Instant timestamp;
203+
204+
@Schema(title = "The IP address of the sender.")
205+
private final String sourceIp;
206+
207+
@Schema(title = "The port of the sender.")
208+
private final Integer sourcePort;
209+
}
210+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package io.kestra.plugin.fs.tcp;
2+
3+
import io.kestra.core.models.annotations.Example;
4+
import io.kestra.core.models.annotations.Plugin;
5+
import io.kestra.core.models.property.Property;
6+
import io.kestra.core.models.tasks.RunnableTask;
7+
import io.kestra.core.models.tasks.Task;
8+
import io.kestra.core.runners.RunContext;
9+
import io.swagger.v3.oas.annotations.media.Schema;
10+
import jakarta.inject.Inject;
11+
import jakarta.validation.constraints.NotNull;
12+
import lombok.*;
13+
import lombok.experimental.SuperBuilder;
14+
import org.slf4j.Logger;
15+
16+
import java.io.*;
17+
import java.net.*;
18+
import java.nio.charset.StandardCharsets;
19+
import java.time.Duration;
20+
21+
22+
@SuperBuilder
23+
@ToString
24+
@EqualsAndHashCode
25+
@Getter
26+
@NoArgsConstructor
27+
@Schema(
28+
title = "Send a payload to a TCP server."
29+
)
30+
@Plugin(
31+
examples = {
32+
@Example(
33+
full = true,
34+
title = "Send a UTF-8 message to a TCP server.",
35+
code = """
36+
id: tcp_send_example
37+
namespace: dev
38+
39+
tasks:
40+
- id: send_tcp
41+
type: io.kestra.plugin.fs.tcp.Send
42+
host: 127.0.0.1
43+
port: 9090
44+
payload: \"Hello from Kestra\"
45+
"""
46+
)
47+
}
48+
)
49+
public class Send extends Task implements RunnableTask<Send.Output> {
50+
51+
@Inject
52+
@Builder.Default
53+
private TcpService tcpService = TcpService.getInstance();
54+
55+
@Schema(title = "The target host or IP address.")
56+
@NotNull
57+
private Property<String> host;
58+
59+
@Schema(title = "The target port number.")
60+
@NotNull
61+
private Property<Integer> port;
62+
63+
@Schema(title = "The payload to send over TCP.")
64+
@NotNull
65+
private Property<String> payload;
66+
67+
@Schema(title = "Encoding for the payload. Use 'BASE64' for binary data.", defaultValue = "UTF-8")
68+
@Builder.Default
69+
private Property<String> encoding = Property.ofValue(StandardCharsets.UTF_8.name());
70+
71+
@Schema(title = "Optional timeout for the socket connection.")
72+
private Property<Duration> timeout;
73+
74+
@Override
75+
public Output run(RunContext runContext) throws Exception {
76+
Logger logger = runContext.logger();
77+
78+
String rHost = runContext.render(this.host).as(String.class).orElseThrow(() -> new IllegalArgumentException("`host` cannot be null or empty"));
79+
Integer rPort = runContext.render(this.port).as(Integer.class).orElseThrow(() -> new IllegalArgumentException("`port` cannot be null or empty"));
80+
String rPayload = runContext.render(this.payload).as(String.class).orElseThrow(() -> new IllegalArgumentException("`payload` cannot be null or empty"));
81+
String rEncoding = runContext.render(this.encoding).as(String.class).orElse(StandardCharsets.UTF_8.name());
82+
Duration rTimeout = runContext.render(this.timeout).as(Duration.class).orElse(Duration.ofSeconds(10));
83+
84+
logger.info("Connecting to {}:{} (encoding: {}, timeout: {}).", rHost, rPort, rEncoding, rTimeout);
85+
86+
int sentBytes;
87+
try (Socket socket = tcpService.connect(rHost, rPort, rTimeout)) {
88+
byte[] bytes = tcpService.encodePayload(rPayload, rEncoding);
89+
try (OutputStream os = socket.getOutputStream()) {
90+
os.write(bytes);
91+
os.flush();
92+
sentBytes = bytes.length;
93+
logger.info("Sent {} bytes to {}:{} successfully.", sentBytes, rHost, rPort);
94+
}
95+
} catch (IOException e) {
96+
logger.error("Failed to send TCP message to {}:{} - {}", rHost, rPort, e.getMessage(), e);
97+
throw e;
98+
}
99+
100+
return Output.builder()
101+
.host(rHost)
102+
.port(rPort)
103+
.sentBytes(sentBytes)
104+
.build();
105+
}
106+
107+
@Builder
108+
@Getter
109+
public static class Output implements io.kestra.core.models.tasks.Output {
110+
@Schema(title = "The target host.")
111+
private final String host;
112+
113+
@Schema(title = "The target port.")
114+
private final Integer port;
115+
116+
@Schema(title = "Number of bytes sent.")
117+
private final Integer sentBytes;
118+
}
119+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.kestra.plugin.fs.tcp;
2+
3+
import jakarta.inject.Singleton;
4+
5+
import java.io.*;
6+
import java.net.*;
7+
import java.nio.charset.Charset;
8+
import java.time.Duration;
9+
import java.util.Base64;
10+
11+
12+
@Singleton
13+
public class TcpService {
14+
15+
private static TcpService instance;
16+
17+
private TcpService() {}
18+
19+
public static TcpService getInstance() {
20+
if (instance == null) {
21+
instance = new TcpService();
22+
}
23+
return instance;
24+
}
25+
26+
public Socket connect(String host, int port, Duration timeout) throws IOException {
27+
Socket socket = new Socket();
28+
if (timeout != null) {
29+
socket.connect(new InetSocketAddress(host, port), (int) timeout.toMillis());
30+
socket.setSoTimeout((int) timeout.toMillis());
31+
} else {
32+
socket.connect(new InetSocketAddress(host, port));
33+
}
34+
return socket;
35+
}
36+
37+
public byte[] encodePayload(String payload, String encoding) {
38+
if ("BASE64".equalsIgnoreCase(encoding)) {
39+
return Base64.getDecoder().decode(payload);
40+
}
41+
Charset charset = Charset.forName(encoding);
42+
return payload.getBytes(charset);
43+
}
44+
45+
public String decodePayload(byte[] bytes, String encoding) {
46+
if ("BASE64".equalsIgnoreCase(encoding)) {
47+
return Base64.getEncoder().encodeToString(bytes);
48+
}
49+
Charset charset = Charset.forName(encoding);
50+
return new String(bytes, charset);
51+
}
52+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
@io.kestra.core.models.annotations.PluginSubGroup(
2+
title = "TCP",
3+
description = "This sub-group of plugins provides tasks and triggers for interacting with TCP socket connections — including sending and receiving data over TCP.",
4+
categories = io.kestra.core.models.annotations.PluginSubGroup.PluginCategory.MESSAGING
5+
)
6+
package io.kestra.plugin.fs.tcp;

0 commit comments

Comments
 (0)