Skip to content

actor: add new package for structured concurrency based on the Actor model #9820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

Roasbeef
Copy link
Member

@Roasbeef Roasbeef commented May 17, 2025

In this PR, we add a new package that implements structured concurrency patterns using the actor model. The README should give a good overview of how the API works, and the major abstractions at play. I'll focus this PR body on the motivation behind introducing such a model, and some of my goals.

Motivation

While working on the new RBF close based channel, I ran into a bug when I went to test the restart scenario.

Typically, the RPC server will contact the switch to do a coop close. The switch holds the link, which has a call back passed into to trigger a coop close via the peer. At the end of this series of calls, we create a new chan closer which uses the underlying channel, and also the new RBF state machine to drive the coop close process:

lnd/rpcserver.go

Lines 2811 to 2831 in 3707b1f

// If the link is not known by the switch, we cannot gracefully close
// the channel.
channelID := lnwire.NewChanIDFromOutPoint(*chanPoint)
if _, err := r.server.htlcSwitch.GetLink(channelID); err != nil {
chanInSwitch = false
// The channel isn't in the switch, but if there's an
// active chan closer for the channel, and it's of the
// RBF variant, then we can actually bypass the switch.
// Otherwise, we'll return an error.
if !chanHasRbfCloser {
rpcsLog.Debugf("Trying to non-force close "+
"offline channel with chan_point=%v",
chanPoint)
return fmt.Errorf("unable to gracefully close "+
"channel while peer is offline (try "+
"force closing it instead): %v", err)
}
}

Unlike the existing chan closer, after a restart, the rbf chan closer is free to trigger additional fee updates. However, as we don't load non active channels back into the switch, the RPC server wasn't able to rely on the switch to send the messages it needed.

To get around this, I had to add 3 new methods to do the pointer chasing of: rpcServer -> server -> peer -> active close map -> rbf closer:

lnd/server.go

Lines 5521 to 5548 in 3707b1f

// AttemptRBFCloseUpdate attempts to trigger a new RBF iteration for a co-op
// close update. This route it to be used only if the target channel in question
// is no longer active in the link. This can happen when we restart while we
// already have done a single RBF co-op close iteration.
func (s *server) AttemptRBFCloseUpdate(ctx context.Context,
chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight,
deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) {
// If the channel is present in the switch, then the request should flow
// through the switch instead.
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
if _, err := s.htlcSwitch.GetLink(chanID); err == nil {
return nil, fmt.Errorf("ChannelPoint(%v) is active in link, "+
"invalid request", chanPoint)
}
// At this point, we know that the channel isn't present in the link, so
// we'll check to see if we have an entry in the active chan closer map.
updates, err := s.attemptCoopRbfFeeBump(
ctx, chanPoint, feeRate, deliveryScript,
)
if err != nil {
return nil, fmt.Errorf("unable to attempt coop rbf fee bump "+
"ChannelPoint(%v)", chanPoint)
}
return updates, nil
}

lnd/server.go

Lines 5482 to 5519 in 3707b1f

// attemptCoopRbfFeeBump attempts to look up the active chan closer for a
// channel given the outpoint. If found, we'll attempt to do a fee bump,
// returning channels used for updates. If the channel isn't currently active
// (p2p connection established), then his function will return an error.
func (s *server) attemptCoopRbfFeeBump(ctx context.Context,
chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight,
deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) {
// First, we'll attempt to look up the channel based on it's
// ChannelPoint.
channel, err := s.chanStateDB.FetchChannel(chanPoint)
if err != nil {
return nil, fmt.Errorf("unable to fetch channel: %w", err)
}
// From the channel, we can now get the pubkey of the peer, then use
// that to eventually get the chan closer.
peerPub := channel.IdentityPub.SerializeCompressed()
// Now that we have the peer pub, we can look up the peer itself.
s.mu.RLock()
targetPeer, ok := s.peersByPub[string(peerPub)]
s.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("peer for ChannelPoint(%v) is "+
"not online", chanPoint)
}
closeUpdates, err := targetPeer.TriggerCoopCloseRbfBump(
ctx, chanPoint, feeRate, deliveryScript,
)
if err != nil {
return nil, fmt.Errorf("unable to trigger coop rbf fee bump: "+
"%w", err)
}
return closeUpdates, nil
}

