Skip to content

Commit 0ba06a0

Browse files
committed
Introduced the concept of the deadline stopwatch and deadline based locking strategies.
DevP2P uses now this strategy to access data from the transaction queue, with a deadline of 200ms for retrieving transactions (supplier) and 1000ms for planning transaction retrieval. DMDcoin#236
1 parent 975221a commit 0ba06a0

File tree

11 files changed

+133
-17
lines changed

11 files changed

+133
-17
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/concensus/miner/src/pool/queue.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::{
2424
atomic::{self, AtomicUsize},
2525
Arc,
2626
},
27+
time::Duration,
2728
};
2829

2930
use self::scoring::ScoringEvent;
@@ -680,12 +681,23 @@ impl TransactionQueue {
680681
///
681682
/// Given transaction hash looks up that transaction in the pool
682683
/// and returns a shared pointer to it or `None` if it's not present, or a readlock could not get acquired.
683-
pub fn find_if_readable(&self, hash: &H256) -> Option<Arc<pool::VerifiedTransaction>> {
684+
pub fn find_if_readable(
685+
&self,
686+
hash: &H256,
687+
max_lock_duration: &Duration,
688+
) -> Option<Arc<pool::VerifiedTransaction>> {
689+
let splitted_duration = max_lock_duration.div_f32(3.0);
684690
self.cached_enforced_pending
685-
.try_read()?
691+
.try_read_for(splitted_duration.clone())?
686692
.find(hash)
687-
.or(self.cached_non_enforced_pending.try_read()?.find(hash))
688-
.or(self.pool.try_read()?.find(hash))
693+
.or(self
694+
.cached_non_enforced_pending
695+
.try_read_for(splitted_duration.clone())?
696+
.find(hash))
697+
.or(self
698+
.pool
699+
.try_read_for(splitted_duration.clone())?
700+
.find(hash))
689701
}
690702

691703
/// Remove a set of transactions from the pool.

crates/ethcore/src/client/client.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2426,8 +2426,14 @@ impl BlockChainClient for Client {
24262426
self.importer.miner.transaction(&hash)
24272427
}
24282428

2429-
fn transaction_if_readable(&self, hash: &H256) -> Option<Arc<VerifiedTransaction>> {
2430-
self.importer.miner.transaction_if_readable(&hash)
2429+
fn transaction_if_readable(
2430+
&self,
2431+
hash: &H256,
2432+
max_lock_duration: &Duration,
2433+
) -> Option<Arc<VerifiedTransaction>> {
2434+
self.importer
2435+
.miner
2436+
.transaction_if_readable(&hash, max_lock_duration)
24312437
}
24322438

24332439
fn uncle(&self, id: UncleId) -> Option<encoded::Header> {

crates/ethcore/src/client/test_client.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::{
2323
Arc,
2424
atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrder},
2525
},
26+
time::Duration,
2627
};
2728

2829
use crate::{
@@ -1160,8 +1161,12 @@ impl BlockChainClient for TestBlockChainClient {
11601161
self.miner.transaction(tx_hash)
11611162
}
11621163

1163-
fn transaction_if_readable(&self, hash: &H256) -> Option<Arc<VerifiedTransaction>> {
1164-
self.miner.transaction_if_readable(hash)
1164+
fn transaction_if_readable(
1165+
&self,
1166+
hash: &H256,
1167+
max_lock_duration: &Duration,
1168+
) -> Option<Arc<VerifiedTransaction>> {
1169+
self.miner.transaction_if_readable(hash, max_lock_duration)
11651170
}
11661171

11671172
/// Returns the devp2p network endpoint IP and Port information that is used to communicate with other peers.

crates/ethcore/src/client/traits.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::{
2020
collections::{BTreeMap, BTreeSet},
2121
net::SocketAddr,
2222
sync::Arc,
23+
time::Duration,
2324
};
2425

2526
use crate::{
@@ -425,7 +426,11 @@ pub trait BlockChainClient:
425426

426427
/// see queued_transactions(&self).
427428
/// Get pool transaction with a given hash, but returns NONE fast, if if cannot acquire a readlock fast.
428-
fn transaction_if_readable(&self, hash: &H256) -> Option<Arc<VerifiedTransaction>>;
429+
fn transaction_if_readable(
430+
&self,
431+
hash: &H256,
432+
max_lock_duration: &Duration,
433+
) -> Option<Arc<VerifiedTransaction>>;
429434

430435
/// Sorted list of transaction gas prices from at least last sample_size blocks.
431436
fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> {

crates/ethcore/src/miner/miner.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,8 +1468,13 @@ impl miner::MinerService for Miner {
14681468
return result;
14691469
}
14701470

1471-
fn transaction_if_readable(&self, hash: &H256) -> Option<Arc<VerifiedTransaction>> {
1472-
self.transaction_queue.find_if_readable(hash)
1471+
fn transaction_if_readable(
1472+
&self,
1473+
hash: &H256,
1474+
max_lock_duration: &Duration,
1475+
) -> Option<Arc<VerifiedTransaction>> {
1476+
self.transaction_queue
1477+
.find_if_readable(hash, max_lock_duration)
14731478
}
14741479

14751480
fn remove_transaction(&self, hash: &H256) -> Option<Arc<VerifiedTransaction>> {

crates/ethcore/src/miner/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub use ethcore_miner::{
3535
use std::{
3636
collections::{BTreeMap, BTreeSet},
3737
sync::Arc,
38+
time::Duration,
3839
};
3940

4041
use crate::types::{
@@ -201,7 +202,11 @@ pub trait MinerService: Send + Sync {
201202

202203
/// Query transaction from the pool given it's hash, without blocking.
203204
/// Might return "None" in cases when the lock could not get acquired.
204-
fn transaction_if_readable(&self, hash: &H256) -> Option<Arc<VerifiedTransaction>>;
205+
fn transaction_if_readable(
206+
&self,
207+
hash: &H256,
208+
max_lock_duration: &Duration,
209+
) -> Option<Arc<VerifiedTransaction>>;
205210

206211
/// Returns next valid nonce for given address.
207212
///

crates/ethcore/sync/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ rand_xorshift = "0.2.0"
3838
rlp = { version = "0.4.6" }
3939
trace-time = "0.1"
4040
triehash-ethereum = {version = "0.2", path = "../../util/triehash-ethereum" }
41+
time-utils = { path = "../../util/time-utils" }
4142
stats = { path = "../../util/stats" }
4243
crossbeam-channel = "0.5.2"
4344

crates/ethcore/sync/src/chain/handler.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ use ethereum_types::{H256, H512, U256};
3232
use hash::keccak;
3333
use network::{PeerId, client_version::ClientVersion};
3434
use rlp::Rlp;
35-
use std::{cmp, mem, time::Instant};
35+
use std::{
36+
cmp, mem,
37+
time::{Duration, Instant},
38+
};
39+
use time_utils::DeadlineStopwatch;
3640

3741
use super::{
3842
request_id::strip_request_id,
@@ -858,6 +862,11 @@ impl SyncHandler {
858862
peer_id: PeerId,
859863
tx_rlp: &Rlp,
860864
) -> Result<(), DownloaderImportError> {
865+
// those P2P operations must not take forever, a better , configurable but balanced timeout managment would be nice to have.
866+
let max_duration = Duration::from_millis(500);
867+
868+
let deadline = DeadlineStopwatch::new(max_duration);
869+
861870
for item in tx_rlp {
862871
let hash = item
863872
.as_val::<H256>()
@@ -868,7 +877,17 @@ impl SyncHandler {
868877

869878
// if we cant read the pool here, we are asuming we dont know the transaction yet.
870879
// in the worst case we are refetching a transaction that we already have.
871-
if io.chain().transaction_if_readable(&hash).is_none() {
880+
881+
if deadline.is_expired() {
882+
// we did run out of time to finish this opation, but thats Ok.
883+
return Ok(());
884+
}
885+
886+
if io
887+
.chain()
888+
.transaction_if_readable(&hash, &deadline.time_left())
889+
.is_none()
890+
{
872891
sync.peers
873892
.get_mut(&peer_id)
874893
.map(|peer| peer.unfetched_pooled_transactions.insert(hash));

crates/ethcore/sync/src/chain/supplier.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use bytes::Bytes;
1818

1919
#[cfg(not(test))]
2020
use devp2p::PAYLOAD_SOFT_LIMIT;
21+
use time_utils::DeadlineStopwatch;
2122
#[cfg(test)]
2223
pub const PAYLOAD_SOFT_LIMIT: usize = 100_000;
2324

@@ -27,7 +28,7 @@ use ethereum_types::{H256, H512};
2728
use network::{self, PeerId};
2829
use parking_lot::RwLock;
2930
use rlp::{Rlp, RlpStream};
30-
use std::cmp;
31+
use std::{cmp, time::Duration};
3132

3233
use crate::sync_io::SyncIo;
3334

@@ -316,14 +317,23 @@ impl SyncSupplier {
316317
rlp.begin_unbounded_list();
317318
let mut not_found = 0;
318319
let mut parse_errors = 0;
320+
321+
let deadline = DeadlineStopwatch::new(Duration::from_millis(200));
319322
for v in r {
320323
if let Ok(hash) = v.as_val::<H256>() {
321324
// io.chain().transaction(hash)
322325

326+
if !deadline.should_continue() {
327+
break;
328+
}
329+
323330
// we do not lock here, if we cannot access the memory at this point in time,
324331
// we will just skip this transaction, otherwise the other peer might wait to long, resulting in a timeout.
325332
// also this solved a potential deadlock situation:
326-
if let Some(tx) = io.chain().transaction_if_readable(&hash) {
333+
if let Some(tx) = io
334+
.chain()
335+
.transaction_if_readable(&hash, &deadline.time_left())
336+
{
327337
tx.signed().rlp_append(&mut rlp);
328338
added += 1;
329339
if rlp.len() > PAYLOAD_SOFT_LIMIT {

0 commit comments

Comments
 (0)