Skip to content

Commit 301b29e

Browse files
committed
Revert "Add gzip compression (#162)" back
This reverts commit 9b5e6e6.
1 parent 7dbd241 commit 301b29e

File tree

4 files changed

+51
-13
lines changed

4 files changed

+51
-13
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ If you think you’ve found a potential security issue, please do not post it in
2323
* `experimental_concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `experimental_concurrency` is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `experimental_concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `experimental_concurrency` option is `10`. WARNING: Enabling `experimental_concurrency` can lead to data loss if the retry count is reached. Enabling concurrency will increase resource usage (memory and CPU).
2424
* `experimental_concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted before records are dropped.
2525
* `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature isn't compatible with the `partition_key` feature. See the KPL aggregation section below for more details.
26-
* `compression`: Setting `compression` to `zlib` will enable zlib compression of each record. By default this feature is disabled and records are not compressed.
26+
* `compression`: Specify an algorithm for compression of each record. Supported compression algorithms are `zlib` and `gzip`. By default this feature is disabled and records are not compressed.
2727
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced.
2828

2929
### Permissions

fluent-bit-kinesis.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,12 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
152152
var comp kinesis.CompressionType
153153
if strings.ToLower(compression) == string(kinesis.CompressionZlib) {
154154
comp = kinesis.CompressionZlib
155+
} else if strings.ToLower(compression) == string(kinesis.CompressionGzip) {
156+
comp = kinesis.CompressionGzip
155157
} else if strings.ToLower(compression) == string(kinesis.CompressionNone) || compression == "" {
156158
comp = kinesis.CompressionNone
157159
} else {
158-
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'none', or undefined", pluginID, compression)
160+
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'gzip', 'none', or undefined", pluginID, compression)
159161
}
160162

161163
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID)

kinesis/kinesis.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package kinesis
1818

1919
import (
2020
"bytes"
21+
"compress/gzip"
2122
"compress/zlib"
2223
"fmt"
2324
"os"
@@ -72,6 +73,8 @@ const (
7273
CompressionNone CompressionType = "none"
7374
// CompressionZlib enables zlib compression
7475
CompressionZlib = "zlib"
76+
// CompressionGzip enables gzip compression
77+
CompressionGzip = "gzip"
7578
)
7679

7780
// OutputPlugin sends log records to kinesis
@@ -460,11 +463,15 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
460463
data = append(data, []byte("\n")...)
461464
}
462465

463-
if outputPlugin.compression == CompressionZlib {
466+
switch outputPlugin.compression {
467+
case CompressionZlib:
464468
data, err = zlibCompress(data)
465-
if err != nil {
466-
return nil, err
467-
}
469+
case CompressionGzip:
470+
data, err = gzipCompress(data)
471+
default:
472+
}
473+
if err != nil {
474+
return nil, err
468475
}
469476

470477
if len(data)+partitionKeyLen > maximumRecordSize {
@@ -610,6 +617,26 @@ func zlibCompress(data []byte) ([]byte, error) {
610617
return b.Bytes(), nil
611618
}
612619

620+
func gzipCompress(data []byte) ([]byte, error) {
621+
var b bytes.Buffer
622+
623+
if data == nil {
624+
return nil, fmt.Errorf("No data to compress. 'nil' value passed as data")
625+
}
626+
627+
zw := gzip.NewWriter(&b)
628+
_, err := zw.Write(data)
629+
if err != nil {
630+
return data, err
631+
}
632+
err = zw.Close()
633+
if err != nil {
634+
return data, err
635+
}
636+
637+
return b.Bytes(), nil
638+
}
639+
613640
// stringOrByteArray returns the string value if the input is a string or byte array otherwise an empty string
614641
func stringOrByteArray(v interface{}) string {
615642
switch t := v.(type) {

kinesis/kinesis_test.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,19 +187,28 @@ func TestAddRecordWithConcurrency(t *testing.T) {
187187
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK")
188188
}
189189

190-
func TestZlibCompression(t *testing.T) {
190+
var compressors = map[string]func([]byte) ([]byte, error){
191+
"zlib": zlibCompress,
192+
"gzip": gzipCompress,
193+
}
194+
195+
func TestCompression(t *testing.T) {
191196

192197
testData := []byte("Test Data: This is test data for compression. This data is needs to have with some repetitive values, so compression is effective.")
193198

194-
compressedBuf, err := zlibCompress(testData)
195-
assert.Equal(t, err, nil, "Expected successful compression of data")
196-
assert.Lessf(t, len(compressedBuf), len(testData), "Compressed data buffer should contain fewer bytes")
199+
for z, f := range compressors {
200+
compressedBuf, err := f(testData)
201+
assert.Equalf(t, err, nil, "Expected successful %s compression of data", z)
202+
assert.Lessf(t, len(compressedBuf), len(testData), "%s compressed data buffer should contain fewer bytes", z)
203+
}
197204
}
198205

199-
func TestZlibCompressionEmpty(t *testing.T) {
206+
func TestCompressionEmpty(t *testing.T) {
200207

201-
_, err := zlibCompress(nil)
202-
assert.NotEqual(t, err, nil, "'nil' data should return an error")
208+
for z, f := range compressors {
209+
_, err := f(nil)
210+
assert.NotEqualf(t, err, nil, "%s compressing 'nil' data should return an error", z)
211+
}
203212
}
204213

205214
func TestDotReplace(t *testing.T) {

0 commit comments

Comments
 (0)