Skip to content

Commit d30cf4d

Browse files
authored
[FLINK-38932][runtime] Fix incorrect scheduled timestamp in ProcessingTimeCallback with scheduleWithFixedDelay (#27429)
1 parent 6c75724 commit d30cf4d

File tree

2 files changed

+89
-8
lines changed

2 files changed

+89
-8
lines changed

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
141141
private ScheduledFuture<?> scheduleRepeatedly(
142142
ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) {
143143
final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
144-
final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period);
144+
final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period, fixedDelay);
145145

146146
// we directly try to register the timer and only react to the status on exception
147147
// that way we save unnecessary volatile accesses for each timer
@@ -281,12 +281,13 @@ interface ExceptionHandler {
281281
}
282282

283283
private Runnable wrapOnTimerCallback(ProcessingTimeCallback callback, long timestamp) {
284-
return new ScheduledTask(status, exceptionHandler, callback, timestamp, 0);
284+
return new ScheduledTask(status, exceptionHandler, callback, timestamp, 0, false);
285285
}
286286

287287
private Runnable wrapOnTimerCallback(
288-
ProcessingTimeCallback callback, long nextTimestamp, long period) {
289-
return new ScheduledTask(status, exceptionHandler, callback, nextTimestamp, period);
288+
ProcessingTimeCallback callback, long nextTimestamp, long period, boolean fixedDelay) {
289+
return new ScheduledTask(
290+
status, exceptionHandler, callback, nextTimestamp, period, fixedDelay);
290291
}
291292

292293
private static final class ScheduledTask implements Runnable {
@@ -296,18 +297,21 @@ private static final class ScheduledTask implements Runnable {
296297

297298
private long nextTimestamp;
298299
private final long period;
300+
private final boolean fixedDelay;
299301

300302
ScheduledTask(
301303
AtomicInteger serviceStatus,
302304
ExceptionHandler exceptionHandler,
303305
ProcessingTimeCallback callback,
304306
long timestamp,
305-
long period) {
307+
long period,
308+
boolean fixedDelay) {
306309
this.serviceStatus = serviceStatus;
307310
this.exceptionHandler = exceptionHandler;
308311
this.callback = callback;
309312
this.nextTimestamp = timestamp;
310313
this.period = period;
314+
this.fixedDelay = fixedDelay;
311315
}
312316

313317
@Override
@@ -320,7 +324,8 @@ public void run() {
320324
} catch (Exception ex) {
321325
exceptionHandler.handleException(ex);
322326
}
323-
nextTimestamp += period;
327+
nextTimestamp =
328+
fixedDelay ? System.currentTimeMillis() + period : nextTimestamp + period;
324329
}
325330
}
326331
}

flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.core.testutils.OneShotLatch;
2222
import org.apache.flink.util.Preconditions;
2323

24+
import org.assertj.core.data.Offset;
2425
import org.junit.jupiter.api.Test;
2526
import org.junit.jupiter.api.Timeout;
2627

@@ -43,21 +44,92 @@ class SystemProcessingTimeServiceTest {
4344

4445
/**
4546
* Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually triggered multiple
46-
* times.
47+
* times with the expected scheduled timestamps.
4748
*/
4849
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
4950
@Test
5051
void testScheduleAtFixedRate() throws Exception {
5152
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
53+
final long initialDelay = 10L;
5254
final long period = 10L;
55+
final long executionDelay = 10L;
5356
final int countDown = 3;
5457

5558
final SystemProcessingTimeService timer = createSystemProcessingTimeService(errorRef);
5659

5760
final CountDownLatch countDownLatch = new CountDownLatch(countDown);
5861

5962
try {
60-
timer.scheduleAtFixedRate(timestamp -> countDownLatch.countDown(), 0L, period);
63+
final long initialTimestamp = timer.getCurrentProcessingTime() + initialDelay;
64+
timer.scheduleAtFixedRate(
65+
timestamp -> {
66+
try {
67+
long executionTimes = countDown - countDownLatch.getCount();
68+
assertThat(timestamp)
69+
.isCloseTo(
70+
initialTimestamp + executionTimes * period,
71+
Offset.offset(period));
72+
Thread.sleep(executionDelay);
73+
} catch (Error e) {
74+
System.out.println(e.getMessage());
75+
throw new Error(e);
76+
}
77+
countDownLatch.countDown();
78+
},
79+
initialDelay,
80+
period);
81+
82+
countDownLatch.await();
83+
84+
assertThat(errorRef.get()).isNull();
85+
} finally {
86+
timer.shutdownService();
87+
}
88+
}
89+
90+
/**
91+
* Tests that SystemProcessingTimeService#testScheduleAtFixedDelay is actually triggered
92+
* multiple times with the expected scheduled timestamps.
93+
*/
94+
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
95+
@Test
96+
void testScheduleAtFixedDelay() throws Exception {
97+
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
98+
final long initialDelay = 10L;
99+
final long period = 10L;
100+
final long executionDelay = 10L;
101+
final int countDown = 3;
102+
103+
final SystemProcessingTimeService timer = createSystemProcessingTimeService(errorRef);
104+
105+
final CountDownLatch countDownLatch = new CountDownLatch(countDown);
106+
107+
final LastExecutionTimeWrapper lastExecutionTimeWrapper = new LastExecutionTimeWrapper();
108+
109+
try {
110+
final long initialTimestamp = timer.getCurrentProcessingTime() + initialDelay;
111+
timer.scheduleWithFixedDelay(
112+
timestamp -> {
113+
try {
114+
if (countDownLatch.getCount() == countDown) {
115+
assertThat(timestamp)
116+
.isCloseTo(initialTimestamp, Offset.offset(period));
117+
} else {
118+
assertThat(timestamp)
119+
.isCloseTo(
120+
lastExecutionTimeWrapper.ts + period,
121+
Offset.offset(period));
122+
}
123+
Thread.sleep(executionDelay);
124+
lastExecutionTimeWrapper.ts = timer.getCurrentProcessingTime();
125+
} catch (Error e) {
126+
System.out.println(e.getMessage());
127+
throw new Error(e);
128+
}
129+
countDownLatch.countDown();
130+
},
131+
initialDelay,
132+
period);
61133

62134
countDownLatch.await();
63135

@@ -67,6 +139,10 @@ void testScheduleAtFixedRate() throws Exception {
67139
}
68140
}
69141

142+
private static class LastExecutionTimeWrapper {
143+
private long ts;
144+
}
145+
70146
/**
71147
* Tests that shutting down the SystemProcessingTimeService will also cancel the scheduled at
72148
* fix rate future.

0 commit comments

Comments
 (0)