Skip to content

Commit b24a022

Browse files
authored
[Fix][Zeta] Adjust the timing of invoking the enumerator open method (#9092)
1 parent c1adbf6 commit b24a022

File tree

3 files changed

+125
-2
lines changed

3 files changed

+125
-2
lines changed

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ public void restoreState(List<ActionSubtaskState> actionStateList) throws Except
200200
} else {
201201
this.enumerator = this.source.getSource().createEnumerator(enumeratorContext);
202202
}
203+
enumerator.open();
204+
enumeratorContext.getEventListener().onEvent(new EnumeratorOpenEvent());
203205
restoreComplete.complete(null);
204206
log.debug("restoreState split enumerator [{}] finished", actionStateList);
205207
}
@@ -311,8 +313,6 @@ private void stateProcess() throws Exception {
311313
case READY_START:
312314
if (startCalled && readerRegisterComplete) {
313315
currState = STARTING;
314-
enumerator.open();
315-
enumeratorContext.getEventListener().onEvent(new EnumeratorOpenEvent());
316316
} else {
317317
Thread.sleep(100);
318318
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.engine.server.task;
19+
20+
import org.apache.seatunnel.api.source.SeaTunnelSource;
21+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
22+
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
23+
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
24+
import org.apache.seatunnel.engine.server.TaskExecutionService;
25+
import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
26+
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
27+
import org.apache.seatunnel.engine.server.execution.TaskLocation;
28+
29+
import org.junit.jupiter.api.Assertions;
30+
import org.junit.jupiter.api.Test;
31+
import org.mockito.Mockito;
32+
33+
import com.hazelcast.cluster.Address;
34+
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
35+
36+
import java.util.ArrayList;
37+
import java.util.Collections;
38+
import java.util.HashSet;
39+
import java.util.concurrent.atomic.AtomicLong;
40+
41+
public class SourceSplitEnumeratorTaskTest {
42+
43+
@Test
44+
void testOpenShouldBeforeReaderRegister() throws Exception {
45+
46+
SeaTunnelSource source = Mockito.mock(SeaTunnelSource.class);
47+
SourceSplitEnumerator enumerator = Mockito.mock(SourceSplitEnumerator.class);
48+
Mockito.when(source.createEnumerator(Mockito.any())).thenReturn(enumerator);
49+
50+
AtomicLong openTime = new AtomicLong(0);
51+
Mockito.doAnswer(
52+
answer -> {
53+
openTime.set(System.currentTimeMillis());
54+
return null;
55+
})
56+
.when(enumerator)
57+
.open();
58+
59+
AtomicLong registerReaderTime = new AtomicLong(0);
60+
Mockito.doAnswer(
61+
answer -> {
62+
registerReaderTime.set(System.currentTimeMillis());
63+
return null;
64+
})
65+
.when(enumerator)
66+
.registerReader(Mockito.anyInt());
67+
68+
SourceAction action =
69+
new SourceAction<>(1, "fake", source, new HashSet<>(), Collections.emptySet());
70+
SourceSplitEnumeratorTask enumeratorTask =
71+
new SourceSplitEnumeratorTask<>(
72+
1, new TaskLocation(new TaskGroupLocation(1, 1, 1), 1, 1), action);
73+
74+
TaskExecutionContext context = Mockito.mock(TaskExecutionContext.class);
75+
InvocationFuture future = Mockito.mock(InvocationFuture.class);
76+
Mockito.when(context.getOrCreateMetricsContext(Mockito.any())).thenReturn(null);
77+
Mockito.when(context.sendToMaster(Mockito.any())).thenReturn(future);
78+
Mockito.when(future.join()).thenReturn(null);
79+
TaskExecutionService taskExecutionService = Mockito.mock(TaskExecutionService.class);
80+
Mockito.when(context.getTaskExecutionService()).thenReturn(taskExecutionService);
81+
82+
enumeratorTask.setTaskExecutionContext(context);
83+
84+
// re-order the method call to test the open() should be called before receivedReader()
85+
CompletableFuture.runAsync(
86+
() -> {
87+
try {
88+
Thread.sleep(1000);
89+
enumeratorTask.receivedReader(
90+
new TaskLocation(new TaskGroupLocation(1, 1, 1), 1, 1),
91+
Address.createUnresolvedAddress("localhost", 5701));
92+
} catch (Exception e) {
93+
throw new RuntimeException(e);
94+
}
95+
});
96+
enumeratorTask.init();
97+
enumeratorTask.restoreState(new ArrayList<>());
98+
99+
while (openTime.get() == 0 || registerReaderTime.get() == 0) {
100+
enumeratorTask.call();
101+
}
102+
103+
Assertions.assertTrue(openTime.get() < registerReaderTime.get());
104+
}
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
mock-maker-inline

0 commit comments

Comments
 (0)