-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathoptions.go
More file actions
211 lines (193 loc) · 7.57 KB
/
Copy pathoptions.go
File metadata and controls
211 lines (193 loc) · 7.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright 2026 Synnax Labs, Inc.
//
// Use of this software is governed by the Business Source License included in the file
// licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with the Business Source
// License, use of this software will be governed by the Apache License, Version 2.0,
// included in the file licenses/APL.txt.
// All included pebble code is copyrighted by the cockroachdb team, and is licensed under
// the BSD 3-Clause License. See the repository file license/BSD-3-Clause.txt for more
// information.
package aspen
import (
"time"
"github.com/cockroachdb/pebble/v2/vfs"
"github.com/synnaxlabs/alamos"
"github.com/synnaxlabs/aspen/internal/cluster"
"github.com/synnaxlabs/aspen/internal/kv"
grpct "github.com/synnaxlabs/aspen/transport/grpc"
fgrpc "github.com/synnaxlabs/freighter/grpc"
"github.com/synnaxlabs/x/address"
xkv "github.com/synnaxlabs/x/kv"
"github.com/synnaxlabs/x/override"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// Option is a function that configures an Aspen instance.
type Option func(*options)
type options struct {
alamos.Instrumentation
// fs sets the filesystem to be used for storing data. This option is ignored
// if a custom kv.ServiceConfig.Engine is set.
fs vfs.FS
// transport is the transport package for the messages that aspen exchanges.
// this setting overrides all other transport settings in sub-configs.
transport struct {
Transport
// external is a boolean flag indicating whether the caller provided an
// external transport they control themselves.
external bool
// ownedPool is the grpc client pool that defaultOptions created when
// the caller did not pass a custom transport. It is non-nil only when
// aspen is responsible for closing it. External transports are
// expected to come with their own pool lifecycle management.
ownedPool *fgrpc.Pool
}
// dirname is the directory where aspen will store its data.
// this option is ignored if a custom kv.ServiceConfig.Engine is set.
dirname string
// addr sets the address for the host node.
addr address.Address
// kv gives the configuration for KV options.
kv kv.Config
// peerAddresses sets the addresses for the peers of the host node.
peerAddresses []address.Address
// cluster gives the configuration for gossiping cluster state.
cluster cluster.Config
// externalKV is a boolean flag indicating whether the caller provided an external
// key-value engine. If so, aspen will not close the engine when it shuts down.
externalKV bool
// bootstrap is a boolean used to indicate whether to bootstrap a new cluster.
bootstrap bool
}
func (o *options) Report() alamos.Report {
// The key-value store and cluster state services will attach their own reports to
// the instrumentation, so we only need to report values that they won't.
return alamos.Report{
"dirname": o.dirname,
"addr": o.addr,
"peers": o.peerAddresses,
"bootstrap": o.bootstrap,
}
}
// Bootstrap tells aspen to bootstrap a new cluster. This option automatically assigns
// the host node and NodeID of 1.
func Bootstrap() Option { return func(o *options) { o.bootstrap = true } }
// WithEngine sets the underlying KV engine that aspen uses to store its data. When
// using this option, the caller should transfer all responsibility for executing queries
// on the engine to aspen.
func WithEngine(engine xkv.DB) Option {
return func(o *options) {
o.externalKV = true
o.kv.Engine = engine
}
}
// WithTransport sets a custom network transport.
func WithTransport(transport Transport) Option {
return func(o *options) {
o.transport.external = true
o.transport.Transport = transport
}
}
// WithInstrumentation sets the instrumentation for aspen.
func WithInstrumentation(i alamos.Instrumentation) Option {
return func(o *options) {
o.Instrumentation = i
}
}
// InMemory sets aspen to use a memory-backed KV engine. This option is ignored if a
// custom KV engine is set (using WithEngine).
func InMemory() Option {
return func(o *options) { o.dirname = ""; o.fs = vfs.NewMem() }
}
// PropagationConfig is a set of configurable values that tune how quickly state converges
// across the cluster. Lower intervals typically bring faster convergence, but also use
// considerably more network traffic.
type PropagationConfig struct {
// PledgeRetryInterval is the interval at which aspen will retry sending a pledge to
// a peer. Pledges are sent at a scaled interval (see PledgeRetryScale).
PledgeRetryInterval time.Duration
// PledgeRetryScale is the factory at which the interval increases after failed
// pledges. For example, a PledgeRetryInterval of 2 seconds and a PledgeRetryScale
// of 2 will result in pledge intervals of 2, 4, 8, 16, 32, and so on until the
// pledge is accepted.
PledgeRetryScale float64
// PledgeRequestTimeout is the maximum amount of time aspen will wait for a pledge
// request to be accepted before moving on to the next peer.
PledgeRequestTimeout time.Duration
// ClusterGossipInterval is the interval at which aspen will propagate cluster state
// to other nodes. Aspen will send messages regardless of whether the state has
// changed, so setting this interval to a low value may result in very high network
// traffic.
ClusterGossipInterval time.Duration
// KVGossipInterval sets the interval at which aspen will propagate key-Value
// operations to other nodes. It's important to note that KV will not gossip if
// there are no operations to propagate.
KVGossipInterval time.Duration
}
// WithPropagationConfig sets the parameters defining how quickly cluster state converges.
// See PropagationConfig for more details.
func WithPropagationConfig(config PropagationConfig) Option {
return func(o *options) {
o.cluster.Pledge.RetryInterval = config.PledgeRetryInterval
o.cluster.Pledge.RetryScale = config.PledgeRetryScale
o.cluster.Pledge.RequestTimeout = config.PledgeRequestTimeout
o.cluster.Gossip.Interval = config.ClusterGossipInterval
o.kv.GossipInterval = config.KVGossipInterval
}
}
var FastPropagationConfig = PropagationConfig{
PledgeRetryInterval: 10 * time.Millisecond,
PledgeRetryScale: 1,
ClusterGossipInterval: 10 * time.Millisecond,
KVGossipInterval: 10 * time.Millisecond,
}
func newOptions(
dirname string,
addr address.Address,
peers []address.Address,
opts ...Option,
) *options {
o := &options{
dirname: dirname,
addr: addr,
peerAddresses: peers,
}
for _, opt := range opts {
opt(o)
}
mergeDefaultOptions(o)
return o
}
func mergeDefaultOptions(o *options) {
def := defaultOptions()
o.dirname = override.String(def.dirname, o.dirname)
o.kv = def.kv.Override(o.kv)
o.cluster = def.cluster.Override(o.cluster)
// Only allocate the default grpc client pool if the caller did not
// provide their own transport. Otherwise the eagerly-created pool would
// leak: nothing would reference it and nothing would close it.
if o.transport.Transport == nil {
pool := fgrpc.OpenPool("", grpc.WithTransportCredentials(insecure.NewCredentials()))
o.transport.ownedPool = pool
o.transport.Transport = grpct.New(pool)
}
o.Instrumentation = override.Zero(def.Instrumentation, o.Instrumentation)
o.cluster.Instrumentation = o.Child("cluster")
o.kv.Instrumentation = o.Child("kv")
o.cluster.HostAddress = o.addr
o.cluster.Pledge.Peers = o.peerAddresses
// If we're bootstrapping these options are ignored.
if o.bootstrap {
o.peerAddresses = []address.Address{}
o.cluster.Pledge.Peers = []address.Address{}
}
}
func defaultOptions() *options {
return &options{
dirname: "aspen",
cluster: cluster.DefaultConfig,
kv: kv.DefaultConfig,
}
}