Skip to content

Commit 6f09643

Browse files
authored
Merge pull request #188 from fala-aws/compression-truncation
add compression with iterative truncation
2 parents d2c86bc + fd3cdf5 commit 6f09643

File tree

2 files changed

+161
-3
lines changed

2 files changed

+161
-3
lines changed

kinesis/kinesis.go

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import (
2020
"bytes"
2121
"compress/gzip"
2222
"compress/zlib"
23+
"errors"
2324
"fmt"
25+
"math"
2426
"os"
2527
"strings"
2628
"sync/atomic"
@@ -44,6 +46,8 @@ import (
4446

4547
const (
4648
truncatedSuffix = "[Truncated...]"
49+
truncationReductionPercent = 90
50+
truncationCompressionMaxAttempts = 10
4751
)
4852

4953
const (
@@ -463,11 +467,14 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
463467
data = append(data, []byte("\n")...)
464468
}
465469

470+
// max truncation size
471+
maxDataSize := maximumRecordSize-partitionKeyLen
472+
466473
switch outputPlugin.compression {
467474
case CompressionZlib:
468-
data, err = zlibCompress(data)
475+
data, err = compressThenTruncate(zlibCompress, data, maxDataSize, []byte(truncatedSuffix), *outputPlugin)
469476
case CompressionGzip:
470-
data, err = gzipCompress(data)
477+
data, err = compressThenTruncate(gzipCompress, data, maxDataSize, []byte(truncatedSuffix), *outputPlugin)
471478
default:
472479
}
473480
if err != nil {
@@ -476,7 +483,7 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
476483

477484
if len(data)+partitionKeyLen > maximumRecordSize {
478485
logrus.Warnf("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n", outputPlugin.PluginID, len(data)+partitionKeyLen, outputPlugin.stream)
479-
data = data[:maximumRecordSize-partitionKeyLen-len(truncatedSuffix)]
486+
data = data[:maxDataSize-len(truncatedSuffix)]
480487
data = append(data, []byte(truncatedSuffix)...)
481488
}
482489

@@ -597,6 +604,9 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa
597604
return "", false
598605
}
599606

607+
// CompressorFunc is a function that compresses a byte slice
608+
type CompressorFunc func([]byte) ([]byte, error)
609+
600610
func zlibCompress(data []byte) ([]byte, error) {
601611
var b bytes.Buffer
602612

@@ -637,6 +647,90 @@ func gzipCompress(data []byte) ([]byte, error) {
637647
return b.Bytes(), nil
638648
}
639649

650+
// Compress Then Truncate
651+
// compresses data with CompressorFunction and iteratively truncates data
652+
// adding the truncation suffix if the CompressorFunction output exceeds maxOutLen.
653+
// The output is compressed and possibly truncated data whose length guaranteed to
654+
// be less than or equal to maxOutLen.
655+
func compressThenTruncate(compressorFunc CompressorFunc, data []byte, maxOutLen int, truncatedSuffix []byte, outputPlugin OutputPlugin) ([]byte, error) {
656+
var compressedData []byte
657+
var truncationBuffer []byte
658+
var originalCompressedLen int
659+
var compressedLen int
660+
var err error
661+
662+
/* Iterative approach to truncation */
663+
isTruncated := false
664+
compressedLen = math.MaxInt64
665+
truncatedInLen := len(data)
666+
truncationBuffer = data
667+
truncationCompressionAttempts := 0
668+
for (compressedLen > maxOutLen) {
669+
compressedData, err = compressorFunc(truncationBuffer)
670+
if err != nil {
671+
return nil, err
672+
}
673+
compressedLen = len(compressedData)
674+
675+
/* Truncation needed */
676+
if (compressedLen > maxOutLen) {
677+
truncationCompressionAttempts++
678+
logrus.Debugf("[kinesis %d] iterative truncation round stream=%s\n",
679+
outputPlugin.PluginID, outputPlugin.stream)
680+
681+
/* Base case: input compressed empty string, output still too large */
682+
if (truncatedInLen == 0) {
683+
logrus.Errorf("[kinesis %d] truncation failed, compressed empty input too " +
684+
"large stream=%s\n", outputPlugin.PluginID, outputPlugin.stream)
685+
return nil, errors.New("compressed empty to large");
686+
}
687+
688+
/* Base case: too many attempts - just to be extra safe */
689+
if (truncationCompressionAttempts > truncationCompressionMaxAttempts) {
690+
logrus.Errorf("[kinesis %d] truncation failed, too many compression attempts " +
691+
"stream=%s\n", outputPlugin.PluginID, outputPlugin.stream)
692+
return nil, errors.New("too many compression attempts");
693+
}
694+
695+
/* Calculate corrected input size */
696+
truncatedInLenPrev := truncatedInLen;
697+
truncatedInLen = (maxOutLen * truncatedInLen) / compressedLen;
698+
truncatedInLen = (truncatedInLen * truncationReductionPercent) / 100;
699+
700+
/* Ensure working down */
701+
if (truncatedInLen >= truncatedInLenPrev) {
702+
truncatedInLen = truncatedInLenPrev - 1;
703+
}
704+
705+
/* Allocate truncation buffer */
706+
if (!isTruncated) {
707+
isTruncated = true;
708+
originalCompressedLen = compressedLen
709+
truncationBuffer = make([]byte, truncatedInLen)
710+
copy(truncationBuffer, data[:truncatedInLen])
711+
}
712+
713+
/* Slap on truncation suffix */
714+
if (truncatedInLen < len(truncatedSuffix)) {
715+
/* No room for the truncation suffix. Terminal error */
716+
logrus.Errorf("[kinesis %d] truncation failed, no room for suffix " +
717+
"stream=%s\n", outputPlugin.PluginID, outputPlugin.stream)
718+
return nil, errors.New("no room for suffix");
719+
}
720+
truncationBuffer = truncationBuffer[:truncatedInLen]
721+
copy(truncationBuffer[len(truncationBuffer)-len(truncatedSuffix):], truncatedSuffix)
722+
}
723+
}
724+
725+
if (isTruncated) {
726+
logrus.Warnf("[kinesis %d] Found compressed record with %d bytes, " +
727+
"truncating to %d bytes after compression, stream=%s\n",
728+
outputPlugin.PluginID, originalCompressedLen, len(compressedData), outputPlugin.stream)
729+
}
730+
731+
return compressedData, nil
732+
}
733+
640734
// stringOrByteArray returns the string value if the input is a string or byte array otherwise an empty string
641735
func stringOrByteArray(v interface{}) string {
642736
switch t := v.(type) {

kinesis/kinesis_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kinesis
22

33
import (
44
"encoding/json"
5+
"math/rand"
56
"os"
67
"testing"
78
"time"
@@ -211,6 +212,69 @@ func TestCompressionEmpty(t *testing.T) {
211212
}
212213
}
213214

215+
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
216+
217+
func RandStringRunes(n int) string {
218+
b := make([]rune, n)
219+
for i := range b {
220+
b[i] = letterRunes[rand.Intn(len(letterRunes))]
221+
}
222+
return string(b)
223+
}
224+
225+
func TestCompressionTruncation(t *testing.T) {
226+
deftlvl := logrus.GetLevel();
227+
logrus.SetLevel(0);
228+
229+
rand.Seed(0)
230+
testData := []byte(RandStringRunes(4000))
231+
testSuffix := "[truncate]"
232+
outputPlugin := OutputPlugin{
233+
PluginID: 10,
234+
stream: "MyStream",
235+
}
236+
var compressedOutput, err = compressThenTruncate(gzipCompress, testData, 200, []byte(testSuffix), outputPlugin)
237+
assert.Nil(t, err)
238+
assert.GreaterOrEqual(t, len(compressedOutput), 150)
239+
assert.LessOrEqual(t, len(compressedOutput), 200)
240+
241+
logrus.SetLevel(deftlvl)
242+
}
243+
244+
func TestCompressionTruncationFailureA(t *testing.T) {
245+
deftlvl := logrus.GetLevel();
246+
logrus.SetLevel(0);
247+
248+
rand.Seed(0)
249+
testData := []byte(RandStringRunes(4000))
250+
testSuffix := "[truncate]"
251+
outputPlugin := OutputPlugin{
252+
PluginID: 10,
253+
stream: "MyStream",
254+
}
255+
var _, err = compressThenTruncate(gzipCompress, testData, 20, []byte(testSuffix), outputPlugin)
256+
assert.Contains(t, err.Error(), "no room for suffix")
257+
258+
logrus.SetLevel(deftlvl)
259+
}
260+
261+
func TestCompressionTruncationFailureB(t *testing.T) {
262+
deftlvl := logrus.GetLevel();
263+
logrus.SetLevel(0);
264+
265+
rand.Seed(0)
266+
testData := []byte{}
267+
testSuffix := "[truncate]"
268+
outputPlugin := OutputPlugin{
269+
PluginID: 10,
270+
stream: "MyStream",
271+
}
272+
var _, err = compressThenTruncate(gzipCompress, testData, 5, []byte(testSuffix), outputPlugin)
273+
assert.Contains(t, err.Error(), "compressed empty to large")
274+
275+
logrus.SetLevel(deftlvl)
276+
}
277+
214278
func TestDotReplace(t *testing.T) {
215279
records := make([]*kinesis.PutRecordsRequestEntry, 0, 500)
216280
record := map[interface{}]interface{}{

0 commit comments

Comments
 (0)