39
39
import io .netty .buffer .ByteBufUtil ;
40
40
import io .netty .buffer .CompositeByteBuf ;
41
41
import io .netty .buffer .Unpooled ;
42
- import java .io .ByteArrayInputStream ;
43
- import java .io .ByteArrayOutputStream ;
44
- import java .io .DataInputStream ;
45
42
import java .io .IOException ;
43
+ import java .nio .charset .StandardCharsets ;
46
44
import java .time .Clock ;
47
45
import java .util .ArrayDeque ;
48
46
import java .util .ArrayList ;
@@ -636,7 +634,7 @@ private void recoverFromLedgerByEntryId(ManagedCursorInfo info,
636
634
}
637
635
638
636
LedgerEntry entry = seq .nextElement ();
639
- byte [] data = entry .getEntry ();
637
+ ByteBuf data = entry .getEntryBuffer ();
640
638
try {
641
639
ChunkSequenceFooter chunkSequenceFooter = parseChunkSequenceFooter (data );
642
640
if (chunkSequenceFooter .numParts > 0 ) {
@@ -672,23 +670,28 @@ private void readChunkSequence(VoidCallback callback, LedgerHandle lh,
672
670
lh .asyncReadEntries (startPos , endPos , new AsyncCallback .ReadCallback () {
673
671
@ Override
674
672
public void readComplete (int rc , LedgerHandle lh , Enumeration <LedgerEntry > entries , Object ctx ) {
675
- ByteArrayOutputStream buffer = new ByteArrayOutputStream ();
673
+ CompositeByteBuf buffer = PulsarByteBufAllocator .DEFAULT .compositeBuffer ();
674
+
675
+ AtomicInteger readableBytes = new AtomicInteger (0 );
676
676
entries .asIterator ().forEachRemaining (entry -> {
677
- log .info ("pos {} len {} bytes " , entry .getEntryId (), entry .getLength ());
678
- try {
679
- buffer .write (entry .getEntry ());
680
- } catch (IOException err ) {
681
- throw new RuntimeException (err );
677
+ if (log .isInfoEnabled ()) {
678
+ log .debug ("pos {} len {} bytes " , entry .getEntryId (), entry .getLength ());
682
679
}
680
+ ByteBuf part = entry .getEntryBuffer ();
681
+ buffer .addComponent (part );
682
+ readableBytes .addAndGet (part .readableBytes ());
683
683
});
684
- byte [] result = buffer .toByteArray ();
684
+ buffer .writerIndex (readableBytes .get ())
685
+ .readerIndex (0 );
686
+
685
687
log .info ("Read {} chunks, total of {} bytes, expected {} bytes" , chunkSequenceFooter .numParts ,
686
- result . length , chunkSequenceFooter .length );
687
- if (result . length != chunkSequenceFooter .length ) {
688
+ buffer . readableBytes () , chunkSequenceFooter .length );
689
+ if (buffer . readableBytes () != chunkSequenceFooter .length ) {
688
690
callback .operationFailed (ManagedLedgerException .getManagedLedgerException (new IOException (
689
- "Expected " + chunkSequenceFooter .length + " bytes but read " + result .length + " bytes" )));
691
+ "Expected " + chunkSequenceFooter .length + " bytes but read "
692
+ + buffer .readableBytes () + " bytes" )));
690
693
}
691
- Throwable res = tryCompleteCursorRecovery (lh , result );
694
+ Throwable res = tryCompleteCursorRecovery (lh , buffer );
692
695
if (res == null ) {
693
696
callback .operationComplete ();
694
697
} else {
@@ -709,32 +712,42 @@ public static final class ChunkSequenceFooter {
709
712
private int length ;
710
713
}
711
714
712
- private ChunkSequenceFooter parseChunkSequenceFooter (byte [] data ) throws IOException {
713
- if (data .length == 0 || data [0 ] != '{' ) {
715
+ private ChunkSequenceFooter parseChunkSequenceFooter (ByteBuf data ) throws IOException {
716
+ // getChar() doesn't move the reader index
717
+ if (data .readableBytes () == 0 || data .getByte (0 ) != '{' ) {
714
718
// this is not JSON
715
719
return ChunkSequenceFooter .NOT_CHUNKED ;
716
720
}
717
- return ObjectMapperFactory .getMapper ().getObjectMapper ().readValue (data , ChunkSequenceFooter .class );
721
+
722
+ try {
723
+ return ObjectMapperFactory .getMapper ().getObjectMapper ()
724
+ .readValue (data .toString (StandardCharsets .UTF_8 ), ChunkSequenceFooter .class );
725
+ } catch (JsonProcessingException e ) {
726
+ return ChunkSequenceFooter .NOT_CHUNKED ;
727
+ }
718
728
}
719
729
720
- private Throwable tryCompleteCursorRecovery (LedgerHandle lh , byte [] data ) {
721
- mbean .addReadCursorLedgerSize (data .length );
730
+ private Throwable tryCompleteCursorRecovery (LedgerHandle lh , ByteBuf data ) {
731
+ mbean .addReadCursorLedgerSize (data .readableBytes () );
722
732
723
733
try {
724
734
data = decompressDataIfNeeded (data , lh );
725
735
} catch (Throwable e ) {
736
+ data .release ();
726
737
log .error ("[{}] Failed to decompress position info from ledger {} for cursor {}: {}" , ledger .getName (),
727
738
lh .getId (), name , e );
728
739
return e ;
729
740
}
730
741
731
742
PositionInfo positionInfo ;
732
743
try {
733
- positionInfo = PositionInfo .parseFrom (data );
744
+ positionInfo = PositionInfo .parseFrom (data . nioBuffer () );
734
745
} catch (InvalidProtocolBufferException e ) {
735
746
log .error ("[{}] Failed to parse position info from ledger {} for cursor {}: {}" , ledger .getName (),
736
747
lh .getId (), name , e );
737
748
return e ;
749
+ } finally {
750
+ data .release ();
738
751
}
739
752
740
753
Map <String , Long > recoveredProperties = Collections .emptyMap ();
@@ -3492,42 +3505,39 @@ private ByteBuf compressDataIfNeeded(ByteBuf data, LedgerHandle lh) {
3492
3505
result .readerIndex (0 )
3493
3506
.writerIndex (4 + compressedSize );
3494
3507
3495
- int ratio = (int ) (compressedSize * 100.0 / uncompressedSize );
3496
- log .info ("[{}] Cursor {} Compressed data size {} bytes (with {}, original size {} bytes, ratio {}%)" ,
3497
- ledger .getName (), name , compressedSize , pulsarCursorInfoCompressionString , uncompressedSize , ratio );
3508
+ if (log .isInfoEnabled ()) {
3509
+ int ratio = (int ) (compressedSize * 100.0 / uncompressedSize );
3510
+ log .info ("[{}] Cursor {} Compressed data size {} bytes (with {}, original size {} bytes, ratio {}%)" ,
3511
+ ledger .getName (), name , compressedSize , pulsarCursorInfoCompressionString ,
3512
+ uncompressedSize , ratio );
3513
+ }
3498
3514
return result ;
3499
3515
} finally {
3500
3516
data .release ();
3501
3517
}
3502
3518
}
3503
3519
3504
- static byte [] decompressDataIfNeeded (byte [] data , LedgerHandle lh ) {
3520
+ static ByteBuf decompressDataIfNeeded (ByteBuf data , LedgerHandle lh ) {
3505
3521
byte [] pulsarCursorInfoCompression =
3506
3522
lh .getCustomMetadata ().get (METADATA_PROPERTY_CURSOR_COMPRESSION_TYPE );
3507
3523
if (pulsarCursorInfoCompression != null ) {
3508
3524
String pulsarCursorInfoCompressionString = new String (pulsarCursorInfoCompression );
3509
3525
if (log .isDebugEnabled ()) {
3510
3526
log .debug ("Ledger {} compression {} decompressing {} bytes, full {}" ,
3511
- lh .getId (), pulsarCursorInfoCompressionString , data .length ,
3527
+ lh .getId (), pulsarCursorInfoCompressionString , data .readableBytes () ,
3512
3528
ByteBufUtil .prettyHexDump (Unpooled .wrappedBuffer (data )));
3513
3529
}
3514
- ByteArrayInputStream input = new ByteArrayInputStream (data );
3515
- DataInputStream dataInputStream = new DataInputStream (input );
3516
3530
try {
3517
- int uncompressedSize = dataInputStream . readInt ();
3518
- byte [] compressedData = dataInputStream . readAllBytes ();
3531
+ // this moves readerIndex
3532
+ int uncompressedSize = data . readInt ();
3519
3533
CompressionCodec compressionCodec = CompressionCodecProvider .getCompressionCodec (
3520
3534
CompressionType .valueOf (pulsarCursorInfoCompressionString ));
3521
- ByteBuf decode = compressionCodec .decode (Unpooled .wrappedBuffer (compressedData ), uncompressedSize );
3522
- try {
3523
- return ByteBufUtil .getBytes (decode );
3524
- } finally {
3525
- decode .release ();
3526
- }
3535
+ ByteBuf decode = compressionCodec .decode (data , uncompressedSize );
3536
+ return decode ;
3527
3537
} catch (IOException | MalformedInputException error ) {
3528
3538
log .error ("Cannot decompress cursor position using {}. Payload is {}" ,
3529
3539
pulsarCursorInfoCompressionString ,
3530
- ByteBufUtil .prettyHexDump (Unpooled . wrappedBuffer ( data ) ), error );
3540
+ ByteBufUtil .prettyHexDump (data ), error );
3531
3541
throw new RuntimeException (error );
3532
3542
}
3533
3543
} else {
0 commit comments