Skip to content

Commit 01159ec

Browse files
authored
[Feature] Support listening for message delayed events in cdc source (apache#6634)
1 parent f1601e3 commit 01159ec

File tree

11 files changed

+281
-26
lines changed

11 files changed

+281
-26
lines changed

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java

+1
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ public enum EventType {
2929
LIFECYCLE_READER_OPEN,
3030
LIFECYCLE_READER_CLOSE,
3131
LIFECYCLE_WRITER_CLOSE,
32+
READER_MESSAGE_DELAYED,
3233
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.source.event;
19+
20+
import org.apache.seatunnel.api.event.Event;
21+
import org.apache.seatunnel.api.event.EventType;
22+
23+
import lombok.AllArgsConstructor;
24+
import lombok.Getter;
25+
import lombok.NoArgsConstructor;
26+
import lombok.Setter;
27+
import lombok.ToString;
28+
29+
@Getter
30+
@Setter
31+
@ToString
32+
@AllArgsConstructor
33+
@NoArgsConstructor
34+
public class MessageDelayedEvent implements Event {
35+
private long createdTime;
36+
private String jobId;
37+
private EventType eventType = EventType.READER_MESSAGE_DELAYED;
38+
39+
private long delayTime;
40+
private String record;
41+
42+
public MessageDelayedEvent(long delayTime) {
43+
this(delayTime, null);
44+
}
45+
46+
public MessageDelayedEvent(long delayTime, String record) {
47+
this.delayTime = delayTime;
48+
this.record = record;
49+
this.createdTime = System.currentTimeMillis();
50+
}
51+
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java

+10
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,23 @@
2121
import org.apache.seatunnel.api.event.EventListener;
2222
import org.apache.seatunnel.api.source.Collector;
2323
import org.apache.seatunnel.api.source.SourceReader;
24+
import org.apache.seatunnel.api.source.event.MessageDelayedEvent;
2425
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
2526
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
2627
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
2728
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
2829
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
2930
import org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
3031
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
32+
import org.apache.seatunnel.connectors.cdc.base.utils.MessageDelayedEventLimiter;
3133
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
3234
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
3335

3436
import org.apache.kafka.connect.source.SourceRecord;
3537

3638
import lombok.extern.slf4j.Slf4j;
3739

40+
import java.time.Duration;
3841
import java.util.HashMap;
3942
import java.util.Iterator;
4043
import java.util.Map;
@@ -71,6 +74,8 @@ public class IncrementalSourceRecordEmitter<T>
7174
protected final Counter recordFetchDelay;
7275
protected final Counter recordEmitDelay;
7376
protected final EventListener eventListener;
77+
protected final MessageDelayedEventLimiter delayedEventLimiter =
78+
new MessageDelayedEventLimiter(Duration.ofSeconds(1), 0.5d);
7479

7580
public IncrementalSourceRecordEmitter(
7681
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
@@ -113,6 +118,11 @@ protected void reportMetrics(SourceRecord element) {
113118
// report emit delay
114119
long emitDelay = now - messageTimestamp;
115120
recordEmitDelay.set(emitDelay > 0 ? emitDelay : 0);
121+
122+
// limit the emit event frequency
123+
if (delayedEventLimiter.acquire(messageTimestamp)) {
124+
eventListener.onEvent(new MessageDelayedEvent(emitDelay, element.toString()));
125+
}
116126
}
117127
}
118128

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.utils;
19+
20+
import org.apache.seatunnel.shade.com.google.common.util.concurrent.RateLimiter;
21+
22+
import lombok.AllArgsConstructor;
23+
24+
import java.time.Duration;
25+
26+
@AllArgsConstructor
27+
public class MessageDelayedEventLimiter {
28+
private final long delayMs;
29+
private final RateLimiter eventRateLimiter;
30+
31+
public MessageDelayedEventLimiter(Duration delayThreshold) {
32+
this(delayThreshold, 1);
33+
}
34+
35+
public MessageDelayedEventLimiter(Duration delayThreshold, double permitsPerSecond) {
36+
this.delayMs = delayThreshold.toMillis();
37+
this.eventRateLimiter = RateLimiter.create(permitsPerSecond);
38+
}
39+
40+
public boolean acquire(long messageCreateTime) {
41+
if (isDelayed(messageCreateTime)) {
42+
return eventRateLimiter.tryAcquire();
43+
}
44+
return false;
45+
}
46+
47+
private boolean isDelayed(long messageCreateTime) {
48+
return delayMs != 0 && System.currentTimeMillis() - messageCreateTime >= delayMs;
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.utils;
19+
20+
import org.junit.jupiter.api.Assertions;
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.time.Duration;
24+
import java.util.concurrent.TimeUnit;
25+
26+
public class MessageDelayedEventLimiterTest {
27+
28+
@Test
29+
public void testAcquire() throws InterruptedException {
30+
double permitsPerSecond = 0.5;
31+
Duration delayThreshold = Duration.ofMillis(1000);
32+
MessageDelayedEventLimiter delayedEventLimiter =
33+
new MessageDelayedEventLimiter(delayThreshold, permitsPerSecond);
34+
35+
long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10);
36+
long actualAcquiredCount = 0;
37+
while (System.currentTimeMillis() < endTime) {
38+
boolean acquired =
39+
delayedEventLimiter.acquire(
40+
System.currentTimeMillis() - (delayThreshold.toMillis() * 10));
41+
if (acquired) {
42+
actualAcquiredCount++;
43+
}
44+
Thread.sleep(1);
45+
}
46+
long expectedAcquiredCount = (long) (TimeUnit.SECONDS.toSeconds(10) * permitsPerSecond);
47+
48+
Assertions.assertTrue(expectedAcquiredCount >= actualAcquiredCount);
49+
}
50+
51+
@Test
52+
public void testNoAcquire() throws InterruptedException {
53+
double permitsPerSecond = 0.5;
54+
Duration delayThreshold = Duration.ofMillis(1000);
55+
MessageDelayedEventLimiter delayedEventLimiter =
56+
new MessageDelayedEventLimiter(delayThreshold, permitsPerSecond);
57+
58+
long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10);
59+
long actualAcquiredCount = 0;
60+
while (System.currentTimeMillis() < endTime) {
61+
boolean acquired = delayedEventLimiter.acquire(System.currentTimeMillis());
62+
if (acquired) {
63+
actualAcquiredCount++;
64+
}
65+
Thread.sleep(1);
66+
}
67+
68+
Assertions.assertTrue(actualAcquiredCount == 0);
69+
}
70+
}

Diff for: seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java

+1
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ private static boolean isSystemThread(String s) {
250250
|| s.contains("Java2D Disposer")
251251
|| s.contains("OkHttp ConnectionPool")
252252
|| s.startsWith("http-report-event-scheduler")
253+
|| s.startsWith("event-forwarder")
253254
|| s.contains(
254255
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")
255256
|| s.startsWith("Log4j2-TF-")

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java

+55
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.seatunnel.engine.server;
1919

2020
import org.apache.seatunnel.api.common.metrics.MetricTags;
21+
import org.apache.seatunnel.api.event.Event;
2122
import org.apache.seatunnel.common.utils.ExceptionUtils;
23+
import org.apache.seatunnel.common.utils.RetryUtils;
2224
import org.apache.seatunnel.common.utils.StringFormatUtils;
2325
import org.apache.seatunnel.engine.common.Constant;
2426
import org.apache.seatunnel.engine.common.config.ConfigProvider;
@@ -28,6 +30,7 @@
2830
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
2931
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
3032
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
33+
import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
3134
import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
3235
import org.apache.seatunnel.engine.server.execution.ExecutionState;
3336
import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -46,10 +49,12 @@
4649
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
4750
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
4851
import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
52+
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
4953

5054
import org.apache.commons.collections4.CollectionUtils;
5155

5256
import com.google.common.collect.Lists;
57+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
5358
import com.hazelcast.instance.impl.NodeState;
5459
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
5560
import com.hazelcast.internal.metrics.MetricDescriptor;
@@ -68,13 +73,15 @@
6873

6974
import java.io.IOException;
7075
import java.net.URL;
76+
import java.util.ArrayList;
7177
import java.util.Collection;
7278
import java.util.HashMap;
7379
import java.util.HashSet;
7480
import java.util.List;
7581
import java.util.Map;
7682
import java.util.Set;
7783
import java.util.UUID;
84+
import java.util.concurrent.ArrayBlockingQueue;
7885
import java.util.concurrent.BlockingQueue;
7986
import java.util.concurrent.CancellationException;
8087
import java.util.concurrent.CompletableFuture;
@@ -140,6 +147,9 @@ public class TaskExecutionService implements DynamicMetricsProvider {
140147

141148
private final ServerConnectorPackageClient serverConnectorPackageClient;
142149

150+
private final BlockingQueue<Event> eventBuffer;
151+
private final ExecutorService eventForwardService;
152+
143153
public TaskExecutionService(
144154
ClassLoaderService classLoaderService,
145155
NodeEngineImpl nodeEngine,
@@ -165,6 +175,43 @@ public TaskExecutionService(
165175

166176
serverConnectorPackageClient =
167177
new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);
178+
179+
eventBuffer = new ArrayBlockingQueue<>(2048);
180+
eventForwardService =
181+
Executors.newSingleThreadExecutor(
182+
new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
183+
eventForwardService.submit(
184+
() -> {
185+
List<Event> events = new ArrayList<>();
186+
RetryUtils.RetryMaterial retryMaterial =
187+
new RetryUtils.RetryMaterial(2, true, e -> true);
188+
while (!Thread.currentThread().isInterrupted()) {
189+
try {
190+
events.clear();
191+
192+
Event first = eventBuffer.take();
193+
events.add(first);
194+
195+
eventBuffer.drainTo(events, 500);
196+
JobEventReportOperation operation = new JobEventReportOperation(events);
197+
198+
RetryUtils.retryWithException(
199+
() ->
200+
NodeEngineUtil.sendOperationToMasterNode(
201+
nodeEngine, operation)
202+
.join(),
203+
retryMaterial);
204+
205+
logger.fine("Event forward success, events " + events.size());
206+
} catch (InterruptedException e) {
207+
Thread.currentThread().interrupt();
208+
logger.info("Event forward thread interrupted");
209+
} catch (Throwable t) {
210+
logger.warning(
211+
"Event forward failed, discard events " + events.size(), t);
212+
}
213+
}
214+
});
168215
}
169216

170217
public void start() {
@@ -175,6 +222,7 @@ public void shutdown() {
175222
isRunning = false;
176223
executorService.shutdownNow();
177224
scheduledExecutorService.shutdown();
225+
eventForwardService.shutdownNow();
178226
}
179227

180228
public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) {
@@ -619,6 +667,13 @@ public void printTaskExecutionRuntimeInfo() {
619667
}
620668
}
621669

670+
public void reportEvent(Event e) {
671+
while (!eventBuffer.offer(e)) {
672+
eventBuffer.poll();
673+
logger.warning("Event buffer is full, discard the oldest event");
674+
}
675+
}
676+
622677
private final class BlockingWorker implements Runnable {
623678

624679
private final TaskTracker tracker;

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.engine.server.event;
1919

2020
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
21+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
2122
import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
2223

2324
import org.apache.seatunnel.api.event.Event;
@@ -104,7 +105,8 @@ public void handle(Event event) {
104105
completionStage.toCompletableFuture().join();
105106
}
106107

107-
private void report() throws IOException {
108+
@VisibleForTesting
109+
synchronized void report() throws IOException {
108110
long headSequence = ringbuffer.headSequence();
109111
if (headSequence > committedEventIndex) {
110112
log.warn(

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class JobEventListener implements EventListener {
3232
@Override
3333
public void onEvent(Event event) {
3434
event.setJobId(String.valueOf(taskLocation.getJobId()));
35-
JobEventReportOperation evenCollectOperation = new JobEventReportOperation(event);
36-
taskExecutionContext.sendToMaster(evenCollectOperation).join();
35+
36+
taskExecutionContext.getTaskExecutionService().reportEvent(event);
3737
}
3838
}

0 commit comments

Comments
 (0)