@@ -60,6 +60,20 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
60
60
{
61
61
/// The monitors
62
62
pub monitors : RwLock < HashMap < OutPoint , ChannelMonitor < ChannelSigner > > > ,
63
+ /// Beyond the synchronization of `monitors` itself, we cannot handle user events until after
64
+ /// any chain updates have been stored on disk. This mutex is used to provide mutual exclusion
65
+ /// of event-processing/block-/transaction-connection.
66
+ /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
67
+ /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
68
+ /// the pending payment entry, and then reloading before the monitor is persisted, resulting in
69
+ /// the ChannelManager re-adding the same payment entry, before the same block is replayed,
70
+ /// resulting in a duplicate PaymentSent event.
71
+ ///
72
+ /// XXX Describe what this means
73
+ /// XXX Figure out if its possible to have update ids here, I think no but it complicates the
74
+ /// channel_monitor_updated api a ton to make users track if they have the latest non-id state
75
+ /// stored :/
76
+ event_mutex : Mutex < HashSet < OutPoint > > ,
63
77
chain_source : Option < C > ,
64
78
broadcaster : T ,
65
79
logger : L ,
@@ -89,26 +103,44 @@ where C::Target: chain::Filter,
89
103
FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
90
104
{
91
105
let mut dependent_txdata = Vec :: new ( ) ;
92
- let monitors = self . monitors . read ( ) . unwrap ( ) ;
93
- for monitor in monitors. values ( ) {
94
- let mut txn_outputs = process ( monitor, txdata) ;
106
+ {
107
+ let monitors = self . monitors . write ( ) . unwrap ( ) ;
108
+ for ( funding_outpoint, monitor) in monitors. iter ( ) {
109
+ let mut txn_outputs;
110
+ {
111
+ let mut ev_lock = self . event_mutex . lock ( ) . unwrap ( ) ;
112
+ txn_outputs = process ( monitor, txdata) ;
113
+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
114
+ match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor) {
115
+ Ok ( ( ) ) =>
116
+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
117
+ Err ( ChannelMonitorUpdateErr :: PermanentFailure ) => {
118
+ self . user_provided_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateFailed ( * funding_outpoint) ) ;
119
+ } ,
120
+ Err ( ChannelMonitorUpdateErr :: TemporaryFailure ) => {
121
+ log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
122
+ ev_lock. insert ( * funding_outpoint) ;
123
+ } ,
124
+ }
125
+ }
95
126
96
- // Register any new outputs with the chain source for filtering, storing any dependent
97
- // transactions from within the block that previously had not been included in txdata.
98
- if let Some ( ref chain_source) = self . chain_source {
99
- let block_hash = header. block_hash ( ) ;
100
- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
101
- for ( idx, output) in outputs. drain ( ..) {
102
- // Register any new outputs with the chain source for filtering and recurse
103
- // if it indicates that there are dependent transactions within the block
104
- // that had not been previously included in txdata.
105
- let output = WatchedOutput {
106
- block_hash : Some ( block_hash) ,
107
- outpoint : OutPoint { txid, index : idx as u16 } ,
108
- script_pubkey : output. script_pubkey ,
109
- } ;
110
- if let Some ( tx) = chain_source. register_output ( output) {
111
- dependent_txdata. push ( tx) ;
127
+ // Register any new outputs with the chain source for filtering, storing any dependent
128
+ // transactions from within the block that previously had not been included in txdata.
129
+ if let Some ( ref chain_source) = self . chain_source {
130
+ let block_hash = header. block_hash ( ) ;
131
+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
132
+ for ( idx, output) in outputs. drain ( ..) {
133
+ // Register any new outputs with the chain source for filtering and recurse
134
+ // if it indicates that there are dependent transactions within the block
135
+ // that had not been previously included in txdata.
136
+ let output = WatchedOutput {
137
+ block_hash : Some ( block_hash) ,
138
+ outpoint : OutPoint { txid, index : idx as u16 } ,
139
+ script_pubkey : output. script_pubkey ,
140
+ } ;
141
+ if let Some ( tx) = chain_source. register_output ( output) {
142
+ dependent_txdata. push ( tx) ;
143
+ }
112
144
}
113
145
}
114
146
}
@@ -134,6 +166,7 @@ where C::Target: chain::Filter,
134
166
pub fn new ( chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P ) -> Self {
135
167
Self {
136
168
monitors : RwLock :: new ( HashMap :: new ( ) ) ,
169
+ event_mutex : Mutex :: new ( HashSet :: new ( ) ) ,
137
170
chain_source,
138
171
broadcaster,
139
172
logger,
@@ -189,10 +222,14 @@ where C::Target: chain::Filter,
189
222
/// 3) update(s) are applied to each remote copy of a ChannelMonitor,
190
223
/// 4) once all remote copies are updated, you call this function with the update_id that
191
224
/// completed, and once it is the latest the Channel will be re-enabled.
192
- pub fn channel_monitor_updated ( & self , funding_txo : OutPoint , highest_applied_update_id : u64 ) {
193
- self . user_provided_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateCompleted ( MonitorUpdated {
194
- funding_txo, monitor_update_id : highest_applied_update_id
195
- } ) ) ;
225
+ pub fn channel_monitor_updated ( & self , funding_txo : OutPoint , highest_applied_update_id : Option < u64 > ) {
226
+ if let Some ( monitor_update_id) = highest_applied_update_id {
227
+ self . user_provided_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateCompleted ( MonitorUpdated {
228
+ funding_txo, monitor_update_id
229
+ } ) ) ;
230
+ } else {
231
+ self . event_mutex . lock ( ) . unwrap ( ) . remove ( & funding_txo) ;
232
+ }
196
233
}
197
234
198
235
#[ cfg( any( test, feature = "fuzztarget" , feature = "_test_utils" ) ) ]
@@ -346,12 +383,17 @@ where C::Target: chain::Filter,
346
383
}
347
384
// Even if updating the monitor returns an error, the monitor's state will
348
385
// still be changed. So, persist the updated monitor despite the error.
349
- let persist_res = self . persister . update_persisted_channel ( funding_txo, & update, monitor) ;
386
+ let persist_res = self . persister . update_persisted_channel ( funding_txo, & Some ( update) , monitor) ;
350
387
if let Err ( ref e) = persist_res {
351
388
log_error ! ( self . logger, "Failed to persist channel monitor update: {:?}" , e) ;
352
389
}
353
390
if update_res. is_err ( ) {
354
391
Err ( ChannelMonitorUpdateErr :: PermanentFailure )
392
+ } else if self . user_provided_events . lock ( ) . unwrap ( ) . contains ( & MonitorEvent :: UpdateFailed ( funding_txo) ) {
393
+ // If we have a pending UpdateFailed event which hasn't yet been received by
394
+ // the ChannelManager, ensure we still fail channel updates for the failed
395
+ // channel.
396
+ Err ( ChannelMonitorUpdateErr :: PermanentFailure )
355
397
} else {
356
398
persist_res
357
399
}
@@ -360,6 +402,12 @@ where C::Target: chain::Filter,
360
402
}
361
403
362
404
fn release_pending_monitor_events ( & self ) -> Vec < MonitorEvent > {
405
+ let ev_lock = self . event_mutex . lock ( ) . unwrap ( ) ;
406
+ if !ev_lock. is_empty ( ) {
407
+ log_error ! ( self . logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!" ) ;
408
+ return self . user_provided_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
409
+ }
410
+
363
411
let mut pending_monitor_events = self . user_provided_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
364
412
for monitor in self . monitors . read ( ) . unwrap ( ) . values ( ) {
365
413
pending_monitor_events. append ( & mut monitor. get_and_clear_pending_monitor_events ( ) ) ;
0 commit comments