Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ const (
DefaultMaxLogFileSize = 20
)

// SwitchRPC is a build-time variable that indicates whether a remote
// router is in control of the payment lifecycle. This is true when the
// 'switchrpc' build tag is active.
var SwitchRPC bool

// LogConfig holds logging configuration options.
//
//nolint:ll
Expand Down
8 changes: 8 additions & 0 deletions build/router_local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//go:build !switchrpc
// +build !switchrpc

package build

func init() {
SwitchRPC = false
}
8 changes: 8 additions & 0 deletions build/router_remote.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//go:build switchrpc
// +build switchrpc

package build

func init() {
SwitchRPC = true
}
11 changes: 11 additions & 0 deletions docs/release-notes/release-notes-0.20.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ a certain amount of msats.
[allow](https://github.com/lightningnetwork/lnd/pull/10087)
`conf_target=1`. Previously they required `conf_target >= 2`.

* To support scenarios where an external entity, such as a remote router,
manages the payment lifecycle via the Switch RPC server, the node must
preserve the history of HTLC attempts across restarts. This [behavior](https://github.com/lightningnetwork/lnd/pull/10178) is now
conditional on how the lnd binary is built. When compiled with the `switchrpc`
build tag, the local `routing.ChannelRouter`'s automatic cleanup of the
dispatcher's (Switch) attempt store on startup is disabled. This shifts the
responsibility of state cleanup to the external controller, which is expected
to use an RPC interface (e.g., switchrpc) to manage the lifecycle of attempts.
Tying this behavior to a build tag, rather than a runtime flag, makes the
binary's purpose explicit and prevents potential misconfigurations.

## RPC Additions
* When querying [`ForwardingEvents`](https://github.com/lightningnetwork/lnd/pull/9813)
logs, the response now include the incoming and outgoing htlc indices of the payment
Expand Down
85 changes: 83 additions & 2 deletions htlcswitch/payment_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"sync"

Expand All @@ -26,6 +27,15 @@ var (
// ErrPaymentIDAlreadyExists is returned if we try to write a pending
// payment whose paymentID already exists.
ErrPaymentIDAlreadyExists = errors.New("paymentID already exists")

// remoteRouterMarkerBucket is a bucket that contains the marker
// key used to indicate that the htlcswitch's payment attempt
// history is being managed by an external router.
remoteRouterMarkerBucket = []byte("remote-router-marker-bucket")

// remoteRouterMarkerKey is the key that will be present in the
// marker bucket if the switch is remote-managed.
remoteRouterMarkerKey = []byte("remote-router-marker-key")
)

// PaymentResult wraps a decoded result received from the network after a
Expand Down Expand Up @@ -85,6 +95,9 @@ func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
type networkResultStore struct {
backend kvdb.Backend

// isRemote indicates that the store is being used in remote mode.
isRemote bool

// results is a map from paymentIDs to channels where subscribers to
// payment results will be notified.
results map[uint64][]chan *networkResult
Expand All @@ -96,12 +109,68 @@ type networkResultStore struct {
attemptIDMtx *multimutex.Mutex[uint64]
}

func newNetworkResultStore(db kvdb.Backend) *networkResultStore {
func newNetworkResultStore(db kvdb.Backend,
isRemote bool) (*networkResultStore, error) {

// Check for a state mismatch. If this is a local-managed router
// but the database has been marked for remote management, we must
// exit to prevent data loss.
if !isRemote {
var isMarkedRemote bool
err := db.View(func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(remoteRouterMarkerBucket)
if bucket != nil &&
bucket.Get(remoteRouterMarkerKey) != nil {

isMarkedRemote = true
}

return nil
}, func() {
isMarkedRemote = false
})
if err != nil {
return nil, fmt.Errorf("unable to check for remote "+
"router marker: %w", err)
}

if isMarkedRemote {
return nil, fmt.Errorf("the htlc attempt database is " +
"marked for remote management by a switchrpc " +
"build, but this binary is in local " +
"management mode. Halting to prevent data " +
"loss. To use this database, restart with an " +
"lnd build that includes the `switchrpc` " +
"build tag")
}
}

// If we are in remote-managed mode, write the marker. This is
// placed after the check above to ensure we don't write and then
// immediately fail in a misconfigured dev environment.
if isRemote {
err := db.Update(func(tx kvdb.RwTx) error {
bucket, err := tx.CreateTopLevelBucket(
remoteRouterMarkerBucket,
)
if err != nil {
return err
}

return bucket.Put(remoteRouterMarkerKey, []byte{0x01})
}, func() {})
if err != nil {
return nil, fmt.Errorf("unable to write remote "+
"router marker: %w", err)
}
}

return &networkResultStore{
backend: db,
isRemote: isRemote,
results: make(map[uint64][]chan *networkResult),
attemptIDMtx: multimutex.NewMutex[uint64](),
}
}, nil
}

// storeResult stores the networkResult for the given attemptID, and notifies
Expand Down Expand Up @@ -294,6 +363,18 @@ func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error {
"result store", len(toClean))
}

// If this was a total cleanup and we are in remote mode,
// delete the marker.
if len(keep) == 0 && store.isRemote {
err := tx.DeleteTopLevelBucket(remoteRouterMarkerBucket)

if err != nil && !errors.Is(
err, kvdb.ErrBucketNotFound) {

return err
}
}

return nil
}, func() {})
}
3 changes: 2 additions & 1 deletion htlcswitch/payment_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func TestNetworkResultStore(t *testing.T) {

db := channeldb.OpenForTesting(t, t.TempDir())

store := newNetworkResultStore(db)
store, err := newNetworkResultStore(db, false)
require.NoError(t, err, "unable create result store")

var results []*networkResult
for i := 0; i < numResults; i++ {
Expand Down
13 changes: 12 additions & 1 deletion htlcswitch/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ type Config struct {

// IsAlias returns whether or not a given SCID is an alias.
IsAlias func(scid lnwire.ShortChannelID) bool

// RemoteRouter is a boolean that indicates whether the payment
// lifecycle is managed by a remote router.
RemoteRouter bool
}

// Switch is the central messaging bus for all incoming/outgoing HTLCs.
Expand Down Expand Up @@ -372,6 +376,13 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) {
return nil, err
}

networkResultStore, err := newNetworkResultStore(
cfg.DB, cfg.RemoteRouter,
)
if err != nil {
return nil, err
}

s := &Switch{
bestHeight: currentHeight,
cfg: &cfg,
Expand All @@ -381,7 +392,7 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) {
interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink),
linkStopIndex: make(map[lnwire.ChannelID]chan struct{}),
networkResults: newNetworkResultStore(cfg.DB),
networkResults: networkResultStore,
htlcPlex: make(chan *plexPacket),
chanCloseRequests: make(chan *ChanClose),
resolutionMsgs: make(chan *resolutionMsg),
Expand Down
21 changes: 18 additions & 3 deletions routing/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ type Config struct {
// TrafficShaper is an optional traffic shaper that can be used to
// control the outgoing channel of a payment.
TrafficShaper fn.Option[htlcswitch.AuxTrafficShaper]

// DispatcherManagedExternally indicates that the dispatcher's (the
// switch's) payment store is being managed by an external entity, and
// should not be cleaned on startup.
DispatcherManagedExternally bool
}

// EdgeLocator is a struct used to identify a specific edge.
Expand Down Expand Up @@ -1441,9 +1446,19 @@ func (r *ChannelRouter) resumePayments() error {
}
}

