Skip to content

Commit 2324092

Browse files
fix
1 parent 4051b6d commit 2324092

File tree

7 files changed

+6
-141
lines changed

7 files changed

+6
-141
lines changed

docs/en/seatunnel-engine/rest-api-v1.md

-42
Original file line numberDiff line numberDiff line change
@@ -843,46 +843,4 @@ Returns a list of logs from the requested node.
843843
To get a list of logs from the current node: `http://localhost:5801/hazelcast/rest/maps/log`
844844
To get the content of a log file: `http://localhost:5801/hazelcast/rest/maps/log/job-898380162133917698.log`
845845

846-
</details>
847-
848-
------------------------------------------------------------------------------------------
849-
850-
### Return Job Event Information
851-
852-
<details>
853-
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/event/:jobId</b></code> <code>(Returns the list of job events)</code></summary>
854-
855-
#### Parameters
856-
857-
> | Parameter Name | Required | Type | Description |
858-
> |----------------|----------|---------|-------------------|
859-
> | jobId | Yes | string | The Job ID |
860-
861-
#### Response
862-
863-
```json
864-
[
865-
{
866-
"createdTime": 1739501227166,
867-
"eventType": "LIFECYCLE_READER_OPEN"
868-
},
869-
{
870-
"createdTime": 1739501227232,
871-
"eventType": "LIFECYCLE_ENUMERATOR_OPEN"
872-
},
873-
{
874-
"createdTime": 1739501227457,
875-
"eventType": "LIFECYCLE_ENUMERATOR_CLOSE"
876-
},
877-
{
878-
"createdTime": 1739501227516,
879-
"eventType": "LIFECYCLE_WRITER_CLOSE"
880-
},
881-
{
882-
"createdTime": 1739501228305,
883-
"eventType": "LIFECYCLE_READER_CLOSE"
884-
}
885-
]
886-
```
887-
888846
</details>

docs/zh/seatunnel-engine/rest-api-v1.md

-43
Original file line numberDiff line numberDiff line change
@@ -850,46 +850,3 @@ network:
850850
获取日志文件内容:`http://localhost:5801/hazelcast/rest/maps/log/job-898380162133917698.log`
851851

852852
</details>
853-
854-
855-
------------------------------------------------------------------------------------------
856-
857-
### 返回作业的事件信息
858-
859-
<details>
860-
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/event/:jobId</b></code> <code>(返回作业事件列表)</code></summary>
861-
862-
#### 参数
863-
864-
> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
865-
> |-------|------|--------|------|
866-
> | jobId || string | Job ID |
867-
868-
#### 响应
869-
870-
```json
871-
[
872-
{
873-
"createdTime": 1739501227166,
874-
"eventType": "LIFECYCLE_READER_OPEN"
875-
},
876-
{
877-
"createdTime": 1739501227232,
878-
"eventType": "LIFECYCLE_ENUMERATOR_OPEN"
879-
},
880-
{
881-
"createdTime": 1739501227457,
882-
"eventType": "LIFECYCLE_ENUMERATOR_CLOSE"
883-
},
884-
{
885-
"createdTime": 1739501227516,
886-
"eventType": "LIFECYCLE_WRITER_CLOSE"
887-
},
888-
{
889-
"createdTime": 1739501228305,
890-
"eventType": "LIFECYCLE_READER_CLOSE"
891-
}
892-
]
893-
```
894-
895-
</details>

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java

-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.seatunnel.api.common.metrics.JobMetrics;
2323
import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
24-
import org.apache.seatunnel.api.event.Event;
2524
import org.apache.seatunnel.api.event.EventHandler;
2625
import org.apache.seatunnel.api.event.EventProcessor;
2726
import org.apache.seatunnel.api.tracing.MDCExecutorService;
@@ -80,7 +79,6 @@
8079
import com.hazelcast.spi.impl.NodeEngineImpl;
8180
import lombok.Getter;
8281
import lombok.NonNull;
83-
import lombok.Setter;
8482
import scala.Tuple2;
8583

