Skip to content

Commit 2895ec9

Browse files
[azeventhubs,azservicebus] Fixing bug where connection string tokens weren't renewing (Azure#20469)
When I made the fix for the SAS connection string path I removed the expiryTime, which caused the background renewer to assume the tokens were non-renewable. I've re-added in the old expiry-time behavior if the expiration time is not zero, and added some tests to sure it works properly.
1 parent a36c563 commit 2895ec9

File tree

9 files changed

+78
-10
lines changed

9 files changed

+78
-10
lines changed

sdk/messaging/azeventhubs/internal/eh/stress/templates/deploy-job.yaml renamed to sdk/messaging/azeventhubs/internal/eh/stress/templates/stress-test-job.yaml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ metadata:
66
# This'll make it so the resources aren't deleted on test exit.
77
# Skip.RemoveTestResources: "true"
88
chaos: "{{ default false .Stress.chaos }}"
9+
testInstance: "{{.Stress.Scenario}}-{{ .Release.Name }}-{{ .Release.Revision }}"
910
spec:
1011
containers:
1112
- name: main
@@ -56,7 +57,8 @@ spec:
5657
namespaces:
5758
- "{{ .Release.Namespace }}"
5859
labelSelectors:
59-
scenario: {{ .Stress.Scenario }}
60+
testInstance: "{{.Stress.Scenario}}-{{ .Release.Name }}-{{ .Release.Revision }}"
61+
chaos: "true"
6062
mode: all
6163
action: loss
6264
duration: 10s
@@ -69,7 +71,8 @@ spec:
6971
namespaces:
7072
- {{ .Release.Namespace }}
7173
labelSelectors:
72-
scenario: {{ .Stress.Scenario }}
74+
testInstance: "{{.Stress.Scenario}}-{{ .Release.Name }}-{{ .Release.Revision }}"
75+
chaos: "true"
7376
mode: all
7477
externalTargets:
7578
- {{ .Stress.BaseName }}.servicebus.windows.net

sdk/messaging/azeventhubs/internal/sas/sas_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ func TestNewSigner(t *testing.T) {
3232
keyName, key := "foo", "superSecret"
3333
signer := NewSigner(keyName, key)
3434
before := time.Now().UTC().Add(-2 * time.Second)
35-
sigStr, expiry, err := signer.SignWithDuration("http://microsoft.com", 1*time.Hour)
35+
36+
// the URL is lowercased and escaped when used as the audience in our signature.
37+
sigStr, expiry, err := signer.SignWithDuration("http://MiCrosoft.com", 1*time.Hour)
38+
3639
require.NoError(t, err)
3740
nixExpiry, err := strconv.ParseInt(expiry, 10, 64)
3841
require.NoError(t, err)

sdk/messaging/azeventhubs/internal/sbauth/token_provider.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,17 @@ func (tpa *TokenProvider) getSASToken(uri string) (*auth.Token, time.Time, error
117117
return nil, time.Time{}, err
118118
}
119119

120+
// we can ignore the error here since we did the string-izing of the time
121+
// in the first place.
122+
var expiryTime time.Time
123+
124+
if authToken.Expiry != "0" {
125+
// TODO: I'd like to just use the actual Expiry time we generated
126+
// Filed here https://github.com/Azure/azure-sdk-for-go/issues/20468
127+
expiryTime = time.Now().Add(time.Minute * 15)
128+
}
129+
120130
return authToken,
121-
time.Time{},
131+
expiryTime,
122132
nil
123133
}

sdk/messaging/azeventhubs/producer_client_test.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,11 @@ func TestProducerClient_SAS(t *testing.T) {
6969
require.NotEmpty(t, events)
7070

7171
logs := getLogsFn()
72-
require.Contains(t, logs, "[azeh.Auth] Token does not have an expiration date, no background renewal needed.")
72+
require.Contains(t, logs, backgroundRenewalDisabledMsg)
7373
}
7474

75+
const backgroundRenewalDisabledMsg = "[azeh.Auth] Token does not have an expiration date, no background renewal needed."
76+
7577
func TestClientsUnauthorizedCreds(t *testing.T) {
7678
testParams := test.GetConnectionParamsForTest(t)
7779

@@ -169,6 +171,7 @@ func TestClientsUnauthorizedCreds(t *testing.T) {
169171
}
170172

171173
func TestProducerClient_GetHubAndPartitionProperties(t *testing.T) {
174+
getLogsFn := test.CaptureLogsForTest()
172175
testParams := test.GetConnectionParamsForTest(t)
173176

174177
producer, err := azeventhubs.NewProducerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, nil)
@@ -198,6 +201,21 @@ func TestProducerClient_GetHubAndPartitionProperties(t *testing.T) {
198201
}
199202

200203
wg.Wait()
204+
logs := getLogsFn()
205+
checkForTokenRefresh(t, logs, testParams.EventHubName)
206+
}
207+
208+
// checkForTokenRefresh just makes sure that background token refresh has been started
209+
// and that we haven't somehow fallen into the trap of marking all tokens are expired.
210+
func checkForTokenRefresh(t *testing.T, logs []string, eventHubName string) {
211+
require.NotContains(t, logs, backgroundRenewalDisabledMsg)
212+
213+
for _, log := range logs {
214+
if strings.HasPrefix(log, fmt.Sprintf("[azeh.Auth] (%s/$management) next refresh in ", eventHubName)) {
215+
return
216+
}
217+
}
218+
require.Failf(t, "No token negotiation log lines", "logs:%s", strings.Join(logs, "\n"))
201219
}
202220

