diff --git a/s3stream/src/main/java/com/automq/stream/s3/DefaultUploadWriteAheadLogTask.java b/s3stream/src/main/java/com/automq/stream/s3/DefaultUploadWriteAheadLogTask.java index 227ca5b097..f592b286a3 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DefaultUploadWriteAheadLogTask.java +++ b/s3stream/src/main/java/com/automq/stream/s3/DefaultUploadWriteAheadLogTask.java @@ -141,17 +141,35 @@ private void upload0(long objectId) { for (Long streamId : streamIds) { List streamRecords = streamRecordsMap.get(streamId); int streamSize = streamRecords.stream().mapToInt(StreamRecordBatch::size).sum(); - if (forceSplit || streamSize >= streamSplitSizeThreshold) { - streamObjectCfList.add(writeStreamObject(streamRecords, streamSize).thenAccept(so -> { - synchronized (request) { - request.addStreamObject(so); - } - })); + if (streamSize >= 0) { + if (forceSplit || streamSize >= streamSplitSizeThreshold) { + streamObjectCfList.add(writeStreamObject(streamRecords, streamSize).thenAccept(so -> { + synchronized (request) { + request.addStreamObject(so); + } + })); + } else { + streamSetWriteCfList.add(acquireLimiter(streamSize).thenAccept(nil -> streamSetObject.write(streamId, streamRecords))); + long startOffset = streamRecords.get(0).getBaseOffset(); + long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset(); + request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset, streamSize)); + } } else { - streamSetWriteCfList.add(acquireLimiter(streamSize).thenAccept(nil -> streamSetObject.write(streamId, streamRecords))); - long startOffset = streamRecords.get(0).getBaseOffset(); - long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset(); - request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset, streamSize)); + streamRecords.stream().map(Collections::singletonList).forEach(streamRecord -> { + int singleStreamSize = streamRecord.stream().mapToInt(StreamRecordBatch::size).sum(); + if (forceSplit || singleStreamSize >= streamSplitSizeThreshold) { + streamObjectCfList.add(writeStreamObject(streamRecord, singleStreamSize).thenAccept(so -> { + synchronized (request) { + request.addStreamObject(so); + } + })); + } else { + streamSetWriteCfList.add(acquireLimiter(singleStreamSize).thenAccept(nil -> streamSetObject.write(streamId, streamRecord))); + long startOffset = streamRecord.get(0).getBaseOffset(); + long endOffset = streamRecord.get(0).getLastOffset(); + request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset, singleStreamSize)); + } + }); } } request.setObjectId(objectId);