Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,9 @@

`region` - The AWS region your Kinesis stream is located - i.e. eu-west-2

`batch_size` - Optional. The maximum size of the records in a batch is 5MiB. If your records are larger in size setting this batch size paramter can guarantee you don't have failed delivery due to too large a batch. Default size if unset is 100.
`batch_size` - The maximum size of the records in a batch is 5MiB. If your records are larger in size setting this batch size paramter can guarantee you don't have failed delivery due to too large a batch. Default size if unset is 100 (optional)

`kms_key_id` - The AWS KMS key ID used to encrypt the records in the Kinesis stream. If not provided, the records will not be encrypted (optional)

Check warning on line 1363 in README.md

View check run for this annotation

probelabs / Visor: security

documentation Issue

The documentation for the new `kms_key_id` feature does not mention the required AWS IAM permissions (`kinesis:DescribeStream` and `kinesis:StartStreamEncryption`). This omission can lead to deployment failures and may cause users to grant overly broad permissions to resolve the issue, violating the principle of least privilege.
Raw output
Update the `README.md` to explicitly state the required IAM permissions. This ensures users can configure their infrastructure securely and correctly from the start.

###### JSON / Conf File

Expand All @@ -1368,7 +1370,8 @@
"meta": {
"stream_name": "my-stream",
"region": "eu-west-2",
"batch_size": 100
"batch_size": 100,
"kms_key_id": "your-kms-key-id"
}
},
```
Expand All @@ -1381,6 +1384,7 @@
TYK_PMP_PUMPS_KINESIs_META_STREAMNAME=my-stream
TYK_PMP_PUMPS_KINESIS_META_REGION=eu-west-2
TYK_PMP_PUMPS_KINESIS_META_BATCHSIZE=100
TYK_PMP_PUMPS_KINESIS_META_KMSKEYID=your-kms-key-id
```

# Base Pump Configurations
Expand Down
52 changes: 52 additions & 0 deletions pumps/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"math/big"
"time"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -41,6 +43,9 @@
// Each record in the request can be as large as 1 MiB, up to a limit of 5 MiB for the entire request, including partition keys.
// Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MiB per second.
BatchSize int `mapstructure:"batch_size"`
// The KMS Key ID used for server-side encryption of the Kinesis stream.
// Defaults to an empty string if not provided.
KMSKeyID string `mapstructure:"kms_key_id" default:""`
}

