Skip to content

Commit b6cf59a

Browse files
committed
Optimize the number of statistical files during binary transmission as the count file size as the byte
1 parent c140178 commit b6cf59a

File tree

11 files changed

+146
-17
lines changed

11 files changed

+146
-17
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.table.type;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.Data;
22+
import lombok.NoArgsConstructor;
23+
24+
import java.io.Serializable;
25+
26+
@Data
27+
@AllArgsConstructor
28+
@NoArgsConstructor
29+
public class BinaryRow implements Serializable {
30+
31+
private String relativePath;
32+
33+
private byte[] data;
34+
35+
private long partIndex;
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.table.type;
19+
20+
public class BinaryRowType implements SeaTunnelDataType<BinaryRow> {
21+
22+
public static final BinaryRowType INSTANCE = new BinaryRowType();
23+
24+
@Override
25+
public Class<BinaryRow> getTypeClass() {
26+
return BinaryRow.class;
27+
}
28+
29+
@Override
30+
public SqlType getSqlType() {
31+
return SqlType.BINARY;
32+
}
33+
34+
@Override
35+
public boolean equals(Object obj) {
36+
if (obj == this) {
37+
return true;
38+
}
39+
return obj instanceof PrimitiveByteArrayType;
40+
}
41+
}

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java

+30
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,19 @@ public boolean isNullAt(int pos) {
9898
return this.fields[pos] == null;
9999
}
100100

101+
public int getCount(SeaTunnelRowType rowType) {
102+
int count = 1;
103+
if (rowType.getFieldType(0) != null) {
104+
switch (rowType.getFieldType(0).getSqlType()) {
105+
case BINARY:
106+
return ((BinaryRow) fields[0]).getPartIndex() == 0 ? 1 : 0;
107+
default:
108+
return count;
109+
}
110+
}
111+
return count;
112+
}
113+
101114
public int getBytesSize(SeaTunnelRowType rowType) {
102115
if (size == 0) {
103116
int s = 0;
@@ -181,6 +194,9 @@ private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
181194
rowSize += getBytesForValue(row.fields[i], types[i]);
182195
}
183196
return rowSize;
197+
case BINARY:
198+
BinaryRow binaryObject = (BinaryRow) v;
199+
return binaryObject.getData().length;
184200
default:
185201
throw new UnsupportedOperationException("Unsupported type: " + sqlType);
186202
}
@@ -224,6 +240,20 @@ private int getArrayNotNullSize(Object[] values) {
224240
return c;
225241
}
226242

243+
public int getCount() {
244+
int count = 1;
245+
if (fields[0] != null) {
246+
String clazz = (fields[0]).getClass().getSimpleName();
247+
switch (clazz) {
248+
case "BinaryRow":
249+
return ((BinaryRow) fields[0]).getPartIndex() == 0 ? 1 : 0;
250+
default:
251+
return count;
252+
}
253+
}
254+
return count;
255+
}
256+
227257
public int getBytesSize() {
228258
if (size == 0) {
229259
int s = 0;

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java

+1
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ public enum SqlType {
3636
TIME,
3737
TIMESTAMP,
3838
ROW,
39+
BINARY,
3940
MULTIPLE_ROW;
4041
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
1919

20+
import org.apache.seatunnel.api.table.type.BinaryRow;
21+
import org.apache.seatunnel.api.table.type.BinaryRowType;
2022
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2123
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2224
import org.apache.seatunnel.common.exception.CommonError;
2325
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
2426
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
2527
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
2628
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
27-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;
2829

2930
import org.apache.hadoop.fs.FSDataOutputStream;
3031

@@ -48,7 +49,7 @@ public BinaryWriteStrategy(FileSinkConfig fileSinkConfig) {
4849
@Override
4950
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
5051
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
51-
if (!seaTunnelRowType.equals(BinaryReadStrategy.binaryRowType)) {
52+
if (!(seaTunnelRowType.getFieldType(0) instanceof BinaryRowType)) {
5253
throw new FileConnectorException(
5354
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
5455
"BinaryWriteStrategy only supports binary format, please read file with `BINARY` format, and do not change schema in the transform.");
@@ -57,9 +58,10 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
5758

5859
@Override
5960
public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException {
60-
byte[] data = (byte[]) seaTunnelRow.getField(0);
61-
String relativePath = (String) seaTunnelRow.getField(1);
62-
long partIndex = (long) seaTunnelRow.getField(2);
61+
BinaryRow binaryRow = (BinaryRow) seaTunnelRow.getField(0);
62+
byte[] data = binaryRow.getData();
63+
String relativePath = binaryRow.getRelativePath();
64+
long partIndex = binaryRow.getPartIndex();
6365
String filePath = getOrCreateFilePathBeingWritten(relativePath);
6466
FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
6567
if (partIndex - 1 != partIndexMap.get(filePath)) {

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
1919

2020
import org.apache.seatunnel.api.source.Collector;
21-
import org.apache.seatunnel.api.table.type.BasicType;
22-
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
21+
import org.apache.seatunnel.api.table.type.BinaryRow;
22+
import org.apache.seatunnel.api.table.type.BinaryRowType;
2323
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2424
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2525
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -37,10 +37,7 @@ public class BinaryReadStrategy extends AbstractReadStrategy {
3737

3838
public static SeaTunnelRowType binaryRowType =
3939
new SeaTunnelRowType(
40-
new String[] {"data", "relativePath", "partIndex"},
41-
new SeaTunnelDataType[] {
42-
PrimitiveByteArrayType.INSTANCE, BasicType.STRING_TYPE, BasicType.LONG_TYPE
43-
});
40+
new String[] {"binary"}, new SeaTunnelDataType[] {BinaryRowType.INSTANCE});
4441

4542
private File basePath;
4643

@@ -75,7 +72,9 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
7572
if (readSize != maxSize) {
7673
buffer = Arrays.copyOf(buffer, readSize);
7774
}
78-
SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, relativePath, partIndex});
75+
SeaTunnelRow row =
76+
new SeaTunnelRow(
77+
new Object[] {new BinaryRow(relativePath, buffer, partIndex)});
7978
buffer = new byte[1024];
8079
output.collect(row);
8180
partIndex++;

Diff for: seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java

+4
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,12 @@ public void execute() throws CommandExecuteException {
220220
Duration.between(startTime, endTime).getSeconds(),
221221
"Total Read Count",
222222
jobMetricsSummary.getSourceReadCount(),
223+
"Total Read Bytes",
224+
jobMetricsSummary.getSourceReadBytes(),
223225
"Total Write Count",
224226
jobMetricsSummary.getSinkWriteCount(),
227+
"Total Write Bytes",
228+
jobMetricsSummary.getSinkWriteBytes(),
225229
"Total Failed Count",
226230
jobMetricsSummary.getSourceReadCount()
227231
- jobMetricsSummary.getSinkWriteCount()));

Diff for: seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -151,22 +151,32 @@ public JobDAGInfo getJobInfo(Long jobId) {
151151

152152
public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId) {
153153
long sourceReadCount = 0L;
154+
long sourceReadBytes = 0L;
154155
long sinkWriteCount = 0L;
156+
long sinkWriteBytes = 0L;
155157
String jobMetrics = getJobMetrics(jobId);
156158
try {
157159
JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
158160
JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
161+
JsonNode sourceReaderBytes = jsonNode.get("SourceReceivedBytes");
159162
JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
163+
JsonNode sinkWriterBytes = jsonNode.get("SinkWriteBytes");
160164
for (int i = 0; i < sourceReaders.size(); i++) {
161165
JsonNode sourceReader = sourceReaders.get(i);
166+
JsonNode sourceBytes = sourceReaderBytes.get(i);
162167
JsonNode sinkWriter = sinkWriters.get(i);
168+
JsonNode sinkBytes = sinkWriterBytes.get(i);
163169
sourceReadCount += sourceReader.get("value").asLong();
170+
sourceReadBytes += sourceBytes.get("value").asLong();
164171
sinkWriteCount += sinkWriter.get("value").asLong();
172+
sinkWriteBytes += sinkBytes.get("value").asLong();
165173
}
166-
return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount);
174+
return new JobMetricsRunner.JobMetricsSummary(
175+
sourceReadCount, sinkWriteCount, sourceReadBytes, sinkWriteBytes);
167176
// Add NullPointerException because of metrics information can be empty like {}
168177
} catch (JsonProcessingException | NullPointerException e) {
169-
return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount);
178+
return new JobMetricsRunner.JobMetricsSummary(
179+
sourceReadCount, sinkWriteCount, sourceReadBytes, sinkWriteBytes);
170180
}
171181
}
172182
}

Diff for: seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java

+2
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,7 @@ public void run() {
8282
public static class JobMetricsSummary {
8383
private long sourceReadCount;
8484
private long sinkWriteCount;
85+
private long sourceReadBytes;
86+
private long sinkWriteBytes;
8587
}
8688
}

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,11 @@ public SeaTunnelSourceCollector(
9999
@Override
100100
public void collect(T row) {
101101
try {
102+
int count = 1;
102103
if (row instanceof SeaTunnelRow) {
103104
int size;
104105
if (rowType instanceof SeaTunnelRowType) {
106+
count = ((SeaTunnelRow) row).getCount((SeaTunnelRowType) rowType);
105107
size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType);
106108
} else if (rowType instanceof MultipleRowType) {
107109
size =
@@ -118,7 +120,7 @@ public void collect(T row) {
118120
}
119121
sendRecordToNext(new Record<>(row));
120122
emptyThisPollNext = false;
121-
sourceReceivedCount.inc();
123+
sourceReceivedCount.inc(count);
122124
sourceReceivedQPS.markEvent();
123125
} catch (IOException e) {
124126
throw new RuntimeException(e);

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,15 @@ public void received(Record<?> record) {
250250
return;
251251
}
252252
writer.write((T) record.getData());
253-
sinkWriteCount.inc();
254-
sinkWriteQPS.markEvent();
253+
int count = 1;
255254
if (record.getData() instanceof SeaTunnelRow) {
256255
long size = ((SeaTunnelRow) record.getData()).getBytesSize();
256+
count = ((SeaTunnelRow) record.getData()).getCount();
257257
sinkWriteBytes.inc(size);
258258
sinkWriteBytesPerSeconds.markEvent(size);
259259
}
260+
sinkWriteCount.inc(count);
261+
sinkWriteQPS.markEvent();
260262
}
261263
} catch (Exception e) {
262264
throw new RuntimeException(e);

0 commit comments

Comments
 (0)