Skip to content

Commit a2fdf26

Browse files
authored
[improve][logging] Log uploads per partition instead of entire batch (streamnative#617)
1 parent 1de24da commit a2fdf26

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ private void unsafeFlush() {
246246
final Map<String, List<Record<GenericRecord>>> recordsToInsertByTopic =
247247
recordsToInsert.stream().collect(Collectors.groupingBy(record -> record.getTopicName().get()));
248248

249-
for (List<Record<GenericRecord>> singleTopicRecordsToInsert : recordsToInsertByTopic.values()) {
249+
for (Map.Entry<String, List<Record<GenericRecord>>> entry : recordsToInsertByTopic.entrySet()) {
250+
List<Record<GenericRecord>> singleTopicRecordsToInsert = entry.getValue();
250251
Record<GenericRecord> firstRecord = singleTopicRecordsToInsert.get(0);
251252
Schema<GenericRecord> schema;
252253
try {
@@ -269,21 +270,27 @@ private void unsafeFlush() {
269270
final Iterator<Record<GenericRecord>> iter = singleTopicRecordsToInsert.iterator();
270271
filepath = buildPartitionPath(firstRecord, partitioner, format, timeStampForPartitioning);
271272
ByteBuffer payload = bindValue(iter, format);
272-
log.info("Uploading blob {} currentBatchSize {} currentBatchBytes {}", filepath, currentBatchSize.get(),
273-
currentBatchBytes.get());
273+
int uploadSize = singleTopicRecordsToInsert.size();
274+
long uploadBytes = getBytesSum(singleTopicRecordsToInsert);
275+
log.info("Uploading blob {} from topic {} uploadSize {} out of currentBatchSize {} "
276+
+ " uploadBytes {} out of currcurrentBatchBytes {}",
277+
filepath, entry.getKey(),
278+
uploadSize, currentBatchSize.get(),
279+
uploadBytes, currentBatchBytes.get());
274280
long elapsedMs = System.currentTimeMillis();
275281
uploadPayload(payload, filepath);
276282
elapsedMs = System.currentTimeMillis() - elapsedMs;
277283
log.debug("Uploading blob {} elapsed time in ms: {}", filepath, elapsedMs);
278284
singleTopicRecordsToInsert.forEach(Record::ack);
279-
currentBatchBytes.addAndGet(-1 * getBytesSum(singleTopicRecordsToInsert));
280-
currentBatchSize.addAndGet(-1 * singleTopicRecordsToInsert.size());
285+
currentBatchBytes.addAndGet(-1 * uploadBytes);
286+
currentBatchSize.addAndGet(-1 * uploadSize);
281287
if (sinkContext != null) {
282288
sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, singleTopicRecordsToInsert.size());
283289
sinkContext.recordMetric(METRICS_LATEST_UPLOAD_ELAPSED_TIME, elapsedMs);
284290
}
285-
log.info("Successfully uploaded blob {} currentBatchSize {} currentBatchBytes {}", filepath,
286-
currentBatchSize.get(), currentBatchBytes.get());
291+
log.info("Successfully uploaded blob {} from topic {} uploadSize {} uploadBytes {}",
292+
filepath, entry.getKey(),
293+
uploadSize, uploadBytes);
287294
} catch (Exception e) {
288295
if (e instanceof ContainerNotFoundException) {
289296
log.error("Blob {} is not found", filepath, e);

0 commit comments

Comments
 (0)