var (
Expand Down Expand Up @@ -85,6 +90,53 @@

// Create Kinesis client
p.client = kinesis.NewFromConfig(cfg)

// Check if KMSKeyID is provided and enable server-side encryption
if p.kinesisConf.KMSKeyID != "" {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// First, check if encryption is already enabled
describeOutput, err := p.client.DescribeStream(ctx, &kinesis.DescribeStreamInput{
StreamName: aws.String(p.kinesisConf.StreamName),
})

switch {
case err != nil:
p.log.Fatalf("Failed to describe Kinesis stream: %v", err)
case describeOutput.StreamDescription.EncryptionType == types.EncryptionTypeKms:
currentKeyID := aws.ToString(describeOutput.StreamDescription.KeyId)
if currentKeyID == p.kinesisConf.KMSKeyID {
p.log.Info("Server-side encryption is already enabled with the specified KMS Key ID")
} else {
p.log.Fatal("Server-side encryption is already enabled with a different KMS Key ID")
}
default:
// Encryption not enabled, proceed to enable it
_, err := p.client.StartStreamEncryption(ctx, &kinesis.StartStreamEncryptionInput{
StreamName: aws.String(p.kinesisConf.StreamName),
EncryptionType: types.EncryptionTypeKms,
KeyId: aws.String(p.kinesisConf.KMSKeyID),
})

if err != nil {

Check warning on line 122 in pumps/kinesis.go

View check run for this annotation

probelabs / Visor: quality

logic Issue

Potentially misleading log message on `ResourceInUseException`. When a `ResourceInUseException` is caught, the code logs `"Server-side encryption is already enabled for the Kinesis stream."`. According to AWS documentation, this exception indicates the stream is currently being updated (i.e., in an `UPDATING` state) and cannot be modified. It does not guarantee that encryption is already enabled. This log message could be misleading during troubleshooting.
Raw output
Change the log message to more accurately reflect the situation, for example: `Stream is currently being updated; could not enable server-side encryption at this time. Will retry on next startup.`
var resourceInUseErr *types.ResourceInUseException
if errors.As(err, &resourceInUseErr) {
p.log.Info("Server-side encryption is already enabled for the Kinesis stream.")
} else {
p.log.Fatalf("Failed to enable server-side encryption for Kinesis stream: %v", err)
}
} else {

Check warning on line 129 in pumps/kinesis.go

View check run for this annotation

probelabs / Visor: performance

performance Issue

The new logic to enable server-side encryption introduces blocking network calls (`DescribeStream` and `StartStreamEncryption`) within the `Init` function. This will increase the application's startup time whenever a `KMSKeyID` is configured, as the pump must wait for responses from the AWS API before it can start processing data. In environments with slow network connectivity to AWS, this could lead to noticeable delays in service availability during deployments or restarts.
Raw output
While the current fail-safe approach is robust, consider adding a log entry at the beginning of the check to indicate that the pump is verifying Kinesis encryption. This would make it clear to operators why startup might be delayed. For example: `p.log.Info("Verifying Kinesis stream encryption...")`. No major architectural change is recommended as the current approach is safe, but awareness of the startup delay is important.
kmsKeyID := p.kinesisConf.KMSKeyID
loggableKeyID := "***" // Default to a fully masked key
if len(kmsKeyID) >= 8 {
loggableKeyID = fmt.Sprintf("%s***%s", kmsKeyID[:4], kmsKeyID[len(kmsKeyID)-4:])
}
p.log.Info("Server-side encryption enabled for Kinesis stream with KMS Key ID: ", loggableKeyID)
}
}
}

p.log.Info(p.GetName() + " Initialized")

return nil
Expand Down
196 changes: 196 additions & 0 deletions pumps/kinesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package pumps

Check failure on line 1 in pumps/kinesis_test.go

View check run for this annotation

probelabs / Visor: quality

logic Issue

Critical functionality is completely untested. The core logic for checking and enabling server-side encryption, which involves multiple AWS API calls (`DescribeStream`, `StartStreamEncryption`) and complex conditional branching, has no unit test coverage. The new test file only validates configuration parsing and does not exercise any of the new AWS interaction logic within the `Init` function.
Raw output
To address this, the `KinesisPump` should be refactored to allow for dependency injection of the Kinesis client. This would enable the use of mocks to write unit tests that cover the following scenarios:
1. Stream is already encrypted with the correct KMS key.
2. Stream is encrypted with a different KMS key (expecting a fatal error).
3. Stream is not encrypted, and the call to `StartStreamEncryption` is successful.
4. The call to `DescribeStream` fails.
5. The call to `StartStreamEncryption` fails with a `ResourceInUseException`.
6. The call to `StartStreamEncryption` fails with a generic error.

import (
"testing"

"github.com/mitchellh/mapstructure"
"github.com/stretchr/testify/assert"
)

func TestKinesisPump_New(t *testing.T) {
pump := &KinesisPump{}
newPump := pump.New()
assert.IsType(t, &KinesisPump{}, newPump)
}

func TestKinesisPump_GetName(t *testing.T) {
pump := &KinesisPump{}
assert.Equal(t, "Kinesis Pump", pump.GetName())
}

func TestKinesisConf_KMSKeyID_Configuration(t *testing.T) {
tests := []struct {
name string
kmsKeyID string
expected string
}{
{
name: "KMSKeyID provided",
kmsKeyID: "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012",
expected: "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012",
},
{
name: "KMSKeyID empty",
kmsKeyID: "",
expected: "",
},
{
name: "KMSKeyID alias format",
kmsKeyID: "alias/my-key",
expected: "alias/my-key",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := &KinesisConf{
KMSKeyID: tt.kmsKeyID,
}
assert.Equal(t, tt.expected, conf.KMSKeyID)
})
}
}

func TestKinesisPump_KMSKeyID_DefaultValue(t *testing.T) {
conf := &KinesisConf{}
assert.Equal(t, "", conf.KMSKeyID)
}

func TestSplitIntoBatches(t *testing.T) {
records := make([]interface{}, 25)
batches := splitIntoBatches(records, 10)
assert.Len(t, batches, 3)
assert.Len(t, batches[0], 10)
assert.Len(t, batches[1], 10)
assert.Len(t, batches[2], 5)
}

