Skip to content

Commit 170390b

Browse files
authored
ensure custom EndpointResolver is still used when either EKS_POD_EXECUTION_ROLE environment variable is set OR when role_arn parameter is set. (#337)
* ensure custom EndpointResolver is still used when either EKS_POD_EXECUTION_ROLE environment variable is set OR when role_arn parameter is set. This ensures the endpoint parameter is still used in either of these cases. Signed-off-by: Alex Price <[email protected]> * add unit test Signed-off-by: Alex Price <[email protected]> --------- Signed-off-by: Alex Price <[email protected]>
1 parent 4772e6c commit 170390b

File tree

2 files changed

+64
-9
lines changed

2 files changed

+64
-9
lines changed

kinesis/kinesis.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint strin
212212
eksConfig.Credentials = creds
213213
eksConfig.Region = aws.String(awsRegion)
214214
eksConfig.HTTPClient = httpClient
215+
eksConfig.EndpointResolver = endpoints.ResolverFunc(customResolverFn)
215216
svcConfig = eksConfig
216217

217218
svcSess, err = session.NewSession(svcConfig)
@@ -226,6 +227,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint strin
226227
stsConfig.Credentials = creds
227228
stsConfig.Region = aws.String(awsRegion)
228229
stsConfig.HTTPClient = httpClient
230+
stsConfig.EndpointResolver = endpoints.ResolverFunc(customResolverFn)
229231
svcConfig = stsConfig
230232

231233
svcSess, err = session.NewSession(svcConfig)

kinesis/kinesis_test.go

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,15 +271,15 @@ func RandStringRunes(n int) string {
271271
}
272272

273273
func TestCompressionTruncation(t *testing.T) {
274-
deftlvl := logrus.GetLevel();
275-
logrus.SetLevel(0);
274+
deftlvl := logrus.GetLevel()
275+
logrus.SetLevel(0)
276276

277277
rand.Seed(0)
278278
testData := []byte(RandStringRunes(4000))
279279
testSuffix := "[truncate]"
280280
outputPlugin := OutputPlugin{
281281
PluginID: 10,
282-
stream: "MyStream",
282+
stream: "MyStream",
283283
}
284284
var compressedOutput, err = compressThenTruncate(gzipCompress, testData, 200, []byte(testSuffix), outputPlugin)
285285
assert.Nil(t, err)
@@ -290,15 +290,15 @@ func TestCompressionTruncation(t *testing.T) {
290290
}
291291

292292
func TestCompressionTruncationFailureA(t *testing.T) {
293-
deftlvl := logrus.GetLevel();
294-
logrus.SetLevel(0);
293+
deftlvl := logrus.GetLevel()
294+
logrus.SetLevel(0)
295295

296296
rand.Seed(0)
297297
testData := []byte(RandStringRunes(4000))
298298
testSuffix := "[truncate]"
299299
outputPlugin := OutputPlugin{
300300
PluginID: 10,
301-
stream: "MyStream",
301+
stream: "MyStream",
302302
}
303303
var _, err = compressThenTruncate(gzipCompress, testData, 20, []byte(testSuffix), outputPlugin)
304304
assert.Contains(t, err.Error(), "no room for suffix")
@@ -307,15 +307,15 @@ func TestCompressionTruncationFailureA(t *testing.T) {
307307
}
308308

309309
func TestCompressionTruncationFailureB(t *testing.T) {
310-
deftlvl := logrus.GetLevel();
311-
logrus.SetLevel(0);
310+
deftlvl := logrus.GetLevel()
311+
logrus.SetLevel(0)
312312

313313
rand.Seed(0)
314314
testData := []byte{}
315315
testSuffix := "[truncate]"
316316
outputPlugin := OutputPlugin{
317317
PluginID: 10,
318-
stream: "MyStream",
318+
stream: "MyStream",
319319
}
320320
var _, err = compressThenTruncate(gzipCompress, testData, 5, []byte(testSuffix), outputPlugin)
321321
assert.Contains(t, err.Error(), "compressed empty to large")
@@ -403,3 +403,56 @@ func TestGetPartitionKey(t *testing.T) {
403403
assert.Equal(t, false, hasValue, "Should not find value")
404404
assert.Len(t, value, 0, "This should be an empty string")
405405
}
406+
407+
// TestNewPutRecordsClient_CustomEndpointWithRoles tests that custom endpoint resolvers
408+
// are preserved when using role-based authentication
409+
func TestNewPutRecordsClient_CustomEndpointWithRoles(t *testing.T) {
410+
// Save and restore environment variable
411+
originalEKSRole := os.Getenv("EKS_POD_EXECUTION_ROLE")
412+
defer func() {
413+
if originalEKSRole == "" {
414+
os.Unsetenv("EKS_POD_EXECUTION_ROLE")
415+
} else {
416+
os.Setenv("EKS_POD_EXECUTION_ROLE", originalEKSRole)
417+
}
418+
}()
419+
420+
customEndpoint := "https://kinesis.custom-domain.local"
421+
422+
testCases := []struct {
423+
name string
424+
roleARN string
425+
eksRole string
426+
}{
427+
{"no_roles", "", ""},
428+
{"with_role_arn", "arn:aws:iam::123456789012:role/test-role", ""},
429+
{"with_eks_role", "", "arn:aws:iam::123456789012:role/eks-role"},
430+
{"with_both_roles", "arn:aws:iam::123456789012:role/test-role", "arn:aws:iam::123456789012:role/eks-role"},
431+
}
432+
433+
for _, tc := range testCases {
434+
t.Run(tc.name, func(t *testing.T) {
435+
// Set EKS role environment variable if specified
436+
if tc.eksRole != "" {
437+
os.Setenv("EKS_POD_EXECUTION_ROLE", tc.eksRole)
438+
} else {
439+
os.Unsetenv("EKS_POD_EXECUTION_ROLE")
440+
}
441+
442+
client, err := newPutRecordsClient(tc.roleARN, "us-west-2", customEndpoint, "", 1, time.Second*30)
443+
444+
if err != nil {
445+
// Expected in test environment without credentials
446+
t.Logf("Expected credential error: %v", err)
447+
return
448+
}
449+
450+
// Verify the custom endpoint is preserved
451+
if client != nil && client.Client != nil {
452+
actualEndpoint := client.Client.ClientInfo.Endpoint
453+
assert.Equal(t, customEndpoint, actualEndpoint,
454+
"Custom endpoint should be preserved when using role-based authentication")
455+
}
456+
})
457+
}
458+
}

0 commit comments

Comments
 (0)