log.Debugf("Cleaning network result store.")
if err := r.cfg.Payer.CleanStore(toKeep); err != nil {
return err
// When the payment life-cycle is managed by an external entity, we must
// not clean the attempt store on startup. The external controller
// relies on HTLC attempt information persisted by the dispatcher to
// resume its payment lifecycle and will need to coordinate all cleanup
// operations itself.
if !r.cfg.DispatcherManagedExternally {
log.Debugf("Cleaning network result store.")
if err := r.cfg.Payer.CleanStore(toKeep); err != nil {
return err
}
} else {
log.Infof("Dispatcher attempt store cleanup disabled. " +
"Attempt information must be cleaned remotely")
}

// launchPayment is a helper closure that handles resuming the payment.
Expand Down
31 changes: 17 additions & 14 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/lightningnetwork/lnd/aliasmgr"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
Expand Down Expand Up @@ -813,6 +814,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr,
MaxFeeExposure: thresholdMSats,
SignAliasUpdate: s.signAliasUpdate,
IsAlias: aliasmgr.IsAlias,
RemoteRouter: build.SwitchRPC,
}, uint32(currentHeight))
if err != nil {
return nil, err
Expand Down Expand Up @@ -1028,20 +1030,21 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr,
}

s.chanRouter, err = routing.New(routing.Config{
SelfNode: nodePubKey,
RoutingGraph: dbs.GraphDB,
Chain: cc.ChainIO,
Payer: s.htlcSwitch,
Control: s.controlTower,
MissionControl: s.defaultMC,
SessionSource: paymentSessionSource,
GetLink: s.htlcSwitch.GetLinkByShortID,
NextPaymentID: sequencer.NextID,
PathFindingConfig: pathFindingConfig,
Clock: clock.NewDefaultClock(),
ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate,
ClosedSCIDs: s.fetchClosedChannelSCIDs(),
TrafficShaper: implCfg.TrafficShaper,
SelfNode: nodePubKey,
RoutingGraph: dbs.GraphDB,
Chain: cc.ChainIO,
Payer: s.htlcSwitch,
Control: s.controlTower,
MissionControl: s.defaultMC,
SessionSource: paymentSessionSource,
GetLink: s.htlcSwitch.GetLinkByShortID,
NextPaymentID: sequencer.NextID,
PathFindingConfig: pathFindingConfig,
Clock: clock.NewDefaultClock(),
ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate,
ClosedSCIDs: s.fetchClosedChannelSCIDs(),
TrafficShaper: implCfg.TrafficShaper,
DispatcherManagedExternally: build.SwitchRPC,
})
if err != nil {
return nil, fmt.Errorf("can't create router: %w", err)
Expand Down
Loading