Skip to content

Commit 4051b6d

Browse files
backup
1 parent de3b520 commit 4051b6d

File tree

5 files changed

+57
-40
lines changed

5 files changed

+57
-40
lines changed

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/disruptor/JobEvent.java

+17
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.seatunnel.engine.server.disruptor;
219

320
import org.apache.seatunnel.api.event.Event;

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/disruptor/JobEventDisruptor.java

+17
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.seatunnel.engine.server.disruptor;
219

320
import org.apache.seatunnel.api.event.Event;

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ public void run() throws Exception {
5252
for (Event event : events) {
5353
processor.process(event);
5454
JobMaster jobMaster = coordinatorService.getJobMaster(Long.parseLong(event.getJobId()));
55-
// jobMaster.getEvents().add(event);
56-
// jobMaster.getHistoryEvents().add(event);
55+
// jobMaster.getEvents().add(event);
56+
// jobMaster.getHistoryEvents().add(event);
5757
jobMaster.getJobEventDisruptor().publish(event);
58-
System.out.println("put 成功,"+event);
58+
System.out.println("put 成功," + event);
5959
}
6060
}
6161

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java

-4
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,6 @@ public class JobMaster {
143143

144144
private SeaTunnelServer seaTunnelServer;
145145

146-
// @Getter private ArrayBlockingQueue<Event> events;
147-
// @Getter private ArrayBlockingQueue<Event> historyEvents;
148146
@Getter private JobEventDisruptor jobEventDisruptor;
149147
/**
150148
* we need store slot used by task in Hazelcast IMap and release or reuse it when a new master
@@ -218,8 +216,6 @@ public JobMaster(
218216
this.metricsImap = metricsImap;
219217
this.seaTunnelServer = seaTunnelServer;
220218
this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>();
221-
// this.events = new ArrayBlockingQueue<>(engineConfig.getEventQueueSize());
222-
// this.historyEvents = new ArrayBlockingQueue<>(engineConfig.getEventQueueSize());
223219
this.jobEventDisruptor = new JobEventDisruptor(engineConfig.getEventQueueSize());
224220
}
225221

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetEventOperation.java

+20-33
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.List;
4040
import java.util.Optional;
4141
import java.util.concurrent.ExecutionException;
42+
import java.util.concurrent.atomic.AtomicInteger;
4243

4344
@Slf4j
4445
public class GetEventOperation extends Operation implements IdentifiedDataSerializable {
@@ -47,13 +48,11 @@ public class GetEventOperation extends Operation implements IdentifiedDataSerial
4748

4849
private Boolean isAll;
4950

50-
private List<Event> events = new ArrayList<>();
51-
5251
public GetEventOperation() {}
5352

5453
private Data response;
5554

56-
private long nextSequence;
55+
private AtomicInteger nextSequence;
5756

5857
/**
5958
* @param jobId job id
@@ -62,7 +61,7 @@ public GetEventOperation() {}
6261
public GetEventOperation(Long jobId, boolean isAll) {
6362
this.jobId = jobId;
6463
this.isAll = isAll;
65-
this.nextSequence = 0;
64+
this.nextSequence = new AtomicInteger(0);
6665
}
6766

6867
@Override
@@ -93,35 +92,9 @@ private Data retrieveEvents(CoordinatorService coordinatorService) {
9392

9493
if (jobMaster != null) {
9594
log.debug("Retrieving events for active job {}, isAll: {}", jobId, isAll);
96-
if (isAll) {
97-
RingBuffer<JobEvent> ringBuffer = jobMaster.getJobEventDisruptor().getRingBuffer();
98-
long nextSequence = 0;
99-
while (ringBuffer.getCursor() >= nextSequence) {
100-
try {
101-
JobEvent jobEvent = ringBuffer.get(nextSequence);
102-
events.add(jobEvent.getEvent());
103-
nextSequence++;
104-
} catch (Exception e) {
105-
e.printStackTrace();
106-
}
107-
}
108-
//
109-
// Optional.ofNullable(jobMaster.getHistoryEvents()).ifPresent(events::addAll);
110-
} else {
111-
RingBuffer<JobEvent> ringBuffer = jobMaster.getJobEventDisruptor().getRingBuffer();
112-
while (ringBuffer.getCursor() >= nextSequence) {
113-
try {
114-
JobEvent jobEvent = ringBuffer.get(nextSequence);
115-
System.out.println("add Event:" + jobEvent.getEvent());
116-
events.add(jobEvent.getEvent());
117-
nextSequence++;
118-
} catch (Exception e) {
119-
e.printStackTrace();
120-
}
121-
}
122-
// Optional.ofNullable(jobMaster.getEvents()).ifPresent(q ->
123-
// q.drainTo(events));
124-
}
95+
RingBuffer<JobEvent> ringBuffer = jobMaster.getJobEventDisruptor().getRingBuffer();
96+
AtomicInteger sequenceToUse = isAll ? new AtomicInteger(0) : nextSequence;
97+
collectEvents(ringBuffer, sequenceToUse, events);
12598
} else {
12699
log.debug("Job {} not active, retrieving from history", jobId);
127100
Optional.ofNullable(
@@ -135,6 +108,20 @@ private Data retrieveEvents(CoordinatorService coordinatorService) {
135108
return this.getNodeEngine().toData(events);
136109
}
137110

111+
private void collectEvents(
112+
RingBuffer<JobEvent> ringBuffer, AtomicInteger sequence, List<Event> events) {
113+
114+
while (ringBuffer.getCursor() >= sequence.get()) {
115+
try {
116+
JobEvent jobEvent = ringBuffer.get(sequence.get());
117+
events.add(jobEvent.getEvent());
118+
sequence.addAndGet(1);
119+
} catch (Exception e) {
120+
e.printStackTrace();
121+
}
122+
}
123+
}
124+
138125
@Override
139126
public Object getResponse() {
140127
return response;

0 commit comments

Comments
 (0)