8684
import java.util.ArrayList;
@@ -92,7 +90,6 @@
9290
import java.util.Objects;
9391
import java.util.Optional;
9492
import java.util.Set;
95-
import java.util.concurrent.ArrayBlockingQueue;
9693
import java.util.concurrent.ConcurrentHashMap;
9794
import java.util.concurrent.ExecutorService;
9895
import java.util.concurrent.Executors;
@@ -156,8 +153,6 @@ public class CoordinatorService {
156153
*/
157154
@Getter private final Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
158155

159-
@Getter @Setter
160-
private Map<Long, ArrayBlockingQueue<Event>> jobEventMap = new ConcurrentHashMap<>();
161156
/**
162157
* key: job id; <br>
163158
* value: job master;

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/disruptor/JobEventDisruptor.java

+3-8
Original file line numberDiff line numberDiff line change
@@ -84,18 +84,13 @@ public void close() throws IOException {
8484
disruptor.shutdown();
8585
}
8686

87-
/** */
8887
public ArrayBlockingQueue<Event> storeJobHistory() {
8988
ArrayBlockingQueue<Event> events = new ArrayBlockingQueue<>(eventQueueSize);
9089
long nextSequence = 0;
9190
while (ringBuffer.getCursor() >= nextSequence) {
92-
try {
93-
JobEvent event = ringBuffer.get(nextSequence);
94-
events.add(event.getEvent());
95-
nextSequence++;
96-
} catch (Exception e) {
97-
e.printStackTrace();
98-
}
91+
JobEvent event = ringBuffer.get(nextSequence);
92+
events.add(event.getEvent());
93+
nextSequence++;
9994
}
10095
try {
10196
this.close();

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java

-3
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,7 @@ public void run() throws Exception {
5252
for (Event event : events) {
5353
processor.process(event);
5454
JobMaster jobMaster = coordinatorService.getJobMaster(Long.parseLong(event.getJobId()));
55-
// jobMaster.getEvents().add(event);
56-
// jobMaster.getHistoryEvents().add(event);
5755
jobMaster.getJobEventDisruptor().publish(event);
58-
System.out.println("put 成功," + event);
5956
}
6057
}
6158

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetEventOperation.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,9 @@ private void collectEvents(
112112
RingBuffer<JobEvent> ringBuffer, AtomicInteger sequence, List<Event> events) {
113113

114114
while (ringBuffer.getCursor() >= sequence.get()) {
115-
try {
116-
JobEvent jobEvent = ringBuffer.get(sequence.get());
117-
events.add(jobEvent.getEvent());
118-
sequence.addAndGet(1);
119-
} catch (Exception e) {
120-
e.printStackTrace();
121-
}
115+
JobEvent jobEvent = ringBuffer.get(sequence.get());
116+
events.add(jobEvent.getEvent());
117+
sequence.addAndGet(1);
122118
}
123119
}
124120

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java

-33
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.seatunnel.engine.server.NodeExtension;
2424
import org.apache.seatunnel.engine.server.log.FormatType;
2525
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
26-
import org.apache.seatunnel.engine.server.rest.service.EventService;
2726
import org.apache.seatunnel.engine.server.rest.service.JobInfoService;
2827
import org.apache.seatunnel.engine.server.rest.service.LogService;
2928
import org.apache.seatunnel.engine.server.rest.service.OverviewService;
@@ -55,7 +54,6 @@
5554
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
5655
import static org.apache.seatunnel.engine.server.rest.RestConstant.CONTEXT_PATH;
5756
import static org.apache.seatunnel.engine.server.rest.RestConstant.INSTANCE_CONTEXT_PATH;
58-
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_EVENT;
5957
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_FINISHED_JOBS;
6058
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_GET_ALL_LOG_NAME;
6159
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_JOB_INFO;
@@ -81,7 +79,6 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
8179
private ThreadDumpService threadDumpService;
8280
private RunningThreadService runningThreadService;
8381
private LogService logService;
84-
private EventService eventService;
8582

8683
public RestHttpGetCommandProcessor(TextCommandService textCommandService) {
8784

@@ -109,7 +106,6 @@ public RestHttpGetCommandProcessor(
109106
this.threadDumpService = new ThreadDumpService(nodeEngine);
110107
this.runningThreadService = new RunningThreadService(nodeEngine);
111108
this.logService = new LogService(nodeEngine);
112-
this.eventService = new EventService(nodeEngine);
113109
}
114110

115111
@Override
@@ -142,8 +138,6 @@ public void handle(HttpGetCommand httpGetCommand) {
142138
getAllNodeLog(httpGetCommand, uri);
143139
} else if (uri.startsWith(CONTEXT_PATH + REST_URL_LOG)) {
144140
getCurrentNodeLog(httpGetCommand, uri);
145-
} else if (uri.startsWith(CONTEXT_PATH + REST_URL_EVENT)) {
146-
getEventInfo(httpGetCommand);
147141
} else {
148142
original.handle(httpGetCommand);
149143
}
@@ -164,33 +158,6 @@ public void handleRejection(HttpGetCommand httpGetCommand) {
164158
handle(httpGetCommand);
165159
}
166160

167-
public void getEventInfo(HttpGetCommand command) {
168-
String uri = StringUtil.stripTrailingSlash(command.getURI());
169-
170-
int lastSlashIndex = uri.lastIndexOf('/');
171-
if (lastSlashIndex == -1 || lastSlashIndex == uri.length() - 1) {
172-
prepareResponse(SC_400, command, "Invalid event URI format");
173-
return;
174-
}
175-
176-
String jobIdStr = uri.substring(lastSlashIndex + 1);
177-
if (StringUtils.isBlank(jobIdStr)) {
178-
prepareResponse(SC_400, command, "Missing job ID in URI");
179-
return;
180-
}
181-
182-
try {
183-
long jobId = Long.parseLong(jobIdStr);
184-
this.prepareResponse(command, eventService.getEventInfoJson(jobId));
185-
} catch (NumberFormatException e) {
186-
logger.warning("Invalid job ID format: " + jobIdStr);
187-
prepareResponse(SC_400, command, "Job ID must be a numeric value");
188-
} catch (Exception e) {
189-
logger.severe("Failed to get event info for job " + jobIdStr + ": " + e.getMessage());
190-
prepareResponse(SC_500, command, "Internal server error");
191-
}
192-
}
193-
194161
public void overView(HttpGetCommand command, String uri) {
195162
uri = StringUtil.stripTrailingSlash(uri);
196163
String tagStr;

0 commit comments

Comments
 (0)