Skip to content

Commit 4bd8faf

Browse files
authored
goat: verify sync v1.1 firehose (#988)
2 parents 0d12453 + 2bc4955 commit 4bd8faf

File tree

6 files changed

+260
-12
lines changed

6 files changed

+260
-12
lines changed

atproto/label/label.go

+25
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,31 @@ type Label struct {
2525
Sig data.Bytes `json:"sig,omitempty" cborgen:"sig,omitempty"`
2626
}
2727

28+
// converts to map[string]any for printing as JSON
29+
func (l *Label) Data() map[string]any {
30+
d := map[string]any{
31+
"cid": l.CID,
32+
"cts": l.CreatedAt,
33+
"src": l.SourceDID,
34+
"uri": l.URI,
35+
"val": l.Val,
36+
"ver": l.Version,
37+
}
38+
if l.CID != nil {
39+
d["cid"] = l.CID
40+
}
41+
if l.ExpiresAt != nil {
42+
d["exp"] = l.ExpiresAt
43+
}
44+
if l.Negated != nil {
45+
d["neg"] = l.Negated
46+
}
47+
if l.Sig != nil {
48+
d["sig"] = data.Bytes(l.Sig)
49+
}
50+
return d
51+
}
52+
2853
// does basic checks on syntax and structure
2954
func (l *Label) VerifySyntax() error {
3055
if l.Version != ATPROTO_LABEL_VERSION {

atproto/repo/commit.go

+18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66

77
"github.com/bluesky-social/indigo/atproto/crypto"
8+
"github.com/bluesky-social/indigo/atproto/data"
89
"github.com/bluesky-social/indigo/atproto/syntax"
910

1011
"github.com/ipfs/go-cid"
@@ -39,6 +40,23 @@ func (c *Commit) VerifyStructure() error {
3940
return nil
4041
}
4142

43+
// returns a representation of the commit object as atproto data (eg, for JSON serialization)
44+
func (c *Commit) AsData() map[string]any {
45+
d := map[string]any{
46+
"did": c.DID,
47+
"version": c.Version,
48+
"prev": (*data.CIDLink)(c.Prev),
49+
"data": data.CIDLink(c.Data),
50+
}
51+
if c.Sig != nil {
52+
d["sig"] = data.Bytes(c.Sig)
53+
}
54+
if c.Rev != "" {
55+
d["rev"] = c.Rev
56+
}
57+
return d
58+
}
59+
4260
// Encodes the commit object as DAG-CBOR, without the signature field. Used for signing or validating signatures.
4361
func (c *Commit) UnsignedBytes() ([]byte, error) {
4462
buf := new(bytes.Buffer)

atproto/repo/sync.go

-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ func VerifyCommitMessage(ctx context.Context, msg *comatproto.SyncSubscribeRepos
119119
logger.Info("prevData was null; skipping tree root check")
120120
}
121121

122-
logger.Info("success")
123122
return repo, nil
124123
}
125124

cmd/goat/firehose.go

+185-11
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111
"net/url"
1212
"os"
1313
"strings"
14+
"time"
1415

1516
comatproto "github.com/bluesky-social/indigo/api/atproto"
1617
"github.com/bluesky-social/indigo/atproto/data"
18+
"github.com/bluesky-social/indigo/atproto/identity"
1719
"github.com/bluesky-social/indigo/atproto/repo"
1820
"github.com/bluesky-social/indigo/atproto/syntax"
1921
"github.com/bluesky-social/indigo/events"
@@ -48,6 +50,23 @@ var cmdFirehose = &cli.Command{
4850
Name: "account-events",
4951
Usage: "only print account and identity events",
5052
},
53+
&cli.BoolFlag{
54+
Name: "quiet",
55+
Aliases: []string{"q"},
56+
Usage: "don't actually print events to stdout (eg, errors only)",
57+
},
58+
&cli.BoolFlag{
59+
Name: "verify-basic",
60+
Usage: "parse events and do basic syntax and structure checks",
61+
},
62+
&cli.BoolFlag{
63+
Name: "verify-sig",
64+
Usage: "verify account signatures on commits",
65+
},
66+
&cli.BoolFlag{
67+
Name: "verify-mst",
68+
Usage: "run inductive verification of ops and MST structure",
69+
},
5170
&cli.BoolFlag{
5271
Name: "ops",
5372
Aliases: []string{"records"},
@@ -58,24 +77,41 @@ var cmdFirehose = &cli.Command{
5877
}
5978

6079
type GoatFirehoseConsumer struct {
61-
// for pretty-printing events to stdout
62-
EventLogger *slog.Logger
6380
OpsMode bool
6481
AccountsOnly bool
82+
Quiet bool
83+
VerifyBasic bool
84+
VerifySig bool
85+
VerifyMST bool
6586
// filter to specified collections
6687
CollectionFilter []string
88+
// for signature verification
89+
Dir identity.Directory
6790
}
6891

6992
func runFirehose(cctx *cli.Context) error {
7093
ctx := context.Background()
7194

72-
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil)))
95+
slog.SetDefault(configLogger(cctx, os.Stderr))
96+
97+
// main thing is skipping handle verification
98+
bdir := identity.BaseDirectory{
99+
SkipHandleVerification: true,
100+
TryAuthoritativeDNS: false,
101+
SkipDNSDomainSuffixes: []string{".bsky.social"},
102+
UserAgent: "goat/" + versioninfo.Short(),
103+
}
104+
cdir := identity.NewCacheDirectory(&bdir, 1_000_000, time.Hour*24, time.Minute*2, time.Minute*5)
73105

74106
gfc := GoatFirehoseConsumer{
75-
EventLogger: slog.New(slog.NewJSONHandler(os.Stdout, nil)),
76107
OpsMode: cctx.Bool("ops"),
77108
AccountsOnly: cctx.Bool("account-events"),
78109
CollectionFilter: cctx.StringSlice("collection"),
110+
Quiet: cctx.Bool("quiet"),
111+
VerifyBasic: cctx.Bool("verify-basic"),
112+
VerifySig: cctx.Bool("verify-sig"),
113+
VerifyMST: cctx.Bool("verify-mst"),
114+
Dir: &cdir,
79115
}
80116

81117
var relayHost string
@@ -104,7 +140,6 @@ func runFirehose(cctx *cli.Context) error {
104140
u.RawQuery = fmt.Sprintf("cursor=%d", cctx.Int("cursor"))
105141
}
106142
urlString := u.String()
107-
slog.Debug("GET", "url", urlString)
108143
con, _, err := dialer.Dial(urlString, http.Header{
109144
"User-Agent": []string{fmt.Sprintf("goat/%s", versioninfo.Short())},
110145
})
@@ -114,7 +149,7 @@ func runFirehose(cctx *cli.Context) error {
114149

115150
rsc := &events.RepoStreamCallbacks{
116151
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
117-
slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq)
152+
//slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq)
118153
if !gfc.AccountsOnly && !gfc.OpsMode {
119154
return gfc.handleCommitEvent(ctx, evt)
120155
} else if !gfc.AccountsOnly && gfc.OpsMode {
@@ -123,26 +158,44 @@ func runFirehose(cctx *cli.Context) error {
123158
return nil
124159
},
125160
RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error {
126-
slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq)
161+
//slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq)
127162
if !gfc.AccountsOnly && !gfc.OpsMode {
128163
return gfc.handleSyncEvent(ctx, evt)
129164
}
130165
return nil
131166
},
132167
RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
133-
slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq)
168+
//slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq)
134169
if !gfc.OpsMode {
135170
return gfc.handleIdentityEvent(ctx, evt)
136171
}
137172
return nil
138173
},
139174
RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error {
140-
slog.Debug("account event", "did", evt.Did, "seq", evt.Seq)
175+
//slog.Debug("account event", "did", evt.Did, "seq", evt.Seq)
141176
if !gfc.OpsMode {
142177
return gfc.handleAccountEvent(ctx, evt)
143178
}
144179
return nil
145180
},
181+
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
182+
if gfc.VerifyBasic {
183+
slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq)
184+
}
185+
return nil
186+
},
187+
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
188+
if gfc.VerifyBasic {
189+
slog.Info("deprecated event type", "eventType", "migrate", "did", evt.Did, "seq", evt.Seq)
190+
}
191+
return nil
192+
},
193+
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
194+
if gfc.VerifyBasic {
195+
slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq)
196+
}
197+
return nil
198+
},
146199
}
147200

148201
scheduler := parallel.NewScheduler(
@@ -156,6 +209,21 @@ func runFirehose(cctx *cli.Context) error {
156209
}
157210

158211
func (gfc *GoatFirehoseConsumer) handleIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error {
212+
if gfc.VerifySig {
213+
did, err := syntax.ParseDID(evt.Did)
214+
if err != nil {
215+
return err
216+
}
217+
gfc.Dir.Purge(ctx, did.AtIdentifier())
218+
}
219+
if gfc.VerifyBasic {
220+
if _, err := syntax.ParseDID(evt.Did); err != nil {
221+
slog.Warn("invalid DID", "eventType", "identity", "did", evt.Did, "seq", evt.Seq)
222+
}
223+
}
224+
if gfc.Quiet {
225+
return nil
226+
}
159227
out := make(map[string]interface{})
160228
out["type"] = "identity"
161229
out["payload"] = evt
@@ -168,6 +236,14 @@ func (gfc *GoatFirehoseConsumer) handleIdentityEvent(ctx context.Context, evt *c
168236
}
169237

170238
func (gfc *GoatFirehoseConsumer) handleAccountEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) error {
239+
if gfc.VerifyBasic {
240+
if _, err := syntax.ParseDID(evt.Did); err != nil {
241+
slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq)
242+
}
243+
}
244+
if gfc.Quiet {
245+
return nil
246+
}
171247
out := make(map[string]interface{})
172248
out["type"] = "account"
173249
out["payload"] = evt
@@ -180,8 +256,25 @@ func (gfc *GoatFirehoseConsumer) handleAccountEvent(ctx context.Context, evt *co
180256
}
181257

182258
func (gfc *GoatFirehoseConsumer) handleSyncEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error {
259+
commit, err := repo.LoadCARCommit(ctx, bytes.NewReader(evt.Blocks))
260+
if err != nil {
261+
return err
262+
}
263+
if gfc.VerifyBasic {
264+
if err := commit.VerifyStructure(); err != nil {
265+
slog.Warn("bad commit object", "eventType", "sync", "did", evt.Did, "seq", evt.Seq, "err", err)
266+
}
267+
if _, err := syntax.ParseDID(evt.Did); err != nil {
268+
slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq)
269+
}
270+
}
271+
if gfc.Quiet {
272+
return nil
273+
}
274+
evt.Blocks = nil
183275
out := make(map[string]interface{})
184276
out["type"] = "sync"
277+
out["commit"] = commit.AsData()
185278
out["payload"] = evt
186279
b, err := json.Marshal(out)
187280
if err != nil {
@@ -194,6 +287,83 @@ func (gfc *GoatFirehoseConsumer) handleSyncEvent(ctx context.Context, evt *comat
194287
// this is the simple version, when not in "records" mode: print the event as JSON, but don't include blocks
195288
func (gfc *GoatFirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
196289

290+
if gfc.VerifyBasic || gfc.VerifySig || gfc.VerifyMST {
291+
292+
logger := slog.With("eventType", "commit", "did", evt.Repo, "seq", evt.Seq, "rev", evt.Rev)
293+
294+
did, err := syntax.ParseDID(evt.Repo)
295+
if err != nil {
296+
return err
297+
}
298+
299+
commit, err := repo.LoadCARCommit(ctx, bytes.NewReader(evt.Blocks))
300+
if err != nil {
301+
return err
302+
}
303+
304+
if gfc.VerifySig {
305+
ident, err := gfc.Dir.LookupDID(ctx, did)
306+
if err != nil {
307+
return err
308+
}
309+
pubkey, err := ident.PublicKey()
310+
if err != nil {
311+
return err
312+
}
313+
logger = logger.With("pds", ident.PDSEndpoint())
314+
if err := commit.VerifySignature(pubkey); err != nil {
315+
logger.Warn("commit signature validation failed", "err", err)
316+
}
317+
}
318+
319+
if len(evt.Blocks) == 0 {
320+
logger.Warn("commit message missing blocks")
321+
}
322+
323+
if gfc.VerifyBasic {
324+
// the commit itself
325+
if err := commit.VerifyStructure(); err != nil {
326+
logger.Warn("bad commit object", "err", err)
327+
}
328+
// the event fields
329+
rev, err := syntax.ParseTID(evt.Rev)
330+
if err != nil {
331+
logger.Warn("bad TID syntax in commit rev", "err", err)
332+
}
333+
if rev.String() != commit.Rev {
334+
logger.Warn("event rev != commit rev", "commitRev", commit.Rev)
335+
}
336+
if did.String() != commit.DID {
337+
logger.Warn("event DID != commit DID", "commitDID", commit.DID)
338+
}
339+
_, err = syntax.ParseDatetime(evt.Time)
340+
if err != nil {
341+
logger.Warn("bad datetime syntax in commit time", "time", evt.Time, "err", err)
342+
}
343+
if evt.TooBig {
344+
logger.Warn("deprecated tooBig commit flag set")
345+
}
346+
if evt.Rebase {
347+
logger.Warn("deprecated rebase commit flag set")
348+
}
349+
}
350+
351+
if gfc.VerifyMST {
352+
if evt.PrevData == nil {
353+
logger.Warn("prevData is nil, skipping MST check")
354+
} else {
355+
// TODO: break out this function in to smaller chunks
356+
if _, err := repo.VerifyCommitMessage(ctx, evt); err != nil {
357+
logger.Warn("failed to invert commit MST", "err", err)
358+
}
359+
}
360+
}
361+
}
362+
363+
if gfc.Quiet {
364+
return nil
365+
}
366+
197367
// apply collections filter
198368
if len(gfc.CollectionFilter) > 0 {
199369
keep := false
@@ -302,14 +472,18 @@ func (gfc *GoatFirehoseConsumer) handleCommitEventOps(ctx context.Context, evt *
302472
if err != nil {
303473
return err
304474
}
305-
fmt.Println(string(b))
475+
if !gfc.Quiet {
476+
fmt.Println(string(b))
477+
}
306478
case "delete":
307479
out["action"] = "delete"
308480
b, err := json.Marshal(out)
309481
if err != nil {
310482
return err
311483
}
312-
fmt.Println(string(b))
484+
if !gfc.Quiet {
485+
fmt.Println(string(b))
486+
}
313487
default:
314488
logger.Error("unexpected record op kind")
315489
}

0 commit comments

Comments
 (0)