40
40
import io .netty .buffer .CompositeByteBuf ;
41
41
import io .netty .buffer .Unpooled ;
42
42
import java .io .ByteArrayInputStream ;
43
+ import java .io .ByteArrayOutputStream ;
43
44
import java .io .DataInputStream ;
44
45
import java .io .IOException ;
45
46
import java .time .Clock ;
46
47
import java .util .ArrayDeque ;
47
48
import java .util .ArrayList ;
48
49
import java .util .Collections ;
49
50
import java .util .Comparator ;
51
+ import java .util .Enumeration ;
50
52
import java .util .HashMap ;
51
53
import java .util .HashSet ;
52
54
import java .util .Iterator ;
75
77
import lombok .Getter ;
76
78
import lombok .NoArgsConstructor ;
77
79
import lombok .ToString ;
80
+ import org .apache .bookkeeper .client .AsyncCallback ;
78
81
import org .apache .bookkeeper .client .AsyncCallback .CloseCallback ;
79
82
import org .apache .bookkeeper .client .AsyncCallback .OpenCallback ;
80
83
import org .apache .bookkeeper .client .BKException ;
@@ -244,6 +247,8 @@ public class ManagedCursorImpl implements ManagedCursor {
244
247
// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
245
248
private volatile boolean isActive = false ;
246
249
250
+ protected int maxPositionChunkSize = 1024 * 1024 ;
251
+
247
252
static class MarkDeleteEntry {
248
253
final PositionImpl newPosition ;
249
254
final MarkDeleteCallback callback ;
@@ -581,71 +586,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
581
586
582
587
// Read the last entry in the ledger
583
588
long lastEntryInLedger = lh .getLastAddConfirmed ();
584
-
585
- if (lastEntryInLedger < 0 ) {
586
- log .warn ("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger" ,
587
- ledger .getName (), ledgerId , name );
588
- // Rewind to last cursor snapshot available
589
- initialize (getRollbackPosition (info ), Collections .emptyMap (), cursorProperties , callback );
590
- return ;
591
- }
592
-
593
- lh .asyncReadEntries (lastEntryInLedger , lastEntryInLedger , (rc1 , lh1 , seq , ctx1 ) -> {
594
- if (log .isDebugEnabled ()) {
595
- log .debug ("[{}} readComplete rc={} entryId={}" , ledger .getName (), rc1 , lh1 .getLastAddConfirmed ());
596
- }
597
- if (isBkErrorNotRecoverable (rc1 )) {
598
- log .error ("[{}] Error reading from metadata ledger {} for cursor {}: {}" , ledger .getName (),
599
- ledgerId , name , BKException .getMessage (rc1 ));
600
- // Rewind to oldest entry available
601
- initialize (getRollbackPosition (info ), Collections .emptyMap (), cursorProperties , callback );
602
- return ;
603
- } else if (rc1 != BKException .Code .OK ) {
604
- log .warn ("[{}] Error reading from metadata ledger {} for cursor {}: {}" , ledger .getName (),
605
- ledgerId , name , BKException .getMessage (rc1 ));
606
-
607
- callback .operationFailed (createManagedLedgerException (rc1 ));
608
- return ;
609
- }
610
-
611
- LedgerEntry entry = seq .nextElement ();
612
- mbean .addReadCursorLedgerSize (entry .getLength ());
613
- PositionInfo positionInfo ;
614
- try {
615
- byte [] data = entry .getEntry ();
616
- data = decompressDataIfNeeded (data , lh );
617
- positionInfo = PositionInfo .parseFrom (data );
618
- } catch (InvalidProtocolBufferException e ) {
619
- callback .operationFailed (new ManagedLedgerException (e ));
620
- return ;
621
- }
622
- log .info ("[{}] Cursor {} recovered to position {}" , ledger .getName (), name , positionInfo );
623
-
624
- Map <String , Long > recoveredProperties = Collections .emptyMap ();
625
- if (positionInfo .getPropertiesCount () > 0 ) {
626
- // Recover properties map
627
- recoveredProperties = new HashMap <>();
628
- for (int i = 0 ; i < positionInfo .getPropertiesCount (); i ++) {
629
- LongProperty property = positionInfo .getProperties (i );
630
- recoveredProperties .put (property .getName (), property .getValue ());
631
- }
632
- }
633
-
634
- log .info ("[{}] Cursor {} recovered with recoveredProperties {}, individualDeletedMessagesCount {}" ,
635
- ledger .getName (), name , recoveredProperties , positionInfo .getIndividualDeletedMessagesCount ());
636
-
637
- PositionImpl position = new PositionImpl (positionInfo );
638
- if (positionInfo .getIndividualDeletedMessagesCount () > 0 ) {
639
- recoverIndividualDeletedMessages (positionInfo .getIndividualDeletedMessagesList ());
640
- }
641
- if (getConfig ().isDeletionAtBatchIndexLevelEnabled ()
642
- && positionInfo .getBatchedEntryDeletionIndexInfoCount () > 0 ) {
643
- recoverBatchDeletedIndexes (positionInfo .getBatchedEntryDeletionIndexInfoList ());
644
- }
645
- recoveredCursor (position , recoveredProperties , cursorProperties , lh );
646
- callback .operationComplete ();
647
- }, null );
589
+ recoverFromLedgerByEntryId (info , callback , lh , lastEntryInLedger );
648
590
};
591
+
649
592
try {
650
593
bookkeeper .asyncOpenLedger (ledgerId , digestType , getConfig ().getPassword (), openCallback ,
651
594
null );
@@ -656,6 +599,101 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
656
599
}
657
600
}
658
601
602
+ private void recoverFromLedgerByEntryId (ManagedCursorInfo info ,
603
+ VoidCallback callback ,
604
+ LedgerHandle lh ,
605
+ long entryId ) {
606
+ long ledgerId = lh .getId ();
607
+
608
+ if (entryId < 0 ) {
609
+ log .warn ("[{}] Error reading from metadata ledger {} for cursor {}: No valid entries in ledger" ,
610
+ ledger .getName (), ledgerId , name );
611
+ // Rewind to last cursor snapshot available
612
+ initialize (getRollbackPosition (info ), Collections .emptyMap (), cursorProperties , callback );
613
+ return ;
614
+ }
615
+
616
+ lh .asyncReadEntries (entryId , entryId , (rc1 , lh1 , seq , ctx1 ) -> {
617
+ if (log .isDebugEnabled ()) {
618
+ log .debug ("[{}} readComplete rc={} entryId={}" , ledger .getName (), rc1 , lh1 .getLastAddConfirmed ());
619
+ }
620
+ if (isBkErrorNotRecoverable (rc1 )) {
621
+ log .error ("[{}] Error reading from metadata ledger {} for cursor {}: {}" , ledger .getName (),
622
+ ledgerId , name , BKException .getMessage (rc1 ));
623
+ // Rewind to oldest entry available
624
+ initialize (getRollbackPosition (info ), Collections .emptyMap (), cursorProperties , callback );
625
+ return ;
626
+ } else if (rc1 != BKException .Code .OK ) {
627
+ log .warn ("[{}] Error reading from metadata ledger {} for cursor {}: {}" , ledger .getName (),
628
+ ledgerId , name , BKException .getMessage (rc1 ));
629
+
630
+ callback .operationFailed (createManagedLedgerException (rc1 ));
631
+ return ;
632
+ }
633
+
634
+ LedgerEntry entry = seq .nextElement ();
635
+ byte [] data = entry .getEntry ();
636
+ try {
637
+ ChunkSequenceFooter chunkSequenceFooter = parseChunkSequenceFooter (data );
638
+ if (chunkSequenceFooter .numParts > 0 ) {
639
+ readChunkSequence (callback , lh , entryId , chunkSequenceFooter );
640
+ } else {
641
+ Throwable res = tryCompleteCursorRecovery (lh , data );
642
+ if (res == null ) {
643
+ callback .operationComplete ();
644
+ } else {
645
+ log .warn ("[{}] Error recovering from metadata ledger {} entry {} for cursor {}. "
646
+ + "Will try recovery from previous entry." ,
647
+ ledger .getName (), ledgerId , entryId , name , res );
648
+ //try recovery from previous entry
649
+ recoverFromLedgerByEntryId (info , callback , lh , entryId - 1 );
650
+ }
651
+ }
652
+ } catch (IOException error ) {
653
+ log .error ("Cannot parse footer" , error );
654
+ log .warn ("[{}] Error recovering from metadata ledger {} entry {} for cursor {}, cannot parse footer. "
655
+ + "Will try recovery from previous entry." ,
656
+ ledger .getName (), ledgerId , entryId , name , error );
657
+ recoverFromLedgerByEntryId (info , callback , lh , entryId - 1 );
658
+ }
659
+ }, null );
660
+ }
661
+
662
+ private void readChunkSequence (VoidCallback callback , LedgerHandle lh ,
663
+ long footerPosition , ChunkSequenceFooter chunkSequenceFooter ) {
664
+ long startPos = footerPosition - chunkSequenceFooter .numParts ;
665
+ long endPos = footerPosition - 1 ;
666
+ log .info ("readChunkSequence from pos {}, num parts {}, startPos {}, endPos {}" ,
667
+ footerPosition , chunkSequenceFooter .numParts , startPos , endPos );
668
+ lh .asyncReadEntries (startPos , endPos , new AsyncCallback .ReadCallback () {
669
+ @ Override
670
+ public void readComplete (int rc , LedgerHandle lh , Enumeration <LedgerEntry > entries , Object ctx ) {
671
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream ();
672
+ entries .asIterator ().forEachRemaining (entry -> {
673
+ log .info ("pos {} len {} bytes " , entry .getEntryId (), entry .getLength ());
674
+ try {
675
+ buffer .write (entry .getEntry ());
676
+ } catch (IOException err ) {
677
+ throw new RuntimeException (err );
678
+ }
679
+ });
680
+ byte [] result = buffer .toByteArray ();
681
+ log .info ("Read {} chunks, total of {} bytes, expected {} bytes" , chunkSequenceFooter .numParts ,
682
+ result .length , chunkSequenceFooter .length );
683
+ if (result .length != chunkSequenceFooter .length ) {
684
+ callback .operationFailed (ManagedLedgerException .getManagedLedgerException (new IOException (
685
+ "Expected " + chunkSequenceFooter .length + " bytes but read " + result .length + " bytes" )));
686
+ }
687
+ Throwable res = tryCompleteCursorRecovery (lh , result );
688
+ if (res == null ) {
689
+ callback .operationComplete ();
690
+ } else {
691
+ callback .operationFailed (new ManagedLedgerException (res ));
692
+ }
693
+ }
694
+ }, null );
695
+ }
696
+
659
697
@ AllArgsConstructor
660
698
@ NoArgsConstructor
661
699
@ Getter
@@ -675,16 +713,15 @@ private ChunkSequenceFooter parseChunkSequenceFooter(byte[] data) throws IOExcep
675
713
return ObjectMapperFactory .getMapper ().getObjectMapper ().readValue (data , ChunkSequenceFooter .class );
676
714
}
677
715
678
- private void completeCursorRecovery ( VoidCallback callback , LedgerHandle lh , byte [] data ) {
716
+ private Throwable tryCompleteCursorRecovery ( LedgerHandle lh , byte [] data ) {
679
717
mbean .addReadCursorLedgerSize (data .length );
680
718
681
719
try {
682
720
data = decompressDataIfNeeded (data , lh );
683
721
} catch (Throwable e ) {
684
722
log .error ("[{}] Failed to decompress position info from ledger {} for cursor {}: {}" , ledger .getName (),
685
723
lh .getId (), name , e );
686
- callback .operationFailed (new ManagedLedgerException (e ));
687
- return ;
724
+ return e ;
688
725
}
689
726
690
727
PositionInfo positionInfo ;
@@ -693,8 +730,7 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt
693
730
} catch (InvalidProtocolBufferException e ) {
694
731
log .error ("[{}] Failed to parse position info from ledger {} for cursor {}: {}" , ledger .getName (),
695
732
lh .getId (), name , e );
696
- callback .operationFailed (new ManagedLedgerException (e ));
697
- return ;
733
+ return e ;
698
734
}
699
735
700
736
Map <String , Long > recoveredProperties = Collections .emptyMap ();
@@ -716,7 +752,7 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt
716
752
recoverBatchDeletedIndexes (positionInfo .getBatchedEntryDeletionIndexInfoList ());
717
753
}
718
754
recoveredCursor (position , recoveredProperties , cursorProperties , lh );
719
- callback . operationComplete () ;
755
+ return null ;
720
756
}
721
757
722
758
private void recoverIndividualDeletedMessages (List <MLDataFormats .MessageRange > individualDeletedMessagesList ) {
@@ -3282,6 +3318,7 @@ private void buildBatchEntryDeletionIndexInfoList(
3282
3318
}
3283
3319
3284
3320
void persistPositionToLedger (final LedgerHandle lh , MarkDeleteEntry mdEntry , final VoidCallback callback ) {
3321
+ checkArgument (maxPositionChunkSize > 0 , "maxPositionChunkSize mus be greater than zero" );
3285
3322
long now = System .nanoTime ();
3286
3323
PositionImpl position = mdEntry .newPosition ;
3287
3324
@@ -3302,10 +3339,9 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
3302
3339
3303
3340
long endCompress = System .nanoTime ();
3304
3341
3305
- int maxSize = 1024 * 1024 ;
3306
3342
int offset = 0 ;
3307
3343
final int len = data .readableBytes ();
3308
- int numParts = 1 + (len / maxSize );
3344
+ int numParts = 1 + (len / maxPositionChunkSize );
3309
3345
3310
3346
if (log .isDebugEnabled ()) {
3311
3347
log .debug ("[{}] Cursor {} Appending to ledger={} position={} data size {} bytes, numParts {}" ,
@@ -3328,7 +3364,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
3328
3364
int part = 0 ;
3329
3365
while (part != numParts ) {
3330
3366
int remaining = len - offset ;
3331
- int currentLen = Math .min (maxSize , remaining );
3367
+ int currentLen = Math .min (maxPositionChunkSize , remaining );
3332
3368
boolean isLast = part == numParts - 1 ;
3333
3369
3334
3370
if (log .isDebugEnabled ()) {
0 commit comments