Skip to content

Commit 27ca287

Browse files
committed
Increase link payload/ack queue sizes and make them configurable. Fixes #3706
1 parent 7455deb commit 27ca287

File tree

11 files changed

+126
-23
lines changed

11 files changed

+126
-23
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,17 @@ New configuration tunables under `edge.oidc`:
10741074
10751075
* github.com/openziti/go-term-markdown: v1.0.1 (new)
10761076
* github.com/openziti/ziti/v2: [v1.6.8 -> v2.0.0](https://github.com/openziti/ziti/compare/v1.6.8...v2.0.0)
1077+
* [Issue #3706](https://github.com/openziti/ziti/issues/3706) - Increase link payload/ack queue sizes and make them configurable
1078+
* [Issue #3778](https://github.com/openziti/ziti/issues/3778) - SetRouterDataModel can deadlock in the router
1079+
* [Issue #3777](https://github.com/openziti/ziti/issues/3777) - With the new circuit reserve, we can have circuits with no path in the controller circuit set, which can cause panics
1080+
* [Issue #3770](https://github.com/openziti/ziti/issues/3770) - Update Token Requests Should Close Channel Connections If Invalid
1081+
* [Issue #3757](https://github.com/openziti/ziti/issues/3757) - Mesh peer signing cert from header is overwritten by TLS underlay cert
1082+
* [Issue #3756](https://github.com/openziti/ziti/issues/3756) - TLS handshake rate limiter timeout check reads from wrong config scope
1083+
* [Issue #3755](https://github.com/openziti/ziti/issues/3755) - commandHandler config read from wrong scope
1084+
* [Issue #3754](https://github.com/openziti/ziti/issues/3754) - dialFailed drops applyFailed parameter, preventing duplicate link retry jitter
1085+
* [Issue #3753](https://github.com/openziti/ziti/issues/3753) - SPIFFE trust domain prefix check has swapped HasPrefix arguments
1086+
* [Issue #3746](https://github.com/openziti/ziti/issues/3746) - The controller connect events control channel handler leaks a goroutine
1087+
* [Issue #3747](https://github.com/openziti/ziti/issues/3747) - Update controller peer error marshalling for app code changes
10771088
* [Issue #3721](https://github.com/openziti/ziti/issues/3721) - Add CreateCircuitV3 to controller
10781089
* [Issue #3719](https://github.com/openziti/ziti/issues/3719) - Allow binding specific inspects to pass through to xgress listener implementations
10791090
* [Issue #3696](https://github.com/openziti/ziti/issues/3696) - oidc provider is non-deterministic for wildcard certs

router/env/config.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,11 @@ type Config struct {
151151
Listeners []*CtrlListenerConfig
152152
}
153153
Link struct {
154-
Listeners []map[interface{}]interface{}
155-
Dialers []map[interface{}]interface{}
156-
Heartbeats channel.HeartbeatOptions
154+
Listeners []map[interface{}]interface{}
155+
Dialers []map[interface{}]interface{}
156+
Heartbeats channel.HeartbeatOptions
157+
PayloadSenderQueueSize int
158+
AckSenderQueueSize int
157159
}
158160
Dialers map[string]xgress.OptionsData
159161
Listeners []ListenerBinding
@@ -205,6 +207,9 @@ const (
205207

206208
DefaultLinkHeartbeatSendInterval = 10 * time.Second
207209
DefaultLinkUnresponsiveTimeout = time.Minute
210+
211+
DefaultLinkPayloadSenderQueueSize = 128
212+
DefaultLinkAckSenderQueueSize = 64
208213
)
209214

210215
// CreateBackup will attempt to use the current path value to create a backup of
@@ -574,6 +579,8 @@ func LoadConfigWithOptions(path string, loadIdentity bool) (*Config, error) {
574579
cfg.Link.Heartbeats = *channel.DefaultHeartbeatOptions()
575580
cfg.Link.Heartbeats.SendInterval = DefaultLinkHeartbeatSendInterval
576581
cfg.Link.Heartbeats.CloseUnresponsiveTimeout = DefaultLinkUnresponsiveTimeout
582+
cfg.Link.PayloadSenderQueueSize = DefaultLinkPayloadSenderQueueSize
583+
cfg.Link.AckSenderQueueSize = DefaultLinkAckSenderQueueSize
577584

578585
if value, found := cfgmap["link"]; found {
579586
if submap, ok := value.(map[interface{}]interface{}); ok {
@@ -629,6 +636,28 @@ func LoadConfigWithOptions(path string, loadIdentity bool) (*Config, error) {
629636
cfg.Link.Heartbeats = *options
630637
}
631638
}
639+
640+
if value, found := submap["payloadSenderQueueSize"]; found {
641+
if intVal, ok := value.(int); ok {
642+
if intVal < 1 {
643+
return nil, fmt.Errorf("[link/payloadSenderQueueSize] must be at least 1, got %d", intVal)
644+
}
645+
cfg.Link.PayloadSenderQueueSize = intVal
646+
} else {
647+
return nil, fmt.Errorf("[link/payloadSenderQueueSize] must be an integer, got %T", value)
648+
}
649+
}
650+
651+
if value, found := submap["ackSenderQueueSize"]; found {
652+
if intVal, ok := value.(int); ok {
653+
if intVal < 1 {
654+
return nil, fmt.Errorf("[link/ackSenderQueueSize] must be at least 1, got %d", intVal)
655+
}
656+
cfg.Link.AckSenderQueueSize = intVal
657+
} else {
658+
return nil, fmt.Errorf("[link/ackSenderQueueSize] must be an integer, got %T", value)
659+
}
660+
}
632661
}
633662
}
634663

router/router.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,14 @@ func (self *Router) GetMetricsRegistry() metrics.UsageRegistry {
170170
return self.metricsRegistry
171171
}
172172

173+
func (self *Router) GetLinkPayloadSenderQueueSize() int {
174+
return self.config.Link.PayloadSenderQueueSize
175+
}
176+
177+
func (self *Router) GetLinkAckSenderQueueSize() int {
178+
return self.config.Link.AckSenderQueueSize
179+
}
180+
173181
func (self *Router) GetXgressRegistry() *env.Registry {
174182
return self.xgRegistry
175183
}

router/xlink_transport/channel.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ const (
1717
ChannelTypeDefault string = "link.default"
1818
)
1919

20-
func NewBaseLinkChannel(underlay channel.Underlay) *BaseLinkChannel {
20+
func NewBaseLinkChannel(underlay channel.Underlay, payloadSenderQueueSize, ackSenderQueueSize int) *BaseLinkChannel {
2121
senderContext := channel.NewSenderContext()
2222

23-
defaultMsgChan := make(chan channel.Sendable, 64)
24-
controlMsgChan := make(chan channel.Sendable, 4)
23+
defaultMsgChan := make(chan channel.Sendable, payloadSenderQueueSize)
24+
controlMsgChan := make(chan channel.Sendable, ackSenderQueueSize)
2525
retryMsgChan := make(chan channel.Sendable, 4)
2626

2727
result := &BaseLinkChannel{
@@ -127,13 +127,15 @@ type DialLinkChannelConfig struct {
127127
Underlay channel.Underlay
128128
MaxDefaultChannels int
129129
MaxAckChannel int
130+
PayloadSenderQueueSize int
131+
AckSenderQueueSize int
130132
StartupDelay time.Duration
131133
UnderlayChangeCallback func(ch *DialLinkChannel)
132134
}
133135

134136
func NewDialLinkChannel(config DialLinkChannelConfig) UnderlayHandlerLinkChannel {
135137
result := &DialLinkChannel{
136-
BaseLinkChannel: *NewBaseLinkChannel(config.Underlay),
138+
BaseLinkChannel: *NewBaseLinkChannel(config.Underlay, config.PayloadSenderQueueSize, config.AckSenderQueueSize),
137139
dialer: config.Dialer,
138140
changeCallback: config.UnderlayChangeCallback,
139141
syncRequired: map[string]struct{}{},
@@ -266,9 +268,9 @@ func (self *DialLinkChannel) CreateGroupedUnderlay(groupId string, groupSecret [
266268
})
267269
}
268270

269-
func NewListenerLinkChannel(underlay channel.Underlay) UnderlayHandlerLinkChannel {
271+
func NewListenerLinkChannel(underlay channel.Underlay, payloadSenderQueueSize, ackSenderQueueSize int) UnderlayHandlerLinkChannel {
270272
result := &ListenerLinkChannel{
271-
BaseLinkChannel: *NewBaseLinkChannel(underlay),
273+
BaseLinkChannel: *NewBaseLinkChannel(underlay, payloadSenderQueueSize, ackSenderQueueSize),
272274
}
273275

274276
result.constraints.AddConstraint(ChannelTypeDefault, 1, 1)

router/xlink_transport/dialer.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,11 +283,13 @@ func (self *dialer) dialMulti(linkId *identity.TokenId, address transport.Addres
283283

284284
if isGrouped, _ := channel.Headers(underlay.Headers()).GetBoolHeader(channel.IsGroupedHeader); isGrouped {
285285
dialLinkChangeConfig := DialLinkChannelConfig{
286-
Dialer: linkDialer,
287-
Underlay: underlay,
288-
MaxDefaultChannels: int(self.config.maxDefaultConnections),
289-
MaxAckChannel: int(self.config.maxAckConnections),
290-
StartupDelay: self.config.startupDelay,
286+
Dialer: linkDialer,
287+
Underlay: underlay,
288+
MaxDefaultChannels: int(self.config.maxDefaultConnections),
289+
MaxAckChannel: int(self.config.maxAckConnections),
290+
PayloadSenderQueueSize: self.env.GetLinkPayloadSenderQueueSize(),
291+
AckSenderQueueSize: self.env.GetLinkAckSenderQueueSize(),
292+
StartupDelay: self.config.startupDelay,
291293
UnderlayChangeCallback: func(ch *DialLinkChannel) {
292294
self.notifyOfLinkChange(ch, bindHandler.link)
293295
},

router/xlink_transport/factory.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type LinkEnv interface {
5959
GetRateLimiterPool() goroutines.Pool
6060
GetCloseNotify() <-chan struct{}
6161
GetRouterId() *identity.TokenId
62+
GetLinkPayloadSenderQueueSize() int
63+
GetLinkAckSenderQueueSize() int
6264
}
6365

6466
func NewFactory(accepter xlink.Acceptor,

router/xlink_transport/listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (self *listener) GetLocalBinding() string {
9191
}
9292

9393
func (self *listener) handleGroupedUnderlay(underlay channel.Underlay, closeCallback func()) (channel.MultiChannel, error) {
94-
linkChannel := NewListenerLinkChannel(underlay)
94+
linkChannel := NewListenerLinkChannel(underlay, self.env.GetLinkPayloadSenderQueueSize(), self.env.GetLinkAckSenderQueueSize())
9595
multiConfig := channel.MultiChannelConfig{
9696
LogicalName: "link/" + underlay.Id(),
9797
Options: self.config.options,

tests/link_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ func (self *testLinkEnv) GetXLinkRegistry() xlink.Registry {
163163
return self.linkRegistry
164164
}
165165

166+
func (self *testLinkEnv) GetLinkPayloadSenderQueueSize() int {
167+
return env.DefaultLinkPayloadSenderQueueSize
168+
}
169+
170+
func (self *testLinkEnv) GetLinkAckSenderQueueSize() int {
171+
return env.DefaultLinkAckSenderQueueSize
172+
}
173+
166174
func newTestLinkEnv() *testLinkEnv {
167175
e := setupEnv()
168176
linkRegistry := link.NewLinkRegistry(e)

zititest/models/circuit-perf-diff/configs/loop-client-ert.yml.tmpl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,25 @@ metrics:
3232
clientId: {{ .Component.Id }}
3333

3434
workloads:
35-
{{ if .Component.BoolVariable "testSdkXgHost" }}
35+
{{ if and (.Component.BoolVariable "testSdkXgHost") (.Component.BoolVariable "testThroughput") }}
3636
- name: throughput-xg
3737
connector: throughput-xg
3838
{{ $.Model.MustVariable "throughputWorkload" }}
3939
{{end}}
4040

41-
{{ if .Component.BoolVariable "testErtHost" }}
41+
{{ if and (.Component.BoolVariable "testErtHost") (.Component.BoolVariable "testThroughput") }}
4242
- name: throughput-ert
4343
connector: throughput-ert
4444
{{ $.Model.MustVariable "throughputWorkload" }}
4545
{{end}}
4646

47-
{{ if .Component.BoolVariable "testSdkXgHost" }}
47+
{{ if and (.Component.BoolVariable "testSdkXgHost") (.Component.BoolVariable "testLatency") }}
4848
- name: latency-xg
4949
connector: latency-xg
5050
{{ $.Model.MustVariable "latencyWorkload" }}
5151
{{end}}
5252

53-
{{ if .Component.BoolVariable "testErtHost" }}
53+
{{ if and (.Component.BoolVariable "testErtHost") (.Component.BoolVariable "testLatency") }}
5454
- name: latency-ert
5555
connector: latency-ert
5656
{{ $.Model.MustVariable "latencyWorkload" }}

zititest/models/circuit-perf-diff/configs/loop-client-xg.yml.tmpl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,28 @@ metrics:
2020
clientId: {{ .Component.Id }}
2121

2222
workloads:
23-
{{ if .Component.BoolVariable "testSdkXgHost" }}
23+
{{ if and (.Component.BoolVariable "testSdkXgHost") (.Component.BoolVariable "testThroughput") }}
2424
- name: throughput-xg
2525
service_name: throughput-xg
2626
connector: default
2727
{{ $.Model.MustVariable "throughputWorkload" }}
2828
{{end}}
2929

30-
{{ if .Component.BoolVariable "testErtHost" }}
30+
{{ if and (.Component.BoolVariable "testErtHost") (.Component.BoolVariable "testThroughput") }}
3131
- name: throughput-ert
3232
service_name: throughput-ert
3333
connector: default
3434
{{ $.Model.MustVariable "throughputWorkload" }}
3535
{{end}}
3636

37-
{{ if .Component.BoolVariable "testSdkXgHost" }}
37+
{{ if and (.Component.BoolVariable "testSdkXgHost") (.Component.BoolVariable "testLatency") }}
3838
- name: latency-xg
3939
service_name: latency-xg
4040
connector: default
4141
{{ $.Model.MustVariable "latencyWorkload" }}
4242
{{end}}
4343

44-
{{ if .Component.BoolVariable "testErtHost" }}
44+
{{ if and (.Component.BoolVariable "testErtHost") (.Component.BoolVariable "testLatency") }}
4545
- name: latency-ert
4646
service_name: latency-ert
4747
connector: default

0 commit comments

Comments
 (0)