From b6cf59a70032ebb2dfcfc47b9a880e01e9343928 Mon Sep 17 00:00:00 2001 From: jxm Date: Wed, 5 Jun 2024 15:29:38 +0800 Subject: [PATCH 1/3] Optimize the number of statistical files during binary transmission as the count file size as the byte --- .../seatunnel/api/table/type/BinaryRow.java | 36 ++++++++++++++++ .../api/table/type/BinaryRowType.java | 41 +++++++++++++++++++ .../api/table/type/SeaTunnelRow.java | 30 ++++++++++++++ .../seatunnel/api/table/type/SqlType.java | 1 + .../file/sink/writer/BinaryWriteStrategy.java | 12 +++--- .../source/reader/BinaryReadStrategy.java | 13 +++--- .../command/ClientExecuteCommand.java | 4 ++ .../engine/client/job/JobClient.java | 14 ++++++- .../engine/client/job/JobMetricsRunner.java | 2 + .../server/task/SeaTunnelSourceCollector.java | 4 +- .../server/task/flow/SinkFlowLifeCycle.java | 6 ++- 11 files changed, 146 insertions(+), 17 deletions(-) create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRow.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRowType.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRow.java new file mode 100644 index 00000000000..c1b779c9cd0 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRow.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class BinaryRow implements Serializable { + + private String relativePath; + + private byte[] data; + + private long partIndex; +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRowType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRowType.java new file mode 100644 index 00000000000..a2602b333bf --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRowType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +public class BinaryRowType implements SeaTunnelDataType { + + public static final BinaryRowType INSTANCE = new BinaryRowType(); + + @Override + public Class getTypeClass() { + return BinaryRow.class; + } + + @Override + public SqlType getSqlType() { + return SqlType.BINARY; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + return obj instanceof PrimitiveByteArrayType; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 1e507cb1fa4..2532326eee8 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -98,6 +98,19 @@ public boolean isNullAt(int pos) { return this.fields[pos] == null; } + public int getCount(SeaTunnelRowType rowType) { + int count = 1; + if (rowType.getFieldType(0) != null) { + switch (rowType.getFieldType(0).getSqlType()) { + case BINARY: + return ((BinaryRow) fields[0]).getPartIndex() == 0 ? 1 : 0; + default: + return count; + } + } + return count; + } + public int getBytesSize(SeaTunnelRowType rowType) { if (size == 0) { int s = 0; @@ -181,6 +194,9 @@ private int getBytesForValue(Object v, SeaTunnelDataType dataType) { rowSize += getBytesForValue(row.fields[i], types[i]); } return rowSize; + case BINARY: + BinaryRow binaryObject = (BinaryRow) v; + return binaryObject.getData().length; default: throw new UnsupportedOperationException("Unsupported type: " + sqlType); } @@ -224,6 +240,20 @@ private int getArrayNotNullSize(Object[] values) { return c; } + public int getCount() { + int count = 1; + if (fields[0] != null) { + String clazz = (fields[0]).getClass().getSimpleName(); + switch (clazz) { + case "BinaryRow": + return ((BinaryRow) fields[0]).getPartIndex() == 0 ? 1 : 0; + default: + return count; + } + } + return count; + } + public int getBytesSize() { if (size == 0) { int s = 0; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java index 838a384809e..82c7eb96b17 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java @@ -36,5 +36,6 @@ public enum SqlType { TIME, TIMESTAMP, ROW, + BINARY, MULTIPLE_ROW; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java index 7f496b2927d..c70db95813a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; +import org.apache.seatunnel.api.table.type.BinaryRow; +import org.apache.seatunnel.api.table.type.BinaryRowType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; @@ -24,7 +26,6 @@ import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; -import org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy; import org.apache.hadoop.fs.FSDataOutputStream; @@ -48,7 +49,7 @@ public BinaryWriteStrategy(FileSinkConfig fileSinkConfig) { @Override public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { super.setSeaTunnelRowTypeInfo(seaTunnelRowType); - if (!seaTunnelRowType.equals(BinaryReadStrategy.binaryRowType)) { + if (!(seaTunnelRowType.getFieldType(0) instanceof BinaryRowType)) { throw new FileConnectorException( FileConnectorErrorCode.FORMAT_NOT_SUPPORT, "BinaryWriteStrategy only supports binary format, please read file with `BINARY` format, and do not change schema in the transform."); @@ -57,9 +58,10 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { @Override public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException { - byte[] data = (byte[]) seaTunnelRow.getField(0); - String relativePath = (String) seaTunnelRow.getField(1); - long partIndex = (long) seaTunnelRow.getField(2); + BinaryRow binaryRow = (BinaryRow) seaTunnelRow.getField(0); + byte[] data = binaryRow.getData(); + String relativePath = binaryRow.getRelativePath(); + long partIndex = binaryRow.getPartIndex(); String filePath = getOrCreateFilePathBeingWritten(relativePath); FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath); if (partIndex - 1 != partIndexMap.get(filePath)) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java index 3bbb90c774b..add8b7465ee 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java @@ -18,8 +18,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.source.reader; import org.apache.seatunnel.api.source.Collector; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.BinaryRow; +import org.apache.seatunnel.api.table.type.BinaryRowType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -37,10 +37,7 @@ public class BinaryReadStrategy extends AbstractReadStrategy { public static SeaTunnelRowType binaryRowType = new SeaTunnelRowType( - new String[] {"data", "relativePath", "partIndex"}, - new SeaTunnelDataType[] { - PrimitiveByteArrayType.INSTANCE, BasicType.STRING_TYPE, BasicType.LONG_TYPE - }); + new String[] {"binary"}, new SeaTunnelDataType[] {BinaryRowType.INSTANCE}); private File basePath; @@ -75,7 +72,9 @@ public void read(String path, String tableId, Collector output) if (readSize != maxSize) { buffer = Arrays.copyOf(buffer, readSize); } - SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, relativePath, partIndex}); + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] {new BinaryRow(relativePath, buffer, partIndex)}); buffer = new byte[1024]; output.collect(row); partIndex++; diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index d1e8b780099..f9a999c716f 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -220,8 +220,12 @@ public void execute() throws CommandExecuteException { Duration.between(startTime, endTime).getSeconds(), "Total Read Count", jobMetricsSummary.getSourceReadCount(), + "Total Read Bytes", + jobMetricsSummary.getSourceReadBytes(), "Total Write Count", jobMetricsSummary.getSinkWriteCount(), + "Total Write Bytes", + jobMetricsSummary.getSinkWriteBytes(), "Total Failed Count", jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount())); diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java index c4a4f68a698..92b405354f2 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java @@ -151,22 +151,32 @@ public JobDAGInfo getJobInfo(Long jobId) { public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId) { long sourceReadCount = 0L; + long sourceReadBytes = 0L; long sinkWriteCount = 0L; + long sinkWriteBytes = 0L; String jobMetrics = getJobMetrics(jobId); try { JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics); JsonNode sourceReaders = jsonNode.get("SourceReceivedCount"); + JsonNode sourceReaderBytes = jsonNode.get("SourceReceivedBytes"); JsonNode sinkWriters = jsonNode.get("SinkWriteCount"); + JsonNode sinkWriterBytes = jsonNode.get("SinkWriteBytes"); for (int i = 0; i < sourceReaders.size(); i++) { JsonNode sourceReader = sourceReaders.get(i); + JsonNode sourceBytes = sourceReaderBytes.get(i); JsonNode sinkWriter = sinkWriters.get(i); + JsonNode sinkBytes = sinkWriterBytes.get(i); sourceReadCount += sourceReader.get("value").asLong(); + sourceReadBytes += sourceBytes.get("value").asLong(); sinkWriteCount += sinkWriter.get("value").asLong(); + sinkWriteBytes += sinkBytes.get("value").asLong(); } - return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount); + return new JobMetricsRunner.JobMetricsSummary( + sourceReadCount, sinkWriteCount, sourceReadBytes, sinkWriteBytes); // Add NullPointerException because of metrics information can be empty like {} } catch (JsonProcessingException | NullPointerException e) { - return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount); + return new JobMetricsRunner.JobMetricsSummary( + sourceReadCount, sinkWriteCount, sourceReadBytes, sinkWriteBytes); } } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java index f413a5bfc52..c66ba59cf8b 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java @@ -82,5 +82,7 @@ public void run() { public static class JobMetricsSummary { private long sourceReadCount; private long sinkWriteCount; + private long sourceReadBytes; + private long sinkWriteBytes; } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index f5d4aed1ab4..700e7ff4b5a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -99,9 +99,11 @@ public SeaTunnelSourceCollector( @Override public void collect(T row) { try { + int count = 1; if (row instanceof SeaTunnelRow) { int size; if (rowType instanceof SeaTunnelRowType) { + count = ((SeaTunnelRow) row).getCount((SeaTunnelRowType) rowType); size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType); } else if (rowType instanceof MultipleRowType) { size = @@ -118,7 +120,7 @@ public void collect(T row) { } sendRecordToNext(new Record<>(row)); emptyThisPollNext = false; - sourceReceivedCount.inc(); + sourceReceivedCount.inc(count); sourceReceivedQPS.markEvent(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 48c530a0c36..e7609a60d5f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -250,13 +250,15 @@ public void received(Record record) { return; } writer.write((T) record.getData()); - sinkWriteCount.inc(); - sinkWriteQPS.markEvent(); + int count = 1; if (record.getData() instanceof SeaTunnelRow) { long size = ((SeaTunnelRow) record.getData()).getBytesSize(); + count = ((SeaTunnelRow) record.getData()).getCount(); sinkWriteBytes.inc(size); sinkWriteBytesPerSeconds.markEvent(size); } + sinkWriteCount.inc(count); + sinkWriteQPS.markEvent(); } } catch (Exception e) { throw new RuntimeException(e); From 543924a88cbdfadcc1c44f2a8de79b6b7130fcc2 Mon Sep 17 00:00:00 2001 From: gitjxm <41097032+gitjxm@users.noreply.github.com> Date: Wed, 5 Jun 2024 16:15:09 +0800 Subject: [PATCH 2/3] Create main.yml --- .github/workflows/main.yml | 1 + 1 file changed, 1 insertion(+) create mode 100644 .github/workflows/main.yml diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1 @@ + From 60377f4af8766b06405f8b0c1f6872b7f3b8b794 Mon Sep 17 00:00:00 2001 From: gitjxm <41097032+gitjxm@users.noreply.github.com> Date: Wed, 5 Jun 2024 16:42:39 +0800 Subject: [PATCH 3/3] Delete .github/workflows/main.yml --- .github/workflows/main.yml | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .github/workflows/main.yml diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml deleted file mode 100644 index 8b137891791..00000000000 --- a/.github/workflows/main.yml +++ /dev/null @@ -1 +0,0 @@ -