Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ More advanced fields:
- `meta_data`: Can be used to set custom metadata inside the kafka message
- `ssl_cert_file`: Can be used to set custom certificate file for authentication with kafka.
- `ssl_key_file`: Can be used to set custom key file for authentication with kafka.
- `batch_bytes`: The maximum size of the batch to be sent to the kafka cluster.

###### JSON / Conf File

Expand All @@ -916,7 +917,8 @@ More advanced fields:
"compressed": true,
"meta_data": {
"key": "value"
}
},
"batch_bytes": 1048576
}
}
}
Expand All @@ -934,6 +936,7 @@ TYK_PMP_PUMPS_KAFKA_META_CLIENTID=tyk-pump
TYK_PMP_PUMPS_KAFKA_META_TIMEOUT=60
TYK_PMP_PUMPS_KAFKA_META_COMPRESSED=true
TYK_PMP_PUMPS_KAFKA_META_METADATA_KEY=value
TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES=1048576
```

## Influx2 Config
Expand Down
11 changes: 10 additions & 1 deletion pumps/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
// SASL algorithm. It's the algorithm specified for scram mechanism. It could be sha-512 or sha-256.
// Defaults to "sha-256".
Algorithm string `json:"sasl_algorithm" mapstructure:"sasl_algorithm"`
// BatchBytes controls the maximum size of a request in bytes before it's sent to a partition.
// If the value is 0, the writer will use the default value from kafka-go library (1MB).
BatchBytes int `json:"batch_bytes" mapstructure:"batch_bytes"`
}

func (k *KafkaPump) New() Pump {
Expand Down Expand Up @@ -162,14 +165,15 @@
timeout = time.Duration(v) * time.Second // i.e: when timeout is 1
}

//Kafka writer connection config
// Kafka writer connection config
dialer := &kafka.Dialer{
Timeout: timeout,
ClientID: k.kafkaConf.ClientId,
TLS: tlsConfig,
SASLMechanism: mechanism,
}

// Kafka writer config
k.writerConfig.Brokers = k.kafkaConf.Broker
k.writerConfig.Topic = k.kafkaConf.Topic
k.writerConfig.Balancer = &kafka.LeastBytes{}
Expand All @@ -178,7 +182,12 @@
k.writerConfig.ReadTimeout = timeout
if k.kafkaConf.Compressed {
k.writerConfig.CompressionCodec = snappy.NewCompressionCodec()
}
if k.kafkaConf.BatchBytes < 0 {
k.log.Errorf("The config batch_bytes cannot be negative, but was set to %d", k.kafkaConf.BatchBytes)

Check warning on line 187 in pumps/kafka.go

View check run for this annotation

probelabs / Visor: security

logic Issue

The application handles an invalid negative `batch_bytes` configuration by logging an error and then proceeding with a default value (0). This approach can mask configuration issues, as the error may be missed in logs, leading the service to run with unintended performance characteristics. A better practice for invalid configuration is to fail fast.
Raw output
Instead of logging and continuing, return an error from the `Init` function when `k.kafkaConf.BatchBytes` is negative. This ensures that the application will not start in a misconfigured state, making the problem immediately apparent to the operator.
} else {
k.writerConfig.BatchBytes = k.kafkaConf.BatchBytes

Check warning on line 189 in pumps/kafka.go

View check run for this annotation

probelabs / Visor: security

security Issue

The `batch_bytes` configuration parameter lacks an upper limit. A malicious or misconfigured user could provide an extremely large value, causing the application to allocate excessive memory for the message batch. This could lead to an Out-of-Memory (OOM) error, resulting in a Denial of Service (DoS).
Raw output
Implement a validation check to enforce a reasonable maximum value for `batch_bytes`. If the configured value exceeds this limit, the pump should either cap the value at the maximum and log a warning, or return an error during initialization to prevent starting with a potentially dangerous configuration.

Check warning on line 189 in pumps/kafka.go

View check run for this annotation

probelabs / Visor: performance

performance Issue

The `batch_bytes` configuration parameter lacks an upper limit. A malicious or misconfigured user could provide an extremely large value, causing the application to allocate excessive memory for the message batch. This could lead to an Out-of-Memory (OOM) error, resulting in a Denial of Service (DoS).
Raw output
Implement a validation check to enforce a reasonable maximum value for `batch_bytes`. If the configured value exceeds this limit, the pump should either cap the value at the maximum and log a warning, or return an error during initialization to prevent starting with a potentially dangerous configuration.

Check warning on line 189 in pumps/kafka.go

View check run for this annotation

probelabs / Visor: quality

logic Issue

The application handles an invalid negative `batch_bytes` configuration by logging an error and then proceeding with a default value (0). This approach can mask configuration issues, as the error may be missed in logs, leading the service to run with unintended performance characteristics. A better practice for invalid configuration is to fail fast.
Raw output
Instead of logging and continuing, return an error from the `Init` function when `k.kafkaConf.BatchBytes` is negative. This ensures that the application will not start in a misconfigured state, making the problem immediately apparent to the operator.

Check warning on line 189 in pumps/kafka.go

View check run for this annotation

probelabs / Visor: quality

security Issue

The `batch_bytes` configuration parameter lacks an upper limit. A malicious or misconfigured user could provide an extremely large value, causing the application to allocate excessive memory for the message batch. This could lead to an Out-of-Memory (OOM) error, resulting in a Denial of Service (DoS).
Raw output
Implement a validation check to enforce a reasonable maximum value for `batch_bytes`. If the configured value exceeds this limit, the pump should either cap the value at the maximum and log a warning, or return an error during initialization to prevent starting with a potentially dangerous configuration.

Check warning on line 189 in pumps/kafka.go

View check run for this annotation

probelabs / Visor: style

style Issue

The application handles an invalid negative `batch_bytes` configuration by logging an error and then proceeding with a default value (0). This approach can mask configuration issues, as the error may be missed in logs, leading the service to run with unintended performance characteristics. A better practice for invalid configuration is to fail fast.
Raw output
Instead of logging and continuing, return an error from the `Init` function when `k.kafkaConf.BatchBytes` is negative. This ensures that the application will not start in a misconfigured state, making the problem immediately apparent to the operator.
}

k.log.Info(k.GetName() + " Initialized")

Expand Down
227 changes: 227 additions & 0 deletions pumps/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package pumps

import (
"os"
"testing"

"github.com/mitchellh/mapstructure"
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

func TestKafkaPump_New(t *testing.T) {
pump := (&KafkaPump{}).New()
assert.IsType(t, &KafkaPump{}, pump)
}

func TestKafkaPump_GetName(t *testing.T) {
pump := &KafkaPump{}
assert.Equal(t, "Kafka Pump", pump.GetName())
}

func TestKafkaPump_Init_BatchBytesConfiguration(t *testing.T) {
//nolint:govet
tests := []struct {
name string
description string
config map[string]interface{}
expectedBytes int
}{
{
name: "Custom BatchBytes Value",
config: map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
"batch_bytes": 2048576, // 2MB
},
expectedBytes: 2048576,
description: "Should set custom BatchBytes value",
},
{
name: "Zero BatchBytes Value",
config: map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
"batch_bytes": 0,
},
expectedBytes: 0,
description: "Should allow zero BatchBytes (uses kafka-go default)",
},
{
name: "No BatchBytes Configuration",
config: map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
},
expectedBytes: 0,
description: "Should default to zero when BatchBytes not specified",
},
{
name: "Large BatchBytes Value",
config: map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
"batch_bytes": 10485760, // 10MB
},
expectedBytes: 10485760,
description: "Should handle large BatchBytes values",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pump := &KafkaPump{}
err := pump.Init(tt.config)

assert.NoError(t, err, tt.description)
assert.Equal(t, tt.expectedBytes, pump.writerConfig.BatchBytes, tt.description)
})
}
}

func TestKafkaPump_Init_BatchBytesWithOtherConfigs(t *testing.T) {
config := map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
"batch_bytes": 512000, // 500KB
"client_id": "test-client",
"timeout": "30s",
"compressed": true,
"use_ssl": false,
"ssl_insecure_skip_verify": false,
"meta_data": map[string]string{
"environment": "test",
},
}

pump := &KafkaPump{}
err := pump.Init(config)

assert.NoError(t, err)
assert.Equal(t, 512000, pump.writerConfig.BatchBytes)
assert.Equal(t, []string{"localhost:9092"}, pump.writerConfig.Brokers)
assert.Equal(t, "test-topic", pump.writerConfig.Topic)
assert.NotNil(t, pump.writerConfig.CompressionCodec)
assert.IsType(t, &kafka.LeastBytes{}, pump.writerConfig.Balancer)
}

func TestKafkaPump_BatchBytesEnvironmentVariable(t *testing.T) {
// Test that BatchBytes can be overridden via environment variables
// This follows the same pattern as other configuration fields
config := map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
"batch_bytes": 1024000, // 1MB
}

pump := &KafkaPump{}
err := pump.Init(config)

assert.NoError(t, err)
assert.Equal(t, 1024000, pump.writerConfig.BatchBytes)
}

func TestKafkaPump_WriterConfigIntegrity(t *testing.T) {
// Test that BatchBytes configuration doesn't interfere with other writer config fields
config := map[string]interface{}{
"broker": []string{"localhost:9092", "localhost:9093"},
"topic": "analytics-topic",
"batch_bytes": 2097152, // 2MB
"client_id": "tyk-pump-test",
"timeout": 10.0,
"compressed": true,
}

pump := &KafkaPump{}
err := pump.Init(config)

assert.NoError(t, err)

// Verify BatchBytes is set correctly
assert.Equal(t, 2097152, pump.writerConfig.BatchBytes)

// Verify other configurations are not affected
assert.Equal(t, []string{"localhost:9092", "localhost:9093"}, pump.writerConfig.Brokers)
assert.Equal(t, "analytics-topic", pump.writerConfig.Topic)
assert.NotNil(t, pump.writerConfig.CompressionCodec)
assert.NotNil(t, pump.writerConfig.Dialer)
assert.IsType(t, &kafka.LeastBytes{}, pump.writerConfig.Balancer)
}

func TestKafkaPump_BatchBytesEnvironmentVariableOverride(t *testing.T) {
// Test that BatchBytes can be overridden via environment variables
// This follows the same pattern as other configuration fields
config := map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
"batch_bytes": 1024000, // 1MB
}

os.Setenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES", "2048000") // 2MB
defer os.Unsetenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES")

pump := &KafkaPump{}
err := pump.Init(config)

assert.NoError(t, err)
assert.Equal(t, 2048000, pump.writerConfig.BatchBytes)
}

func TestKafkaPump_BatchBytesEnvironmentVariableInvalid(t *testing.T) {
// Test that BatchBytes environment variable is ignored if it's not a valid integer
// This follows the same pattern as other configuration fields
config := map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
"batch_bytes": 1024000, // 1MB
}

os.Setenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES", "not-an-integer")
defer os.Unsetenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES")

pump := &KafkaPump{}
err := pump.Init(config)

assert.NoError(t, err)
assert.Equal(t, 1024000, pump.writerConfig.BatchBytes)
}

func TestKafkaPump_BatchBytesConfigAndEnvironmentVariableBothInvalid(t *testing.T) {
// Test that mapstructure.Decode fails when batch_bytes is a non-integer string
config := map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
"batch_bytes": "invalid-config-value", // Non-integer config value
}

os.Setenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES", "invalid-env-value")
defer os.Unsetenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES")

// Test that mapstructure.Decode fails with invalid batch_bytes config
// We test this directly without calling Init() to avoid log.Fatal()
kafkaConf := &KafkaConf{}
err := mapstructure.Decode(config, kafkaConf)

// We expect mapstructure.Decode to fail when trying to convert string to int
assert.Error(t, err, "Expected mapstructure.Decode to fail with invalid batch_bytes config")
assert.Contains(t, err.Error(), "batch_bytes", "Error should mention batch_bytes field")
assert.Contains(t, err.Error(), "expected type 'int'", "Error should mention type conversion issue")
}

func TestKafkaPump_Init_NegativeBatchBytes(t *testing.T) {
// Test that negative batch_bytes values are handled properly
// The pump should log an error and use the default value (0) instead of the negative value
config := map[string]interface{}{
"broker": []string{"localhost:9092"},
"topic": "test-topic",
"batch_bytes": -1024, // Negative value
}

pump := &KafkaPump{}
err := pump.Init(config)

// Init should succeed despite negative batch_bytes
assert.NoError(t, err, "Init should succeed with negative batch_bytes")
// The negative value should NOT be set, instead it should use default (0)
assert.Equal(t, 0, pump.writerConfig.BatchBytes, "Should use default value (0) when batch_bytes is negative")
}
Loading