diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index 79a713a71e0..b87c6019b4b 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -23,6 +23,8 @@ seatunnel: queue-type: blockingqueue print-execution-info-interval: 60 print-job-metrics-info-interval: 60 + print-job-event-info-interval: 30 + job-event-queue-size: 128 slot-service: dynamic-slot: true checkpoint: diff --git a/docs/en/seatunnel-engine/rest-api-v2.md b/docs/en/seatunnel-engine/rest-api-v2.md index 72eb9044431..67b85642802 100644 --- a/docs/en/seatunnel-engine/rest-api-v2.md +++ b/docs/en/seatunnel-engine/rest-api-v2.md @@ -907,4 +907,48 @@ To get the metrics, you need to open `Telemetry` first, or you will get an empty More information about `Telemetry` can be found in the [Telemetry](telemetry.md) documentation. - \ No newline at end of file + + + + +------------------------------------------------------------------------------------------ + +### Return Job Event Information + +
+ GET /event/:jobId (Returns the list of job events) + +#### Parameters + +> | Parameter Name | Required | Type | Description | +> |----------------|----------|---------|-------------------| +> | jobId | Yes | string | The Job ID | + +#### Response + +```json +[ + { + "createdTime": 1739501227166, + "eventType": "LIFECYCLE_READER_OPEN" + }, + { + "createdTime": 1739501227232, + "eventType": "LIFECYCLE_ENUMERATOR_OPEN" + }, + { + "createdTime": 1739501227457, + "eventType": "LIFECYCLE_ENUMERATOR_CLOSE" + }, + { + "createdTime": 1739501227516, + "eventType": "LIFECYCLE_WRITER_CLOSE" + }, + { + "createdTime": 1739501228305, + "eventType": "LIFECYCLE_READER_CLOSE" + } +] +``` + +
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md b/docs/zh/seatunnel-engine/rest-api-v2.md index ed3c6803c7a..d28ff429131 100644 --- a/docs/zh/seatunnel-engine/rest-api-v2.md +++ b/docs/zh/seatunnel-engine/rest-api-v2.md @@ -895,4 +895,46 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 'config_file=@" 更多关于`Telemetry`的信息可以在[Telemetry](telemetry.md)文档中找到。 - \ No newline at end of file + + +------------------------------------------------------------------------------------------ + +### 返回作业的事件信息 + +
+ GET /event/:jobId (返回作业事件列表) + +#### 参数 + +> | 参数名称 | 是否必传 | 参数类型 | 参数描述 | +> |-------|------|--------|------| +> | jobId | 是 | string | Job ID | + +#### 响应 + +```json +[ + { + "createdTime": 1739501227166, + "eventType": "LIFECYCLE_READER_OPEN" + }, + { + "createdTime": 1739501227232, + "eventType": "LIFECYCLE_ENUMERATOR_OPEN" + }, + { + "createdTime": 1739501227457, + "eventType": "LIFECYCLE_ENUMERATOR_CLOSE" + }, + { + "createdTime": 1739501227516, + "eventType": "LIFECYCLE_WRITER_CLOSE" + }, + { + "createdTime": 1739501228305, + "eventType": "LIFECYCLE_READER_CLOSE" + } +] +``` + +
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index bb7fe67a874..bb15aa217d0 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.seatunnel.api.event.Event; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.core.starter.command.Command; @@ -29,6 +30,7 @@ import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.client.job.JobEventRunner; import org.apache.seatunnel.engine.client.job.JobMetricsRunner; import org.apache.seatunnel.engine.client.job.JobStatusRunner; import org.apache.seatunnel.engine.common.Constant; @@ -56,6 +58,7 @@ import java.time.LocalDateTime; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -84,6 +87,7 @@ public void execute() throws CommandExecuteException { LocalDateTime startTime = LocalDateTime.now(); LocalDateTime endTime = LocalDateTime.now(); SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + Long jobId = null; try { String clusterName = clientCommandArgs.getClusterName(); ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); @@ -158,7 +162,6 @@ public void execute() throws CommandExecuteException { ? Long.parseLong(clientCommandArgs.getCustomJobId()) : null); } - // get job start time startTime = LocalDateTime.now(); // create job proxy @@ -189,11 +192,11 @@ public void execute() throws CommandExecuteException { } })); // get job id - long jobId = clientJobProxy.getJobId(); + jobId = clientJobProxy.getJobId(); JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId); executorService = Executors.newScheduledThreadPool( - 2, + 3, new ThreadFactoryBuilder() .setNameFormat("job-metrics-runner-%d") .setDaemon(true) @@ -204,6 +207,15 @@ public void execute() throws CommandExecuteException { seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS); + JobEventRunner jobEventRunner = + new JobEventRunner(engineClient.getJobClient(), jobId); + executorService.scheduleAtFixedRate( + jobEventRunner, + 0, + seaTunnelConfig.getEngineConfig().getPrintJobEventInfoInterval(), + TimeUnit.SECONDS); + log.info("Init job event runner , job id : {}", jobId); + if (!isLocalMode) { // LOCAL mode does not require running the job status runner executorService.schedule( @@ -227,6 +239,9 @@ public void execute() throws CommandExecuteException { } catch (Exception e) { throw new CommandExecuteException("SeaTunnel job executed failed", e); } finally { + + printEvent(jobId); + if (jobMetricsSummary != null) { // print job statistics information when job finished log.info( @@ -252,6 +267,25 @@ public void execute() throws CommandExecuteException { } } + private void printEvent(Long jobId) { + if (jobId == null) { + jobId = + Optional.ofNullable(clientCommandArgs.getJobId()) + .map(Long::parseLong) + .orElseGet( + () -> + Optional.ofNullable(clientCommandArgs.getCustomJobId()) + .map(Long::parseLong) + .orElse(null)); + } + if (jobId != null) { + List event = engineClient.getJobClient().getEvent(jobId); + event.forEach(e -> log.info("EventType: {}", e.getEventType())); + } else { + log.warn("JobId is null, can not get job event."); + } + } + private void closeClient() { if (engineClient != null) { engineClient.close(); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobEventIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobEventIT.java new file mode 100644 index 00000000000..7815fae061b --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobEventIT.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; + +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class JobEventIT extends SeaTunnelEngineContainer { + + private static final String TEST_CONFIG_FILE = "/fakesource_to_console.conf"; + private static final String LIFECYCLE_EVENT = "LIFECYCLE_READER_OPEN"; + private static final String JOB_ID_PATTERN = "job id:\\s*(\\d+)"; + private static final String EVENT_API_ENDPOINT = "localhost:8080/event/"; + + @Test + public void testJobRestoreApplyResources() throws IOException, InterruptedException { + // Execute job and verify output + Container.ExecResult execResult = executeJob(server, TEST_CONFIG_FILE); + String stdout = execResult.getStdout(); + Assertions.assertNotNull(stdout, "Job execution output should not be null"); + Assertions.assertTrue( + stdout.contains(LIFECYCLE_EVENT), "Job output should contain lifecycle event"); + + // Extract job ID from logs + String jobId = extractJobId(server.getLogs()); + Assertions.assertNotNull(jobId, "Failed to extract job ID from logs"); + + // Query job event API + Container.ExecResult eventResult = queryJobEvent(jobId); + String eventOutput = eventResult.getStdout(); + Assertions.assertNotNull(eventOutput, "Event API response should not be null"); + Assertions.assertTrue( + eventOutput.contains(LIFECYCLE_EVENT), + "Event API response should contain lifecycle event"); + } + + private String extractJobId(String logs) { + Pattern pattern = Pattern.compile(JOB_ID_PATTERN); + Matcher matcher = pattern.matcher(logs); + return matcher.find() ? matcher.group(1) : null; + } + + private Container.ExecResult queryJobEvent(String jobId) + throws InterruptedException, IOException { + return server.execInContainer( + "sh", "-c", String.format("curl %s%s", EVENT_API_ENDPOINT, jobId)); + } +} diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java index 2421829ea9e..cddd0187455 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.api.event.Event; import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient; import org.apache.seatunnel.engine.client.util.ContentFormatUtil; import org.apache.seatunnel.engine.common.Constant; @@ -32,6 +33,7 @@ import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.core.job.JobStatusData; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetEventCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobCheckpointCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec; @@ -180,4 +182,13 @@ public List getCheckpointData(Long jobId) { SeaTunnelGetJobCheckpointCodec.encodeRequest(jobId), SeaTunnelGetJobCheckpointCodec::decodeResponse)); } + + public List getEvent(Long jobId) { + return hazelcastClient + .getSerializationService() + .toObject( + hazelcastClient.requestOnMasterAndDecodeResponse( + SeaTunnelGetEventCodec.encodeRequest(jobId, false), + SeaTunnelGetEventCodec::decodeResponse)); + } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobEventRunner.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobEventRunner.java new file mode 100644 index 00000000000..5fcce2068e6 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobEventRunner.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.client.job; + +import org.apache.seatunnel.api.event.Event; + +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class JobEventRunner implements Runnable { + + private final JobClient jobClient; + private final Long jobId; + + public JobEventRunner(JobClient jobClient, Long jobId) { + this.jobClient = jobClient; + this.jobId = jobId; + } + + @Override + public void run() { + Thread.currentThread().setName("job-event-runner-" + jobId); + List event = jobClient.getEvent(jobId); + event.forEach(e -> log.info("EventType: {}", e.getEventType())); + } +} diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java index 60551202e83..9b9aadb153c 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java @@ -51,6 +51,8 @@ public class Constant { public static final String IMAP_FINISHED_JOB_VERTEX_INFO = "engine_finishedJobVertexInfo"; + public static final String IMAP_FINISHED_JOB_EVENT = "engine_finishedJobEvent"; + public static final String IMAP_STATE_TIMESTAMPS = "engine_stateTimestamps"; public static final String IMAP_OWNED_SLOT_PROFILES = "engine_ownedSlotProfilesIMap"; diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index 8e0be8b83e9..7cf9c20e342 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -51,6 +51,13 @@ public class EngineConfig { ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL .defaultValue(); + private int printJobEventInfoInterval = + ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_EVENT_INFO_INTERVAL + .defaultValue(); + + private int eventQueueSize = + ServerConfigOptions.MasterServerConfigOptions.JOB_EVENT_QUEUE_SIZE.defaultValue(); + private int jobMetricsBackupInterval = ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL .defaultValue(); @@ -120,6 +127,22 @@ public void setPrintJobMetricsInfoInterval(int printJobMetricsInfoInterval) { this.printJobMetricsInfoInterval = printJobMetricsInfoInterval; } + public void setPrintJobEventInfoInterval(int printJobEventInfoInterval) { + checkPositive( + printJobEventInfoInterval, + ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_EVENT_INFO_INTERVAL + + " must be > 0"); + this.printJobEventInfoInterval = printJobEventInfoInterval; + } + + public void setEventQueueSize(int eventQueueSize) { + checkPositive( + eventQueueSize, + ServerConfigOptions.MasterServerConfigOptions.JOB_EVENT_QUEUE_SIZE + + " must be > 0"); + this.eventQueueSize = eventQueueSize; + } + public void setJobMetricsBackupInterval(int jobMetricsBackupInterval) { checkPositive( jobMetricsBackupInterval, diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 7b4c968dd2a..7e3b817546b 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -170,6 +170,23 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { .PRINT_JOB_METRICS_INFO_INTERVAL .key(), getTextContent(node))); + } else if (ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_EVENT_INFO_INTERVAL + .key() + .equals(name)) { + engineConfig.setPrintJobEventInfoInterval( + getIntegerValue( + ServerConfigOptions.MasterServerConfigOptions + .PRINT_JOB_EVENT_INFO_INTERVAL + .key(), + getTextContent(node))); + } else if (ServerConfigOptions.MasterServerConfigOptions.JOB_EVENT_QUEUE_SIZE + .key() + .equals(name)) { + engineConfig.setEventQueueSize( + getIntegerValue( + ServerConfigOptions.MasterServerConfigOptions.JOB_EVENT_QUEUE_SIZE + .key(), + getTextContent(node))); } else if (ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL .key() .equals(name)) { diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 6aab8dfa79a..587450dd5fd 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -85,6 +85,18 @@ public static class MasterServerConfigOptions { .defaultValue(60) .withDescription("The interval (in seconds) of job print metrics info"); + public static final Option PRINT_JOB_EVENT_INFO_INTERVAL = + Options.key("print-job-event-info-interval") + .intType() + .defaultValue(30) + .withDescription("The interval (in seconds) of job print event info"); + + public static final Option JOB_EVENT_QUEUE_SIZE = + Options.key("job-event-queue-size") + .intType() + .defaultValue((int) Math.pow(2, 7)) + .withDescription("Queue size for storing events, default: 128"); + public static final Option JOB_METRICS_BACKUP_INTERVAL = Options.key("job-metrics-backup-interval") .intType() diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetEventCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetEventCodec.java new file mode 100644 index 00000000000..8217d09b993 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetEventCodec.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.core.protocol.codec; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.client.impl.protocol.Generated; +import com.hazelcast.client.impl.protocol.codec.builtin.DataCodec; + +import static com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET; +import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET; +import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET; +import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BOOLEAN_SIZE_IN_BYTES; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeBoolean; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeBoolean; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong; + +/* + * This file is auto-generated by the Hazelcast Client Protocol Code Generator. + * To change this file, edit the templates or the protocol + * definitions on the https://github.com/hazelcast/hazelcast-client-protocol + * and regenerate it. + */ +@Generated("5550421723954a31969aef56dbef506a") +public final class SeaTunnelGetEventCodec { + // hex: 0xDE1000 + public static final int REQUEST_MESSAGE_TYPE = 14553088; + // hex: 0xDE1001 + public static final int RESPONSE_MESSAGE_TYPE = 14553089; + private static final int REQUEST_JOB_ID_FIELD_OFFSET = + PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES; + private static final int REQUEST_IS_ALL_FIELD_OFFSET = + REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES; + private static final int REQUEST_INITIAL_FRAME_SIZE = + REQUEST_IS_ALL_FIELD_OFFSET + BOOLEAN_SIZE_IN_BYTES; + private static final int RESPONSE_INITIAL_FRAME_SIZE = + RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES; + + private SeaTunnelGetEventCodec() {} + + public static class RequestParameters { + + /** */ + public long jobId; + + /** */ + public boolean isAll; + } + + public static ClientMessage encodeRequest(long jobId, boolean isAll) { + ClientMessage clientMessage = ClientMessage.createForEncode(); + clientMessage.setRetryable(true); + clientMessage.setOperationName("SeaTunnel.GetEvent"); + ClientMessage.Frame initialFrame = + new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE); + encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1); + encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId); + encodeBoolean(initialFrame.content, REQUEST_IS_ALL_FIELD_OFFSET, isAll); + clientMessage.add(initialFrame); + return clientMessage; + } + + public static SeaTunnelGetEventCodec.RequestParameters decodeRequest( + ClientMessage clientMessage) { + ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator(); + RequestParameters request = new RequestParameters(); + ClientMessage.Frame initialFrame = iterator.next(); + request.jobId = decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET); + request.isAll = decodeBoolean(initialFrame.content, REQUEST_IS_ALL_FIELD_OFFSET); + return request; + } + + public static ClientMessage encodeResponse(com.hazelcast.internal.serialization.Data event) { + ClientMessage clientMessage = ClientMessage.createForEncode(); + ClientMessage.Frame initialFrame = + new ClientMessage.Frame( + new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE); + clientMessage.add(initialFrame); + + DataCodec.encode(clientMessage, event); + return clientMessage; + } + + /** */ + public static com.hazelcast.internal.serialization.Data decodeResponse( + ClientMessage clientMessage) { + ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator(); + // empty initial frame + iterator.next(); + return DataCodec.decode(iterator); + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml index da23e66e426..43240db6f4a 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml +++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml @@ -308,6 +308,31 @@ methods: response: params: - name: response + type: Data + nullable: false + since: 2.0 + doc: '' + - id: 16 + name: getEvent + since: 2.0 + doc: '' + request: + retryable: true + partitionIdentifier: -1 + params: + - name: jobId + type: long + nullable: false + since: 2.0 + doc: '' + - name: isAll + type: boolean + nullable: false + since: 2.0 + doc: '' + response: + params: + - name: event type: Data nullable: false since: 2.0 diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 5a65f248ae2..47ff6ee82e2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -77,6 +77,7 @@ import com.hazelcast.map.IMap; import com.hazelcast.ringbuffer.Ringbuffer; import com.hazelcast.spi.impl.NodeEngineImpl; +import lombok.Getter; import lombok.NonNull; import scala.Tuple2; @@ -150,7 +151,7 @@ public class CoordinatorService { * key: job id;
* value: job master; */ - private final Map runningJobMasterMap = new ConcurrentHashMap<>(); + @Getter private final Map runningJobMasterMap = new ConcurrentHashMap<>(); /** * key: job id;
@@ -411,6 +412,7 @@ private void initCoordinatorService() { nodeEngine .getHazelcastInstance() .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO), + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_EVENT), engineConfig.getHistoryJobExpireMinutes()); eventProcessor = createJobEventProcessor( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java index 4d9b75abf55..ba218ecf810 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.server.rest.servlet.AllNodeLogServlet; import org.apache.seatunnel.engine.server.rest.servlet.CurrentNodeLogServlet; import org.apache.seatunnel.engine.server.rest.servlet.EncryptConfigServlet; +import org.apache.seatunnel.engine.server.rest.servlet.EventServlet; import org.apache.seatunnel.engine.server.rest.servlet.FinishedJobsServlet; import org.apache.seatunnel.engine.server.rest.servlet.JobInfoServlet; import org.apache.seatunnel.engine.server.rest.servlet.MetricsServlet; @@ -57,6 +58,7 @@ import java.util.EnumSet; import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_ENCRYPT_CONFIG; +import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_EVENT; import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_FINISHED_JOBS; import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_GET_ALL_LOG_NAME; import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_JOB_INFO; @@ -132,6 +134,7 @@ public void createJettyServer() { ServletHolder stopJobHolder = new ServletHolder(new StopJobServlet(nodeEngine)); ServletHolder stopJobsHolder = new ServletHolder(new StopJobsServlet(nodeEngine)); ServletHolder encryptConfigHolder = new ServletHolder(new EncryptConfigServlet(nodeEngine)); + ServletHolder eventHolder = new ServletHolder(new EventServlet(nodeEngine)); ServletHolder updateTagsHandler = new ServletHolder(new UpdateTagsServlet(nodeEngine)); ServletHolder runningThreadsHolder = @@ -145,6 +148,7 @@ public void createJettyServer() { ServletHolder metricsServlet = new ServletHolder(new MetricsServlet(nodeEngine)); + context.addServlet(eventHolder, convertUrlToPath(REST_URL_EVENT)); context.addServlet(overviewHolder, convertUrlToPath(REST_URL_OVERVIEW)); context.addServlet(runningJobsHolder, convertUrlToPath(REST_URL_RUNNING_JOBS)); context.addServlet(finishedJobsHolder, convertUrlToPath(REST_URL_FINISHED_JOBS)); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/disruptor/JobEvent.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/disruptor/JobEvent.java new file mode 100644 index 00000000000..65521c5148f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/disruptor/JobEvent.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.disruptor; + +import org.apache.seatunnel.api.event.Event; + +import com.lmax.disruptor.EventFactory; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class JobEvent { + + private Event event; + + public static final EventFactory FACTORY = JobEvent::new; +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/disruptor/JobEventDisruptor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/disruptor/JobEventDisruptor.java new file mode 100644 index 00000000000..1567f9e6482 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/disruptor/JobEventDisruptor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.disruptor; + +import org.apache.seatunnel.api.event.Event; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.DaemonThreadFactory; +import lombok.Getter; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadFactory; + +public class JobEventDisruptor implements Closeable { + + private volatile Disruptor disruptor; + + private boolean isClosed = false; + + @Getter private RingBuffer ringBuffer; + + private final int eventQueueSize; + + public JobEventDisruptor(int eventQueueSize) { + this.eventQueueSize = findNextPowerOfTwo(eventQueueSize); + ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE; + this.disruptor = + new Disruptor<>( + JobEvent.FACTORY, + this.eventQueueSize, + threadFactory, + ProducerType.SINGLE, + new BlockingWaitStrategy()); + + disruptor.start(); + this.ringBuffer = disruptor.getRingBuffer(); + } + + private int findNextPowerOfTwo(int value) { + return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); + } + + public boolean publish(Event event) { + if (isClosed()) { + return false; + } + long sequence = ringBuffer.next(); + try { + JobEvent jobEvent = ringBuffer.get(sequence); + jobEvent.setEvent(event); + } finally { + ringBuffer.publish(sequence); + } + return true; + } + + public boolean isClosed() { + return isClosed; + } + + @Override + public void close() throws IOException { + isClosed = true; + disruptor.shutdown(); + } + + public ArrayBlockingQueue storeJobHistory() { + ArrayBlockingQueue events = new ArrayBlockingQueue<>(eventQueueSize); + long nextSequence = 0; + while (ringBuffer.getCursor() >= nextSequence) { + JobEvent event = ringBuffer.get(nextSequence); + events.add(event.getEvent()); + nextSequence++; + } + try { + this.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return events; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java index 9e3da06cf05..017f18af9d3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java @@ -19,7 +19,9 @@ import org.apache.seatunnel.api.event.Event; import org.apache.seatunnel.api.event.EventProcessor; +import org.apache.seatunnel.engine.server.CoordinatorService; import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.master.JobMaster; import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook; import com.hazelcast.nio.ObjectDataInput; @@ -45,9 +47,12 @@ public class JobEventReportOperation extends Operation implements IdentifiedData @Override public void run() throws Exception { SeaTunnelServer server = getService(); - EventProcessor processor = server.getCoordinatorService().getEventProcessor(); + CoordinatorService coordinatorService = server.getCoordinatorService(); + EventProcessor processor = coordinatorService.getEventProcessor(); for (Event event : events) { processor.process(event); + JobMaster jobMaster = coordinatorService.getJobMaster(Long.parseLong(event.getJobId())); + jobMaster.getJobEventDisruptor().publish(event); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java index c3d8fc3b6ee..3e1a19d5a9e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.seatunnel.api.common.metrics.JobMetrics; +import org.apache.seatunnel.api.event.Event; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.core.job.ExecutionAddress; import org.apache.seatunnel.engine.core.job.JobDAGInfo; @@ -55,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -97,6 +99,8 @@ public class JobHistoryService { private final IMap finishedJobMetricsImap; + @Getter private final IMap> finishedJobEventImap; + private final ObjectMapper objectMapper; private final int finishedJobExpireTime; @@ -110,6 +114,7 @@ public JobHistoryService( IMap finishedJobStateImap, IMap finishedJobMetricsImap, IMap finishedJobVertexInfoImap, + IMap> finishedJobEventImap, int finishedJobExpireTime) { this.nodeEngine = nodeEngine; this.runningJobStateIMap = runningJobStateIMap; @@ -120,6 +125,7 @@ public JobHistoryService( this.finishedJobMetricsImap = finishedJobMetricsImap; this.finishedJobDAGInfoImap = finishedJobVertexInfoImap; this.finishedJobDAGInfoImap.addEntryListener(new JobInfoExpiredListener(), true); + this.finishedJobEventImap = finishedJobEventImap; this.objectMapper = new ObjectMapper(); this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); this.finishedJobExpireTime = finishedJobExpireTime; @@ -215,6 +221,10 @@ public void storeFinishedJobState(JobMaster jobMaster) { finishedJobStateImap.put(jobState.jobId, jobState, finishedJobExpireTime, TimeUnit.MINUTES); } + public void storeFinishedJobEvent(Long jobId, ArrayBlockingQueue events) { + finishedJobEventImap.put(jobId, events, finishedJobExpireTime, TimeUnit.MINUTES); + } + public void storeFinishedPipelineMetrics(long jobId, JobMetrics metrics) { finishedJobMetricsImap.computeIfAbsent(jobId, key -> JobMetrics.of(new HashMap<>())); JobMetrics newMetrics = finishedJobMetricsImap.get(jobId).merge(metrics); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 709263b11f8..3df6da06339 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -63,6 +63,7 @@ import org.apache.seatunnel.engine.server.dag.physical.PlanUtils; import org.apache.seatunnel.engine.server.dag.physical.ResourceUtils; import org.apache.seatunnel.engine.server.dag.physical.SubPlan; +import org.apache.seatunnel.engine.server.disruptor.JobEventDisruptor; import org.apache.seatunnel.engine.server.execution.TaskExecutionState; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; import org.apache.seatunnel.engine.server.execution.TaskLocation; @@ -142,6 +143,7 @@ public class JobMaster { private SeaTunnelServer seaTunnelServer; + @Getter private JobEventDisruptor jobEventDisruptor; /** * we need store slot used by task in Hazelcast IMap and release or reuse it when a new master * node active. @@ -214,6 +216,7 @@ public JobMaster( this.metricsImap = metricsImap; this.seaTunnelServer = seaTunnelServer; this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>(); + this.jobEventDisruptor = new JobEventDisruptor(engineConfig.getEventQueueSize()); } public synchronized void init(long initializationTimestamp, boolean restart) throws Exception { @@ -713,6 +716,8 @@ public void cleanJob() { checkpointManager.clearCheckpointIfNeed(physicalPlan.getJobStatus()); jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), getJobDAGInfo()); jobHistoryService.storeFinishedJobState(this); + jobHistoryService.storeFinishedJobEvent( + jobImmutableInformation.getJobId(), jobEventDisruptor.storeJobHistory()); removeJobIMap(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetEventOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetEventOperation.java new file mode 100644 index 00000000000..6b39fbba86f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetEventOperation.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.operation; + +import org.apache.seatunnel.api.event.Event; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture; +import org.apache.seatunnel.engine.server.CoordinatorService; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.disruptor.JobEvent; +import org.apache.seatunnel.engine.server.master.JobMaster; +import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook; + +import com.hazelcast.internal.serialization.Data; +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import com.hazelcast.spi.impl.operationservice.Operation; +import com.lmax.disruptor.RingBuffer; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +public class GetEventOperation extends Operation implements IdentifiedDataSerializable { + + private Long jobId; + + private Boolean isAll; + + public GetEventOperation() {} + + private Data response; + + private AtomicInteger nextSequence; + + /** + * @param jobId job id + * @param isAll When isAll is true, retrieve all events; otherwise, retrieve the latest event + */ + public GetEventOperation(Long jobId, boolean isAll) { + this.jobId = jobId; + this.isAll = isAll; + this.nextSequence = new AtomicInteger(0); + } + + @Override + public void run() throws Exception { + if (jobId == null) { + throw new SeaTunnelEngineException("JobId cannot be null"); + } + SeaTunnelServer server = getService(); + CoordinatorService coordinatorService = server.getCoordinatorService(); + + try { + response = + CompletableFuture.supplyAsync( + () -> retrieveEvents(coordinatorService), + getNodeEngine() + .getExecutionService() + .getExecutor("get_event_operation")) + .get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to retrieve events for job " + jobId, e); + throw new SeaTunnelEngineException("Failed to retrieve events: " + e.getMessage(), e); + } + } + + private Data retrieveEvents(CoordinatorService coordinatorService) { + List events = new ArrayList<>(); + JobMaster jobMaster = coordinatorService.getJobMaster(jobId); + + if (jobMaster != null) { + log.debug("Retrieving events for active job {}, isAll: {}", jobId, isAll); + RingBuffer ringBuffer = jobMaster.getJobEventDisruptor().getRingBuffer(); + AtomicInteger sequenceToUse = isAll ? new AtomicInteger(0) : nextSequence; + collectEvents(ringBuffer, sequenceToUse, events); + } else { + log.debug("Job {} not active, retrieving from history", jobId); + Optional.ofNullable( + coordinatorService + .getJobHistoryService() + .getFinishedJobEventImap() + .get(jobId)) + .ifPresent(events::addAll); + } + + return this.getNodeEngine().toData(events); + } + + private void collectEvents( + RingBuffer ringBuffer, AtomicInteger sequence, List events) { + + while (ringBuffer.getCursor() >= sequence.get()) { + JobEvent jobEvent = ringBuffer.get(sequence.get()); + events.add(jobEvent.getEvent()); + sequence.addAndGet(1); + } + } + + @Override + public Object getResponse() { + return response; + } + + @Override + public int getFactoryId() { + return ResourceDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return ResourceDataSerializerHook.GET_EVENT_TYPE; + } + + @Override + public String getServiceName() { + return SeaTunnelServer.SERVICE_NAME; + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeLong(jobId); + out.writeBoolean(isAll); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + jobId = in.readLong(); + isAll = in.readBoolean(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetEventTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetEventTask.java new file mode 100644 index 00000000000..1eb32f1bda5 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetEventTask.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.protocol.task; + +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetEventCodec; +import org.apache.seatunnel.engine.server.operation.GetEventOperation; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.instance.impl.Node; +import com.hazelcast.internal.nio.Connection; +import com.hazelcast.internal.serialization.Data; +import com.hazelcast.spi.impl.operationservice.Operation; + +public class GetEventTask + extends AbstractSeaTunnelMessageTask { + + protected GetEventTask(ClientMessage clientMessage, Node node, Connection connection) { + super( + clientMessage, + node, + connection, + SeaTunnelGetEventCodec::decodeRequest, + SeaTunnelGetEventCodec::encodeResponse); + } + + @Override + protected Operation prepareOperation() { + return new GetEventOperation(parameters.jobId, parameters.isAll); + } + + @Override + public String getMethodName() { + return "getEvent"; + } + + @Override + public Object[] getParameters() { + return new Object[0]; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java index e3d50aedb2f..d46ad1a37d5 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetClusterHealthMetricsCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetEventCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobCheckpointCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec; @@ -107,5 +108,8 @@ private void initFactories() { SeaTunnelGetJobCheckpointCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new GetJobCheckpointTask(clientMessage, node, connection)); + factories.put( + SeaTunnelGetEventCodec.REQUEST_MESSAGE_TYPE, + (clientMessage, connection) -> new GetEventTask(clientMessage, node, connection)); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index 810e08453eb..7d50660c87e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -67,6 +67,7 @@ public class RestConstant { public static final String REST_URL_RUNNING_JOBS = "/running-jobs"; @Deprecated public static final String REST_URL_RUNNING_JOB = "/running-job"; public static final String REST_URL_JOB_INFO = "/job-info"; + public static final String REST_URL_EVENT = "/event"; public static final String REST_URL_FINISHED_JOBS = "/finished-jobs"; public static final String REST_URL_ENCRYPT_CONFIG = "/encrypt-config"; public static final String REST_URL_THREAD_DUMP = "/thread-dump"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/EventService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/EventService.java new file mode 100644 index 00000000000..9f077ee3c6f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/EventService.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.rest.service; + +import org.apache.seatunnel.api.event.Event; +import org.apache.seatunnel.engine.common.Constant; + +import com.hazelcast.internal.json.JsonArray; +import com.hazelcast.internal.json.JsonObject; +import com.hazelcast.spi.impl.NodeEngineImpl; + +import java.util.concurrent.ArrayBlockingQueue; + +public class EventService extends BaseService { + + public EventService(NodeEngineImpl nodeEngine) { + super(nodeEngine); + } + + public JsonArray getEventInfoJson(Long jobId) { + try { + ArrayBlockingQueue events = + nodeEngine + .getHazelcastInstance() + .>getMap( + Constant.IMAP_FINISHED_JOB_EVENT) + .get(jobId); + + if (events == null || events.isEmpty()) { + return new JsonArray(); + } + + return events.stream() + .map(this::buildEventJson) + .collect(JsonArray::new, JsonArray::add, JsonArray::add); + } catch (ClassCastException e) { + + return new JsonArray(); + } + } + + private JsonObject buildEventJson(Event event) { + JsonObject eventJson = new JsonObject(); + eventJson.add("createdTime", event.getCreatedTime()); + eventJson.add("eventType", event.getEventType().toString()); + return eventJson; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/EventServlet.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/EventServlet.java new file mode 100644 index 00000000000..0ecc07c0940 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/EventServlet.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.rest.servlet; + +import org.apache.seatunnel.engine.server.rest.service.EventService; + +import com.hazelcast.spi.impl.NodeEngineImpl; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.util.Optional; + +public class EventServlet extends BaseServlet { + + private static final long serialVersionUID = 1L; + + private final EventService eventService; + + public EventServlet(NodeEngineImpl nodeEngine) { + super(nodeEngine); + this.eventService = new EventService(nodeEngine); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + + String[] pathSegments = + Optional.ofNullable(req.getPathInfo()) + .orElseThrow(() -> new IllegalArgumentException("Path cannot be null")) + .split("/"); + + Long jobId = + Long.parseLong( + Optional.of(pathSegments) + .filter(p -> p.length > 1) + .map(p -> p[1]) + .orElseThrow( + () -> + new IllegalArgumentException( + "Job ID must be provided in the path"))); + + writeJson(resp, eventService.getEventInfoJson(jobId)); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java index 09f9a4550c0..0b9c5bb3aa9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java @@ -53,6 +53,8 @@ public class ResourceDataSerializerHook implements DataSerializerHook { public static final int REQUEST_SLOT_INFO_TYPE = 9; + public static final int GET_EVENT_TYPE = 10; + public static final int FACTORY_ID = FactoryIdHelper.getFactoryId( SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,