-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmessage_source.go
More file actions
103 lines (89 loc) · 3.31 KB
/
message_source.go
File metadata and controls
103 lines (89 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package channel
import "io"
// MessageSourceF is a function that returns the next message to send on an underlay.
// It blocks until a message is available or the notifier signals closure.
type MessageSourceF func(notifier *CloseNotifier) (Sendable, error)
// MessageSourceProvider returns the message source for a given underlay type.
type MessageSourceProvider interface {
GetMessageSource(underlayType string) MessageSourceF
}
// MakeSingleQueueMessageSource returns a MessageSourceF that reads from one queue.
func MakeSingleQueueMessageSource(closeNotify <-chan struct{}, q1 <-chan Sendable) MessageSourceF {
return func(notifier *CloseNotifier) (Sendable, error) {
select {
case msg := <-q1:
return msg, nil
case <-closeNotify:
return nil, io.EOF
case <-notifier.GetCloseNotify():
return nil, io.EOF
}
}
}
// MakeTwoQueueMessageSource returns a MessageSourceF that reads from two queues with equal priority.
func MakeTwoQueueMessageSource(closeNotify <-chan struct{}, q1, q2 <-chan Sendable) MessageSourceF {
return func(notifier *CloseNotifier) (Sendable, error) {
select {
case msg := <-q1:
return msg, nil
case msg := <-q2:
return msg, nil
case <-closeNotify:
return nil, io.EOF
case <-notifier.GetCloseNotify():
return nil, io.EOF
}
}
}
// MakeThreeQueueMessageSource returns a MessageSourceF that reads from three queues with equal priority.
func MakeThreeQueueMessageSource(closeNotify <-chan struct{}, q1, q2, q3 <-chan Sendable) MessageSourceF {
return func(notifier *CloseNotifier) (Sendable, error) {
select {
case msg := <-q1:
return msg, nil
case msg := <-q2:
return msg, nil
case msg := <-q3:
return msg, nil
case <-closeNotify:
return nil, io.EOF
case <-notifier.GetCloseNotify():
return nil, io.EOF
}
}
}
// SimpleMessageSourceProvider maps underlay types to message sources, with a default fallback.
type SimpleMessageSourceProvider struct {
defaultSource MessageSourceF
sources map[string]MessageSourceF
}
// NewSimpleMessageSourceProvider creates a provider with the given default message source.
func NewSimpleMessageSourceProvider(defaultSource MessageSourceF) *SimpleMessageSourceProvider {
return &SimpleMessageSourceProvider{
defaultSource: defaultSource,
sources: map[string]MessageSourceF{},
}
}
// AddSource registers a message source for a specific underlay type.
func (p *SimpleMessageSourceProvider) AddSource(underlayType string, source MessageSourceF) {
p.sources[underlayType] = source
}
// GetMessageSource returns the message source for the given underlay type, falling back to the default.
func (p *SimpleMessageSourceProvider) GetMessageSource(underlayType string) MessageSourceF {
if source, ok := p.sources[underlayType]; ok {
return source
}
return p.defaultSource
}