Skip to content

feat: EgressTrackingService#246

Merged
volmedo merged 7 commits intovic/feat/egress-batch-storefrom
vic/feat/egress-tracking-service
Oct 1, 2025
Merged

feat: EgressTrackingService#246
volmedo merged 7 commits intovic/feat/egress-batch-storefrom
vic/feat/egress-tracking-service

Conversation

@volmedo
Copy link
Copy Markdown
Contributor

@volmedo volmedo commented Sep 19, 2025

Ref: #174

I separated the logic for storing receipt batching and the one sending those batches to the egress tracking service. Now the store just stores the batches, and notifies when they are ready. The new EgressTrackingService will invoke space/egress/track with new batches.

@volmedo volmedo self-assigned this Sep 19, 2025
@volmedo volmedo force-pushed the vic/feat/egress-tracking-service branch from dc58aa6 to 943fbaf Compare September 19, 2025 15:12
Comment thread pkg/service/egresstracking/service.go Outdated
@volmedo volmedo requested review from alanshaw and frrist September 22, 2025 15:05
@volmedo volmedo marked this pull request as ready for review September 22, 2025 15:06
@volmedo volmedo force-pushed the vic/feat/egress-tracking-service branch from 15a46e8 to 93d4761 Compare September 23, 2025 13:35
@volmedo volmedo changed the base branch from main to vic/feat/egress-batch-store September 23, 2025 13:36
Copy link
Copy Markdown
Member

@frrist frrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After seeing this used in practice here's an idea for the interface/design of the batch store, which I suggest we rename to "Journal":

type Journal interface {
    // Append adds an entry to the journal
    Append(ctx context.Context, entry []byte) error
    // List all completed (rotated) segments
    ListSegments(ctx context.Context) ([]SegmentInfo, error)
    // Get a completed segment by its CID
    GetSegment(ctx context.Context, cid cid.Cid) (io.ReadCloser, error)
    // Delete a segment by its CID
    DeleteSegment(ctx context.Context, cid cid.Cid) error
}

type SegmentInfo struct {
    CID         cid.Cid
    Size        int64
    EntryCount  int
    CreatedAt   time.Time
    FinishedAt  time.Time
}

// elsewhere....
journal := NewFSJournal(path, Options{
    MaxSize: 100*MB,
    OnRotation: func(ctx context.Context, segment SegmentInfo) error {
        // Queue for egress service
        return jobQueue.Enqueue(ctx, segment)
    },
})

Suggesting this with the goal of getting to a more generalized implementation.

Currently the Append operation forces the callers to deal with rollover/notification events, which they may not always want to do.

OnRotation defines the callback that fires when rotation happens (like Alans suggestion elsewhere). For the case at hand this means push it to the queue, but for other use cases this could be something else.

Once the job is processed from the queue, and the etracker becomes aware of it, it can then fetch the segment with GetSegment as we do now. Later, we can call (and implement) DeleteSegment for GC. ListSegments becomes our window for debugging/monitoring.

Comment on lines +44 to +47
// non-configurable defaults
maxRetries := uint(10)
maxWorkers := uint(runtime.NumCPU())
maxTimeout := 5 * time.Second
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it comes time to test, specifically around failure cases, we might want these to be configurable. see ReplicatorConfig for an example on how this has been done elsewhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is true when you are testing the job queue implementation. I just mocked the queue out in the service's tests.

I actually had a first implementation replicating ReplicatorConfig (sorry) and then I ditched it when I saw they values were hardcoded anyway. At least it is more explicit this way. It will be easy to expose these values in config when needed.

Comment thread pkg/service/egresstracking/service.go
Comment thread pkg/service/egresstracking/server.go Outdated
Comment thread pkg/service/egresstracking/fx.go
@volmedo
Copy link
Copy Markdown
Contributor Author

volmedo commented Sep 24, 2025

thanks for the input @frrist. Again, I'm advocating against early generalization. I think getting this to work as soon as possible is more valuable at this stage of the project, and we can always iterate if needed without assuming extra risk/effort doing it then vs. doing it now.

@frrist frrist added this to the v0.0.15 milestone Sep 30, 2025
@volmedo
Copy link
Copy Markdown
Contributor Author

volmedo commented Oct 1, 2025

@frrist I want to merge this into #245 and I'd appreciate if you could take one final look and approve if you feel it's ready, or suggest more changes.

@frrist
Copy link
Copy Markdown
Member

frrist commented Oct 1, 2025

Let's get this merged into #245 and I'll leave any comments there

Ref: #174 

Depends on: #245 and #246 

Attach a ReceiptLogger to the `space/content/retrieve` handler to
collect retrieval receipts and add them to the EgressTrackingService to
be stored, batched and sent in `space/egress/track` invocations.
@volmedo volmedo merged commit f49919b into vic/feat/egress-batch-store Oct 1, 2025
6 checks passed
@volmedo volmedo deleted the vic/feat/egress-tracking-service branch October 1, 2025 14:51
volmedo added a commit that referenced this pull request Oct 3, 2025
Ref: #174 

Implement an `EgressBatchStore` that allows appending
`space/content/retrieve` receipts to a batch. When the batch is above
the max, it is flushed automatically.

Things that are missing and will be implemented in follow up PRs:
- sending the batch to the egress tracking service in a
`space/egress/track` invocation
- exposing an `http.FileSystem` and adding the corresponding
`http.FileServer` endpoint

Once these are done, I'll hook it up in Alan's `space/content/retrieve`
handler.

---------

Co-authored-by: ash <alan@storacha.network>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants