Skip to content

Commit 6c78fc5

Browse files
authored
Merge pull request #2550 from halseth/autpilot-agent-quit-pending-conn
autopilot: don't block Stop on pending connection
2 parents 077e188 + 49a85b2 commit 6c78fc5

File tree

2 files changed

+187
-1
lines changed

2 files changed

+187
-1
lines changed

autopilot/agent.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,40 @@ func (a *Agent) executeDirective(directive AttachmentDirective) {
667667
return
668668
}
669669

670-
alreadyConnected, err := a.cfg.ConnectToPeer(pub, directive.Addrs)
670+
connected := make(chan bool)
671+
errChan := make(chan error)
672+
673+
// To ensure a call to ConnectToPeer doesn't block the agent from
674+
// shutting down, we'll launch it in a non-waitgrouped goroutine, that
675+
// will signal when a result is returned.
676+
// TODO(halseth): use DialContext to cancel on transport level.
677+
go func() {
678+
alreadyConnected, err := a.cfg.ConnectToPeer(
679+
pub, directive.Addrs,
680+
)
681+
if err != nil {
682+
select {
683+
case errChan <- err:
684+
case <-a.quit:
685+
}
686+
return
687+
}
688+
689+
select {
690+
case connected <- alreadyConnected:
691+
case <-a.quit:
692+
return
693+
}
694+
}()
695+
696+
var alreadyConnected bool
697+
select {
698+
case alreadyConnected = <-connected:
699+
case err = <-errChan:
700+
case <-a.quit:
701+
return
702+
}
703+
671704
if err != nil {
672705
log.Warnf("Unable to connect to %x: %v",
673706
pub.SerializeCompressed(), err)

autopilot/agent_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,3 +1546,156 @@ func TestAgentSkipPendingConns(t *testing.T) {
15461546
t.Fatalf("agent should have attempted connection")
15471547
}
15481548
}
1549+
1550+
// TestAgentQuitWhenPendingConns tests that we are able to stop the autopilot
1551+
// agent even though there are pending connections to nodes.
1552+
func TestAgentQuitWhenPendingConns(t *testing.T) {
1553+
t.Parallel()
1554+
1555+
// First, we'll create all the dependencies that we'll need in order to
1556+
// create the autopilot agent.
1557+
self, err := randKey()
1558+
if err != nil {
1559+
t.Fatalf("unable to generate key: %v", err)
1560+
}
1561+
1562+
quit := make(chan struct{})
1563+
defer close(quit)
1564+
1565+
heuristic := &mockHeuristic{
1566+
nodeScoresArgs: make(chan directiveArg),
1567+
nodeScoresResps: make(chan map[NodeID]*NodeScore),
1568+
quit: quit,
1569+
}
1570+
constraints := &mockConstraints{
1571+
moreChansResps: make(chan moreChansResp),
1572+
quit: quit,
1573+
}
1574+
1575+
chanController := &mockChanController{
1576+
openChanSignals: make(chan openChanIntent),
1577+
}
1578+
memGraph, _, _ := newMemChanGraph()
1579+
1580+
// The wallet will start with 6 BTC available.
1581+
const walletBalance = btcutil.SatoshiPerBitcoin * 6
1582+
1583+
connect := make(chan chan error)
1584+
1585+
// With the dependencies we created, we can now create the initial
1586+
// agent itself.
1587+
testCfg := Config{
1588+
Self: self,
1589+
Heuristic: heuristic,
1590+
ChanController: chanController,
1591+
WalletBalance: func() (btcutil.Amount, error) {
1592+
return walletBalance, nil
1593+
},
1594+
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
1595+
errChan := make(chan error)
1596+
1597+
select {
1598+
case connect <- errChan:
1599+
case <-quit:
1600+
return false, errors.New("quit")
1601+
}
1602+
1603+
select {
1604+
case err := <-errChan:
1605+
return false, err
1606+
case <-quit:
1607+
return false, errors.New("quit")
1608+
}
1609+
},
1610+
DisconnectPeer: func(*btcec.PublicKey) error {
1611+
return nil
1612+
},
1613+
Graph: memGraph,
1614+
Constraints: constraints,
1615+
}
1616+
initialChans := []Channel{}
1617+
agent, err := New(testCfg, initialChans)
1618+
if err != nil {
1619+
t.Fatalf("unable to create agent: %v", err)
1620+
}
1621+
1622+
// To ensure the heuristic doesn't block on quitting the agent, we'll
1623+
// use the agent's quit chan to signal when it should also stop.
1624+
heuristic.quit = agent.quit
1625+
1626+
// With the autopilot agent and all its dependencies we'll start the
1627+
// primary controller goroutine.
1628+
if err := agent.Start(); err != nil {
1629+
t.Fatalf("unable to start agent: %v", err)
1630+
}
1631+
defer agent.Stop()
1632+
1633+
// We'll only return a single directive for a pre-chosen node.
1634+
nodeKey, err := memGraph.addRandNode()
1635+
if err != nil {
1636+
t.Fatalf("unable to generate key: %v", err)
1637+
}
1638+
nodeID := NewNodeID(nodeKey)
1639+
nodeDirective := &NodeScore{
1640+
NodeID: nodeID,
1641+
Score: 0.5,
1642+
}
1643+
1644+
// We'll send an initial "yes" response to advance the agent past its
1645+
// initial check. This will cause it to try to get directives from the
1646+
// graph.
1647+
select {
1648+
case constraints.moreChansResps <- moreChansResp{
1649+
numMore: 1,
1650+
amt: walletBalance,
1651+
}:
1652+
case <-time.After(time.Second * 10):
1653+
t.Fatalf("heuristic wasn't queried in time")
1654+
}
1655+
1656+
// Check the args.
1657+
select {
1658+
case req := <-heuristic.nodeScoresArgs:
1659+
if len(req.nodes) != 1 {
1660+
t.Fatalf("expected %v nodes, instead "+
1661+
"had %v", 1, len(req.nodes))
1662+
}
1663+
if _, ok := req.nodes[nodeID]; !ok {
1664+
t.Fatalf("node not included in arguments")
1665+
}
1666+
case <-time.After(time.Second * 10):
1667+
t.Fatalf("select wasn't queried in time")
1668+
}
1669+
1670+
// Respond with a scored directive.
1671+
select {
1672+
case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{
1673+
NewNodeID(nodeKey): nodeDirective,
1674+
}:
1675+
case <-time.After(time.Second * 10):
1676+
t.Fatalf("heuristic wasn't queried in time")
1677+
}
1678+
1679+
// The agent should attempt connection to the node.
1680+
select {
1681+
case <-connect:
1682+
case <-time.After(time.Second * 10):
1683+
t.Fatalf("agent did not attempt connection")
1684+
}
1685+
1686+
// Make sure that we are able to stop the agent, even though there is a
1687+
// pending connection.
1688+
stopped := make(chan error)
1689+
go func() {
1690+
stopped <- agent.Stop()
1691+
}()
1692+
1693+
select {
1694+
case err := <-stopped:
1695+
if err != nil {
1696+
t.Fatalf("error stopping agent: %v", err)
1697+
}
1698+
case <-time.After(2 * time.Second):
1699+
t.Fatalf("unable to stop agent")
1700+
}
1701+
}

0 commit comments

Comments
 (0)