diff --git a/README.md b/README.md index 7504578cd..3008b4979 100644 --- a/README.md +++ b/README.md @@ -895,6 +895,7 @@ More advanced fields: - `meta_data`: Can be used to set custom metadata inside the kafka message - `ssl_cert_file`: Can be used to set custom certificate file for authentication with kafka. - `ssl_key_file`: Can be used to set custom key file for authentication with kafka. +- `batch_bytes`: The maximum size of the batch to be sent to the kafka cluster. ###### JSON / Conf File @@ -916,7 +917,8 @@ More advanced fields: "compressed": true, "meta_data": { "key": "value" - } + }, + "batch_bytes": 1048576 } } } @@ -934,6 +936,7 @@ TYK_PMP_PUMPS_KAFKA_META_CLIENTID=tyk-pump TYK_PMP_PUMPS_KAFKA_META_TIMEOUT=60 TYK_PMP_PUMPS_KAFKA_META_COMPRESSED=true TYK_PMP_PUMPS_KAFKA_META_METADATA_KEY=value +TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES=1048576 ``` ## Influx2 Config diff --git a/pumps/kafka.go b/pumps/kafka.go index 551900595..994fe9e97 100644 --- a/pumps/kafka.go +++ b/pumps/kafka.go @@ -68,6 +68,9 @@ type KafkaConf struct { // SASL algorithm. It's the algorithm specified for scram mechanism. It could be sha-512 or sha-256. // Defaults to "sha-256". Algorithm string `json:"sasl_algorithm" mapstructure:"sasl_algorithm"` + // BatchBytes controls the maximum size of a request in bytes before it's sent to a partition. + // If the value is 0, the writer will use the default value from kafka-go library (1MB). + BatchBytes int `json:"batch_bytes" mapstructure:"batch_bytes"` } func (k *KafkaPump) New() Pump { @@ -162,7 +165,7 @@ func (k *KafkaPump) Init(config interface{}) error { timeout = time.Duration(v) * time.Second // i.e: when timeout is 1 } - //Kafka writer connection config + // Kafka writer connection config dialer := &kafka.Dialer{ Timeout: timeout, ClientID: k.kafkaConf.ClientId, @@ -170,6 +173,7 @@ func (k *KafkaPump) Init(config interface{}) error { SASLMechanism: mechanism, } + // Kafka writer config k.writerConfig.Brokers = k.kafkaConf.Broker k.writerConfig.Topic = k.kafkaConf.Topic k.writerConfig.Balancer = &kafka.LeastBytes{} @@ -179,6 +183,11 @@ func (k *KafkaPump) Init(config interface{}) error { if k.kafkaConf.Compressed { k.writerConfig.CompressionCodec = snappy.NewCompressionCodec() } + if k.kafkaConf.BatchBytes < 0 { + k.log.Errorf("The config batch_bytes cannot be negative, but was set to %d", k.kafkaConf.BatchBytes) + } else { + k.writerConfig.BatchBytes = k.kafkaConf.BatchBytes + } k.log.Info(k.GetName() + " Initialized") diff --git a/pumps/kafka_test.go b/pumps/kafka_test.go new file mode 100644 index 000000000..e6938afc3 --- /dev/null +++ b/pumps/kafka_test.go @@ -0,0 +1,227 @@ +package pumps + +import ( + "os" + "testing" + + "github.com/mitchellh/mapstructure" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" +) + +func TestKafkaPump_New(t *testing.T) { + pump := (&KafkaPump{}).New() + assert.IsType(t, &KafkaPump{}, pump) +} + +func TestKafkaPump_GetName(t *testing.T) { + pump := &KafkaPump{} + assert.Equal(t, "Kafka Pump", pump.GetName()) +} + +func TestKafkaPump_Init_BatchBytesConfiguration(t *testing.T) { + //nolint:govet + tests := []struct { + name string + description string + config map[string]interface{} + expectedBytes int + }{ + { + name: "Custom BatchBytes Value", + config: map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + "batch_bytes": 2048576, // 2MB + }, + expectedBytes: 2048576, + description: "Should set custom BatchBytes value", + }, + { + name: "Zero BatchBytes Value", + config: map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + "batch_bytes": 0, + }, + expectedBytes: 0, + description: "Should allow zero BatchBytes (uses kafka-go default)", + }, + { + name: "No BatchBytes Configuration", + config: map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + }, + expectedBytes: 0, + description: "Should default to zero when BatchBytes not specified", + }, + { + name: "Large BatchBytes Value", + config: map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + "batch_bytes": 10485760, // 10MB + }, + expectedBytes: 10485760, + description: "Should handle large BatchBytes values", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pump := &KafkaPump{} + err := pump.Init(tt.config) + + assert.NoError(t, err, tt.description) + assert.Equal(t, tt.expectedBytes, pump.writerConfig.BatchBytes, tt.description) + }) + } +} + +func TestKafkaPump_Init_BatchBytesWithOtherConfigs(t *testing.T) { + config := map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + "batch_bytes": 512000, // 500KB + "client_id": "test-client", + "timeout": "30s", + "compressed": true, + "use_ssl": false, + "ssl_insecure_skip_verify": false, + "meta_data": map[string]string{ + "environment": "test", + }, + } + + pump := &KafkaPump{} + err := pump.Init(config) + + assert.NoError(t, err) + assert.Equal(t, 512000, pump.writerConfig.BatchBytes) + assert.Equal(t, []string{"localhost:9092"}, pump.writerConfig.Brokers) + assert.Equal(t, "test-topic", pump.writerConfig.Topic) + assert.NotNil(t, pump.writerConfig.CompressionCodec) + assert.IsType(t, &kafka.LeastBytes{}, pump.writerConfig.Balancer) +} + +func TestKafkaPump_BatchBytesEnvironmentVariable(t *testing.T) { + // Test that BatchBytes can be overridden via environment variables + // This follows the same pattern as other configuration fields + config := map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + "batch_bytes": 1024000, // 1MB + } + + pump := &KafkaPump{} + err := pump.Init(config) + + assert.NoError(t, err) + assert.Equal(t, 1024000, pump.writerConfig.BatchBytes) +} + +func TestKafkaPump_WriterConfigIntegrity(t *testing.T) { + // Test that BatchBytes configuration doesn't interfere with other writer config fields + config := map[string]interface{}{ + "broker": []string{"localhost:9092", "localhost:9093"}, + "topic": "analytics-topic", + "batch_bytes": 2097152, // 2MB + "client_id": "tyk-pump-test", + "timeout": 10.0, + "compressed": true, + } + + pump := &KafkaPump{} + err := pump.Init(config) + + assert.NoError(t, err) + + // Verify BatchBytes is set correctly + assert.Equal(t, 2097152, pump.writerConfig.BatchBytes) + + // Verify other configurations are not affected + assert.Equal(t, []string{"localhost:9092", "localhost:9093"}, pump.writerConfig.Brokers) + assert.Equal(t, "analytics-topic", pump.writerConfig.Topic) + assert.NotNil(t, pump.writerConfig.CompressionCodec) + assert.NotNil(t, pump.writerConfig.Dialer) + assert.IsType(t, &kafka.LeastBytes{}, pump.writerConfig.Balancer) +} + +func TestKafkaPump_BatchBytesEnvironmentVariableOverride(t *testing.T) { + // Test that BatchBytes can be overridden via environment variables + // This follows the same pattern as other configuration fields + config := map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + "batch_bytes": 1024000, // 1MB + } + + os.Setenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES", "2048000") // 2MB + defer os.Unsetenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES") + + pump := &KafkaPump{} + err := pump.Init(config) + + assert.NoError(t, err) + assert.Equal(t, 2048000, pump.writerConfig.BatchBytes) +} + +func TestKafkaPump_BatchBytesEnvironmentVariableInvalid(t *testing.T) { + // Test that BatchBytes environment variable is ignored if it's not a valid integer + // This follows the same pattern as other configuration fields + config := map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + "batch_bytes": 1024000, // 1MB + } + + os.Setenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES", "not-an-integer") + defer os.Unsetenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES") + + pump := &KafkaPump{} + err := pump.Init(config) + + assert.NoError(t, err) + assert.Equal(t, 1024000, pump.writerConfig.BatchBytes) +} + +func TestKafkaPump_BatchBytesConfigAndEnvironmentVariableBothInvalid(t *testing.T) { + // Test that mapstructure.Decode fails when batch_bytes is a non-integer string + config := map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + "batch_bytes": "invalid-config-value", // Non-integer config value + } + + os.Setenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES", "invalid-env-value") + defer os.Unsetenv("TYK_PMP_PUMPS_KAFKA_META_BATCHBYTES") + + // Test that mapstructure.Decode fails with invalid batch_bytes config + // We test this directly without calling Init() to avoid log.Fatal() + kafkaConf := &KafkaConf{} + err := mapstructure.Decode(config, kafkaConf) + + // We expect mapstructure.Decode to fail when trying to convert string to int + assert.Error(t, err, "Expected mapstructure.Decode to fail with invalid batch_bytes config") + assert.Contains(t, err.Error(), "batch_bytes", "Error should mention batch_bytes field") + assert.Contains(t, err.Error(), "expected type 'int'", "Error should mention type conversion issue") +} + +func TestKafkaPump_Init_NegativeBatchBytes(t *testing.T) { + // Test that negative batch_bytes values are handled properly + // The pump should log an error and use the default value (0) instead of the negative value + config := map[string]interface{}{ + "broker": []string{"localhost:9092"}, + "topic": "test-topic", + "batch_bytes": -1024, // Negative value + } + + pump := &KafkaPump{} + err := pump.Init(config) + + // Init should succeed despite negative batch_bytes + assert.NoError(t, err, "Init should succeed with negative batch_bytes") + // The negative value should NOT be set, instead it should use default (0) + assert.Equal(t, 0, pump.writerConfig.BatchBytes, "Should use default value (0) when batch_bytes is negative") +}