lnd/peer/brontide.go

Lines 5355 to 5392 in 3707b1f

// TriggerCoopCloseRbfBump given a chan ID, and the params needed to trigger a
// new RBF co-op close update, a bump is attempted. A channel used for updates,
// along with one used to o=communicate any errors is returned. If no chan
// closer is found, then false is returned for the second argument.
func (p *Brontide) TriggerCoopCloseRbfBump(ctx context.Context,
chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight,
deliveryScript lnwire.DeliveryAddress) (*CoopCloseUpdates, error) {
// If RBF coop close isn't permitted, then we'll an error.
if !p.rbfCoopCloseAllowed() {
return nil, fmt.Errorf("rbf coop close not enabled for " +
"channel")
}
closeUpdates := &CoopCloseUpdates{
UpdateChan: make(chan interface{}, 1),
ErrChan: make(chan error, 1),
}
// We'll re-use the existing switch struct here, even though we're
// bypassing the switch entirely.
closeReq := htlcswitch.ChanClose{
CloseType: contractcourt.CloseRegular,
ChanPoint: &chanPoint,
TargetFeePerKw: feeRate,
DeliveryScript: deliveryScript,
Updates: closeUpdates.UpdateChan,
Err: closeUpdates.ErrChan,
Ctx: ctx,
}
err := p.startRbfChanCloser(newRPCShutdownInit(&closeReq), chanPoint)
if err != nil {
return nil, err
}
return closeUpdates, nil
}


Across the codebase, this is a common pattern wherein we'll go from the rpcServer to the server, which is effectively a "god struct" that contains pointers to all the other sub-systems that we may need to access. In the case of the version before this PR, as the life cycle of some sub-systems shifted, we had to unravel a few abstractions to be able to send a message to the sub-system at play. This requires quite a bit of knowledge on the behalf of the RPC server: it needs to know which sub-systems manage others, their life cycles, how they're created, the methods to call, etc, etc.

The Actor Solution

As we'll see in a follow up PR, the new actor package lets the peer expose a new actor that can handle the RPC specific logic for the close bump process. So instead of doing this pointer chasing thru the god struct, which adds a degree of tight coupling, the rpcserver just needs to create the ServiceKey that it advertised at the package level. It can then use this to send a message directly to the new rbf close actor.

This achieves a greater degree of loose coupling, and the abstractions lend well to experimentation and composition of various actors.

Broadly in the codebase, we already implement message passing between event loops managed by goroutines where the state is a local variable. This package codifies that pattern, and creates a more standardized way of allowing these event loops to interact with each other. It also provides more flexibility, as the sub-system boundaries are based on messages instead of methods.

Future[T]

IMO the Future[T] abstraction added in this PR is also a very nice abstraction that wraps the typical send/recv timeout behavior we have elsewhere across the codebase. Instead of directly returning the channel (allows for anti-patterns such as blocking forever w/ no context), we can return Future[T] in place.

Roasbeef added 5 commits May 16, 2025 17:24
In this commit, we add two new fundamental data structures: Future[T]
and Promise[T].

A future is a response that might be ready at some point in the future.
This is already a common pattern in Go, we just make a type safe wrapper
around the typical operations: block w/ a timeout, add a call back for
execution, pipeline the response to a new future.

A promise is an intent to complete a future. Typically the caller
receives the future, and the callee is able to complete the future using
a promise.
In this commit, we add the actual Actor implementation. We define a
series of types and interfaces, that in concert, describe our actor. An
actor has some ID, a reference (used to send messages to it), and also a
set of defined messages that it'll accept.

