Skip to content

Commit 6e505bb

Browse files
committed
Increase link payload/ack queue sizes and make them configurable. Fixes #3706
1 parent 44d450c commit 6e505bb

File tree

12 files changed

+109
-45
lines changed

12 files changed

+109
-45
lines changed

router/env/config.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,11 @@ type Config struct {
151151
Listeners []*CtrlListenerConfig
152152
}
153153
Link struct {
154-
Listeners []map[interface{}]interface{}
155-
Dialers []map[interface{}]interface{}
156-
Heartbeats channel.HeartbeatOptions
154+
Listeners []map[interface{}]interface{}
155+
Dialers []map[interface{}]interface{}
156+
Heartbeats channel.HeartbeatOptions
157+
PayloadSenderQueueSize int
158+
AckSenderQueueSize int
157159
}
158160
Dialers map[string]xgress.OptionsData
159161
Listeners []ListenerBinding
@@ -205,6 +207,9 @@ const (
205207

206208
DefaultLinkHeartbeatSendInterval = 10 * time.Second
207209
DefaultLinkUnresponsiveTimeout = time.Minute
210+
211+
DefaultLinkPayloadSenderQueueSize = 128
212+
DefaultLinkAckSenderQueueSize = 64
208213
)
209214

210215
// CreateBackup will attempt to use the current path value to create a backup of
@@ -574,6 +579,8 @@ func LoadConfigWithOptions(path string, loadIdentity bool) (*Config, error) {
574579
cfg.Link.Heartbeats = *channel.DefaultHeartbeatOptions()
575580
cfg.Link.Heartbeats.SendInterval = DefaultLinkHeartbeatSendInterval
576581
cfg.Link.Heartbeats.CloseUnresponsiveTimeout = DefaultLinkUnresponsiveTimeout
582+
cfg.Link.PayloadSenderQueueSize = DefaultLinkPayloadSenderQueueSize
583+
cfg.Link.AckSenderQueueSize = DefaultLinkAckSenderQueueSize
577584

578585
if value, found := cfgmap["link"]; found {
579586
if submap, ok := value.(map[interface{}]interface{}); ok {
@@ -629,6 +636,28 @@ func LoadConfigWithOptions(path string, loadIdentity bool) (*Config, error) {
629636
cfg.Link.Heartbeats = *options
630637
}
631638
}
639+
640+
if value, found := submap["payloadSenderQueueSize"]; found {
641+
if intVal, ok := value.(int); ok {
642+
if intVal < 1 {
643+
return nil, fmt.Errorf("[link/payloadSenderQueueSize] must be at least 1, got %d", intVal)
644+
}
645+
cfg.Link.PayloadSenderQueueSize = intVal
646+
} else {
647+
return nil, fmt.Errorf("[link/payloadSenderQueueSize] must be an integer, got %T", value)
648+
}
649+
}
650+
651+
if value, found := submap["ackSenderQueueSize"]; found {
652+
if intVal, ok := value.(int); ok {
653+
if intVal < 1 {
654+
return nil, fmt.Errorf("[link/ackSenderQueueSize] must be at least 1, got %d", intVal)
655+
}
656+
cfg.Link.AckSenderQueueSize = intVal
657+
} else {
658+
return nil, fmt.Errorf("[link/ackSenderQueueSize] must be an integer, got %T", value)
659+
}
660+
}
632661
}
633662
}
634663

