From 3a6ebdfe499b4bfa9da95dc49d478f6606196658 Mon Sep 17 00:00:00 2001 From: Petra Jaros Date: Tue, 10 Feb 2026 11:41:55 -0500 Subject: [PATCH 1/3] refactor: Prefer `any` over `interface{}` --- pkg/bus/bus.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index cd8c5909..bfe30f60 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -5,12 +5,12 @@ import ( ) type Subscriber interface { - Subscribe(topic string, fn interface{}) error - Unsubscribe(topic string, handler interface{}) error + Subscribe(topic string, fn any) error + Unsubscribe(topic string, handler any) error } type Publisher interface { - Publish(topic string, args ...interface{}) + Publish(topic string, args ...any) } type Bus interface { @@ -26,20 +26,20 @@ type EventBus struct { bus eventbus.Bus } -func (e *EventBus) Publish(topic string, args ...interface{}) { +func (e *EventBus) Publish(topic string, args ...any) { e.bus.Publish(topic, args...) } -func (e *EventBus) Subscribe(topic string, handler interface{}) error { +func (e *EventBus) Subscribe(topic string, handler any) error { return e.bus.Subscribe(topic, handler) } -func (e *EventBus) Unsubscribe(topic string, handler interface{}) error { +func (e *EventBus) Unsubscribe(topic string, handler any) error { return e.bus.Unsubscribe(topic, handler) } type NoopBus struct{} -func (b *NoopBus) Publish(topic string, args ...interface{}) {} -func (b *NoopBus) Subscribe(topic string, handler interface{}) error { return nil } -func (b *NoopBus) Unsubscribe(topic string, handler interface{}) error { return nil } +func (b *NoopBus) Publish(topic string, args ...any) {} +func (b *NoopBus) Subscribe(topic string, handler any) error { return nil } +func (b *NoopBus) Unsubscribe(topic string, handler any) error { return nil } From 24887424dc1bf608eba80ba94ee752248c2c8d0e Mon Sep 17 00:00:00 2001 From: Petra Jaros Date: Tue, 10 Feb 2026 11:42:29 -0500 Subject: [PATCH 2/3] refactor: Rm redundant variable These aren't needed in current versions of Go. The scoping rules used to be different. --- cmd/internal/upload/ui/todo.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/internal/upload/ui/todo.go b/cmd/internal/upload/ui/todo.go index 47f9d9ec..edbf6e3b 100644 --- a/cmd/internal/upload/ui/todo.go +++ b/cmd/internal/upload/ui/todo.go @@ -172,8 +172,6 @@ func (o *UploadObserver) init(ctx context.Context) error { func (o *UploadObserver) Subscribe() error { for uid, state := range o.states { - state := state - // subscribe to client upload events (guppy -> piri(s)) if err := o.sub.Subscribe(events.TopicClientPut(uid), func(evt events.PutProgress) { o.statesMu.Lock() From 2cbcb48e80c684a357b2bbe5c554fb992c390f35 Mon Sep 17 00:00:00 2001 From: Petra Jaros Date: Tue, 10 Feb 2026 12:59:44 -0500 Subject: [PATCH 3/3] feat: Add `--json` output option for `upload` --- cmd/internal/upload/jsonout/jsonout.go | 230 +++++++++++++++ cmd/internal/upload/jsonout/jsonout_test.go | 304 ++++++++++++++++++++ cmd/upload/root.go | 225 ++++++++++----- test/doupload | 29 +- 4 files changed, 705 insertions(+), 83 deletions(-) create mode 100644 cmd/internal/upload/jsonout/jsonout.go create mode 100644 cmd/internal/upload/jsonout/jsonout_test.go diff --git a/cmd/internal/upload/jsonout/jsonout.go b/cmd/internal/upload/jsonout/jsonout.go new file mode 100644 index 00000000..20fb096b --- /dev/null +++ b/cmd/internal/upload/jsonout/jsonout.go @@ -0,0 +1,230 @@ +package jsonout + +import ( + "encoding/json" + "fmt" + "io" + "sync" + "time" + + "github.com/storacha/go-libstoracha/digestutil" + + "github.com/storacha/guppy/pkg/bus" + "github.com/storacha/guppy/pkg/bus/events" + "github.com/storacha/guppy/pkg/preparation/types/id" + uploadsmodel "github.com/storacha/guppy/pkg/preparation/uploads/model" +) + +// JSONEmitter writes JSON events as newline-delimited JSON (NDJSON) to a writer. +// It is safe for concurrent use. +type JSONEmitter struct { + mu sync.Mutex + enc *json.Encoder +} + +// NewJSONEmitter creates a new JSONEmitter that writes to w. +func NewJSONEmitter(w io.Writer) *JSONEmitter { + return &JSONEmitter{enc: json.NewEncoder(w)} +} + +// Emit writes v as a single JSON line. +func (e *JSONEmitter) Emit(v any) { + e.mu.Lock() + defer e.mu.Unlock() + _ = e.enc.Encode(v) +} + +// EmitUploadStart emits a lifecycle event when an upload begins. +func (e *JSONEmitter) EmitUploadStart(u *uploadsmodel.Upload) { + e.Emit(UploadStartEvent{ + Type: "upload_start", + UploadID: u.ID().String(), + SourceID: u.SourceID().String(), + }) +} + +// EmitUploadComplete emits a lifecycle event when an upload finishes successfully. +func (e *JSONEmitter) EmitUploadComplete(uploadID id.UploadID, rootCID string) { + e.Emit(UploadCompleteEvent{ + Type: "upload_complete", + UploadID: uploadID.String(), + RootCID: rootCID, + }) +} + +// EmitUploadError emits a lifecycle event when an upload fails. +func (e *JSONEmitter) EmitUploadError(uploadID id.UploadID, err error, attempt int) { + e.Emit(UploadErrorEvent{ + Type: "upload_error", + UploadID: uploadID.String(), + Error: err.Error(), + Attempt: attempt, + }) +} + +// Subscribe registers bus event handlers that emit NDJSON for each event. +func Subscribe(emitter *JSONEmitter, sub bus.Subscriber, uploads []*uploadsmodel.Upload) error { + for _, u := range uploads { + uid := u.ID() + uidStr := uid.String() + srcIDStr := u.SourceID().String() + + if err := sub.Subscribe(events.TopicFsEntry(u.SourceID()), func(evt events.FSScanView) { + emitter.Emit(FSScanEvent{ + Type: "fs_scan", + SourceID: srcIDStr, + UploadID: uidStr, + FSEntryID: evt.FSEntryID.String(), + Path: evt.Path, + IsDir: evt.IsDir, + Size: evt.Size, + }) + }); err != nil { + return fmt.Errorf("subscribing to fs entry events: %w", err) + } + + if err := sub.Subscribe(events.TopicDagScan(uid), func(evt events.DAGScanView) { + var cidStr *string + if evt.CID.Defined() { + s := evt.CID.String() + cidStr = &s + } + emitter.Emit(DAGScanEvent{ + Type: "dag_scan", + UploadID: uidStr, + FSEntryID: evt.FSEntryID.String(), + Created: evt.Created.Format(time.RFC3339Nano), + Updated: evt.Updated.Format(time.RFC3339Nano), + CID: cidStr, + }) + }); err != nil { + return fmt.Errorf("subscribing to dag scan events: %w", err) + } + + if err := sub.Subscribe(events.TopicShard(uid), func(evt events.ShardView) { + se := ShardEvent{ + Type: "shard", + ShardID: evt.ID.String(), + UploadID: evt.UploadID.String(), + Size: evt.Size, + State: string(evt.State), + } + if len(evt.Digest) > 0 { + s := digestutil.Format(evt.Digest) + se.Digest = &s + } + if evt.PieceCID.Defined() { + s := evt.PieceCID.String() + se.PieceCID = &s + } + if evt.Location != nil { + s := evt.Location.Link().String() + se.Location = &s + } + if evt.PDPAccept != nil { + s := evt.PDPAccept.Link().String() + se.PDPAccept = &s + } + emitter.Emit(se) + }); err != nil { + return fmt.Errorf("subscribing to shard events: %w", err) + } + + if err := sub.Subscribe(events.TopicClientPut(uid), func(evt events.PutProgress) { + emitter.Emit(PutProgressEvent{ + Type: "put_progress", + UploadID: uidStr, + BlobID: evt.BlobID.String(), + Uploaded: evt.Uploaded, + Total: evt.Total, + }) + }); err != nil { + return fmt.Errorf("subscribing to put progress events: %w", err) + } + + if err := sub.Subscribe(events.TopicWorker(uid), func(evt events.UploadWorkerEvent) { + we := WorkerEvent{ + Type: "worker", + UploadID: uidStr, + Name: evt.Name, + Status: string(evt.Status), + } + if evt.Error != nil { + s := evt.Error.Error() + we.Error = &s + } + emitter.Emit(we) + }); err != nil { + return fmt.Errorf("subscribing to worker events: %w", err) + } + } + return nil +} + +// Event types for NDJSON output. Each has a "type" discriminator field. + +type FSScanEvent struct { + Type string `json:"type"` + SourceID string `json:"source_id"` + UploadID string `json:"upload_id"` + FSEntryID string `json:"fs_entry_id"` + Path string `json:"path"` + IsDir bool `json:"is_dir"` + Size uint64 `json:"size"` +} + +type DAGScanEvent struct { + Type string `json:"type"` + UploadID string `json:"upload_id"` + FSEntryID string `json:"fs_entry_id"` + Created string `json:"created"` + Updated string `json:"updated"` + CID *string `json:"cid"` +} + +type ShardEvent struct { + Type string `json:"type"` + ShardID string `json:"shard_id"` + UploadID string `json:"upload_id"` + Size uint64 `json:"size"` + Digest *string `json:"digest,omitempty"` + PieceCID *string `json:"piece_cid,omitempty"` + State string `json:"state"` + Location *string `json:"location,omitempty"` + PDPAccept *string `json:"pdp_accept,omitempty"` +} + +type PutProgressEvent struct { + Type string `json:"type"` + UploadID string `json:"upload_id"` + BlobID string `json:"blob_id"` + Uploaded int64 `json:"uploaded"` + Total uint64 `json:"total"` +} + +type WorkerEvent struct { + Type string `json:"type"` + UploadID string `json:"upload_id"` + Name string `json:"name"` + Status string `json:"status"` + Error *string `json:"error,omitempty"` +} + +type UploadStartEvent struct { + Type string `json:"type"` + UploadID string `json:"upload_id"` + SourceID string `json:"source_id"` +} + +type UploadCompleteEvent struct { + Type string `json:"type"` + UploadID string `json:"upload_id"` + RootCID string `json:"root_cid"` +} + +type UploadErrorEvent struct { + Type string `json:"type"` + UploadID string `json:"upload_id"` + Error string `json:"error"` + Attempt int `json:"attempt"` +} diff --git a/cmd/internal/upload/jsonout/jsonout_test.go b/cmd/internal/upload/jsonout/jsonout_test.go new file mode 100644 index 00000000..23818ab7 --- /dev/null +++ b/cmd/internal/upload/jsonout/jsonout_test.go @@ -0,0 +1,304 @@ +package jsonout + +import ( + "bytes" + "encoding/json" + "errors" + "sync" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" + "github.com/storacha/go-ucanto/did" + "github.com/stretchr/testify/require" + + "github.com/storacha/guppy/pkg/bus" + "github.com/storacha/guppy/pkg/bus/events" + blobsmodel "github.com/storacha/guppy/pkg/preparation/blobs/model" + "github.com/storacha/guppy/pkg/preparation/types/id" + uploadsmodel "github.com/storacha/guppy/pkg/preparation/uploads/model" +) + +func TestJSONEmitter_Emit(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + + emitter.Emit(map[string]string{"type": "test", "value": "hello"}) + + var got map[string]string + err := json.Unmarshal(buf.Bytes(), &got) + require.NoError(t, err) + require.Equal(t, "test", got["type"]) + require.Equal(t, "hello", got["value"]) +} + +func TestJSONEmitter_ThreadSafety(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + emitter.Emit(map[string]int{"n": n}) + }(i) + } + wg.Wait() + + // Each line should be valid JSON + lines := bytes.Split(bytes.TrimSpace(buf.Bytes()), []byte("\n")) + require.Len(t, lines, 100) + for _, line := range lines { + var got map[string]int + err := json.Unmarshal(line, &got) + require.NoError(t, err) + } +} + +func testUpload(t *testing.T) *uploadsmodel.Upload { + t.Helper() + spaceDID, err := did.Parse("did:web:test.storacha.network") + require.NoError(t, err) + u, err := uploadsmodel.NewUpload(spaceDID, id.New()) + require.NoError(t, err) + return u +} + +func TestSubscribe_FSScan(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + eb := bus.New() + u := testUpload(t) + + err := Subscribe(emitter, eb, []*uploadsmodel.Upload{u}) + require.NoError(t, err) + + fsEntryID := id.New() + eb.Publish(events.TopicFsEntry(u.SourceID()), events.FSScanView{ + Path: "/data/file.bin", + IsDir: false, + Size: 1024, + FSEntryID: fsEntryID, + }) + + // Give the synchronous handler time to write + time.Sleep(10 * time.Millisecond) + + var got FSScanEvent + err = json.Unmarshal(buf.Bytes(), &got) + require.NoError(t, err) + require.Equal(t, "fs_scan", got.Type) + require.Equal(t, u.SourceID().String(), got.SourceID) + require.Equal(t, u.ID().String(), got.UploadID) + require.Equal(t, fsEntryID.String(), got.FSEntryID) + require.Equal(t, "/data/file.bin", got.Path) + require.False(t, got.IsDir) + require.Equal(t, uint64(1024), got.Size) +} + +func TestSubscribe_DAGScan_WithCID(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + eb := bus.New() + u := testUpload(t) + + err := Subscribe(emitter, eb, []*uploadsmodel.Upload{u}) + require.NoError(t, err) + + // Create a test CID + mh, err := multihash.Sum([]byte("test"), multihash.SHA2_256, -1) + require.NoError(t, err) + testCID := cid.NewCidV1(cid.Raw, mh) + + now := time.Now().UTC() + eb.Publish(events.TopicDagScan(u.ID()), events.DAGScanView{ + FSEntryID: id.New(), + Created: now, + Updated: now, + CID: testCID, + }) + + time.Sleep(10 * time.Millisecond) + + var got DAGScanEvent + err = json.Unmarshal(buf.Bytes(), &got) + require.NoError(t, err) + require.Equal(t, "dag_scan", got.Type) + require.NotNil(t, got.CID) + require.Equal(t, testCID.String(), *got.CID) +} + +func TestSubscribe_DAGScan_UndefinedCID(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + eb := bus.New() + u := testUpload(t) + + err := Subscribe(emitter, eb, []*uploadsmodel.Upload{u}) + require.NoError(t, err) + + now := time.Now().UTC() + eb.Publish(events.TopicDagScan(u.ID()), events.DAGScanView{ + FSEntryID: id.New(), + Created: now, + Updated: now, + CID: cid.Undef, + }) + + time.Sleep(10 * time.Millisecond) + + var got DAGScanEvent + err = json.Unmarshal(buf.Bytes(), &got) + require.NoError(t, err) + require.Equal(t, "dag_scan", got.Type) + require.Nil(t, got.CID) +} + +func TestSubscribe_Shard(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + eb := bus.New() + u := testUpload(t) + + err := Subscribe(emitter, eb, []*uploadsmodel.Upload{u}) + require.NoError(t, err) + + mh, err := multihash.Sum([]byte("shard-data"), multihash.SHA2_256, -1) + require.NoError(t, err) + + shardID := id.New() + eb.Publish(events.TopicShard(u.ID()), events.ShardView{ + ID: shardID, + UploadID: u.ID(), + Size: 4194304, + Digest: mh, + State: blobsmodel.BlobStateClosed, + // Location and PDPAccept left nil + }) + + time.Sleep(10 * time.Millisecond) + + var got ShardEvent + err = json.Unmarshal(buf.Bytes(), &got) + require.NoError(t, err) + require.Equal(t, "shard", got.Type) + require.Equal(t, shardID.String(), got.ShardID) + require.Equal(t, u.ID().String(), got.UploadID) + require.Equal(t, uint64(4194304), got.Size) + require.NotNil(t, got.Digest) + require.Equal(t, "closed", got.State) + require.Nil(t, got.Location) + require.Nil(t, got.PDPAccept) +} + +func TestSubscribe_PutProgress(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + eb := bus.New() + u := testUpload(t) + + err := Subscribe(emitter, eb, []*uploadsmodel.Upload{u}) + require.NoError(t, err) + + blobID := id.New() + eb.Publish(events.TopicClientPut(u.ID()), events.PutProgress{ + BlobID: blobID, + Uploaded: 2048, + Total: 4096, + }) + + time.Sleep(10 * time.Millisecond) + + var got PutProgressEvent + err = json.Unmarshal(buf.Bytes(), &got) + require.NoError(t, err) + require.Equal(t, "put_progress", got.Type) + require.Equal(t, u.ID().String(), got.UploadID) + require.Equal(t, blobID.String(), got.BlobID) + require.Equal(t, int64(2048), got.Uploaded) + require.Equal(t, uint64(4096), got.Total) +} + +func TestSubscribe_Worker(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + eb := bus.New() + u := testUpload(t) + + err := Subscribe(emitter, eb, []*uploadsmodel.Upload{u}) + require.NoError(t, err) + + eb.Publish(events.TopicWorker(u.ID()), events.UploadWorkerEvent{ + Name: "Scan-FS", + Status: events.Running, + }) + + time.Sleep(10 * time.Millisecond) + + var got WorkerEvent + err = json.Unmarshal(buf.Bytes(), &got) + require.NoError(t, err) + require.Equal(t, "worker", got.Type) + require.Equal(t, u.ID().String(), got.UploadID) + require.Equal(t, "Scan-FS", got.Name) + require.Equal(t, "Running", got.Status) + require.Nil(t, got.Error) +} + +func TestSubscribe_Worker_WithError(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + eb := bus.New() + u := testUpload(t) + + err := Subscribe(emitter, eb, []*uploadsmodel.Upload{u}) + require.NoError(t, err) + + eb.Publish(events.TopicWorker(u.ID()), events.UploadWorkerEvent{ + Name: "Upload-Shard", + Status: events.Failed, + Error: errors.New("connection refused"), + }) + + time.Sleep(10 * time.Millisecond) + + var got WorkerEvent + err = json.Unmarshal(buf.Bytes(), &got) + require.NoError(t, err) + require.Equal(t, "worker", got.Type) + require.Equal(t, "Failed", got.Status) + require.NotNil(t, got.Error) + require.Equal(t, "connection refused", *got.Error) +} + +func TestEmitLifecycleEvents(t *testing.T) { + var buf bytes.Buffer + emitter := NewJSONEmitter(&buf) + u := testUpload(t) + + emitter.EmitUploadStart(u) + emitter.EmitUploadComplete(u.ID(), "bafytest123") + emitter.EmitUploadError(u.ID(), errors.New("something broke"), 3) + + lines := bytes.Split(bytes.TrimSpace(buf.Bytes()), []byte("\n")) + require.Len(t, lines, 3) + + var start UploadStartEvent + require.NoError(t, json.Unmarshal(lines[0], &start)) + require.Equal(t, "upload_start", start.Type) + require.Equal(t, u.ID().String(), start.UploadID) + require.Equal(t, u.SourceID().String(), start.SourceID) + + var complete UploadCompleteEvent + require.NoError(t, json.Unmarshal(lines[1], &complete)) + require.Equal(t, "upload_complete", complete.Type) + require.Equal(t, "bafytest123", complete.RootCID) + + var errEvt UploadErrorEvent + require.NoError(t, json.Unmarshal(lines[2], &errEvt)) + require.Equal(t, "upload_error", errEvt.Type) + require.Equal(t, "something broke", errEvt.Error) + require.Equal(t, 3, errEvt.Attempt) +} diff --git a/cmd/upload/root.go b/cmd/upload/root.go index 77cba11e..b6397383 100644 --- a/cmd/upload/root.go +++ b/cmd/upload/root.go @@ -11,6 +11,7 @@ import ( "github.com/mitchellh/go-wordwrap" "github.com/spf13/cobra" + "github.com/storacha/guppy/cmd/internal/upload/jsonout" "github.com/storacha/guppy/cmd/internal/upload/ui" "github.com/storacha/guppy/cmd/upload/source" "github.com/storacha/guppy/internal/cmdutil" @@ -22,18 +23,154 @@ import ( uploadsmodel "github.com/storacha/guppy/pkg/preparation/uploads/model" ) +func runUploadsJSON(cmd *cobra.Command, api preparation.API, uploads []*uploadsmodel.Upload, eb bus.Bus) error { + emitter := jsonout.NewJSONEmitter(cmd.OutOrStdout()) + if err := jsonout.Subscribe(emitter, eb, uploads); err != nil { + return fmt.Errorf("subscribing to events: %w", err) + } + + var failCount int + for _, u := range uploads { + emitter.EmitUploadStart(u) + attempt := 0 + var uploadCID cid.Cid + var lastErr error + + for { + attempt++ + var err error + uploadCID, err = api.ExecuteUpload(cmd.Context(), u) + if err == nil { + lastErr = nil + break + } + + var re types.RetriableError + if errors.As(err, &re) { + lastErr = err + if rootFlags.retry { + emitter.EmitUploadError(u.ID(), err, attempt) + continue + } + break + } + + lastErr = err + break + } + + if lastErr != nil { + emitter.EmitUploadError(u.ID(), lastErr, attempt) + failCount++ + continue + } + + emitter.EmitUploadComplete(u.ID(), uploadCID.String()) + } + + if failCount > 0 { + return cmdutil.NewHandledCliError(fmt.Errorf("%d upload(s) failed", failCount)) + } + return nil +} + +func runUploadsLog(cmd *cobra.Command, api preparation.API, uploads []*uploadsmodel.Upload) error { + type uploadResult struct { + upload *uploadsmodel.Upload + cid cid.Cid + attempts int + } + + type uploadFailure struct { + upload *uploadsmodel.Upload + err error + attempts int + } + + var completedUploads []uploadResult + var failedUploads []uploadFailure + for _, u := range uploads { + start := time.Now() + log.Infow("Starting upload", "upload", u.ID()) + attempt := 0 + var uploadCID cid.Cid + var lastErr error + + for { + attempt++ + var err error + uploadCID, err = api.ExecuteUpload(cmd.Context(), u) + if err == nil { + lastErr = nil + break + } + + var re types.RetriableError + if errors.As(err, &re) { + lastErr = err + if rootFlags.retry { + log.Warnw("Retriable upload error encountered, retrying", "upload", u.ID(), "attempt", attempt, + "err", err) + continue + } + + log.Errorw("Retriable upload error encountered (retry disabled)", "upload", u.ID(), "attempt", + attempt, "err", err) + break + } + + lastErr = err + log.Errorw("Upload failed with non-retriable error", "upload", u.ID(), "attempt", attempt, "err", err) + break + } + + if lastErr != nil { + failedUploads = append(failedUploads, uploadFailure{ + upload: u, + err: lastErr, + attempts: attempt, + }) + log.Errorw("Upload failed", "upload", u.ID(), "duration", time.Since(start), "attempts", attempt, "err", + lastErr) + continue + } + + completedUploads = append(completedUploads, uploadResult{ + upload: u, + cid: uploadCID, + attempts: attempt, + }) + log.Infow("Completed upload", "upload", u.ID(), "cid", uploadCID.String(), "duration", time.Since(start), "attempts", attempt) + } + + for _, u := range completedUploads { + cmd.Printf("Upload completed successfully: %s\n", u.cid.String()) + } + + if len(failedUploads) > 0 { + cmd.Println("Uploads failed:") + for _, u := range failedUploads { + cmd.Printf("- %s: %v\n", u.upload.ID(), u.err) + } + return cmdutil.NewHandledCliError(fmt.Errorf("%d upload(s) failed", len(failedUploads))) + } + return nil +} + var log = logging.Logger("cmd/upload") var rootFlags struct { all bool retry bool parallelism uint64 + json bool } func init() { Cmd.Flags().BoolVar(&rootFlags.all, "all", false, "Upload all sources (even if arguments are provided)") Cmd.Flags().BoolVar(&rootFlags.retry, "retry", false, "Auto-retry failed uploads") Cmd.Flags().Uint64Var(&rootFlags.parallelism, "parallelism", 6, "Number of parallel shard uploads to perform concurrently") + Cmd.Flags().BoolVar(&rootFlags.json, "json", false, "Output events as newline-delimited JSON") Cmd.AddCommand(source.Cmd) } @@ -62,6 +199,10 @@ var Cmd = &cobra.Command{ return fmt.Errorf("getting 'ui' flag: %w", err) } + if useUI && rootFlags.json { + return fmt.Errorf("--ui and --json flags are mutually exclusive") + } + requestedSources := args[1:] // The command line was valid. Past here, errors do not mean the user needs @@ -143,87 +284,13 @@ var Cmd = &cobra.Command{ if useUI { return ui.RunUploadUI(ctx, repo, api, uploadsToRun, rootFlags.retry, eb) } - // UI disabled, log at info level - logging.SetAllLoggers(logging.LevelInfo) - - type uploadResult struct { - upload *uploadsmodel.Upload - cid cid.Cid - attempts int - } - - type uploadFailure struct { - upload *uploadsmodel.Upload - err error - attempts int - } - - var completedUploads []uploadResult - var failedUploads []uploadFailure - for _, u := range uploadsToRun { - start := time.Now() - log.Infow("Starting upload", "upload", u.ID()) - attempt := 0 - var uploadCID cid.Cid - var lastErr error - - for { - attempt++ - uploadCID, err = api.ExecuteUpload(ctx, u) - if err == nil { - lastErr = nil - break - } - var re types.RetriableError - if errors.As(err, &re) { - lastErr = err - if rootFlags.retry { - log.Warnw("Retriable upload error encountered, retrying", "upload", u.ID(), "attempt", attempt, - "err", err) - continue - } - - log.Errorw("Retriable upload error encountered (retry disabled)", "upload", u.ID(), "attempt", - attempt, "err", err) - break - } - - lastErr = err - log.Errorw("Upload failed with non-retriable error", "upload", u.ID(), "attempt", attempt, "err", err) - break - } - - if lastErr != nil { - failedUploads = append(failedUploads, uploadFailure{ - upload: u, - err: lastErr, - attempts: attempt, - }) - log.Errorw("Upload failed", "upload", u.ID(), "duration", time.Since(start), "attempts", attempt, "err", - lastErr) - continue - } - - completedUploads = append(completedUploads, uploadResult{ - upload: u, - cid: uploadCID, - attempts: attempt, - }) - log.Infow("Completed upload", "upload", u.ID(), "cid", uploadCID.String(), "duration", time.Since(start), "attempts", attempt) - } - - for _, u := range completedUploads { - cmd.Printf("Upload completed successfully: %s\n", u.cid.String()) + if rootFlags.json { + return runUploadsJSON(cmd, api, uploadsToRun, eb) } - if len(failedUploads) > 0 { - cmd.Println("Uploads failed:") - for _, u := range failedUploads { - cmd.Printf("- %s: %v\n", u.upload.ID(), u.err) - } - return cmdutil.NewHandledCliError(fmt.Errorf("%d upload(s) failed", len(failedUploads))) - } - return nil + // Default: log at info level + logging.SetAllLoggers(logging.LevelInfo) + return runUploadsLog(cmd, api, uploadsToRun) }, } diff --git a/test/doupload b/test/doupload index a1cf0dd6..5f2169e1 100755 --- a/test/doupload +++ b/test/doupload @@ -23,6 +23,15 @@ set -e set -o pipefail +spinner_frames=( + "▱▱▱" + "▰▱▱" + "▰▰▱" + "▰▰▰" + "▱▰▰" + "▱▱▰" +) + # Parse arguments database_url="" while [[ $# -gt 0 ]]; do @@ -163,8 +172,8 @@ trap 'last_command=$ZSH_DEBUG_CMD' DEBUG handle_error() { local exit_code=$1 local failed_command=$2 - echo - echo "❌ Command failed with exit code $exit_code: $failed_command" + echo >&2 + echo "❌ Command failed with exit code $exit_code: $failed_command" >&2 } # Handle errors with some printed output @@ -194,9 +203,10 @@ main () { guppy space info "$space" echo - echo "📤 Uploading data from $dataDir to space $space" + echo "📤 Uploading data from $dataDir to space $space (output -> $sandbox/upload-events.jsonl)" guppy upload source add "$space" "$dataDir" - rootCID=$(guppy upload "$space" | tee /dev/fd/3 | grep 'Upload completed successfully:' | awk '{print $4}') + guppy upload --json "$space" | show_progress "$sandbox/upload-events.jsonl" + rootCID=$(jq -r 'select(.type == "upload_complete") | .root_cid' "$sandbox/upload-events.jsonl") echo echo "📥 Retrieving data from space $space with root CID $rootCID to $outDir1" @@ -237,6 +247,17 @@ main () { '{$account, $space, $rootCID, $dataDir, $subdir}' > "$sandbox/test-params.json" } +show_progress() { + output_file="$1" + current_frame=1 + while IFS= read -r line; do + printf '%s\n' "$line" >> "$output_file" + printf '\r\033[K%.'"${COLUMNS:-80}"'s' "${spinner_frames[current_frame]} $line" + current_frame=$(( (current_frame % ${#spinner_frames[@]}) + 1 )) + done + printf '\r\033[K' +} + log_in() { local account="$1" # Start login in background