Skip to content

Commit ae7887a

Browse files
authored
[receiver/azureeventhubreceiver] use $Default as default consumer group (#43051)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The old SDK sets `$Default` as the consumer group when no consumer groups are set. This is a fix to the new SDK implementation to do the same. This would cause issues when users attempted to use the new SDK and didn't have the consumer group set. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #43049 <!--Describe what testing was performed and which tests were added.--> #### Testing Added tests for the function for getting the consumer group <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 91ed054 commit ae7887a

File tree

3 files changed

+67
-2
lines changed

3 files changed

+67
-2
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: azureeventhubreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Use `$Default` as the default consumer group with the new azeventhubs SDK
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43049]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/azureeventhubreceiver/eventhubhandler_azeventhub.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,20 @@ type azPartitionClient interface {
2323
ReceiveEvents(ctx context.Context, maxBatchSize int, options *azeventhubs.ReceiveEventsOptions) ([]*azeventhubs.ReceivedEventData, error)
2424
}
2525

26+
func getConsumerGroup(config *Config) string {
27+
if config.ConsumerGroup == "" {
28+
return "$Default"
29+
}
30+
return config.ConsumerGroup
31+
}
32+
2633
func newAzeventhubWrapper(h *eventhubHandler) (*hubWrapperAzeventhubImpl, error) {
34+
consumerGroup := getConsumerGroup(h.config)
35+
2736
hub, newHubErr := azeventhubs.NewConsumerClientFromConnectionString(
2837
h.config.Connection,
2938
"",
30-
h.config.ConsumerGroup,
39+
consumerGroup,
3140
&azeventhubs.ConsumerClientOptions{},
3241
)
3342

@@ -124,7 +133,6 @@ func (h *hubWrapperAzeventhubImpl) Receive(ctx context.Context, partitionID stri
124133
}
125134
}
126135
}
127-
128136
pc, err := h.hub.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
129137
StartPosition: startPos,
130138
})

receiver/azureeventhubreceiver/eventhubhandler_azeventhub_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,3 +225,33 @@ func TestPartitionListener_SetErr(t *testing.T) {
225225
p.setErr(errors.New("test"))
226226
assert.Equal(t, "test", p.err.Error())
227227
}
228+
229+
func TestGetConsumerGroup(t *testing.T) {
230+
testCases := []struct {
231+
name string
232+
consumerGroup string
233+
expectedGroup string
234+
}{
235+
{
236+
name: "empty consumer group defaults to $Default",
237+
consumerGroup: "",
238+
expectedGroup: "$Default",
239+
},
240+
{
241+
name: "custom consumer group is preserved",
242+
consumerGroup: "custom-group",
243+
expectedGroup: "custom-group",
244+
},
245+
}
246+
247+
for _, test := range testCases {
248+
t.Run(test.name, func(t *testing.T) {
249+
config := &Config{
250+
ConsumerGroup: test.consumerGroup,
251+
}
252+
253+
result := getConsumerGroup(config)
254+
assert.Equal(t, test.expectedGroup, result)
255+
})
256+
}
257+
}

0 commit comments

Comments
 (0)