Skip to content

[Bug] [Zeta] NegativeArraySizeException in SeaTunnelRow when executing Shuffle task #10826

@lm-ylj

Description

@lm-ylj

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

When synchronizing multiple tables from MySQL-CDC to Kafka in SeaTunnel, the job fails with a NegativeArraySizeException if the table has more than 127 fields.
The Shuffle task fails immediately with the following exception:

task: [pipeline-1 [Shuffle [Source[0]-MySQL-CDC]] -> xxx -> pipeline-1 [Sink[0]-Kafka-xxx]-ShuffleTask (1/1)] end with state FAILED and Exception: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException
	at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
	at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
	at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
	at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:721)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1043)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NegativeArraySizeException
	at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
	at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:81)
	at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
	at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
	... 13 more

Root Cause Analysis
The root cause is that RecordSerializer uses the byte data type to read and write the field count.
A byte in Java can only hold values from -128 to 127. When the number of fields exceeds 127, the value overflows and becomes a negative number, which directly causes the NegativeArraySizeException when initializing SeaTunnelRow.

@Override
public void write(ObjectDataOutput out, Record record) throws IOException {
	Object data = record.getData();
	if (data instanceof CheckpointBarrier) {
		CheckpointBarrier checkpointBarrier = (CheckpointBarrier) data;
		out.writeByte(RecordDataType.CHECKPOINT_BARRIER.ordinal());
		out.writeLong(checkpointBarrier.getId());
		out.writeLong(checkpointBarrier.getTimestamp());
		out.writeString(checkpointBarrier.getCheckpointType().getName());
		out.writeObject(checkpointBarrier.getPrepareCloseTasks());
		out.writeObject(checkpointBarrier.getClosedTasks());
	} else if (data instanceof SeaTunnelRow) {
		SeaTunnelRow row = (SeaTunnelRow) data;
		out.writeByte(RecordDataType.SEATUNNEL_ROW.ordinal());
		out.writeString(row.getTableId());
		out.writeByte(row.getRowKind().toByteValue());
		// This line
		out.writeByte(row.getArity());
		for (Object field : row.getFields()) {
			out.writeObject(field);
		}
	} else {
		throw new UnsupportedEncodingException(
				"Unsupported serialize class: " + data.getClass());
	}
}

@Override
public Record read(ObjectDataInput in) throws IOException {
	Object data;
	byte dataType = in.readByte();
	if (dataType == RecordDataType.CHECKPOINT_BARRIER.ordinal()) {
		data =
				new CheckpointBarrier(
						in.readLong(),
						in.readLong(),
						CheckpointType.fromName(in.readString()),
						in.readObject(),
						in.readObject());
	} else if (dataType == RecordDataType.SEATUNNEL_ROW.ordinal()) {
		String tableId = in.readString();
		byte rowKind = in.readByte();
		// This line
		byte arity = in.readByte();
		SeaTunnelRow row = new SeaTunnelRow(arity);
		row.setTableId(tableId);
		row.setRowKind(RowKind.fromByteValue(rowKind));
		for (int i = 0; i < arity; i++) {
			row.setField(i, in.readObject());
		}
		data = row;
	} else {
		throw new UnsupportedEncodingException(
				"Unsupported deserialize data type: " + dataType);
	}
	return new Record(data);
}

SeaTunnel Version

2.3.7

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 60000
  checkpoint.timeout = 300000
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://xxx:xxx"
    username = "xxx"
    password = "xxx"
    // aaa.bbb: 127 fields and ccc.ddd: 128 fields
    table-names = ["aaa.bbb", "ccc.ddd"]
  }
}

sink {
  Kafka {
    topic = "xxx"
    bootstrap.servers = "xxx"
  }
}

Running Command

seatunnel.sh -c test.config

Error Exception

com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException
	at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
	at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
	at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
	at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:721)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1043)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NegativeArraySizeException
	at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
	at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:81)
	at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
	at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
	... 13 more

Zeta or Flink or Spark Version

2.3.7

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions