diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRow.java new file mode 100644 index 00000000000..c1b779c9cd0 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRow.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class BinaryRow implements Serializable { + + private String relativePath; + + private byte[] data; + + private long partIndex; +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRowType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRowType.java new file mode 100644 index 00000000000..a2602b333bf --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BinaryRowType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +public class BinaryRowType implements SeaTunnelDataType { + + public static final BinaryRowType INSTANCE = new BinaryRowType(); + + @Override + public Class getTypeClass() { + return BinaryRow.class; + } + + @Override + public SqlType getSqlType() { + return SqlType.BINARY; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + return obj instanceof PrimitiveByteArrayType; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 1e507cb1fa4..2532326eee8 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -98,6 +98,19 @@ public boolean isNullAt(int pos) { return this.fields[pos] == null; } + public int getCount(SeaTunnelRowType rowType) { + int count = 1; + if (rowType.getFieldType(0) != null) { + switch (rowType.getFieldType(0).getSqlType()) { + case BINARY: + return ((BinaryRow) fields[0]).getPartIndex() == 0 ? 1 : 0; + default: + return count; + } + } + return count; + } + public int getBytesSize(SeaTunnelRowType rowType) { if (size == 0) { int s = 0; @@ -181,6 +194,9 @@ private int getBytesForValue(Object v, SeaTunnelDataType dataType) { rowSize += getBytesForValue(row.fields[i], types[i]); } return rowSize; + case BINARY: + BinaryRow binaryObject = (BinaryRow) v; + return binaryObject.getData().length; default: throw new UnsupportedOperationException("Unsupported type: " + sqlType); } @@ -224,6 +240,20 @@ private int getArrayNotNullSize(Object[] values) { return c; } + public int getCount() { + int count = 1; + if (fields[0] != null) { + String clazz = (fields[0]).getClass().getSimpleName(); + switch (clazz) { + case "BinaryRow": + return ((BinaryRow) fields[0]).getPartIndex() == 0 ? 1 : 0; + default: + return count; + } + } + return count; + } + public int getBytesSize() { if (size == 0) { int s = 0; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java index 838a384809e..82c7eb96b17 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java @@ -36,5 +36,6 @@ public enum SqlType { TIME, TIMESTAMP, ROW, + BINARY, MULTIPLE_ROW; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java index 7f496b2927d..c70db95813a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; +import org.apache.seatunnel.api.table.type.BinaryRow; +import org.apache.seatunnel.api.table.type.BinaryRowType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; @@ -24,7 +26,6 @@ import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; -import org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy; import org.apache.hadoop.fs.FSDataOutputStream; @@ -48,7 +49,7 @@ public BinaryWriteStrategy(FileSinkConfig fileSinkConfig) { @Override public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { super.setSeaTunnelRowTypeInfo(seaTunnelRowType); - if (!seaTunnelRowType.equals(BinaryReadStrategy.binaryRowType)) { + if (!(seaTunnelRowType.getFieldType(0) instanceof BinaryRowType)) { throw new FileConnectorException( FileConnectorErrorCode.FORMAT_NOT_SUPPORT, "BinaryWriteStrategy only supports binary format, please read file with `BINARY` format, and do not change schema in the transform."); @@ -57,9 +58,10 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { @Override public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException { - byte[] data = (byte[]) seaTunnelRow.getField(0); - String relativePath = (String) seaTunnelRow.getField(1); - long partIndex = (long) seaTunnelRow.getField(2); + BinaryRow binaryRow = (BinaryRow) seaTunnelRow.getField(0); + byte[] data = binaryRow.getData(); + String relativePath = binaryRow.getRelativePath(); + long partIndex = binaryRow.getPartIndex(); String filePath = getOrCreateFilePathBeingWritten(relativePath); FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath); if (partIndex - 1 != partIndexMap.get(filePath)) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java index 3bbb90c774b..add8b7465ee 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java @@ -18,8 +18,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.source.reader; import org.apache.seatunnel.api.source.Collector; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.BinaryRow; +import org.apache.seatunnel.api.table.type.BinaryRowType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -37,10 +37,7 @@ public class BinaryReadStrategy extends AbstractReadStrategy { public static SeaTunnelRowType binaryRowType = new SeaTunnelRowType( - new String[] {"data", "relativePath", "partIndex"}, - new SeaTunnelDataType[] { - PrimitiveByteArrayType.INSTANCE, BasicType.STRING_TYPE, BasicType.LONG_TYPE - }); + new String[] {"binary"}, new SeaTunnelDataType[] {BinaryRowType.INSTANCE}); private File basePath; @@ -75,7 +72,9 @@ public void read(String path, String tableId, Collector output) if (readSize != maxSize) { buffer = Arrays.copyOf(buffer, readSize); } - SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, relativePath, partIndex}); + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] {new BinaryRow(relativePath, buffer, partIndex)}); buffer = new byte[1024]; output.collect(row); partIndex++; diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index 4527d98d660..9453f4bf2b1 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -223,8 +223,12 @@ public void execute() throws CommandExecuteException { Duration.between(startTime, endTime).getSeconds(), "Total Read Count", jobMetricsSummary.getSourceReadCount(), + "Total Read Bytes", + jobMetricsSummary.getSourceReadBytes(), "Total Write Count", jobMetricsSummary.getSinkWriteCount(), + "Total Write Bytes", + jobMetricsSummary.getSinkWriteBytes(), "Total Failed Count", jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount())); diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java index c4a4f68a698..92b405354f2 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java @@ -151,22 +151,32 @@ public JobDAGInfo getJobInfo(Long jobId) { public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId) { long sourceReadCount = 0L; + long sourceReadBytes = 0L; long sinkWriteCount = 0L; + long sinkWriteBytes = 0L; String jobMetrics = getJobMetrics(jobId); try { JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics); JsonNode sourceReaders = jsonNode.get("SourceReceivedCount"); + JsonNode sourceReaderBytes = jsonNode.get("SourceReceivedBytes"); JsonNode sinkWriters = jsonNode.get("SinkWriteCount"); + JsonNode sinkWriterBytes = jsonNode.get("SinkWriteBytes"); for (int i = 0; i < sourceReaders.size(); i++) { JsonNode sourceReader = sourceReaders.get(i); + JsonNode sourceBytes = sourceReaderBytes.get(i); JsonNode sinkWriter = sinkWriters.get(i); + JsonNode sinkBytes = sinkWriterBytes.get(i); sourceReadCount += sourceReader.get("value").asLong(); + sourceReadBytes += sourceBytes.get("value").asLong(); sinkWriteCount += sinkWriter.get("value").asLong(); + sinkWriteBytes += sinkBytes.get("value").asLong(); } - return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount); + return new JobMetricsRunner.JobMetricsSummary( + sourceReadCount, sinkWriteCount, sourceReadBytes, sinkWriteBytes); // Add NullPointerException because of metrics information can be empty like {} } catch (JsonProcessingException | NullPointerException e) { - return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount); + return new JobMetricsRunner.JobMetricsSummary( + sourceReadCount, sinkWriteCount, sourceReadBytes, sinkWriteBytes); } } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java index f413a5bfc52..c66ba59cf8b 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java @@ -82,5 +82,7 @@ public void run() { public static class JobMetricsSummary { private long sourceReadCount; private long sinkWriteCount; + private long sourceReadBytes; + private long sinkWriteBytes; } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index f5d4aed1ab4..700e7ff4b5a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -99,9 +99,11 @@ public SeaTunnelSourceCollector( @Override public void collect(T row) { try { + int count = 1; if (row instanceof SeaTunnelRow) { int size; if (rowType instanceof SeaTunnelRowType) { + count = ((SeaTunnelRow) row).getCount((SeaTunnelRowType) rowType); size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType); } else if (rowType instanceof MultipleRowType) { size = @@ -118,7 +120,7 @@ public void collect(T row) { } sendRecordToNext(new Record<>(row)); emptyThisPollNext = false; - sourceReceivedCount.inc(); + sourceReceivedCount.inc(count); sourceReceivedQPS.markEvent(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 48c530a0c36..e7609a60d5f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -250,13 +250,15 @@ public void received(Record record) { return; } writer.write((T) record.getData()); - sinkWriteCount.inc(); - sinkWriteQPS.markEvent(); + int count = 1; if (record.getData() instanceof SeaTunnelRow) { long size = ((SeaTunnelRow) record.getData()).getBytesSize(); + count = ((SeaTunnelRow) record.getData()).getCount(); sinkWriteBytes.inc(size); sinkWriteBytesPerSeconds.markEvent(size); } + sinkWriteCount.inc(count); + sinkWriteQPS.markEvent(); } } catch (Exception e) { throw new RuntimeException(e);