An actor can be implemented using a simple function if it's stateless.
Otherwise, a struct can implement the Receive method, and handle its
internal message passing and state that way.
In this commit, we add the actor system (along with the receiptionist)
and the router.

An actor can be registered with the system, which allows other callers
to locate it to send message to it via the receptionist. Custom routers
can be created for when there're actors that rely on the same service
key and also req+resp type. This can be used to implement something
similar to a worker pool.
In this commit, we add a series of examples that show how the package
can be used in the wild. They can be run as normal Example tests.
@Roasbeef Roasbeef added brainstorming Long term ideas/discussion/requests for feedback code health Related to code commenting, refactoring, and other non-behaviour improvements architecture Related to system design labels May 17, 2025
Copy link
Contributor

coderabbitai bot commented May 17, 2025

Important

Review skipped

Auto reviews are limited to specific labels.

🏷️ Labels to auto review (1)
  • llm-review

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

Pull reviewers stats

Stats of the last 30 days for lnd:

User Total reviews Time to review Total comments
guggero
🥇
24
▀▀▀
23h 49m
47
▀▀▀
ziggie1984
🥈
13
12h 35m
35
▀▀
bhandras
🥉
11
4h 34m
12
yyforyongyu
10
1d 3h 57m
16
Roasbeef
7
9h 39m
4
ellemouton
5
1d 6h 18m
5
bitromortac
5
1h 41m
6
morehouse
3
1d 1h 19m
3
ffranr
2
18m
0
mohamedawnallah
2
6d 14h 50m
▀▀
11
NishantBansal2003
2
5d 15h 32m
▀▀
0
sputn1ck
1
23h 39m
2
GeorgeTsagk
1
3d 36m
0
saubyk
1
20h 37m
0
MPins
1
8d 14h 1m
▀▀▀
3

In this commit, we add a readme which serves as a general introduction
to the pacakge, and also the motivation of the package. It serves as a
manual for developers that may wish to interact with the package.
@Roasbeef
Copy link
Member Author

Roasbeef commented May 20, 2025

Used this recently during the PB Hackathon. It worked pretty well. Decomposing the pipeline into dedicated actors allowed for incremental implementation during the time crunch (24 hr hackathon, but we made our solution in ~6 of actual coding). Only one instance of each actor was present in the final solution, but we could easily scale out the any of them (in particular the ChatActor and OptimizerActor) to increase parallelism. If we were to modify the FuzzExecutorActor to run instead using something like managed docker containers, the end messages say the same so no updates are needed elsewhere.

Here's a diagram describing the final architecture:

flowchart LR
    CLI(User/CLI) --> Engine(FuzzEngine)
    Engine -- "1\. Starts" --> Controller(FuzzControllerActor)
    
    subgraph Phase1["Phase 1: Initial Generation"]
        Controller -- "2\. Send Program" --> Writer(FuzzWriterActor)
        Writer -- "3\. Request" --> ChatActor(ChatActor)
        ChatActor -- "4\. Generate" --> Writer
        Writer -- "5\. Provide Test" --> Controller
    end
    
    subgraph Phase2["Phase 2: Iterative Loop"]
        Controller -- "6\. Execute" --> Executor(FuzzExecutorActor)
        Executor -- "7\. Return Results" --> Controller
        Controller -- "8\. Analyze" --> Optimizer(FuzzOptimizerActor)
        Optimizer -- "9\. Request" --> ChatActor
        ChatActor -- "10\. Improve" --> Optimizer
        Optimizer -- "11\. Return" --> Controller
        Controller -- "12\. Repeat" --> Executor
    end
    
    Controller -- "13\. Finalize" --> Report(Final Report)
Loading

One thing missing (especially important for the distributed case) is any sort of active queue management. Right now we'll just block forever, or until the context gets cancelled. We could update this to drop the message down to a RetryActor that'll try to deliver the message until restart. Ofc, we can also just rely on the caller to handle that themselves.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
architecture Related to system design brainstorming Long term ideas/discussion/requests for feedback code health Related to code commenting, refactoring, and other non-behaviour improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant