Skip to content

Commit cbee701

Browse files
committed
goat: add firehose verify args
1 parent 6220dc6 commit cbee701

File tree

3 files changed

+211
-11
lines changed

3 files changed

+211
-11
lines changed

cmd/goat/firehose.go

+179-11
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111
"net/url"
1212
"os"
1313
"strings"
14+
"time"
1415

1516
comatproto "github.com/bluesky-social/indigo/api/atproto"
1617
"github.com/bluesky-social/indigo/atproto/data"
18+
"github.com/bluesky-social/indigo/atproto/identity"
1719
"github.com/bluesky-social/indigo/atproto/repo"
1820
"github.com/bluesky-social/indigo/atproto/syntax"
1921
"github.com/bluesky-social/indigo/events"
@@ -48,6 +50,23 @@ var cmdFirehose = &cli.Command{
4850
Name: "account-events",
4951
Usage: "only print account and identity events",
5052
},
53+
&cli.BoolFlag{
54+
Name: "quiet",
55+
Aliases: []string{"q"},
56+
Usage: "don't actually print events to stdout (eg, errors only)",
57+
},
58+
&cli.BoolFlag{
59+
Name: "verify-basic",
60+
Usage: "parse events and do basic syntax and structure checks",
61+
},
62+
&cli.BoolFlag{
63+
Name: "verify-sig",
64+
Usage: "verify account signatures on commits",
65+
},
66+
&cli.BoolFlag{
67+
Name: "verify-mst",
68+
Usage: "run inductive verification of ops and MST structure",
69+
},
5170
&cli.BoolFlag{
5271
Name: "ops",
5372
Aliases: []string{"records"},
@@ -58,24 +77,41 @@ var cmdFirehose = &cli.Command{
5877
}
5978

6079
type GoatFirehoseConsumer struct {
61-
// for pretty-printing events to stdout
62-
EventLogger *slog.Logger
6380
OpsMode bool
6481
AccountsOnly bool
82+
Quiet bool
83+
VerifyBasic bool
84+
VerifySig bool
85+
VerifyMST bool
6586
// filter to specified collections
6687
CollectionFilter []string
88+
// for signature verification
89+
Dir identity.Directory
6790
}
6891

6992
func runFirehose(cctx *cli.Context) error {
7093
ctx := context.Background()
7194

72-
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil)))
95+
slog.SetDefault(configLogger(cctx, os.Stderr))
96+
97+
// main thing is skipping handle verification
98+
bdir := identity.BaseDirectory{
99+
SkipHandleVerification: true,
100+
TryAuthoritativeDNS: false,
101+
SkipDNSDomainSuffixes: []string{".bsky.social"},
102+
UserAgent: "goat/" + versioninfo.Short(),
103+
}
104+
cdir := identity.NewCacheDirectory(&bdir, 1_000_000, time.Hour*24, time.Minute*2, time.Minute*5)
73105

74106
gfc := GoatFirehoseConsumer{
75-
EventLogger: slog.New(slog.NewJSONHandler(os.Stdout, nil)),
76107
OpsMode: cctx.Bool("ops"),
77108
AccountsOnly: cctx.Bool("account-events"),
78109
CollectionFilter: cctx.StringSlice("collection"),
110+
Quiet: cctx.Bool("quiet"),
111+
VerifyBasic: cctx.Bool("verify-basic"),
112+
VerifySig: cctx.Bool("verify-sig"),
113+
VerifyMST: cctx.Bool("verify-mst"),
114+
Dir: &cdir,
79115
}
80116

81117
var relayHost string
@@ -104,7 +140,6 @@ func runFirehose(cctx *cli.Context) error {
104140
u.RawQuery = fmt.Sprintf("cursor=%d", cctx.Int("cursor"))
105141
}
106142
urlString := u.String()
107-
slog.Debug("GET", "url", urlString)
108143
con, _, err := dialer.Dial(urlString, http.Header{
109144
"User-Agent": []string{fmt.Sprintf("goat/%s", versioninfo.Short())},
110145
})
@@ -114,7 +149,7 @@ func runFirehose(cctx *cli.Context) error {
114149

115150
rsc := &events.RepoStreamCallbacks{
116151
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
117-
slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq)
152+
//slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq)
118153
if !gfc.AccountsOnly && !gfc.OpsMode {
119154
return gfc.handleCommitEvent(ctx, evt)
120155
} else if !gfc.AccountsOnly && gfc.OpsMode {
@@ -123,26 +158,44 @@ func runFirehose(cctx *cli.Context) error {
123158
return nil
124159
},
125160
RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error {
126-
slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq)
161+
//slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq)
127162
if !gfc.AccountsOnly && !gfc.OpsMode {
128163
return gfc.handleSyncEvent(ctx, evt)
129164
}
130165
return nil
131166
},
132167
RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
133-
slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq)
168+
//slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq)
134169
if !gfc.OpsMode {
135170
return gfc.handleIdentityEvent(ctx, evt)
136171
}
137172
return nil
138173
},
139174
RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error {
140-
slog.Debug("account event", "did", evt.Did, "seq", evt.Seq)
175+
//slog.Debug("account event", "did", evt.Did, "seq", evt.Seq)
141176
if !gfc.OpsMode {
142177
return gfc.handleAccountEvent(ctx, evt)
143178
}
144179
return nil
145180
},
181+
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
182+
if gfc.VerifyBasic {
183+
slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq)
184+
}
185+
return nil
186+
},
187+
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
188+
if gfc.VerifyBasic {
189+
slog.Info("deprecated event type", "eventType", "migrate", "did", evt.Did, "seq", evt.Seq)
190+
}
191+
return nil
192+
},
193+
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
194+
if gfc.VerifyBasic {
195+
slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq)
196+
}
197+
return nil
198+
},
146199
}
147200

