@@ -20,7 +20,9 @@ import (
2020 "bytes"
2121 "compress/gzip"
2222 "compress/zlib"
23+ "errors"
2324 "fmt"
25+ "math"
2426 "os"
2527 "strings"
2628 "sync/atomic"
@@ -44,6 +46,8 @@ import (
4446
4547const (
4648 truncatedSuffix = "[Truncated...]"
49+ truncationReductionPercent = 90
50+ truncationCompressionMaxAttempts = 10
4751)
4852
4953const (
@@ -463,11 +467,14 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
463467 data = append (data , []byte ("\n " )... )
464468 }
465469
470+ // max truncation size
471+ maxDataSize := maximumRecordSize - partitionKeyLen
472+
466473 switch outputPlugin .compression {
467474 case CompressionZlib :
468- data , err = zlibCompress ( data )
475+ data , err = compressThenTruncate ( zlibCompress , data , maxDataSize , [] byte ( truncatedSuffix ), * outputPlugin )
469476 case CompressionGzip :
470- data , err = gzipCompress ( data )
477+ data , err = compressThenTruncate ( gzipCompress , data , maxDataSize , [] byte ( truncatedSuffix ), * outputPlugin )
471478 default :
472479 }
473480 if err != nil {
@@ -476,7 +483,7 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
476483
477484 if len (data )+ partitionKeyLen > maximumRecordSize {
478485 logrus .Warnf ("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n " , outputPlugin .PluginID , len (data )+ partitionKeyLen , outputPlugin .stream )
479- data = data [:maximumRecordSize - partitionKeyLen - len (truncatedSuffix )]
486+ data = data [:maxDataSize - len (truncatedSuffix )]
480487 data = append (data , []byte (truncatedSuffix )... )
481488 }
482489
@@ -597,6 +604,9 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa
597604 return "" , false
598605}
599606
607+ // CompressorFunc is a function that compresses a byte slice
608+ type CompressorFunc func ([]byte ) ([]byte , error )
609+
600610func zlibCompress (data []byte ) ([]byte , error ) {
601611 var b bytes.Buffer
602612
@@ -637,6 +647,90 @@ func gzipCompress(data []byte) ([]byte, error) {
637647 return b .Bytes (), nil
638648}
639649
650+ // Compress Then Truncate
651+ // compresses data with CompressorFunction and iteratively truncates data
652+ // adding the truncation suffix if the CompressorFunction output exceeds maxOutLen.
653+ // The output is compressed and possibly truncated data whose length guaranteed to
654+ // be less than or equal to maxOutLen.
655+ func compressThenTruncate (compressorFunc CompressorFunc , data []byte , maxOutLen int , truncatedSuffix []byte , outputPlugin OutputPlugin ) ([]byte , error ) {
656+ var compressedData []byte
657+ var truncationBuffer []byte
658+ var originalCompressedLen int
659+ var compressedLen int
660+ var err error
661+
662+ /* Iterative approach to truncation */
663+ isTruncated := false
664+ compressedLen = math .MaxInt64
665+ truncatedInLen := len (data )
666+ truncationBuffer = data
667+ truncationCompressionAttempts := 0
668+ for (compressedLen > maxOutLen ) {
669+ compressedData , err = compressorFunc (truncationBuffer )
670+ if err != nil {
671+ return nil , err
672+ }
673+ compressedLen = len (compressedData )
674+
675+ /* Truncation needed */
676+ if (compressedLen > maxOutLen ) {
677+ truncationCompressionAttempts ++
678+ logrus .Debugf ("[kinesis %d] iterative truncation round stream=%s\n " ,
679+ outputPlugin .PluginID , outputPlugin .stream )
680+
681+ /* Base case: input compressed empty string, output still too large */
682+ if (truncatedInLen == 0 ) {
683+ logrus .Errorf ("[kinesis %d] truncation failed, compressed empty input too " +
684+ "large stream=%s\n " , outputPlugin .PluginID , outputPlugin .stream )
685+ return nil , errors .New ("compressed empty to large" );
686+ }
687+
688+ /* Base case: too many attempts - just to be extra safe */
689+ if (truncationCompressionAttempts > truncationCompressionMaxAttempts ) {
690+ logrus .Errorf ("[kinesis %d] truncation failed, too many compression attempts " +
691+ "stream=%s\n " , outputPlugin .PluginID , outputPlugin .stream )
692+ return nil , errors .New ("too many compression attempts" );
693+ }
694+
695+ /* Calculate corrected input size */
696+ truncatedInLenPrev := truncatedInLen ;
697+ truncatedInLen = (maxOutLen * truncatedInLen ) / compressedLen ;
698+ truncatedInLen = (truncatedInLen * truncationReductionPercent ) / 100 ;
699+
700+ /* Ensure working down */
701+ if (truncatedInLen >= truncatedInLenPrev ) {
702+ truncatedInLen = truncatedInLenPrev - 1 ;
703+ }
704+
705+ /* Allocate truncation buffer */
706+ if (! isTruncated ) {
707+ isTruncated = true ;
708+ originalCompressedLen = compressedLen
709+ truncationBuffer = make ([]byte , truncatedInLen )
710+ copy (truncationBuffer , data [:truncatedInLen ])
711+ }
712+
713+ /* Slap on truncation suffix */
714+ if (truncatedInLen < len (truncatedSuffix )) {
715+ /* No room for the truncation suffix. Terminal error */
716+ logrus .Errorf ("[kinesis %d] truncation failed, no room for suffix " +
717+ "stream=%s\n " , outputPlugin .PluginID , outputPlugin .stream )
718+ return nil , errors .New ("no room for suffix" );
719+ }
720+ truncationBuffer = truncationBuffer [:truncatedInLen ]
721+ copy (truncationBuffer [len (truncationBuffer )- len (truncatedSuffix ):], truncatedSuffix )
722+ }
723+ }
724+
725+ if (isTruncated ) {
726+ logrus .Warnf ("[kinesis %d] Found compressed record with %d bytes, " +
727+ "truncating to %d bytes after compression, stream=%s\n " ,
728+ outputPlugin .PluginID , originalCompressedLen , len (compressedData ), outputPlugin .stream )
729+ }
730+
731+ return compressedData , nil
732+ }
733+
640734// stringOrByteArray returns the string value if the input is a string or byte array otherwise an empty string
641735func stringOrByteArray (v interface {}) string {
642736 switch t := v .(type ) {
0 commit comments