Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,7 @@ func TestRedisProduceComplex(t *testing.T) {
if err != nil {
t.Fatalf("mergeMaps() unexpected error: %v", err)
}
// Only when there are invalid entries got will have duplicates
if tc.withInvalidEntries {
got = removeDuplicates(got)
}
// mergeValues now deduplicates when withInvalidEntries is true

var combinedEntries []string
for i := 0; i < tc.numProducers; i++ {
Expand Down Expand Up @@ -382,30 +379,28 @@ func TestRedisProduceComplex(t *testing.T) {
}
}

func removeDuplicates(list []string) []string {
capture := map[string]bool{}
var ret []string
for _, elem := range list {
if _, found := capture[elem]; !found {
ret = append(ret, elem)
capture[elem] = true
}
}
sort.Strings(ret)
return ret
}

// mergeValues merges maps from the slice and returns their values.
// Returns and error if there exists duplicate key.
// If withInvalidEntries is true, values are deduplicated (same message may be produced multiple times).
// Returns an error if there exists a duplicate key and withInvalidEntries is false.
func mergeValues(messages []map[string]string, withInvalidEntries bool) ([]string, error) {
res := make(map[string]any)
var ret []string
var seenValues map[string]struct{}
if withInvalidEntries {
seenValues = make(map[string]struct{})
}
for _, m := range messages {
for k, v := range m {
if _, found := res[k]; found && !withInvalidEntries {
return nil, fmt.Errorf("duplicate key: %v", k)
}
res[k] = v
if withInvalidEntries {
if _, seen := seenValues[v]; seen {
continue
}
seenValues[v] = struct{}{}
}
ret = append(ret, v)
}
}
Expand Down