203221
func TestProducerClient_GetEventHubsProperties(t *testing.T) {

sdk/messaging/azservicebus/client_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,11 @@ func TestNewClientUsingSharedAccessSignature(t *testing.T) {
172172
require.EqualValues(t, "hello world", string(messages[0].Body))
173173

174174
logs := getLogsFn()
175-
require.Contains(t, logs, "[azsb.Auth] Token does not have an expiration date, no background renewal needed.")
175+
require.Contains(t, logs, backgroundRenewalDisabledMsg)
176176
}
177177

178+
const backgroundRenewalDisabledMsg = "[azsb.Auth] Token does not have an expiration date, no background renewal needed."
179+
178180
const fastNotFoundDuration = 10 * time.Second
179181

180182
func TestNewClientNewSenderNotFound(t *testing.T) {

sdk/messaging/azservicebus/internal/sas/sas_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ func TestNewSigner(t *testing.T) {
3232
keyName, key := "foo", "superSecret"
3333
signer := NewSigner(keyName, key)
3434
before := time.Now().UTC().Add(-2 * time.Second)
35-
sigStr, expiry, err := signer.SignWithDuration("http://microsoft.com", 1*time.Hour)
35+
36+
// the URL is lowercased and escaped when used as the audience in our signature.
37+
sigStr, expiry, err := signer.SignWithDuration("http://MiCrosoft.com", 1*time.Hour)
3638
require.NoError(t, err)
3739
nixExpiry, err := strconv.ParseInt(expiry, 10, 64)
3840
require.NoError(t, err)

sdk/messaging/azservicebus/internal/sbauth/token_provider.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,17 @@ func (tpa *TokenProvider) getSASToken(uri string) (*auth.Token, time.Time, error
117117
return nil, time.Time{}, err
118118
}
119119

120+
// we can ignore the error here since we did the string-izing of the time
121+
// in the first place.
122+
var expiryTime time.Time
123+
124+
if authToken.Expiry != "0" {
125+
// TODO: I'd like to just use the actual Expiry time we generated
126+
// Filed here https://github.com/Azure/azure-sdk-for-go/issues/20468
127+
expiryTime = time.Now().Add(time.Minute * 15)
128+
}
129+
120130
return authToken,
121-
time.Time{},
131+
expiryTime,
122132
nil
123133
}

sdk/messaging/azservicebus/internal/stress/templates/deploy-job.yaml renamed to sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
metadata:
44
labels:
55
chaos: "{{ default false .Stress.chaos }}"
6+
testInstance: "{{.Stress.Scenario}}-{{ .Release.Name }}-{{ .Release.Revision }}"
67
spec:
78
# uncomment to deploy to the southeastasia region.
89
# nodeSelector:
@@ -46,7 +47,8 @@ spec:
4647
namespaces:
4748
- "{{ .Release.Namespace }}"
4849
labelSelectors:
49-
scenario: {{ .Stress.Scenario }}
50+
testInstance: "{{.Stress.Scenario}}-{{ .Release.Name }}-{{ .Release.Revision }}"
51+
chaos: "true"
5052
mode: all
5153
action: loss
5254
duration: 10s
@@ -59,7 +61,8 @@ spec:
5961
namespaces:
6062
- {{ .Release.Namespace }}
6163
labelSelectors:
62-
scenario: {{ .Stress.Scenario }}
64+
testInstance: "{{.Stress.Scenario}}-{{ .Release.Name }}-{{ .Release.Revision }}"
65+
chaos: "true"
6366
mode: all
6467
externalTargets:
6568
- {{ .Stress.BaseName }}.servicebus.windows.net

sdk/messaging/azservicebus/receiver_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ func TestReceiverCancel(t *testing.T) {
3737
}
3838

3939
func TestReceiverSendFiveReceiveFive(t *testing.T) {
40+
getLogsFn := test.CaptureLogsForTest()
41+
4042
serviceBusClient, cleanup, queueName := setupLiveTest(t, nil)
4143
defer cleanup()
4244

@@ -63,6 +65,21 @@ func TestReceiverSendFiveReceiveFive(t *testing.T) {
6365

6466
require.NoError(t, receiver.CompleteMessage(context.Background(), messages[i], nil))
6567
}
68+
69+
logs := getLogsFn()
70+
checkForTokenRefresh(t, logs, queueName)
71+
}
72+
73+
// checkForTokenRefresh just makes sure that background token refresh has been started
74+
// and that we haven't somehow fallen into the trap of marking all tokens are expired.
75+
func checkForTokenRefresh(t *testing.T, logs []string, queueName string) {
76+
require.NotContains(t, logs, backgroundRenewalDisabledMsg)
77+
for _, log := range logs {
78+
if strings.HasPrefix(log, fmt.Sprintf("[azsb.Auth] (%s) next refresh in ", queueName)) {
79+
return
80+
}
81+
}
82+
require.Fail(t, "No token negotiation log lines")
6683
}
6784

6885
func TestReceiverSendFiveReceiveFive_Subscription(t *testing.T) {

0 commit comments

Comments
 (0)