@@ -294,14 +294,19 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
294294 short latestSchemaId = (short ) schemaInfo .getSchemaId ();
295295 validateSchemaId (kvRecords .schemaId (), latestSchemaId );
296296
297+ // we only support ADD COLUMN, so targetColumns is fine to be used directly
297298 RowMerger currentMerger =
298299 rowMerger .configureTargetColumns (
299300 targetColumns , latestSchemaId , latestSchema );
300301
301302 RowType latestRowType = latestSchema .getRowType ();
302303 WalBuilder walBuilder = createWalBuilder (latestSchemaId , latestRowType );
303304 walBuilder .setWriterState (kvRecords .writerId (), kvRecords .batchSequence ());
305+ // we only support ADD COLUMN LAST, so the BinaryRow after RowMerger is
306+ // only has fewer ending columns than latest schema, so we pad nulls to
307+ // the end of the BinaryRow to get the latest schema row.
304308 PaddingRow latestSchemaRow = new PaddingRow (latestRowType .getFieldCount ());
309+ // get offset to track the offset corresponded to the kv record
305310 long logEndOffsetOfPrevBatch = logTablet .localLogEndOffset ();
306311
307312 try {
@@ -325,6 +330,8 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
325330 // the CDC log offset by 1.
326331 LogAppendInfo logAppendInfo = logTablet .appendAsLeader (walBuilder .build ());
327332
333+ // if the batch is duplicated, we should truncate the kvPreWriteBuffer
334+ // already written.
328335 if (logAppendInfo .duplicated ()) {
329336 kvPreWriteBuffer .truncateTo (
330337 logEndOffsetOfPrevBatch , TruncateReason .DUPLICATED );
@@ -340,13 +347,15 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
340347 kvPreWriteBuffer .truncateTo (logEndOffsetOfPrevBatch , TruncateReason .ERROR );
341348 throw t ;
342349 } finally {
350+ // deallocate the memory and arrow writer used by the wal builder
343351 walBuilder .deallocate ();
344352 }
345353 });
346354 }
347355
348356 private void validateSchemaId (short schemaIdOfNewData , short latestSchemaId ) {
349357 if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0 ) {
358+ // TODO: we may need to support retriable exception here
350359 throw new SchemaNotExistException (
351360 "Invalid schema id: "
352361 + schemaIdOfNewData
@@ -355,7 +364,7 @@ private void validateSchemaId(short schemaIdOfNewData, short latestSchemaId) {
355364 }
356365 }
357366
358- private long processKvRecords (
367+ private void processKvRecords (
359368 KvRecordBatch kvRecords ,
360369 short schemaIdOfNewData ,
361370 RowMerger currentMerger ,
@@ -365,6 +374,7 @@ private long processKvRecords(
365374 throws Exception {
366375 long logOffset = startLogOffset ;
367376
377+ // TODO: reuse the read context and decoder
368378 KvRecordBatch .ReadContext readContext =
369379 KvRecordReadContext .createReadContext (kvFormat , schemaGetter );
370380 ValueDecoder valueDecoder = new ValueDecoder (schemaGetter , kvFormat );
@@ -396,8 +406,6 @@ private long processKvRecords(
396406 logOffset );
397407 }
398408 }
399-
400- return logOffset ;
401409 }
402410
403411 private long processDeletion (
@@ -410,6 +418,7 @@ private long processDeletion(
410418 throws Exception {
411419 DeleteBehavior deleteBehavior = currentMerger .deleteBehavior ();
412420 if (deleteBehavior == DeleteBehavior .IGNORE ) {
421+ // skip delete rows if the merger doesn't support yet
413422 return logOffset ;
414423 } else if (deleteBehavior == DeleteBehavior .DISABLE ) {
415424 throw new DeletionDisabledException (
@@ -428,6 +437,7 @@ private long processDeletion(
428437 BinaryValue oldValue = valueDecoder .decodeValue (oldValueBytes );
429438 BinaryValue newValue = currentMerger .delete (oldValue );
430439
440+ // if newValue is null, it means the row should be deleted
431441 if (newValue == null ) {
432442 return applyDelete (key , oldValue , walBuilder , latestSchemaRow , logOffset );
433443 } else {
0 commit comments