Skip to content

Recover bids for unresolved auction on restart #3109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

Tristan-Wilson
Copy link
Member

The Bid Validator sends validated bids to the Autonomous Auctioneer by means of Redis streams, using our pubsub package.

This commit changes the Auctioneer to only mark bid messages as fully consumed after the auction they were part of is resolved, or if the auction resolution window has closed and can no longer be resolved. This means if the Auctioneer is restarted mid-round before auction resolution, then it will fetch any bids from earlier in the round from redis.

The Bid Validator sends validated bids to the Autonomous Auctioneer by
means of Redis streams, using our pubsub package.

This commit changes the Auctioneer to only mark bid messages as fully
consumed after the auction they were part of is resolved, or if the
auction resolution window has closed and can no longer be resolved. This
means if the Auctioneer is restarted mid-round before auction
resolution, then it will fetch any bids from earlier in the round from
redis.
// Once resolveAuction returns, we acknowledge all bids to remove them from redis.
// We remove them unconditionally, since resolveAuction retries until the round ends,
// and there is no way to use them after the round ends.
defer a.acknowledgeAllBids(ctx, upcomingRound)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fine if this function exits early because of an error and we still ack all bids?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is fine because resolveAuction is only called once per auction and not retried.

if uint64(bid.Round) <= round {
if err := a.consumer.SetResult(ctx, msgID, nil); err != nil {
log.Error("Error acknowledging bid after auction resolution", "msgID", msgID, "error", err)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we continue because of an error here, is it ok if we have an unacked bid that is not cleared in the map?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it in the latest version.

@Tristan-Wilson
Copy link
Member Author

Tristan-Wilson commented Apr 10, 2025

I am reworking this because I had misunderstood how our pubsub abstraction worked.

Problem with my first attempt

(For tests the heartbeat is every 3ms, idle timeout is 30ms.)
msg.Ack() as soon as they are consumed -> this turns off the heartbeat
Messages are put into list to be SetResult-ed at end of round

The problem with this is that the messages will be continuously replayed because they will have timed out (no heartbeat), and the bid cache uses whatever message it got last. Even if there wasn't the following issue with reading the pending messages manually, the auction may end in the middle of replaying the messages and this isn't good.

The behavior of the consumer picking 1 out of 50 of the oldest messages randomly will break chronological ordering of bids. Bids need to be re-inserted into the bid cache in order since newer bids from the same account overwrite older bids.

Other problems

The production default autoclaim time is too short for the rebooted consumer case we're trying to handle -> if it's set to 5 minutes then messages will never be able to be reclaimed in a given 1 minute round

@ganeshvanahalli
Copy link
Contributor

I am reworking this because I had misunderstood how our pubsub abstraction worked.

Problem with my first attempt

(For tests the heartbeat is every 3ms, idle timeout is 30ms.) msg.Ack() as soon as they are consumed -> this turns off the heartbeat Messages are put into list to be SetResult-ed at end of round

The problem with this is that the messages will be continuously replayed because they will have timed out (no heartbeat), and the bid cache uses whatever message it got last. Even if there wasn't the following issue with reading the pending messages manually, the auction may end in the middle of replaying the messages and this isn't good.

The behavior of the consumer picking 1 out of 50 of the oldest messages randomly will break chronological ordering of bids. Bids need to be re-inserted into the bid cache in order since newer bids from the same account overwrite older bids.

For this, on the consumer side (i.e auctioneer), why dont we just Ack all-but-last bid from an address? Because anyway newer bids should replace the older ones so theres not point in not ack-ing old bids. This way even if the auctioneer (consumer) restarts- it will try to reclaim bids from the PEL that are already from unique addresses!

One way to do this is bidcache will store ack functions for last bids from a controller-address along with the validated bid and upon receiving another bid from the same address you can then call that ack function from the cache before calling bidcache.add(). This way acknowledgeAllBids function will just be iterating over bidcache and calling the ack functions after successful auction resolution or at round end.

validateBidTemporal is a great idea that and also probably highlights a possible bug where old bids could've been processed in the next round- though very very unlikely!

@Tristan-Wilson Tristan-Wilson marked this pull request as ready for review April 16, 2025 20:24
@Tristan-Wilson
Copy link
Member Author

For this, on the consumer side (i.e auctioneer), why dont we just Ack all-but-last bid from an address? Because anyway newer bids should replace the older ones so theres not point in not ack-ing old bids. This way even if the auctioneer (consumer) restarts- it will try to reclaim bids from the PEL that are already from unique addresses!

One way to do this is bidcache will store ack functions for last bids from a controller-address along with the validated bid and upon receiving another bid from the same address you can then call that ack function from the cache before calling bidcache.add(). This way acknowledgeAllBids function will just be iterating over bidcache and calling the ack functions after successful auction resolution or at round end.

It's a good idea but so far I'm unconvinced that it's worth the extra complexity, especially since we're limiting bids per account to 5.

Copy link
Contributor

@ganeshvanahalli ganeshvanahalli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great test! Requesting minor changes required for production

@@ -263,16 +275,48 @@ func (a *AuctioneerServer) Start(ctx_in context.Context) {
// There's nothing in the queue.
return time.Millisecond * 250
}

if err := validateBidTemporal(&a.roundTimingInfo, (uint64)(req.Value.Round)); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have been nicer to have the body of this CallIteratively invocation be its own function, which we can more easily unit test. It seems there are several assertions we would want to ensure in this logic, for instance, that we do not send duplicate bids to the receiver because it cares about orderings. We can also check that we reconsumed the same bid, etc. This business logic could be tested by passing in a mock redis interface as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to its own function consumeNextBid and added unit tests.

@@ -426,6 +475,33 @@ func (a *AuctioneerServer) resolveAuction(ctx context.Context) error {
return nil
}

func (a *AuctioneerServer) acknowledgeAllBids(ctx context.Context, round uint64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method could also be unit tested to verify that we delete from the unacked bids map for bids that are within the expected round

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now tested in TestConsumeNextBid_MultipleValidBids

// Check time-related constraints for bid.
// It's useful to split out to be able to re-check just these contraints after
// time has elapsed.
func validateBidTemporal(roundTimingInfo *RoundTimingInfo, bidRound uint64) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateBidTimeConstraints feels more apt here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks that is a better name.

@@ -203,7 +215,7 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
}
}
return
case <-time.After(c.cfg.IdletimeToAutoclaim / 10):
case <-time.After(c.cfg.IdletimeToAutoclaim / 2):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cut from 10 to 2? Also, what is the rationale behind the divisor at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With 10 then there would be 10 heartbeats for each autoclaim period which seems like overkill. I changed it to 2 instead of 1 because with 1 if there is any delay of the heartbeat goroutine then it could miss a heartbeat and cause the message to be reclaimed.

@Tristan-Wilson Tristan-Wilson force-pushed the timeboost-auctioneer-mid-round-restart-bid-recovery branch from a7a692f to a616359 Compare April 25, 2025 01:56
Copy link
Contributor

@ganeshvanahalli ganeshvanahalli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@rauljordan rauljordan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice tests! This PR looks great :)

@Tristan-Wilson Tristan-Wilson merged commit b51c4a3 into master May 1, 2025
17 checks passed
@Tristan-Wilson Tristan-Wilson deleted the timeboost-auctioneer-mid-round-restart-bid-recovery branch May 1, 2025 00:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants