Skip to content

[Improve] Binary transmission statistics #6946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.type;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BinaryRow implements Serializable {

private String relativePath;

private byte[] data;

private long partIndex;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.type;

public class BinaryRowType implements SeaTunnelDataType<BinaryRow> {

public static final BinaryRowType INSTANCE = new BinaryRowType();

@Override
public Class<BinaryRow> getTypeClass() {
return BinaryRow.class;
}

@Override
public SqlType getSqlType() {
return SqlType.BINARY;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
return obj instanceof PrimitiveByteArrayType;
}
}
Comment on lines +18 to +41
Copy link
Member

@Hisoka-X Hisoka-X Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we add a new type, we have to be careful because it affects how all Sink/Transform should handle it. But the only purpose of this PR to add this new type is just to make the metrics more accurate, I think we need to discuss it in depth. cc @hailin0 @EricJoy2048

Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ public boolean isNullAt(int pos) {
return this.fields[pos] == null;
}

public int getCount(SeaTunnelRowType rowType) {
int count = 1;
if (rowType.getFieldType(0) != null) {
switch (rowType.getFieldType(0).getSqlType()) {
case BINARY:
return ((BinaryRow) fields[0]).getPartIndex() == 0 ? 1 : 0;
default:
return count;
}
}
return count;
}

public int getBytesSize(SeaTunnelRowType rowType) {
if (size == 0) {
int s = 0;
Expand Down Expand Up @@ -181,6 +194,9 @@ private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
rowSize += getBytesForValue(row.fields[i], types[i]);
}
return rowSize;
case BINARY:
BinaryRow binaryObject = (BinaryRow) v;
return binaryObject.getData().length;
default:
throw new UnsupportedOperationException("Unsupported type: " + sqlType);
}
Expand Down Expand Up @@ -224,6 +240,20 @@ private int getArrayNotNullSize(Object[] values) {
return c;
}

public int getCount() {
int count = 1;
if (fields[0] != null) {
String clazz = (fields[0]).getClass().getSimpleName();
switch (clazz) {
case "BinaryRow":
return ((BinaryRow) fields[0]).getPartIndex() == 0 ? 1 : 0;
default:
return count;
}
}
return count;
}

public int getBytesSize() {
if (size == 0) {
int s = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ public enum SqlType {
TIME,
TIMESTAMP,
ROW,
BINARY,
MULTIPLE_ROW;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import org.apache.seatunnel.api.table.type.BinaryRow;
import org.apache.seatunnel.api.table.type.BinaryRowType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;

import org.apache.hadoop.fs.FSDataOutputStream;

Expand All @@ -48,7 +49,7 @@ public BinaryWriteStrategy(FileSinkConfig fileSinkConfig) {
@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
if (!seaTunnelRowType.equals(BinaryReadStrategy.binaryRowType)) {
if (!(seaTunnelRowType.getFieldType(0) instanceof BinaryRowType)) {
throw new FileConnectorException(
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
"BinaryWriteStrategy only supports binary format, please read file with `BINARY` format, and do not change schema in the transform.");
Expand All @@ -57,9 +58,10 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {

@Override
public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException {
byte[] data = (byte[]) seaTunnelRow.getField(0);
String relativePath = (String) seaTunnelRow.getField(1);
long partIndex = (long) seaTunnelRow.getField(2);
BinaryRow binaryRow = (BinaryRow) seaTunnelRow.getField(0);
byte[] data = binaryRow.getData();
String relativePath = binaryRow.getRelativePath();
long partIndex = binaryRow.getPartIndex();
String filePath = getOrCreateFilePathBeingWritten(relativePath);
FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
if (partIndex - 1 != partIndexMap.get(filePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.BinaryRow;
import org.apache.seatunnel.api.table.type.BinaryRowType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand All @@ -37,10 +37,7 @@ public class BinaryReadStrategy extends AbstractReadStrategy {

public static SeaTunnelRowType binaryRowType =
new SeaTunnelRowType(
new String[] {"data", "relativePath", "partIndex"},
new SeaTunnelDataType[] {
PrimitiveByteArrayType.INSTANCE, BasicType.STRING_TYPE, BasicType.LONG_TYPE
});
new String[] {"binary"}, new SeaTunnelDataType[] {BinaryRowType.INSTANCE});

private File basePath;

Expand Down Expand Up @@ -75,7 +72,9 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
if (readSize != maxSize) {
buffer = Arrays.copyOf(buffer, readSize);
}
SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, relativePath, partIndex});
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {new BinaryRow(relativePath, buffer, partIndex)});
buffer = new byte[1024];
output.collect(row);
partIndex++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,12 @@ public void execute() throws CommandExecuteException {
Duration.between(startTime, endTime).getSeconds(),
"Total Read Count",
jobMetricsSummary.getSourceReadCount(),
"Total Read Bytes",
jobMetricsSummary.getSourceReadBytes(),
"Total Write Count",
jobMetricsSummary.getSinkWriteCount(),
"Total Write Bytes",
jobMetricsSummary.getSinkWriteBytes(),
"Total Failed Count",
jobMetricsSummary.getSourceReadCount()
- jobMetricsSummary.getSinkWriteCount()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,32 @@ public JobDAGInfo getJobInfo(Long jobId) {

public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId) {
long sourceReadCount = 0L;
long sourceReadBytes = 0L;
long sinkWriteCount = 0L;
long sinkWriteBytes = 0L;
String jobMetrics = getJobMetrics(jobId);
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
JsonNode sourceReaderBytes = jsonNode.get("SourceReceivedBytes");
JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
JsonNode sinkWriterBytes = jsonNode.get("SinkWriteBytes");
for (int i = 0; i < sourceReaders.size(); i++) {
JsonNode sourceReader = sourceReaders.get(i);
JsonNode sourceBytes = sourceReaderBytes.get(i);
JsonNode sinkWriter = sinkWriters.get(i);
JsonNode sinkBytes = sinkWriterBytes.get(i);
sourceReadCount += sourceReader.get("value").asLong();
sourceReadBytes += sourceBytes.get("value").asLong();
sinkWriteCount += sinkWriter.get("value").asLong();
sinkWriteBytes += sinkBytes.get("value").asLong();
}
return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount);
return new JobMetricsRunner.JobMetricsSummary(
sourceReadCount, sinkWriteCount, sourceReadBytes, sinkWriteBytes);
// Add NullPointerException because of metrics information can be empty like {}
} catch (JsonProcessingException | NullPointerException e) {
return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount);
return new JobMetricsRunner.JobMetricsSummary(
sourceReadCount, sinkWriteCount, sourceReadBytes, sinkWriteBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,7 @@ public void run() {
public static class JobMetricsSummary {
private long sourceReadCount;
private long sinkWriteCount;
private long sourceReadBytes;
private long sinkWriteBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ public SeaTunnelSourceCollector(
@Override
public void collect(T row) {
try {
int count = 1;
if (row instanceof SeaTunnelRow) {
int size;
if (rowType instanceof SeaTunnelRowType) {
count = ((SeaTunnelRow) row).getCount((SeaTunnelRowType) rowType);
size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType);
} else if (rowType instanceof MultipleRowType) {
size =
Expand All @@ -118,7 +120,7 @@ public void collect(T row) {
}
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
sourceReceivedCount.inc();
sourceReceivedCount.inc(count);
sourceReceivedQPS.markEvent();
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,15 @@ public void received(Record<?> record) {
return;
}
writer.write((T) record.getData());
sinkWriteCount.inc();
sinkWriteQPS.markEvent();
int count = 1;
if (record.getData() instanceof SeaTunnelRow) {
long size = ((SeaTunnelRow) record.getData()).getBytesSize();
count = ((SeaTunnelRow) record.getData()).getCount();
sinkWriteBytes.inc(size);
sinkWriteBytesPerSeconds.markEvent(size);
}
sinkWriteCount.inc(count);
sinkWriteQPS.markEvent();
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Loading