-
-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Expand file tree
/
Copy pathprovide.go
More file actions
285 lines (248 loc) · 10.8 KB
/
provide.go
File metadata and controls
285 lines (248 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package config
import (
"fmt"
"strings"
"time"
"github.com/libp2p/go-libp2p-kad-dht/amino"
)
const (
DefaultProvideEnabled = true
DefaultProvideStrategy = "all"
// DefaultProvideBloomFPRate is the target false positive rate for the
// bloom filter used by +unique and +entities reprovide cycles and
// fast-provide-dag walks. Expressed as 1/N (one false positive per N
// lookups). At ~1 in 4.75M (~0.00002%) each CID costs ~4 bytes before
// ipfs/bbloom's power-of-two rounding.
//
// Kubo owns this default independently of boxo/dag/walker; the two
// values may diverge over time without coordination.
DefaultProvideBloomFPRate = 4_750_000
// MinProvideBloomFPRate is the smallest accepted Provide.BloomFPRate.
// Below 1 in 1M the bloom filter becomes lossy enough to drop a
// meaningful fraction of CIDs from each reprovide cycle (e.g. at
// rate=10_000 a 100M-CID repo skips ~10K CIDs per cycle).
MinProvideBloomFPRate = 1_000_000
// DHT provider defaults
DefaultProvideDHTInterval = 22 * time.Hour // https://github.com/ipfs/kubo/pull/9326
DefaultProvideDHTMaxWorkers = 16 // Unified default for both sweep and legacy providers
DefaultProvideDHTSweepEnabled = true
DefaultProvideDHTResumeEnabled = true
DefaultProvideDHTDedicatedPeriodicWorkers = 2
DefaultProvideDHTDedicatedBurstWorkers = 1
DefaultProvideDHTMaxProvideConnsPerWorker = 20
DefaultProvideDHTKeystoreBatchSize = 1 << 14 // ~544 KiB per batch (1 multihash = 34 bytes)
DefaultProvideDHTOfflineDelay = 2 * time.Hour
// DefaultFastProvideTimeout is the maximum time allowed for fast-provide operations.
// Prevents hanging on network issues when providing root CID.
// 10 seconds is sufficient for DHT operations with sweep provider or accelerated client.
DefaultFastProvideTimeout = 10 * time.Second
)
type ProvideStrategy int
const (
ProvideStrategyAll ProvideStrategy = 1 << iota
ProvideStrategyPinned
ProvideStrategyRoots
ProvideStrategyMFS
ProvideStrategyUnique // bloom filter cross-DAG deduplication
ProvideStrategyEntities // entity-aware traversal (implies Unique)
)
// Provide configures both immediate CID announcements (provide operations) for new content
// and periodic re-announcements of existing CIDs (reprovide operations).
// This section combines the functionality previously split between Provider and Reprovider.
type Provide struct {
// Enabled controls whether both provide and reprovide systems are enabled.
// When disabled, the node will not announce any content to the routing system.
Enabled Flag `json:",omitempty"`
// Strategy determines which CIDs are announced to the routing system.
// Default: DefaultProvideStrategy
Strategy *OptionalString `json:",omitempty"`
// BloomFPRate sets the target false positive rate of the bloom filter
// used by Provide.Strategy modifiers +unique and +entities (and the
// matching fast-provide-dag walk). Expressed as 1/N (one false
// positive per N lookups), so higher N means lower FP rate but more
// memory per CID. Only takes effect when Provide.Strategy includes
// +unique or +entities.
//
// Default: DefaultProvideBloomFPRate
BloomFPRate *OptionalInteger `json:",omitempty"`
// DHT configures DHT-specific provide and reprovide settings.
DHT ProvideDHT
}
// ProvideDHT configures DHT provider settings for both immediate announcements
// and periodic reprovides.
type ProvideDHT struct {
// Interval sets the time between rounds of reproviding local content
// to the routing system. Set to "0" to disable content reproviding.
// Default: DefaultProvideDHTInterval
Interval *OptionalDuration `json:",omitempty"`
// MaxWorkers sets the maximum number of concurrent workers for provide operations.
// When SweepEnabled is false: controls NEW CID announcements only.
// When SweepEnabled is true: controls total worker pool for all operations.
// Default: DefaultProvideDHTMaxWorkers
MaxWorkers *OptionalInteger `json:",omitempty"`
// SweepEnabled activates the sweeping reprovider system which spreads
// reprovide operations over time.
// Default: DefaultProvideDHTSweepEnabled
SweepEnabled Flag `json:",omitempty"`
// DedicatedPeriodicWorkers sets workers dedicated to periodic reprovides (sweep mode only).
// Default: DefaultProvideDHTDedicatedPeriodicWorkers
DedicatedPeriodicWorkers *OptionalInteger `json:",omitempty"`
// DedicatedBurstWorkers sets workers dedicated to burst provides (sweep mode only).
// Default: DefaultProvideDHTDedicatedBurstWorkers
DedicatedBurstWorkers *OptionalInteger `json:",omitempty"`
// MaxProvideConnsPerWorker sets concurrent connections per worker for sending provider records (sweep mode only).
// Default: DefaultProvideDHTMaxProvideConnsPerWorker
MaxProvideConnsPerWorker *OptionalInteger `json:",omitempty"`
// KeystoreBatchSize sets the batch size for keystore operations during reprovide refresh (sweep mode only).
// Default: DefaultProvideDHTKeystoreBatchSize
KeystoreBatchSize *OptionalInteger `json:",omitempty"`
// OfflineDelay sets the delay after which the provider switches from Disconnected to Offline state (sweep mode only).
// Default: DefaultProvideDHTOfflineDelay
OfflineDelay *OptionalDuration `json:",omitempty"`
// ResumeEnabled controls whether the provider resumes from its previous state on restart.
// When enabled, the provider persists its reprovide cycle state and provide queue to the datastore,
// and restores them on restart. When disabled, the provider starts fresh on each restart.
// Default: true
ResumeEnabled Flag `json:",omitempty"`
}
func ParseProvideStrategy(s string) (ProvideStrategy, error) {
var strategy ProvideStrategy
for part := range strings.SplitSeq(s, "+") {
switch part {
case "all", "flat":
strategy |= ProvideStrategyAll
case "":
// empty string (default config) maps to "all",
// but empty tokens from splitting (e.g. "pinned+") are invalid
if s == "" {
strategy |= ProvideStrategyAll
} else {
return 0, fmt.Errorf("invalid provide strategy: empty token in %q", s)
}
case "pinned":
strategy |= ProvideStrategyPinned
case "roots":
strategy |= ProvideStrategyRoots
case "mfs":
strategy |= ProvideStrategyMFS
case "unique":
strategy |= ProvideStrategyUnique
case "entities":
strategy |= ProvideStrategyEntities | ProvideStrategyUnique
default:
return 0, fmt.Errorf("unknown provide strategy token: %q in %q", part, s)
}
}
// "all" provides every block and cannot be combined with selective strategies
if strategy&ProvideStrategyAll != 0 && strategy != ProvideStrategyAll {
return 0, fmt.Errorf("\"all\" strategy cannot be combined with other strategies in %q", s)
}
// +unique/+entities require a base strategy that walks DAGs (pinned and/or mfs)
wantsDedup := strategy&(ProvideStrategyUnique|ProvideStrategyEntities) != 0
if wantsDedup {
walksDAGs := strategy&(ProvideStrategyPinned|ProvideStrategyMFS) != 0
if !walksDAGs {
return 0, fmt.Errorf("+unique/+entities must combine with pinned and/or mfs in %q", s)
}
if strategy&ProvideStrategyRoots != 0 {
return 0, fmt.Errorf("+unique/+entities is incompatible with roots in %q", s)
}
}
return strategy, nil
}
// MustParseProvideStrategy is like ParseProvideStrategy but panics on error.
// Use with strategy strings that have already been validated at startup.
func MustParseProvideStrategy(s string) ProvideStrategy {
strategy, err := ParseProvideStrategy(s)
if err != nil {
panic(err)
}
return strategy
}
// ValidateProvideConfig validates the Provide configuration according to DHT requirements.
func ValidateProvideConfig(cfg *Provide) error {
// Validate Provide.Strategy
strategy := cfg.Strategy.WithDefault(DefaultProvideStrategy)
if _, err := ParseProvideStrategy(strategy); err != nil {
return fmt.Errorf("Provide.Strategy: %w", err)
}
// Validate Provide.BloomFPRate
if !cfg.BloomFPRate.IsDefault() {
rate := cfg.BloomFPRate.WithDefault(DefaultProvideBloomFPRate)
if rate < MinProvideBloomFPRate {
return fmt.Errorf("Provide.BloomFPRate must be >= %d (1 in 1M), got %d", MinProvideBloomFPRate, rate)
}
}
// Validate Provide.DHT.Interval
if !cfg.DHT.Interval.IsDefault() {
interval := cfg.DHT.Interval.WithDefault(DefaultProvideDHTInterval)
if interval > amino.DefaultProvideValidity {
return fmt.Errorf("Provide.DHT.Interval (%v) must be less than or equal to DHT provider record validity (%v)", interval, amino.DefaultProvideValidity)
}
if interval < 0 {
return fmt.Errorf("Provide.DHT.Interval must be non-negative, got %v", interval)
}
}
// Validate MaxWorkers
if !cfg.DHT.MaxWorkers.IsDefault() {
maxWorkers := cfg.DHT.MaxWorkers.WithDefault(DefaultProvideDHTMaxWorkers)
if maxWorkers <= 0 {
return fmt.Errorf("Provide.DHT.MaxWorkers must be positive, got %d", maxWorkers)
}
}
// Validate DedicatedPeriodicWorkers
if !cfg.DHT.DedicatedPeriodicWorkers.IsDefault() {
workers := cfg.DHT.DedicatedPeriodicWorkers.WithDefault(DefaultProvideDHTDedicatedPeriodicWorkers)
if workers < 0 {
return fmt.Errorf("Provide.DHT.DedicatedPeriodicWorkers must be non-negative, got %d", workers)
}
}
// Validate DedicatedBurstWorkers
if !cfg.DHT.DedicatedBurstWorkers.IsDefault() {
workers := cfg.DHT.DedicatedBurstWorkers.WithDefault(DefaultProvideDHTDedicatedBurstWorkers)
if workers < 0 {
return fmt.Errorf("Provide.DHT.DedicatedBurstWorkers must be non-negative, got %d", workers)
}
}
// Validate MaxProvideConnsPerWorker
if !cfg.DHT.MaxProvideConnsPerWorker.IsDefault() {
conns := cfg.DHT.MaxProvideConnsPerWorker.WithDefault(DefaultProvideDHTMaxProvideConnsPerWorker)
if conns <= 0 {
return fmt.Errorf("Provide.DHT.MaxProvideConnsPerWorker must be positive, got %d", conns)
}
}
// Validate KeystoreBatchSize
if !cfg.DHT.KeystoreBatchSize.IsDefault() {
batchSize := cfg.DHT.KeystoreBatchSize.WithDefault(DefaultProvideDHTKeystoreBatchSize)
if batchSize <= 0 {
return fmt.Errorf("Provide.DHT.KeystoreBatchSize must be positive, got %d", batchSize)
}
}
// Validate OfflineDelay
if !cfg.DHT.OfflineDelay.IsDefault() {
delay := cfg.DHT.OfflineDelay.WithDefault(DefaultProvideDHTOfflineDelay)
if delay < 0 {
return fmt.Errorf("Provide.DHT.OfflineDelay must be non-negative, got %v", delay)
}
}
return nil
}
// ShouldProvideForStrategy determines if content should be provided based on the provide strategy
// and content characteristics (pinned status, root status, MFS status).
func ShouldProvideForStrategy(strategy ProvideStrategy, isPinned bool, isPinnedRoot bool, isMFS bool) bool {
if strategy&ProvideStrategyAll != 0 {
// 'all' strategy: always provide
return true
}
// For combined strategies, check each component
if strategy&ProvideStrategyPinned != 0 && isPinned {
return true
}
if strategy&ProvideStrategyRoots != 0 && isPinnedRoot {
return true
}
if strategy&ProvideStrategyMFS != 0 && isMFS {
return true
}
return false
}