148201
scheduler := parallel.NewScheduler(
@@ -156,6 +209,21 @@ func runFirehose(cctx *cli.Context) error {
156209
}
157210

158211
func (gfc *GoatFirehoseConsumer) handleIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error {
212+
if gfc.VerifySig {
213+
did, err := syntax.ParseDID(evt.Did)
214+
if err != nil {
215+
return err
216+
}
217+
gfc.Dir.Purge(ctx, did.AtIdentifier())
218+
}
219+
if gfc.VerifyBasic {
220+
if _, err := syntax.ParseDID(evt.Did); err != nil {
221+
slog.Warn("invalid DID", "eventType", "identity", "did", evt.Did, "seq", evt.Seq)
222+
}
223+
}
224+
if gfc.Quiet {
225+
return nil
226+
}
159227
out := make(map[string]interface{})
160228
out["type"] = "identity"
161229
out["payload"] = evt
@@ -168,6 +236,14 @@ func (gfc *GoatFirehoseConsumer) handleIdentityEvent(ctx context.Context, evt *c
168236
}
169237

170238
func (gfc *GoatFirehoseConsumer) handleAccountEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) error {
239+
if gfc.VerifyBasic {
240+
if _, err := syntax.ParseDID(evt.Did); err != nil {
241+
slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq)
242+
}
243+
}
244+
if gfc.Quiet {
245+
return nil
246+
}
171247
out := make(map[string]interface{})
172248
out["type"] = "account"
173249
out["payload"] = evt
@@ -184,6 +260,17 @@ func (gfc *GoatFirehoseConsumer) handleSyncEvent(ctx context.Context, evt *comat
184260
if err != nil {
185261
return err
186262
}
263+
if gfc.VerifyBasic {
264+
if err := commit.VerifyStructure(); err != nil {
265+
slog.Warn("bad commit object", "eventType", "sync", "did", evt.Did, "seq", evt.Seq, "err", err)
266+
}
267+
if _, err := syntax.ParseDID(evt.Did); err != nil {
268+
slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq)
269+
}
270+
}
271+
if gfc.Quiet {
272+
return nil
273+
}
187274
evt.Blocks = nil
188275
out := make(map[string]interface{})
189276
out["type"] = "sync"
@@ -200,6 +287,83 @@ func (gfc *GoatFirehoseConsumer) handleSyncEvent(ctx context.Context, evt *comat
200287
// this is the simple version, when not in "records" mode: print the event as JSON, but don't include blocks
201288
func (gfc *GoatFirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
202289

290+
if gfc.VerifyBasic || gfc.VerifySig || gfc.VerifyMST {
291+
292+
logger := slog.With("eventType", "commit", "did", evt.Repo, "seq", evt.Seq, "rev", evt.Rev)
293+
294+
did, err := syntax.ParseDID(evt.Repo)
295+
if err != nil {
296+
return err
297+
}
298+
299+
commit, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks))
300+
if err != nil {
301+
return err
302+
}
303+
304+
if gfc.VerifySig {
305+
ident, err := gfc.Dir.LookupDID(ctx, did)
306+
if err != nil {
307+
return err
308+
}
309+
pubkey, err := ident.PublicKey()
310+
if err != nil {
311+
return err
312+
}
313+
logger = logger.With("pds", ident.PDSEndpoint())
314+
if err := commit.VerifySignature(pubkey); err != nil {
315+
logger.Warn("commit signature validation failed", "err", err)
316+
}
317+
}
318+
319+
if len(evt.Blocks) == 0 {
320+
logger.Warn("commit message missing blocks")
321+
}
322+
323+
if gfc.VerifyBasic {
324+
// the commit itself
325+
if err := commit.VerifyStructure(); err != nil {
326+
logger.Warn("bad commit object", "err", err)
327+
}
328+
// the event fields
329+
rev, err := syntax.ParseTID(evt.Rev)
330+
if err != nil {
331+
logger.Warn("bad TID syntax in commit rev", "err", err)
332+
}
333+
if rev.String() != commit.Rev {
334+
logger.Warn("event rev != commit rev", "commitRev", commit.Rev)
335+
}
336+
if did.String() != commit.DID {
337+
logger.Warn("event DID != commit DID", "commitDID", commit.DID)
338+
}
339+
_, err = syntax.ParseDatetime(evt.Time)
340+
if err != nil {
341+
logger.Warn("bad datetime syntax in commit time", "time", evt.Time, "err", err)
342+
}
343+
if evt.TooBig {
344+
logger.Warn("deprecated tooBig commit flag set")
345+
}
346+
if evt.Rebase {
347+
logger.Warn("deprecated rebase commit flag set")
348+
}
349+
}
350+
351+
if gfc.VerifyMST {
352+
if evt.PrevData == nil {
353+
logger.Warn("prevData is nil, skipping MST check")
354+
} else {
355+
// TODO: break out this function in to smaller chunks
356+
if _, err := repo.VerifyCommitMessage(ctx, evt); err != nil {
357+
logger.Warn("failed to invert commit MST", "err", err)
358+
}
359+
}
360+
}
361+
}
362+
363+
if gfc.Quiet {
364+
return nil
365+
}
366+
203367
// apply collections filter
204368
if len(gfc.CollectionFilter) > 0 {
205369
keep := false
@@ -308,14 +472,18 @@ func (gfc *GoatFirehoseConsumer) handleCommitEventOps(ctx context.Context, evt *
308472
if err != nil {
309473
return err
310474
}
311-
fmt.Println(string(b))
475+
if !gfc.Quiet {
476+
fmt.Println(string(b))
477+
}
312478
case "delete":
313479
out["action"] = "delete"
314480
b, err := json.Marshal(out)
315481
if err != nil {
316482
return err
317483
}
318-
fmt.Println(string(b))
484+
if !gfc.Quiet {
485+
fmt.Println(string(b))
486+
}
319487
default:
320488
logger.Error("unexpected record op kind")
321489
}

cmd/goat/main.go

+7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ func run(args []string) error {
2323
Name: "goat",
2424
Usage: "Go AT protocol CLI tool",
2525
Version: versioninfo.Short(),
26+
Flags: []cli.Flag{
27+
&cli.StringFlag{
28+
Name: "log-level",
29+
Usage: "log verbosity level (eg: warn, info, debug)",
30+
EnvVars: []string{"GOAT_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"},
31+
},
32+
},
2633
}
2734
app.Commands = []*cli.Command{
2835
cmdRecordGet,

cmd/goat/util.go

+25
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@ package main
33
import (
44
"context"
55
"io"
6+
"log/slog"
67
"os"
8+
"strings"
79

810
"github.com/bluesky-social/indigo/atproto/identity"
911
"github.com/bluesky-social/indigo/atproto/syntax"
12+
13+
"github.com/urfave/cli/v2"
1014
)
1115

1216
func resolveIdent(ctx context.Context, arg string) (*identity.Identity, error) {
@@ -42,3 +46,24 @@ func getFileOrStdout(path string) (io.WriteCloser, error) {
4246
}
4347
return file, nil
4448
}
49+
50+
func configLogger(cctx *cli.Context, writer io.Writer) *slog.Logger {
51+
var level slog.Level
52+
switch strings.ToLower(cctx.String("log-level")) {
53+
case "error":
54+
level = slog.LevelError
55+
case "warn":
56+
level = slog.LevelWarn
57+
case "info":
58+
level = slog.LevelInfo
59+
case "debug":
60+
level = slog.LevelDebug
61+
default:
62+
level = slog.LevelInfo
63+
}
64+
logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{
65+
Level: level,
66+
}))
67+
slog.SetDefault(logger)
68+
return logger
69+
}

0 commit comments

Comments
 (0)