Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.10.1
* Feature - Make maximum aggregation size configurable.

## 1.10.0
* Feature - Add support for building this plugin on Windows. *Note that this is only support in this plugin repo for Windows compilation.*

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ After this step, run `make windows-release`. Then use with Fluent Bit on Windows
* `compression`: Specify an algorithm for compression of each record. Supported compression algorithms are `zlib` and `gzip`. By default this feature is disabled and records are not compressed.
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced.
* `http_request_timeout`: Specify a timeout (in seconds) for the underlying AWS SDK Go HTTP call when sending records to Kinesis. By default, a timeout of `0` is used, indicating no timeout. Note that even with no timeout, the default behavior of the AWS SDK Go library may still lead to an eventual timeout.
* `aggregation_maximum_record_size`: defines aggregation maximum record size. Default is 1MB.
* `skip_aggregation_record_size`: defines max record size that should be included in an aggregation. Default is 20KB.

### Permissions

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.0
1.10.1
65 changes: 45 additions & 20 deletions aggregate/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"crypto/md5"
"fmt"

"github.com/aws/amazon-kinesis-streams-for-fluent-bit/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"

"github.com/canva/amazon-kinesis-streams-for-fluent-bit/compress"
"github.com/canva/amazon-kinesis-streams-for-fluent-bit/util"
)

var (
Expand All @@ -19,31 +21,47 @@ var (
)

const (
maximumRecordSize = 1024 * 1024 // 1 MB
defaultMaxAggRecordSize = 20 * 1024 // 20K
initialAggRecordSize = 0
fieldNumberSize = 1 // All field numbers are below 16, meaning they will only take up 1 byte
defaultMaximumRecordSize = 1024 * 1024 // 1 MB
defaultMaxAggRecordSize = 20 * 1024 // 20K
initialAggRecordSize = 0
fieldNumberSize = 1 // All field numbers are below 16, meaning they will only take up 1 byte
)

// Aggregator kinesis aggregator
type Aggregator struct {
partitionKeys map[string]uint64
records []*Record
aggSize int // Size of both records, and partitionKeys in bytes
maxAggRecordSize int
stringGen *util.RandomStringGenerator
partitionKeys map[string]uint64
records []*Record
aggSize int // Size of both records, and partitionKeys in bytes
maximumRecordSize int
maxAggRecordSize int
stringGen *util.RandomStringGenerator
}

// Config is for aggregation customizations.
type Config struct {
MaximumRecordSize *int
MaxAggRecordSize *int
}

// NewAggregator create a new aggregator
func NewAggregator(stringGen *util.RandomStringGenerator) *Aggregator {

return &Aggregator{
partitionKeys: make(map[string]uint64, 0),
records: make([]*Record, 0),
maxAggRecordSize: defaultMaxAggRecordSize,
aggSize: initialAggRecordSize,
stringGen: stringGen,
func NewAggregator(stringGen *util.RandomStringGenerator, cfg *Config) *Aggregator {
a := &Aggregator{
partitionKeys: make(map[string]uint64, 0),
records: make([]*Record, 0),
maximumRecordSize: defaultMaximumRecordSize,
maxAggRecordSize: defaultMaxAggRecordSize,
aggSize: initialAggRecordSize,
stringGen: stringGen,
}

if cfg.MaximumRecordSize != nil {
a.maximumRecordSize = *cfg.MaximumRecordSize
}
if cfg.MaxAggRecordSize != nil {
a.maxAggRecordSize = *cfg.MaxAggRecordSize
}

return a
}

// AddRecord to the aggregate buffer.
Expand Down Expand Up @@ -91,7 +109,7 @@ func (a *Aggregator) AddRecord(partitionKey string, hasPartitionKey bool, data [
pkeyFieldSize := protowire.SizeVarint(pKeyIdx) + fieldNumberSize
// Total size is byte size of data + pkey field + field number of parent proto

if a.getSize()+protowire.SizeBytes(dataFieldSize+pkeyFieldSize)+fieldNumberSize+pKeyAddedSize >= maximumRecordSize {
if a.getSize()+protowire.SizeBytes(dataFieldSize+pkeyFieldSize)+fieldNumberSize+pKeyAddedSize >= a.maximumRecordSize {
// Aggregate records, and return if error
entry, err = a.AggregateRecords()
if err != nil {
Expand Down Expand Up @@ -149,11 +167,18 @@ func (a *Aggregator) AggregateRecords() (entry *kinesis.PutRecordsRequestEntry,

logrus.Debugf("[kinesis ] Aggregated (%d) records of size (%d) with total size (%d), partition key (%s)\n", len(a.records), a.getSize(), len(kclData), pkeys[0])

compressedData, err := compress.Compress(kclData)
if err != nil {
logrus.Warnf("Failed to compress KCL data: %v", err)
// This should not result in dropping records/increasing retries.
// Compressor will return original data if it fails to compress them.
}

// Clear buffer if aggregation didn't fail
a.clearBuffer()

return &kinesis.PutRecordsRequestEntry{
Data: kclData,
Data: compressedData,
PartitionKey: aws.String(pkeys[0]),
}, nil
}
Expand Down
7 changes: 4 additions & 3 deletions aggregate/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package aggregate
import (
"testing"

"github.com/aws/amazon-kinesis-streams-for-fluent-bit/util"
"github.com/stretchr/testify/assert"

"github.com/canva/amazon-kinesis-streams-for-fluent-bit/util"
)

const concurrencyRetryLimit = 4

func TestAddRecordCalculatesCorrectSize(t *testing.T) {
generator := util.NewRandomStringGenerator(18)
aggregator := NewAggregator(generator)
aggregator := NewAggregator(generator, &Config{})

_, err := aggregator.AddRecord("", false, []byte("test value"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
Expand All @@ -24,7 +25,7 @@ func TestAddRecordCalculatesCorrectSize(t *testing.T) {

func TestAddRecordDoesNotAddNewRandomPartitionKey(t *testing.T) {
generator := util.NewRandomStringGenerator(18)
aggregator := NewAggregator(generator)
aggregator := NewAggregator(generator, &Config{})

_, err := aggregator.AddRecord("", false, []byte("test value"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
Expand Down
59 changes: 59 additions & 0 deletions compress/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Package compress is responsible for compression data.
package compress

import (
"github.com/canva/amazon-kinesis-streams-for-fluent-bit/compress/gzip"
"github.com/canva/amazon-kinesis-streams-for-fluent-bit/compress/noop"
"github.com/canva/amazon-kinesis-streams-for-fluent-bit/compress/zstd"
)

// Format is the format of compression that happens.
type Format string

// Format is the format of compression that happens.
const (
FormatNoop = Format("noop")
FormatGZip = Format("gzip")
FormatZSTD = Format("zstd")
)

// Config holds configurations need for creating a new Compress instance.
type Config struct {
Format Format
Level int
}

// Compression will compress any given array of bytes.
type Compression interface {
Compress([]byte) ([]byte, error)
}

// New creates an instance of Compression.
// Based on the Format value, it creates corresponding instance.
func New(conf *Config) (Compression, error) {
switch conf.Format {
default:
return noop.New(), nil
case FormatGZip:
return gzip.New(conf.Level)
case FormatZSTD:
return zstd.New(conf.Level)
}
}

// defaultCompression holds global compression instance.
var defaultCompression Compression

// Init will initialise global compression instance.
func Init(conf *Config) error {
var err error

defaultCompression, err = New(conf)

return err
}

// Compress will compress any given array of bytes.
func Compress(data []byte) ([]byte, error) {
return defaultCompression.Compress(data)
}
Loading