Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 131 additions & 136 deletions cumulus/client/consensus/aura/src/collators/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ pub struct Params<BI, CIDP, Client, RClient, Proposer, CS> {
}

/// Run bare Aura consensus as a relay-chain-driven collator.
pub fn run<Block, P, BI, CIDP, Client, RClient, Proposer, CS>(
pub async fn run<Block, P, BI, CIDP, Client, RClient, Proposer, CS>(
params: Params<BI, CIDP, Client, RClient, Proposer, CS>,
) -> impl Future<Output = ()> + Send + 'static
where
) where
Block: BlockT + Send,
Client: ProvideRuntimeApi<Block>
+ BlockOf
Expand All @@ -112,161 +111,157 @@ where
P::Public: AppPublic + Member + Codec,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
async move {
let mut collation_requests = match params.collation_request_receiver {
Some(receiver) => receiver,
None =>
cumulus_client_collator::relay_chain_driven::init(
params.collator_key,
params.para_id,
params.overseer_handle,
)
.await,
};

let mut collator = {
let params = collator_util::Params {
create_inherent_data_providers: params.create_inherent_data_providers,
block_import: params.block_import,
relay_client: params.relay_client.clone(),
keystore: params.keystore.clone(),
para_id: params.para_id,
proposer: params.proposer,
collator_service: params.collator_service,
};
let mut collation_requests = match params.collation_request_receiver {
Some(receiver) => receiver,
None =>
cumulus_client_collator::relay_chain_driven::init(
params.collator_key,
params.para_id,
params.overseer_handle,
)
.await,
};

collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
let mut collator = {
let params = collator_util::Params {
create_inherent_data_providers: params.create_inherent_data_providers,
block_import: params.block_import,
relay_client: params.relay_client.clone(),
keystore: params.keystore.clone(),
para_id: params.para_id,
proposer: params.proposer,
collator_service: params.collator_service,
};

let mut last_processed_slot = 0;
let mut last_relay_chain_block = Default::default();
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
};

while let Some(request) = collation_requests.next().await {
macro_rules! reject_with_error {
($err:expr) => {{
request.complete(None);
tracing::error!(target: crate::LOG_TARGET, err = ?{ $err });
continue;
}};
}
let mut last_processed_slot = 0;
let mut last_relay_chain_block = Default::default();

macro_rules! try_request {
($x:expr) => {{
match $x {
Ok(x) => x,
Err(e) => reject_with_error!(e),
}
}};
}
while let Some(request) = collation_requests.next().await {
macro_rules! reject_with_error {
($err:expr) => {{
request.complete(None);
tracing::error!(target: crate::LOG_TARGET, err = ?{ $err });
continue;
}};
}

let validation_data = request.persisted_validation_data();
macro_rules! try_request {
($x:expr) => {{
match $x {
Ok(x) => x,
Err(e) => reject_with_error!(e),
}
}};
}

let parent_header =
try_request!(Block::Header::decode(&mut &validation_data.parent_head.0[..]));
let validation_data = request.persisted_validation_data();

let parent_hash = parent_header.hash();
let parent_header =
try_request!(Block::Header::decode(&mut &validation_data.parent_head.0[..]));

if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
continue
}
let parent_hash = parent_header.hash();

let Ok(Some(code)) =
params.para_client.state_at(parent_hash).map_err(drop).and_then(|s| {
s.storage(&sp_core::storage::well_known_keys::CODE).map_err(drop)
})
else {
continue;
};
if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
continue
}

super::check_validation_code_or_log(
&ValidationCode::from(code).hash(),
params.para_id,
&params.relay_client,
*request.relay_parent(),
)
.await;
let Ok(Some(code)) = params
.para_client
.state_at(parent_hash)
.map_err(drop)
.and_then(|s| s.storage(&sp_core::storage::well_known_keys::CODE).map_err(drop))
else {
continue;
};

let relay_parent_header =
match params.relay_client.header(RBlockId::hash(*request.relay_parent())).await {
Err(e) => reject_with_error!(e),
Ok(None) => continue, // sanity: would be inconsistent to get `None` here
Ok(Some(h)) => h,
};
super::check_validation_code_or_log(
&ValidationCode::from(code).hash(),
params.para_id,
&params.relay_client,
*request.relay_parent(),
)
.await;

let slot_duration = match params.para_client.runtime_api().slot_duration(parent_hash) {
Ok(d) => d,
let relay_parent_header =
match params.relay_client.header(RBlockId::hash(*request.relay_parent())).await {
Err(e) => reject_with_error!(e),
Ok(None) => continue, // sanity: would be inconsistent to get `None` here
Ok(Some(h)) => h,
};

let claim = match collator_util::claim_slot::<_, _, P>(
&*params.para_client,
parent_hash,
&relay_parent_header,
slot_duration,
params.relay_chain_slot_duration,
&params.keystore,
)
.await
{
Ok(None) => continue,
Ok(Some(c)) => c,
Err(e) => reject_with_error!(e),
};
let slot_duration = match params.para_client.runtime_api().slot_duration(parent_hash) {
Ok(d) => d,
Err(e) => reject_with_error!(e),
};

// With async backing this function will be called every relay chain block.
//
// Most parachains currently run with 12 seconds slots and thus, they would try to
// produce multiple blocks per slot which very likely would fail on chain. Thus, we have
// this "hack" to only produce one block per slot per relay chain fork.
//
// With https://github.com/paritytech/polkadot-sdk/issues/3168 this implementation will be
// obsolete and also the underlying issue will be fixed.
if last_processed_slot >= *claim.slot() &&
last_relay_chain_block < *relay_parent_header.number()
{
continue
}
let claim = match collator_util::claim_slot::<_, _, P>(
&*params.para_client,
parent_hash,
&relay_parent_header,
slot_duration,
params.relay_chain_slot_duration,
&params.keystore,
)
.await
{
Ok(None) => continue,
Ok(Some(c)) => c,
Err(e) => reject_with_error!(e),
};

let (parachain_inherent_data, other_inherent_data) = try_request!(
collator
.create_inherent_data(
*request.relay_parent(),
&validation_data,
parent_hash,
claim.timestamp(),
)
.await
);
// With async backing this function will be called every relay chain block.
//
// Most parachains currently run with 12 seconds slots and thus, they would try to
// produce multiple blocks per slot which very likely would fail on chain. Thus, we have
// this "hack" to only produce one block per slot per relay chain fork.
//
// With https://github.com/paritytech/polkadot-sdk/issues/3168 this implementation will be
// obsolete and also the underlying issue will be fixed.
if last_processed_slot >= *claim.slot() &&
last_relay_chain_block < *relay_parent_header.number()
{
continue
}

let allowed_pov_size = (validation_data.max_pov_size / 2) as usize;
let (parachain_inherent_data, other_inherent_data) = try_request!(
collator
.create_inherent_data(
*request.relay_parent(),
&validation_data,
parent_hash,
claim.timestamp(),
)
.await
);

let maybe_collation = try_request!(
collator
.collate(
&parent_header,
&claim,
None,
(parachain_inherent_data, other_inherent_data),
params.authoring_duration,
allowed_pov_size,
)
.await
);
let allowed_pov_size = (validation_data.max_pov_size / 2) as usize;

if let Some((collation, block_data)) = maybe_collation {
let Some(block_hash) = block_data.blocks().first().map(|b| b.hash()) else {
continue
};
let result_sender =
Some(collator.collator_service().announce_with_barrier(block_hash));
request.complete(Some(CollationResult { collation, result_sender }));
} else {
request.complete(None);
tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
}
let maybe_collation = try_request!(
collator
.collate(
&parent_header,
&claim,
None,
(parachain_inherent_data, other_inherent_data),
params.authoring_duration,
allowed_pov_size,
)
.await
);

last_processed_slot = *claim.slot();
last_relay_chain_block = *relay_parent_header.number();
if let Some((collation, block_data)) = maybe_collation {
let Some(block_hash) = block_data.blocks().first().map(|b| b.hash()) else { continue };
let result_sender = Some(collator.collator_service().announce_with_barrier(block_hash));
request.complete(Some(CollationResult { collation, result_sender }));
} else {
request.complete(None);
tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
}

last_processed_slot = *claim.slot();
last_relay_chain_block = *relay_parent_header.number();
}
}
Loading
Loading