Skip to content

Commit 9426b7b

Browse files
authored
[Fix][Connector-V2] Fix maxcompute write with multi parallelism (#9089)
1 parent 8041d59 commit 9426b7b

File tree

1 file changed

+2
-3
lines changed
  • seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink

1 file changed

+2
-3
lines changed

seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
4747
private RecordWriter recordWriter;
4848
private final TableTunnel.UploadSession session;
4949
private final TableSchema tableSchema;
50-
private static final Long BLOCK_0 = 0L;
5150
private final SeaTunnelRowType rowType;
5251

5352
public MaxcomputeWriter(ReadonlyConfig readonlyConfig, SeaTunnelRowType rowType) {
@@ -68,7 +67,7 @@ public MaxcomputeWriter(ReadonlyConfig readonlyConfig, SeaTunnelRowType rowType)
6867
tunnel.createUploadSession(
6968
readonlyConfig.get(PROJECT), readonlyConfig.get(TABLE_NAME));
7069
}
71-
this.recordWriter = session.openRecordWriter(BLOCK_0);
70+
this.recordWriter = session.openBufferedWriter();
7271
log.info("open record writer success");
7372
} catch (Exception e) {
7473
throw new MaxcomputeConnectorException(
@@ -89,7 +88,7 @@ public void close() throws IOException {
8988
if (recordWriter != null) {
9089
recordWriter.close();
9190
try {
92-
session.commit(new Long[] {BLOCK_0});
91+
session.commit();
9392
} catch (Exception e) {
9493
throw new MaxcomputeConnectorException(
9594
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, e);

0 commit comments

Comments
 (0)