router/handler_xgress/data_plane.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,13 @@ import (
2929
type dataPlaneAdapter struct {
3030
acker xgress.AckSender
3131
forwarder *forwarder.Forwarder
32-
retransmitter *xgress.Retransmitter
3332
payloadIngester *xgress.PayloadIngester
3433
metrics xgress.Metrics
3534
}
3635

3736
type DataPlaneAdapterConfig struct {
3837
Acker xgress.AckSender
3938
Forwarder *forwarder.Forwarder
40-
Retransmitter *xgress.Retransmitter
4139
PayloadIngester *xgress.PayloadIngester
4240
Metrics xgress.Metrics
4341
}
@@ -46,7 +44,6 @@ func NewXgressDataPlaneAdapter(cfg DataPlaneAdapterConfig) xgress.DataPlaneAdapt
4644
return &dataPlaneAdapter{
4745
acker: cfg.Acker,
4846
forwarder: cfg.Forwarder,
49-
retransmitter: cfg.Retransmitter,
5047
payloadIngester: cfg.PayloadIngester,
5148
metrics: cfg.Metrics,
5249
}
@@ -82,10 +79,6 @@ func (adapter *dataPlaneAdapter) ForwardAcknowledgement(ack *xgress.Acknowledgem
8279
adapter.acker.SendAck(ack, address)
8380
}
8481

85-
func (adapter *dataPlaneAdapter) GetRetransmitter() *xgress.Retransmitter {
86-
return adapter.retransmitter
87-
}
88-
8982
func (adapter *dataPlaneAdapter) GetPayloadIngester() *xgress.PayloadIngester {
9083
return adapter.payloadIngester
9184
}

router/router.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,14 @@ func (self *Router) GetMetricsRegistry() metrics.UsageRegistry {
170170
return self.metricsRegistry
171171
}
172172

173+
func (self *Router) GetLinkPayloadSenderQueueSize() int {
174+
return self.config.Link.PayloadSenderQueueSize
175+
}
176+
177+
func (self *Router) GetLinkAckSenderQueueSize() int {
178+
return self.config.Link.AckSenderQueueSize
179+
}
180+
173181
func (self *Router) GetXgressRegistry() *env.Registry {
174182
return self.xgRegistry
175183
}
@@ -353,12 +361,10 @@ func (self *Router) GetAlerter() env.Alerter {
353361
func (self *Router) createDataPlaneAdapter() xgress.DataPlaneAdapter {
354362
payloadIngester := xgress.NewPayloadIngesterWithConfig(64, self.shutdownC)
355363
ackSender := xgress_router.NewAcker(self.forwarder, self.metricsRegistry, self.shutdownC)
356-
retransmitter := xgress.NewRetransmitter(self.forwarder, self.metricsRegistry, self.GetCloseNotify())
357364

358365
return handler_xgress.NewXgressDataPlaneAdapter(handler_xgress.DataPlaneAdapterConfig{
359366
Acker: ackSender,
360367
Forwarder: self.forwarder,
361-
Retransmitter: retransmitter,
362368
PayloadIngester: payloadIngester,
363369
Metrics: xgress.NewMetrics(self.metricsRegistry),
364370
})

router/xgress_test/mock_adapter.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,6 @@ func (self *MockDataPlaneAdapter) ForwardAcknowledgement(ack *xgress.Acknowledge
189189
}
190190
}
191191

192-
// GetRetransmitter returns the retransmitter from the environment
193-
func (self *MockDataPlaneAdapter) GetRetransmitter() *xgress.Retransmitter {
194-
return self.env.GetRetransmitter()
195-
}
196-
197192
// GetPayloadIngester returns the payload ingester from the environment
198193
func (self *MockDataPlaneAdapter) GetPayloadIngester() *xgress.PayloadIngester {
199194
return self.env.GetPayloadIngester()

router/xgress_test/mock_env.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package xgress_test
33
import (
44
"time"
55

6-
"github.com/openziti/metrics"
76
"github.com/openziti/sdk-golang/xgress"
87
)
98

@@ -14,6 +13,8 @@ func (m mockFaulter) ReportForwardingFault(circuitId string, ctrlId string) {
1413

1514
type noopMetrics struct{}
1615

16+
func (n noopMetrics) MarkRetransmission() {}
17+
func (n noopMetrics) MarkRetransmissionFailure() {}
1718
func (n noopMetrics) MarkAckReceived() {}
1819
func (n noopMetrics) MarkPayloadDropped() {}
1920
func (n noopMetrics) MarkDuplicateAck() {}
@@ -28,25 +29,18 @@ func (n noopMetrics) SendPayloadBuffered(int64) {}
2829
func (n noopMetrics) SendPayloadDelivered(int64) {}
2930

3031
type MockEnv struct {
31-
retransmitter *xgress.Retransmitter
3232
payloadIngester *xgress.PayloadIngester
3333
metrics xgress.Metrics
3434
}
3535

3636
func NewMockEnv() *MockEnv {
3737
closeNotify := make(chan struct{})
38-
metricsRegistry := metrics.NewRegistry("test", nil)
3938
return &MockEnv{
40-
retransmitter: xgress.NewRetransmitter(mockFaulter{}, metricsRegistry, closeNotify),
4139
payloadIngester: xgress.NewPayloadIngester(closeNotify),
4240
metrics: noopMetrics{},
4341
}
4442
}
4543

46-
func (self *MockEnv) GetRetransmitter() *xgress.Retransmitter {
47-
return self.retransmitter
48-
}
49-
5044
func (self *MockEnv) GetPayloadIngester() *xgress.PayloadIngester {
5145
return self.payloadIngester
5246
}

router/xlink_transport/channel.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ const (
1717
ChannelTypeDefault string = "link.default"
1818
)
1919

20-
func NewBaseLinkChannel(underlay channel.Underlay) *BaseLinkChannel {
20+
func NewBaseLinkChannel(underlay channel.Underlay, payloadSenderQueueSize, ackSenderQueueSize int) *BaseLinkChannel {
2121
senderContext := channel.NewSenderContext()
2222

23-
defaultMsgChan := make(chan channel.Sendable, 64)
24-
controlMsgChan := make(chan channel.Sendable, 4)
23+
defaultMsgChan := make(chan channel.Sendable, payloadSenderQueueSize)
24+
controlMsgChan := make(chan channel.Sendable, ackSenderQueueSize)
2525
retryMsgChan := make(chan channel.Sendable, 4)
2626

2727
result := &BaseLinkChannel{
@@ -127,13 +127,15 @@ type DialLinkChannelConfig struct {
127127
Underlay channel.Underlay
128128
MaxDefaultChannels int
129129
MaxAckChannel int
130+
PayloadSenderQueueSize int
131+
AckSenderQueueSize int
130132
StartupDelay time.Duration
131133
UnderlayChangeCallback func(ch *DialLinkChannel)
132134
}
133135

134136
func NewDialLinkChannel(config DialLinkChannelConfig) UnderlayHandlerLinkChannel {
135137
result := &DialLinkChannel{
136-
BaseLinkChannel: *NewBaseLinkChannel(config.Underlay),
138+
BaseLinkChannel: *NewBaseLinkChannel(config.Underlay, config.PayloadSenderQueueSize, config.AckSenderQueueSize),
137139
dialer: config.Dialer,
138140
changeCallback: config.UnderlayChangeCallback,
139141
syncRequired: map[string]struct{}{},
@@ -266,9 +268,9 @@ func (self *DialLinkChannel) CreateGroupedUnderlay(groupId string, groupSecret [
266268
})
267269
}
268270

269-
func NewListenerLinkChannel(underlay channel.Underlay) UnderlayHandlerLinkChannel {
271+
func NewListenerLinkChannel(underlay channel.Underlay, payloadSenderQueueSize, ackSenderQueueSize int) UnderlayHandlerLinkChannel {
270272
result := &ListenerLinkChannel{
271-
BaseLinkChannel: *NewBaseLinkChannel(underlay),
273+
BaseLinkChannel: *NewBaseLinkChannel(underlay, payloadSenderQueueSize, ackSenderQueueSize),
272274
}
273275

274276
result.constraints.AddConstraint(ChannelTypeDefault, 1, 1)

router/xlink_transport/dialer.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,11 +283,13 @@ func (self *dialer) dialMulti(linkId *identity.TokenId, address transport.Addres
283283

284284
if isGrouped, _ := channel.Headers(underlay.Headers()).GetBoolHeader(channel.IsGroupedHeader); isGrouped {
285285
dialLinkChangeConfig := DialLinkChannelConfig{
286-
Dialer: linkDialer,
287-
Underlay: underlay,
288-
MaxDefaultChannels: int(self.config.maxDefaultConnections),
289-
MaxAckChannel: int(self.config.maxAckConnections),
290-
StartupDelay: self.config.startupDelay,
286+
Dialer: linkDialer,
287+
Underlay: underlay,
288+
MaxDefaultChannels: int(self.config.maxDefaultConnections),
289+
MaxAckChannel: int(self.config.maxAckConnections),
290+
PayloadSenderQueueSize: self.env.GetLinkPayloadSenderQueueSize(),
291+
AckSenderQueueSize: self.env.GetLinkAckSenderQueueSize(),
292+
StartupDelay: self.config.startupDelay,
291293
UnderlayChangeCallback: func(ch *DialLinkChannel) {
292294
self.notifyOfLinkChange(ch, bindHandler.link)
293295
},

router/xlink_transport/factory.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type LinkEnv interface {
5959
GetRateLimiterPool() goroutines.Pool
6060
GetCloseNotify() <-chan struct{}
6161
GetRouterId() *identity.TokenId
62+
GetLinkPayloadSenderQueueSize() int
63+
GetLinkAckSenderQueueSize() int
6264
}
6365

6466
func NewFactory(accepter xlink.Acceptor,

router/xlink_transport/listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (self *listener) GetLocalBinding() string {
9191
}
9292

9393
func (self *listener) handleGroupedUnderlay(underlay channel.Underlay, closeCallback func()) (channel.MultiChannel, error) {
94-
linkChannel := NewListenerLinkChannel(underlay)
94+
linkChannel := NewListenerLinkChannel(underlay, self.env.GetLinkPayloadSenderQueueSize(), self.env.GetLinkAckSenderQueueSize())
9595
multiConfig := channel.MultiChannelConfig{
9696
LogicalName: "link/" + underlay.Id(),
9797
Options: self.config.options,

zititest/models/circuit-perf-diff/configs/loop-client-ert.yml.tmpl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,25 @@ metrics:
3232
clientId: {{ .Component.Id }}
3333

3434
workloads:
35-
{{ if .Component.BoolVariable "testSdkXgHost" }}
35+
{{ if and (.Component.BoolVariable "testSdkXgHost") (.Component.BoolVariable "testThroughput") }}
3636
- name: throughput-xg
3737
connector: throughput-xg
3838
{{ $.Model.MustVariable "throughputWorkload" }}
3939
{{end}}
4040

41-
{{ if .Component.BoolVariable "testErtHost" }}
41+
{{ if and (.Component.BoolVariable "testErtHost") (.Component.BoolVariable "testThroughput") }}
4242
- name: throughput-ert
4343
connector: throughput-ert
4444
{{ $.Model.MustVariable "throughputWorkload" }}
4545
{{end}}
4646

47-
{{ if .Component.BoolVariable "testSdkXgHost" }}
47+
{{ if and (.Component.BoolVariable "testSdkXgHost") (.Component.BoolVariable "testLatency") }}
4848
- name: latency-xg
4949
connector: latency-xg
5050
{{ $.Model.MustVariable "latencyWorkload" }}
5151
{{end}}
5252

53-
{{ if .Component.BoolVariable "testErtHost" }}
53+
{{ if and (.Component.BoolVariable "testErtHost") (.Component.BoolVariable "testLatency") }}
5454
- name: latency-ert
5555
connector: latency-ert
5656
{{ $.Model.MustVariable "latencyWorkload" }}

0 commit comments

Comments
 (0)