Conversation
5047631 to
479d257
Compare
| }, nil | ||
| } | ||
|
|
||
| func (s *fsBatchStore) Append(ctx context.Context, rcpt receipt.Receipt[content.RetrieveOk, fdm.FailureModel]) error { |
There was a problem hiding this comment.
I think we could generalize this as a "CAR logger" the parameter here could be an "Archiver" or any other function that returns bytes when called...
Or preferably, it could be a generic type and you could pass a "block encoder" to the constructor that will create a cid and bytes pair for the thing you're logging.
...and then you could have a callback for onFlush to implement the invocation to egress tracker.
There was a problem hiding this comment.
I prefer keeping things as stupidly simple as possible until there is a use case that justifies the extra complexity. What we need today is a component that handles batches of space/content/retrieve receipts. I'm not sure how likely it is that we will need to do a similar handling of some other bytes, but happy to generalize this when that need arises.
I like the idea of the callback though. Do you mind if I compare it with the current approach in #246 and implement there if it looks better?
| batchFileSuffix = ".car" | ||
| ) | ||
|
|
||
| var _ EgressBatchStore = (*fsBatchStore)(nil) |
There was a problem hiding this comment.
I am not convinced by the name - to me a store is something you put stuff in that you can also get stuff back out of. This is an append only log/logger - you cannot/do not want to retrieve items from it.
There was a problem hiding this comment.
actually, we will want to retrieve items from it when implementing the endpoint from which receipt batches will be fetched by the egress tracking service during consolidation, so I think it qualifies as a store. But happy to think of a different name if that argument still feels unconvincing.
There was a problem hiding this comment.
Yes ok I take your point. I was thinking in terms of retrieving individual receipts, there is an asymmetry here because you don't retrieve the receipts, you retrieve the batches.
There was a problem hiding this comment.
ah, gotcha. Something like ReceiptBatcher then? Sorry, I'm not super inspired today 😐
There was a problem hiding this comment.
How about "Journal" - imo that name implies both the append-only nature and the fact that we're preserving a record of what happened. At a high level, I think of this roughly as a filesystem journal.
There was a problem hiding this comment.
I'll do the rename once I merge the other two branches into this one if that's ok. I think that'd be easier than dealing with the merge conflicts.
frrist
left a comment
There was a problem hiding this comment.
Blocking due to:
- uncertainty on the finalize method, gut says we don't want/need to call this. Might just want to call close?
- the (re)opening of the current file for each append operation.
| @@ -0,0 +1,176 @@ | |||
| package egressbatchstore | |||
There was a problem hiding this comment.
At a high-level I'd like us to look at separating the "sequential append" concerns from the "what format am I writing" concerns. This can of course come in a follow on.
As Alan has already suggested, we can have a "generic" journal that handles the "append" and "rotate" operations. Then plug something like a "CAR Writer" into it that handles the formatting of the data. For example, the JobQueue we use attempts to follow this kind of design.
This will make testing easier, and allow the component to be reused for other things when the time arises. I'd imagine having a robust/reusable journal implementation will come in handy down the road.
There was a problem hiding this comment.
As I replied to Alan's suggestion, I'm not trying to implement a generic store. Our codebase is full of stores that are not used and interfaces with a single implementation. I like to apply the YAGNI mantra as much as possible.
Happy to work on a generic/reusable implementation when/if the need arises. I created #254 to capture this proposal.
But for now I'd rather put something together that solves the particular problem we have at hand in the minimum amount of time possible.
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| rwbs, err := blockstore.OpenReadWrite(s.curBatchPath, nil) |
There was a problem hiding this comment.
Rather than opening and closing the current batch file for every append operations, lets instead maintain a reference to the current batch. Then we can just open and close the file during stat up/rollover once.
I suspect this will become a bottleneck for nodes when serving lots of content.
There was a problem hiding this comment.
I erred on the side of safety. The idea is not to leave the file in an inconsistent state in case of failure. If the node fails it's unclear to me if the resulting state is recoverable. Since this process is directly related to node operators getting paid, I think it makes sense to trade some performance for stability in this case.
Of course, we can always revisit the implementation in the future if this indeed becomes an issue.
There was a problem hiding this comment.
Can we move all the logic together that needs locking? e.g. Obtain a lock, open rwbs, put, finalize, check size/roll. unlock.
There was a problem hiding this comment.
Can we move all the logic together that needs locking? e.g. Obtain a lock, open rwbs, put, finalize, check size/roll. unlock.
this doesn't apply anymore since the locks were moved up a layer to the EgressTrackingService
There was a problem hiding this comment.
Ah right. Is the intention here then to merge #246 into this PR?
There was a problem hiding this comment.
yeah, #246 targets this branch, not main. Sorry, my intention was to make things easier to review but it ended up being confusing.
There was a problem hiding this comment.
I am more than happy to review this as one giant PR/branch. Might be easier that way as we work things out fwiw.
There was a problem hiding this comment.
I'm merging the other branches into this one, but I'd like to get the discussions in them solved first so that they don't fall through the cracks.
| s.mu.Lock() | ||
| defer s.mu.Unlock() |
There was a problem hiding this comment.
What exactly is being guarded with this lock? The blockstore used below should already have locking in place for Put operations yea?
There was a problem hiding this comment.
One needs to read the blockstore's code to learn that it has its own locking indeed, there is no mention of concurrency safety in the docs. I don't like depending on the underlying implementation doing the right thing (and I actually think leaving the option to users of the library to implement synchronization as they see fit is a better design, but that's another topic), so I feel more comfortable making sure things are synchronized where they should.
In any case, the idea here was to make the append and a potential rotation atomic. We don't want a subsequent call to update the batch while we are rotating it/sending it in an invocation.
| if err := rwbs.Finalize(); err != nil { | ||
| return fmt.Errorf("finalizing batch: %w", err) | ||
| } |
There was a problem hiding this comment.
I think we only need to call this when using a carv2 with an index: https://github.com/ipld/go-car/blob/v2.15.0/v2/internal/store/index.go#L90 - I assume that's not the case here? If so can probably drop this call.
There was a problem hiding this comment.
I'm actually trying to use CARv2. Among the features of CARv2, the docs mention:
Write CARv2 files via Read-Write blockstore API, with support for appending blocks to an existing CARv2 file, and resumption from a partially written CARv2 files.
which looks like a good fit for the egress batching store use case.
There was a problem hiding this comment.
I think this also means the CAR file(s) we batch will include an index in addition to the data we are serving to the egress tracker. I understand said index is required for using the RWBS API, but it means extra data to move around. Worth documenting this in the method.
There was a problem hiding this comment.
your comment got me thinking. It's true that an indexed CARv2 is required to use blockstore.ReadWrite. However, at consolidation time all blocks in the CAR batch will be iterated sequentially, so the index is not useful in that case. Therefore, we could use a CARv2 for batching, but send a CARv1 to the egress tracker.
I also thought of storing rotated batches directly as CARv1, but I think having them as CARv2 will be useful when we implement receipt consolidation handling, as it might be necessary to read from the batch those receipts that had errors.
I created #259 to evaluate this alternative once we have a better idea about how consolidation errors will be handled.
c0daac9 to
9ba0fa9
Compare
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| rwbs, err := blockstore.OpenReadWrite(s.curBatchPath, nil) |
There was a problem hiding this comment.
Skimming the car blockstore code leads me to believe we need to call Close on this when were done with it iff using CarV2 - should we include that here?
There was a problem hiding this comment.
it's not necessary, Finalize does it. According to the docs:
Finalize finalizes this blockstore by writing the CARv2 header, along with flattened index for more efficient subsequent read. This is the equivalent to calling FinalizeReadOnly and Close. After this call, the blockstore can no longer be used.
and I confirmed the code does it indeed.
@frrist are these still a concern? PTAL at my replies and let me know what you think. |
|
@alanshaw any concerns beyond the name of the store? |
Ref: #174 I separated the logic for storing receipt batching and the one sending those batches to the egress tracking service. Now the store just stores the batches, and notifies when they are ready. The new `EgressTrackingService` will invoke `space/egress/track` with new batches.
|
Yeah, I'm still concerned about the performance implications. The current implementation calls I spent some time looking at the go-car package design, and have come away with the understanding its API is meant to be used with a single Currently this implementation does Even with proper API usage, CarV2 builds an index for each batch that gets discarded during consolidation. For append-only sequential access, CarV1 would be more appropriate and have lower bandwidth requirements. I'm not trying to block progress here, I want to ship this too! But I also don't want to ship something we already know has unnecessary overhead just to get it out the door. The amount node operators get paid is somewhat coupled to the performance of this design, and fixing performance issues in production is more expensive than getting it right now. Can we please address the open/finalize-per-append pattern first? That seems like a clear bug in how the API is being used, and the fix should be straightforward. |
|
I understood from this example in the docs that I'm confused now. Among the features of CARv2, the docs mention:
What is the advantage of CARv2 in this regard? Can a CARv1 file be resumed the same way? I'm happy to use CARv1 if we are not getting any benefits from using v2. One final consideration, as I replied to your original suggestion, the reason I implemented the To summarize: I'm moving to CARv1 and will |
My understanding is that CarV1 doesn't have the notion of resumption since it's just a list of CIDs and their bytes, and resumption is for ensuring the index of a CarV2 is valid, CarV1 has no index, thus the operation is moot. In my experience CarV2 has only been valuable when performing random access over the data, treating it as a "store", thus why it has an index. Candidly, a CAR(v1 or v2) file isn't really the correct "thing" for this use case. They are designed for DAGs with root CIDs. The header describes entry points to a graph. But these receipts are independent blocks with no DAG structure. We're passing nil for roots because there aren't any. But using it here is totally fine with me as we use it as the canonical medium for passing around data over a transport layer. Much excitement for Filepack wrt to this point. Looking at the commit just pushed which moved this to CarV1, here's an sketch for an alternative: // serilz the receipt data
rcptArchive := rcpt.Archive()
archiveBytes, err := io.ReadAll(rcptArchive)
if err != nil {
return false, cid.Cid{}, err
}
// make the cid
archiveCID, err := cid.V1Builder{
Codec: uint64(multicodec.Car),
MhType: uint64(multihash.SHA2_256),
}.Sum(archiveBytes)
if err != nil {
return false, cid.Cid{}, err
}
// cid to bytes
cidBytes := archiveCID.Bytes()
// append a line in the car file, this is what `Put` is doing internally, but less complicated.
if err := util.LdWrite(s.writer, cidBytes, archiveBytes); err != nil {
return false, cid.Cid{}, err
}
// record the size of the data written
blockSize := util.LdSize(cidBytes, archiveBytes)
s.currentSize += int64(blockSize)This will produce a carv1 roughly resembling: Then it comes to locking, just lock around the |
frrist
left a comment
There was a problem hiding this comment.
Much better, thanks. Left some more comments on the change to the Journal implementation. Happy for them to be addressed in a follow on.
| type fsJournal struct { | ||
| basePath string | ||
| curBatchPath string | ||
| rwbs *blockstore.ReadWrite |
There was a problem hiding this comment.
I don't think we're getting much from this, could just be an *os.File.
| if s.rwbs == nil { | ||
| // Open a new read-write blockstore for the current batch | ||
| rwbs, err := blockstore.OpenReadWrite(s.curBatchPath, nil, blockstore.WriteAsCarV1(true)) | ||
| if err != nil { | ||
| return false, cid.Cid{}, fmt.Errorf("opening current batch for writing: %w", err) | ||
| } | ||
|
|
||
| s.rwbs = rwbs | ||
| } |
There was a problem hiding this comment.
Preference for this to be done in:
NewFSJournal: this allows initialization errors to be caught at construction, rather than the first call toAppendrotate: Keeps theAppendmethod focused on business logic of writing blocks.
| // rotate the batch if it exceeds the size limit | ||
| curSize, err := s.currentBatchSize() | ||
| if err != nil { | ||
| return false, cid.Cid{}, fmt.Errorf("checking current batch size: %w", err) | ||
| } |
There was a problem hiding this comment.
I'd prefer to remove the Stat call caused by this operation, one Stat per Append isn't ideal for hot path code. Instead, we can use the util.Ld* methods I mentioned in my comment, and track the size of the batch directly. NewFSJournal can perform the initial Stat operation for the case of resuming writing to an existing batch.
| // Rename the file to include the CID | ||
| if err := os.Rename(s.curBatchPath, newPath); err != nil { | ||
| return cid.Cid{}, fmt.Errorf("renaming batch file: %w", err) | ||
| } |
There was a problem hiding this comment.
I am not sure what the behavior is from renaming an open file, to be safe I think we'll want to close f before calling this operation. Please address this before merge
|
@frrist wanna take a final look before merging? |
|
LGTM |
| blobs.Module, // Provides blob service and handler | ||
| claims.Module, // Provides claims service and handler | ||
| publisher.Module, // Provides publisher service and handler | ||
| egresstracking.Module, // Provides egress tracking service |
There was a problem hiding this comment.
| egresstracking.Module, // Provides egress tracking service | |
| egresstracker.Module, // Provides egress tracking service |
We have aggregator. publisher, replicator, principalresolver already - for more consistency?
There was a problem hiding this comment.
yeah, I used egresstracking to differentiate from the egress tracker, which is the service running at the other end. But I plan to rename that to billing server, so 👍🏻
| warmStageIndexingServiceDID = lo.Must(did.Parse("did:web:staging.indexer.warm.storacha.network")) | ||
|
|
||
| warmStageEgressTrackingServiceURL = lo.Must(url.Parse("https://staging.etracker.warm.storacha.network")) | ||
| warmStageEgressTrackingServiceURL = lo.Must(url.Parse("https://staging.etracker.warm.storacha.network/track")) |
There was a problem hiding this comment.
sure. It will make more sense when the service accepts other capabilities.
Changed here and storacha/etracker#4
| return fmt.Errorf("reading receipt: %w", err) | ||
| } | ||
|
|
||
| // we're not expecting any meaningful response here so we just check for error |
There was a problem hiding this comment.
...except it's a TODO to get the consolidate task CID from the effects.
There was a problem hiding this comment.
shoot! I totally forgot about that, great catch.
There was a problem hiding this comment.
ah, wait, yes, this is done in #264, my brain is mixing things
|
|
||
| // Calculate the CID of the current batch | ||
| hash := sha256.New() | ||
| n, err := io.Copy(hash, j.currBatch) |
There was a problem hiding this comment.
I think you should calculate the hash as bytes are being appended, so that when you come to rotate you don't have to read the whole file again, which will be a pause to operations.
Obviously when you start the node you'll have to re-hash anything already written but it's not such a problem since you're already in a paused state...
There was a problem hiding this comment.
And if we wanna get real fancy/fast here later we can swap out sha for BLAKE3, which is typically faster.
There was a problem hiding this comment.
I love the idea. Unfortunately it's not easy to get ahold of the bytes being written when using util.LdWrite. I implemented it by seeking the current batch stream, but I think that might actually end up being even slower 🤷🏻. Take a look at 3c25095
An alternative would be to use our custom implementation of util.LdWrite and car.WriteHeader that write to the hash while writing to the underlying stream. That would also allow avoiding the use of util.LdSize, I think both util.LdWrite and car.WriteHeader should return the number of bytes written, as is usual in write functions. I'll give a stab at this tomorrow.
There was a problem hiding this comment.
I think ldwrite is just a varint followed by the bytes...should be easy enough to do manually.
You could alternatively tee the stream?
There was a problem hiding this comment.
ah, that's a great idea!
Co-authored-by: ash <alan@storacha.network>
|
I think all comments and suggestions have been addressed, so I'm merging this! |
small fix/follow-on to #245 Ensures when a journal is resumed, the incremental sha is re-computed, see `TestResumeAfterRestart` for details. --------- Co-authored-by: Vicente Olmedo <vicente@storacha.network>
Ref: #174
Implement an
EgressBatchStorethat allows appendingspace/content/retrievereceipts to a batch. When the batch is above the max, it is flushed automatically.Things that are missing and will be implemented in follow up PRs:
space/egress/trackinvocationhttp.FileSystemand adding the correspondinghttp.FileServerendpointOnce these are done, I'll hook it up in Alan's
space/content/retrievehandler.