func TestKinesisPump_KMSKeyID_LogMasking(t *testing.T) {
kmsKeyID := "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012"
if len(kmsKeyID) >= 8 {
masked := kmsKeyID[:4] + "***" + kmsKeyID[len(kmsKeyID)-4:]
assert.Equal(t, "arn:***9012", masked)
}
}

// Tests for the new describe stream encryption logic
func TestKinesisPump_EncryptionConfig_SameKey(t *testing.T) {
kmsKeyID := "arn:aws:kms:us-east-1:123456789012:key/test-key-id"

config := map[string]interface{}{
"stream_name": "test-stream",
"region": "us-east-1",
"kms_key_id": kmsKeyID,
}

kinesisConf := &KinesisConf{}
err := mapstructure.Decode(config, kinesisConf)
assert.NoError(t, err)
assert.Equal(t, kmsKeyID, kinesisConf.KMSKeyID)
assert.Equal(t, "test-stream", kinesisConf.StreamName)
assert.Equal(t, "us-east-1", kinesisConf.Region)
}

func TestKinesisPump_EncryptionConfig_DifferentKey(t *testing.T) {
currentKeyID := "arn:aws:kms:us-east-1:123456789012:key/current-key"
newKeyID := "arn:aws:kms:us-east-1:123456789012:key/new-key"

config := map[string]interface{}{
"stream_name": "test-stream",
"region": "us-east-1",
"kms_key_id": newKeyID,
}

kinesisConf := &KinesisConf{}
err := mapstructure.Decode(config, kinesisConf)
assert.NoError(t, err)
assert.Equal(t, newKeyID, kinesisConf.KMSKeyID)
assert.Equal(t, "test-stream", kinesisConf.StreamName)
assert.Equal(t, "us-east-1", kinesisConf.Region)

// Verify the keys are different (simulating the scenario)
assert.NotEqual(t, currentKeyID, newKeyID)
}

func TestKinesisPump_EncryptionConfig_NotEncrypted(t *testing.T) {
kmsKeyID := "arn:aws:kms:us-east-1:123456789012:key/test-key-id"

config := map[string]interface{}{
"stream_name": "test-stream",
"region": "us-east-1",
"kms_key_id": kmsKeyID,
}

kinesisConf := &KinesisConf{}
err := mapstructure.Decode(config, kinesisConf)
assert.NoError(t, err)
assert.Equal(t, kmsKeyID, kinesisConf.KMSKeyID)
assert.Equal(t, "test-stream", kinesisConf.StreamName)
assert.Equal(t, "us-east-1", kinesisConf.Region)
}

func TestKinesisPump_EncryptionConfig_NoKMSKeyID(t *testing.T) {
config := map[string]interface{}{
"stream_name": "test-stream",
"region": "us-east-1",
// No kms_key_id provided - should skip encryption
}

kinesisConf := &KinesisConf{}
err := mapstructure.Decode(config, kinesisConf)
assert.NoError(t, err)
assert.Equal(t, "", kinesisConf.KMSKeyID)
assert.Equal(t, "test-stream", kinesisConf.StreamName)
assert.Equal(t, "us-east-1", kinesisConf.Region)
}

func TestKinesisPump_BatchSize_Configuration(t *testing.T) {
//nolint:govet
tests := []struct {
name string
batchSize interface{}
expectedValue int
}{
{
name: "Default batch size (not provided)",
batchSize: nil,
expectedValue: 0, // Will be set to 100 in Init()
},
{
name: "Custom batch size",
batchSize: 250,
expectedValue: 250,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := map[string]interface{}{
"stream_name": "test-stream",
"region": "us-east-1",
}

if tt.batchSize != nil {
config["batch_size"] = tt.batchSize
}

kinesisConf := &KinesisConf{}
err := mapstructure.Decode(config, kinesisConf)
assert.NoError(t, err)
assert.Equal(t, tt.expectedValue, kinesisConf.BatchSize)
})
}
}

func TestKinesisPump_StreamName_Required(t *testing.T) {
config := map[string]interface{}{
"region": "us-east-1",
// Missing stream_name
}

kinesisConf := &KinesisConf{}
err := mapstructure.Decode(config, kinesisConf)
assert.NoError(t, err)
assert.Equal(t, "", kinesisConf.StreamName) // Should be empty when not provided
assert.Equal(t, "us-east-1", kinesisConf.Region)
}
Loading