Skip to content

Commit d8c91ad

Browse files
committed
treat sync1.1 errors as warnings
1 parent 6aa979f commit d8c91ad

File tree

3 files changed

+150
-38
lines changed

3 files changed

+150
-38
lines changed

cmd/relay/bgs/bgs.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ type BGSConfig struct {
101101

102102
// AdminToken checked against "Authorization: Bearer {}" header
103103
AdminToken string
104+
105+
Sync11ErrorsAreWarnings bool
104106
}
105107

106108
func DefaultBGSConfig() *BGSConfig {
@@ -112,7 +114,7 @@ func DefaultBGSConfig() *BGSConfig {
112114
}
113115
}
114116

115-
func NewBGS(db *gorm.DB, validator *Validator, evtman *events.EventManager, didd identity.Directory, config *BGSConfig) (*BGS, error) {
117+
func NewBGS(db *gorm.DB, evtman *events.EventManager, didd identity.Directory, config *BGSConfig) (*BGS, error) {
116118

117119
if config == nil {
118120
config = DefaultBGSConfig()
@@ -132,6 +134,8 @@ func NewBGS(db *gorm.DB, validator *Validator, evtman *events.EventManager, didd
132134

133135
uc, _ := lru.New[string, *Account](1_000_000)
134136

137+
validator := NewValidator(didd, config.InductionTraceLog, config.Sync11ErrorsAreWarnings)
138+
135139
bgs := &BGS{
136140
db: db,
137141

cmd/relay/bgs/validator.go

+137-33
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020

2121
const defaultMaxRevFuture = time.Hour
2222

23-
func NewValidator(directory identity.Directory, inductionTraceLog *slog.Logger) *Validator {
23+
func NewValidator(directory identity.Directory, inductionTraceLog *slog.Logger, sync11ErrorsAreWarnings bool) *Validator {
2424
maxRevFuture := defaultMaxRevFuture // TODO: configurable
2525
ErrRevTooFarFuture := fmt.Errorf("new rev is > %s in the future", maxRevFuture)
2626

@@ -30,9 +30,10 @@ func NewValidator(directory identity.Directory, inductionTraceLog *slog.Logger)
3030
inductionTraceLog: inductionTraceLog,
3131
directory: directory,
3232

33-
maxRevFuture: maxRevFuture,
34-
ErrRevTooFarFuture: ErrRevTooFarFuture,
35-
AllowSignatureNotFound: true, // TODO: configurable
33+
maxRevFuture: maxRevFuture,
34+
ErrRevTooFarFuture: ErrRevTooFarFuture,
35+
AllowSignatureNotFound: true, // TODO: configurable
36+
Sync11ErrorsAreWarnings: sync11ErrorsAreWarnings,
3637
}
3738
}
3839

@@ -56,6 +57,8 @@ type Validator struct {
5657
// AllowSignatureNotFound enables counting messages without findable public key to pass through with a warning counter
5758
// TODO: refine this for what kind of 'not found' we accept.
5859
AllowSignatureNotFound bool
60+
61+
Sync11ErrorsAreWarnings bool
5962
}
6063

6164
type NextCommitHandler interface {
@@ -129,36 +132,56 @@ func (val *Validator) VerifyCommitMessage(ctx context.Context, host *models.PDS,
129132
hostname := host.Host
130133
hasWarning := false
131134
commitVerifyStarts.Inc()
132-
logger := slog.Default().With("did", msg.Repo, "rev", msg.Rev, "seq", msg.Seq, "time", msg.Time)
135+
logger := slog.Default().With("host", hostname, "did", msg.Repo, "rev", msg.Rev, "seq", msg.Seq, "time", msg.Time)
133136

134137
did, err := syntax.ParseDID(msg.Repo)
135138
if err != nil {
136139
commitVerifyErrors.WithLabelValues(hostname, "did").Inc()
137-
return nil, err
140+
if !val.Sync11ErrorsAreWarnings {
141+
return nil, err
142+
} else {
143+
logger.Warn("invalid did", "err", err)
144+
}
138145
}
139146
rev, err := syntax.ParseTID(msg.Rev)
140147
if err != nil {
141148
commitVerifyErrors.WithLabelValues(hostname, "tid").Inc()
142-
return nil, err
149+
if !val.Sync11ErrorsAreWarnings {
150+
return nil, err
151+
} else {
152+
logger.Warn("invalid rev", "err", err)
153+
}
143154
}
144155
if prevRoot != nil {
145156
prevRev := prevRoot.GetRev()
146157
curTime := rev.Time()
147158
prevTime := prevRev.Time()
148159
if curTime.Before(prevTime) {
149160
commitVerifyErrors.WithLabelValues(hostname, "revb").Inc()
150-
dt := prevTime.Sub(curTime)
151-
return nil, &revOutOfOrderError{dt}
161+
if !val.Sync11ErrorsAreWarnings {
162+
dt := prevTime.Sub(curTime)
163+
return nil, &revOutOfOrderError{dt}
164+
} else {
165+
logger.Warn("new rev before old rev", "prev rev", prevRev, "rev", rev)
166+
}
152167
}
153168
}
154169
if rev.Time().After(time.Now().Add(val.maxRevFuture)) {
155170
commitVerifyErrors.WithLabelValues(hostname, "revf").Inc()
156-
return nil, val.ErrRevTooFarFuture
171+
if !val.Sync11ErrorsAreWarnings {
172+
return nil, val.ErrRevTooFarFuture
173+
} else {
174+
logger.Warn("far future rev", "now", time.Now(), "rev", rev.Time(), "err", err)
175+
}
157176
}
158177
_, err = syntax.ParseDatetime(msg.Time)
159178
if err != nil {
160179
commitVerifyErrors.WithLabelValues(hostname, "time").Inc()
161-
return nil, err
180+
if !val.Sync11ErrorsAreWarnings {
181+
return nil, err
182+
} else {
183+
logger.Warn("invalid time", "err", err)
184+
}
162185
}
163186

164187
if msg.TooBig {
@@ -177,16 +200,28 @@ func (val *Validator) VerifyCommitMessage(ctx context.Context, host *models.PDS,
177200
commit, repoFragment, err := atrepo.LoadFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks)))
178201
if err != nil {
179202
commitVerifyErrors.WithLabelValues(hostname, "car").Inc()
180-
return nil, err
203+
if !val.Sync11ErrorsAreWarnings {
204+
return nil, err
205+
} else {
206+
logger.Warn("invalid car", "err", err)
207+
}
181208
}
182209

183210
if commit.Rev != rev.String() {
184211
commitVerifyErrors.WithLabelValues(hostname, "rev").Inc()
185-
return nil, fmt.Errorf("rev did not match commit")
212+
if !val.Sync11ErrorsAreWarnings {
213+
return nil, fmt.Errorf("rev did not match commit")
214+
} else {
215+
logger.Warn("message rev != commit.rev")
216+
}
186217
}
187218
if commit.DID != did.String() {
188219
commitVerifyErrors.WithLabelValues(hostname, "did2").Inc()
189-
return nil, fmt.Errorf("rev did not match commit")
220+
if !val.Sync11ErrorsAreWarnings {
221+
return nil, fmt.Errorf("rev did not match commit")
222+
} else {
223+
logger.Warn("message did != commit.did")
224+
}
190225
}
191226

192227
err = val.VerifyCommitSignature(ctx, commit, hostname, &hasWarning)
@@ -202,21 +237,37 @@ func (val *Validator) VerifyCommitMessage(ctx context.Context, host *models.PDS,
202237
nsid, rkey, err := syntax.ParseRepoPath(op.Path)
203238
if err != nil {
204239
commitVerifyErrors.WithLabelValues(hostname, "opp").Inc()
205-
return nil, fmt.Errorf("invalid repo path in ops list: %w", err)
240+
if !val.Sync11ErrorsAreWarnings {
241+
return nil, fmt.Errorf("invalid repo path in ops list: %w", err)
242+
} else {
243+
logger.Warn("invalid repo path", "err", err)
244+
}
206245
}
207-
val, err := repoFragment.GetRecordCID(ctx, nsid, rkey)
246+
rcid, err := repoFragment.GetRecordCID(ctx, nsid, rkey)
208247
if err != nil {
209248
commitVerifyErrors.WithLabelValues(hostname, "rcid").Inc()
210-
return nil, err
249+
if !val.Sync11ErrorsAreWarnings {
250+
return nil, err
251+
} else {
252+
logger.Warn("invalid record cid", "err", err)
253+
}
211254
}
212-
if *c != *val {
255+
if *c != *rcid {
213256
commitVerifyErrors.WithLabelValues(hostname, "opc").Inc()
214-
return nil, fmt.Errorf("record op doesn't match MST tree value")
257+
if !val.Sync11ErrorsAreWarnings {
258+
return nil, fmt.Errorf("record op doesn't match MST tree value")
259+
} else {
260+
logger.Warn("record op doesn't match MST tree value")
261+
}
215262
}
216263
_, _, err = repoFragment.GetRecordBytes(ctx, nsid, rkey)
217264
if err != nil {
218265
commitVerifyErrors.WithLabelValues(hostname, "rec").Inc()
219-
return nil, err
266+
if !val.Sync11ErrorsAreWarnings {
267+
return nil, err
268+
} else {
269+
logger.Warn("could not get record bytes", "err", err)
270+
}
220271
}
221272
}
222273
}
@@ -257,30 +308,50 @@ func (val *Validator) VerifyCommitMessage(ctx context.Context, host *models.PDS,
257308
ops, err := ParseCommitOps(msg.Ops)
258309
if err != nil {
259310
commitVerifyErrors.WithLabelValues(hostname, "pop").Inc()
260-
return nil, err
311+
if !val.Sync11ErrorsAreWarnings {
312+
return nil, err
313+
} else {
314+
logger.Warn("invalid commit ops", "err", err)
315+
}
261316
}
262317
ops, err = atrepo.NormalizeOps(ops)
263318
if err != nil {
264319
commitVerifyErrors.WithLabelValues(hostname, "nop").Inc()
265-
return nil, err
320+
if !val.Sync11ErrorsAreWarnings {
321+
return nil, err
322+
} else {
323+
logger.Warn("could not normalize ops", "err", err)
324+
}
266325
}
267326

268327
invTree := repoFragment.MST.Copy()
269328
for _, op := range ops {
270329
if err := atrepo.InvertOp(&invTree, &op); err != nil {
271330
commitVerifyErrors.WithLabelValues(hostname, "inv").Inc()
272-
return nil, err
331+
if !val.Sync11ErrorsAreWarnings {
332+
return nil, err
333+
} else {
334+
logger.Warn("could not invert op", "err", err)
335+
}
273336
}
274337
}
275338
computed, err := invTree.RootCID()
276339
if err != nil {
277340
commitVerifyErrors.WithLabelValues(hostname, "it").Inc()
278-
return nil, err
341+
if !val.Sync11ErrorsAreWarnings {
342+
return nil, err
343+
} else {
344+
logger.Warn("inverted tree could not get root cid", "err", err)
345+
}
279346
}
280347
if *computed != *c {
281348
// this is self-inconsistent malformed data
282349
commitVerifyErrors.WithLabelValues(hostname, "pd").Inc()
283-
return nil, fmt.Errorf("inverted tree root didn't match prevData")
350+
if !val.Sync11ErrorsAreWarnings {
351+
return nil, fmt.Errorf("inverted tree root didn't match prevData")
352+
} else {
353+
logger.Warn("inverted tree root didn't match prevData")
354+
}
284355
}
285356
//logger.Debug("prevData matched", "prevData", c.String(), "computed", computed.String())
286357

@@ -305,46 +376,79 @@ func (val *Validator) VerifyCommitMessage(ctx context.Context, host *models.PDS,
305376
func (val *Validator) HandleSync(ctx context.Context, host *models.PDS, msg *atproto.SyncSubscribeRepos_Sync) (newRoot *cid.Cid, err error) {
306377
hostname := host.Host
307378
hasWarning := false
379+
logger := slog.Default().With("host", hostname, "did", msg.Did, "rev", msg.Rev, "seq", msg.Seq, "time", msg.Time)
308380

309381
did, err := syntax.ParseDID(msg.Did)
310382
if err != nil {
311383
syncVerifyErrors.WithLabelValues(hostname, "did").Inc()
312-
return nil, err
384+
if !val.Sync11ErrorsAreWarnings {
385+
return nil, err
386+
} else {
387+
logger.Warn("invalid did", "err", err)
388+
}
313389
}
314390
rev, err := syntax.ParseTID(msg.Rev)
315391
if err != nil {
316392
syncVerifyErrors.WithLabelValues(hostname, "tid").Inc()
317-
return nil, err
393+
if !val.Sync11ErrorsAreWarnings {
394+
return nil, err
395+
} else {
396+
logger.Warn("invalid rev", "err", err)
397+
}
318398
}
319399
if rev.Time().After(time.Now().Add(val.maxRevFuture)) {
320400
syncVerifyErrors.WithLabelValues(hostname, "revf").Inc()
321-
return nil, val.ErrRevTooFarFuture
401+
if !val.Sync11ErrorsAreWarnings {
402+
return nil, val.ErrRevTooFarFuture
403+
} else {
404+
logger.Warn("invalid rev too far future", "now", time.Now(), "rev", rev.Time())
405+
}
322406
}
323407
_, err = syntax.ParseDatetime(msg.Time)
324408
if err != nil {
325409
syncVerifyErrors.WithLabelValues(hostname, "time").Inc()
326-
return nil, err
410+
if !val.Sync11ErrorsAreWarnings {
411+
return nil, err
412+
} else {
413+
logger.Warn("invalid time", "err", err)
414+
}
327415
}
328416

329417
commit, err := atrepo.LoadCARCommit(ctx, bytes.NewReader([]byte(msg.Blocks)))
330418
if err != nil {
331419
commitVerifyErrors.WithLabelValues(hostname, "car").Inc()
332-
return nil, err
420+
if !val.Sync11ErrorsAreWarnings {
421+
return nil, err
422+
} else {
423+
logger.Warn("invalid car", "err", err)
424+
}
333425
}
334426

335427
if commit.Rev != rev.String() {
336428
commitVerifyErrors.WithLabelValues(hostname, "rev").Inc()
337-
return nil, fmt.Errorf("rev did not match commit")
429+
if !val.Sync11ErrorsAreWarnings {
430+
return nil, fmt.Errorf("rev did not match commit")
431+
} else {
432+
logger.Warn("message rev != commit.rev")
433+
}
338434
}
339435
if commit.DID != did.String() {
340436
commitVerifyErrors.WithLabelValues(hostname, "did2").Inc()
341-
return nil, fmt.Errorf("rev did not match commit")
437+
if !val.Sync11ErrorsAreWarnings {
438+
return nil, fmt.Errorf("did did not match commit")
439+
} else {
440+
logger.Warn("message did != commit.did")
441+
}
342442
}
343443

344444
err = val.VerifyCommitSignature(ctx, commit, hostname, &hasWarning)
345445
if err != nil {
346446
// signature errors are metrics counted inside VerifyCommitSignature()
347-
return nil, err
447+
if !val.Sync11ErrorsAreWarnings {
448+
return nil, err
449+
} else {
450+
logger.Warn("invalid sig", "err", err)
451+
}
348452
}
349453

350454
return &commit.Data, nil

cmd/relay/main.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ func run(args []string) error {
162162
Value: false,
163163
Usage: "make outbound firehose sequence number approximately unix microseconds",
164164
},
165+
&cli.BoolFlag{
166+
Name: "sync11-errors-are-warnings",
167+
EnvVars: []string{"RELAY_SYNC11_JUST_WARN"},
168+
Value: true,
169+
Usage: "don't fail traffic on sync1.1 errors",
170+
},
165171
}
166172

167173
app.Action = runRelay
@@ -289,9 +295,6 @@ func runRelay(cctx *cli.Context) error {
289295
}
290296
cacheDir := identity.NewCacheDirectory(&baseDir, cctx.Int("did-cache-size"), time.Hour*24, time.Minute*2, time.Minute*5)
291297

292-
// TODO: rename repoman
293-
repoman := libbgs.NewValidator(&cacheDir, inductionTraceLog)
294-
295298
var persister events.EventPersistence
296299

297300
dpd := cctx.String("disk-persister-dir")
@@ -342,6 +345,7 @@ func runRelay(cctx *cli.Context) error {
342345
bgsConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit")
343346
bgsConfig.ApplyPDSClientSettings = makePdsClientSetup(ratelimitBypass)
344347
bgsConfig.InductionTraceLog = inductionTraceLog
348+
bgsConfig.Sync11ErrorsAreWarnings = cctx.Bool("sync11-errors-are-warnings")
345349
nextCrawlers := cctx.StringSlice("next-crawler")
346350
if len(nextCrawlers) != 0 {
347351
nextCrawlerUrls := make([]*url.URL, len(nextCrawlers))
@@ -363,7 +367,7 @@ func runRelay(cctx *cli.Context) error {
363367
bgsConfig.AdminToken = base64.URLEncoding.EncodeToString(rblob[:])
364368
logger.Info("generated random admin key", "header", "Authorization: Bearer "+bgsConfig.AdminToken)
365369
}
366-
bgs, err := libbgs.NewBGS(db, repoman, evtman, &cacheDir, bgsConfig)
370+
bgs, err := libbgs.NewBGS(db, evtman, &cacheDir, bgsConfig)
367371
if err != nil {
368372
return err
369373
}

0 commit comments

Comments
 (0)