Skip to content

refactor: Revisit Scheduler's public API #4870

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 151 additions & 53 deletions script/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use crate::syscalls::{
};

use crate::types::{
DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState, Message,
ReadState, RunMode, SgData, SyscallGenerator, VmArgs, VmContext, VmId, VmState, WriteState,
DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState,
IterationResult, Message, ReadState, RunMode, SgData, SyscallGenerator, TerminatedResult,
VmArgs, VmContext, VmId, VmState, WriteState,
};
use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
use ckb_types::core::Cycle;
Expand Down Expand Up @@ -48,12 +49,12 @@ where
M: DefaultMachineRunner,
{
/// Immutable context data for current running transaction & script.
pub sg_data: SgData<DL>,
sg_data: SgData<DL>,

/// Syscall generator
pub syscall_generator: SyscallGenerator<DL, V, M::Inner>,
syscall_generator: SyscallGenerator<DL, V, M::Inner>,
/// Syscall generator context
pub syscall_context: V,
syscall_context: V,

/// Total cycles. When a scheduler executes, there are 3 variables
/// that might all contain charged cycles: +total_cycles+,
Expand Down Expand Up @@ -83,29 +84,29 @@ where
///
/// One can consider that +total_cycles+ contains the total cycles
/// consumed in current scheduler, when the scheduler is not busy executing.
pub total_cycles: Arc<AtomicU64>,
total_cycles: Arc<AtomicU64>,
/// Iteration cycles, see +total_cycles+ on its usage
pub iteration_cycles: Cycle,
iteration_cycles: Cycle,
/// Next vm id used by spawn.
pub next_vm_id: VmId,
next_vm_id: VmId,
/// Next fd used by pipe.
pub next_fd_slot: u64,
next_fd_slot: u64,
/// Used to store VM state.
pub states: BTreeMap<VmId, VmState>,
states: BTreeMap<VmId, VmState>,
/// Used to confirm the owner of fd.
pub fds: BTreeMap<Fd, VmId>,
fds: BTreeMap<Fd, VmId>,
/// Verify the VM's inherited fd list.
pub inherited_fd: BTreeMap<VmId, Vec<Fd>>,
inherited_fd: BTreeMap<VmId, Vec<Fd>>,
/// Instantiated vms.
pub instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
/// Suspended vms.
pub suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
/// Terminated vms.
pub terminated_vms: BTreeMap<VmId, i8>,
terminated_vms: BTreeMap<VmId, i8>,

/// MessageBox is expected to be empty before returning from `run`
/// function, there is no need to persist messages.
pub message_box: Arc<Mutex<Vec<Message>>>,
message_box: Arc<Mutex<Vec<Message>>>,
}

impl<DL, V, M> Scheduler<DL, V, M>
Expand Down Expand Up @@ -143,8 +144,40 @@ where
self.total_cycles.load(Ordering::Acquire)
}

/// Fetch specified VM state
pub fn state(&self, vm_id: &VmId) -> Option<VmState> {
self.states.get(vm_id).cloned()
}

/// Access the SgData data structure
pub fn sg_data(&self) -> &SgData<DL> {
&self.sg_data
}

/// This function provides a peek into one of the current created
/// VM. Depending on the actual state, the VM might either be instantiated
/// or suspended. As a result, 2 callback functions must be provided to handle
/// both cases. The function only provides a *peek*, meaning the caller must
/// not make any changes to an instantiated VMs. the VM is passed as a mutable
/// reference only because memory load functions in CKB-VM require mutable
/// references. It does not mean the caller can modify the VM in any sense.
/// Even a slight tampering of the VM can result in non-determinism.
pub fn peek<F, G, W>(&mut self, vm_id: &VmId, mut f: F, mut g: G) -> Result<W, Error>
where
F: FnMut(&mut M) -> Result<W, Error>,
G: FnMut(&Snapshot2<DataPieceId>, &SgData<DL>) -> Result<W, Error>,
{
if let Some((_, machine)) = self.instantiated.get_mut(vm_id) {
return f(machine);
}
if let Some(snapshot) = self.suspended.get(vm_id) {
return g(snapshot, &self.sg_data);
}
Err(Error::Unexpected(format!("VM {} does not exist!", vm_id)))
}

/// Add cycles to total cycles.
pub fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
match self
.total_cycles
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
Expand Down Expand Up @@ -240,48 +273,49 @@ where
/// * Cycle limit reached, the returned error would be ckb_vm::Error::CyclesExceeded,
/// * Pause trigger, the returned error would be ckb_vm::Error::Pause,
/// * Other terminating errors
pub fn run(&mut self, mode: RunMode) -> Result<(i8, Cycle), Error> {
if self.states.is_empty() {
// Booting phase, we will need to initialize the first VM.
let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
assert_eq!(
self.boot_vm(
&DataLocation {
data_piece_id: program_id,
offset: 0,
length: u64::MAX,
},
VmArgs::Vector(vec![]),
)?,
ROOT_VM_ID
);
}
assert!(self.states.contains_key(&ROOT_VM_ID));
pub fn run(&mut self, mode: RunMode) -> Result<TerminatedResult, Error> {
self.boot_root_vm_if_needed()?;

let (pause, mut limit_cycles) = match mode {
RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
RunMode::Pause(pause) => (pause, u64::MAX),
};

while self.states[&ROOT_VM_ID] != VmState::Terminated {
assert_eq!(self.iteration_cycles, 0);
let iterate_return = self.iterate(pause.clone(), limit_cycles);
self.consume_cycles(self.iteration_cycles)?;
limit_cycles = limit_cycles
.checked_sub(self.iteration_cycles)
.ok_or(Error::CyclesExceeded)?;
// Clear iteration cycles intentionally after each run
self.iteration_cycles = 0;
iterate_return?;
while !self.terminated() {
limit_cycles = self.iterate_outer(&pause, limit_cycles)?.1;
}

// At this point, root VM cannot be suspended
let root_vm = &self.instantiated[&ROOT_VM_ID];
Ok((root_vm.1.machine().exit_code(), self.consumed_cycles()))
self.terminated_result()
}

/// Public API that runs a single VM, processes all messages, then returns the
/// executed VM ID(so caller can fetch later data). This can be used when more
/// finer tweaks are required for a single VM.
pub fn iterate(&mut self) -> Result<IterationResult, Error> {
self.boot_root_vm_if_needed()?;

if self.terminated() {
return Ok(IterationResult {
executed_vm: ROOT_VM_ID,
terminated_status: Some(self.terminated_result()?),
});
}

let (id, _) = self.iterate_outer(&Pause::new(), u64::MAX)?;
Copy link
Collaborator

@eval-exec eval-exec Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If self.terminated() is false, should we return the remaining_cycles from self.iterate_outer(&Pause::new(), u64::MAX)?; to exit_status?
Probably not, because IterationResult::exit_status.1 is intended to store the consumed_cycles, not the remaining_cycles?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IterationResult::exit_status.1 should always contain consumed cycles up to the execution point.

let terminated_status = if self.terminated() {
Some(self.terminated_result()?)
} else {
None
};

Ok(IterationResult {
executed_vm: id,
terminated_status,
})
}

/// Returns the machine that needs to be executed in the current iterate.
pub fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
// Process all pending VM reads & writes.
self.process_io()?;
// Find a runnable VM that has the largest ID.
Expand All @@ -300,7 +334,7 @@ where
}

/// Process machine execution results in the current iterate.
pub fn iterate_process_results(
fn iterate_process_results(
&mut self,
vm_id_to_run: u64,
result: Result<i8, Error>,
Expand Down Expand Up @@ -360,10 +394,31 @@ where
}
}

// This internal function is actually a wrapper over +iterate_inner+,
// it is split into a different function, so cycle calculation will be
// executed no matter what result +iterate_inner+ returns.
#[inline]
fn iterate_outer(
&mut self,
pause: &Pause,
limit_cycles: Cycle,
) -> Result<(VmId, Cycle), Error> {
assert_eq!(self.iteration_cycles, 0);
let iterate_return = self.iterate_inner(pause.clone(), limit_cycles);
self.consume_cycles(self.iteration_cycles)?;
let remaining_cycles = limit_cycles
.checked_sub(self.iteration_cycles)
.ok_or(Error::CyclesExceeded)?;
// Clear iteration cycles intentionally after each run
self.iteration_cycles = 0;
let id = iterate_return?;
Ok((id, remaining_cycles))
}

// This is internal function that does the actual VM execution loop.
// Here both pause signal and limit_cycles are provided so as to simplify
// branches.
fn iterate(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<(), Error> {
fn iterate_inner(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<VmId, Error> {
// Execute the VM for real, consumed cycles in the virtual machine is
// moved over to +iteration_cycles+, then we reset virtual machine's own
// cycle count to zero.
Expand All @@ -380,7 +435,8 @@ where
.iteration_cycles
.checked_add(cycles)
.ok_or(Error::CyclesExceeded)?;
self.iterate_process_results(id, result)
self.iterate_process_results(id, result)?;
Ok(id)
}

fn process_message_box(&mut self) -> Result<(), Error> {
Expand Down Expand Up @@ -774,6 +830,27 @@ where
Ok(())
}

/// If current scheduler is terminated
pub fn terminated(&self) -> bool {
self.states
.get(&ROOT_VM_ID)
.map(|state| *state == VmState::Terminated)
.unwrap_or(false)
}

fn terminated_result(&mut self) -> Result<TerminatedResult, Error> {
assert!(self.terminated());

let exit_code = {
let root_vm = &self.ensure_get_instantiated(&ROOT_VM_ID)?.1;
root_vm.machine().exit_code()
};
Ok(TerminatedResult {
exit_code,
consumed_cycles: self.consumed_cycles(),
})
}

// Ensure VMs are instantiated
fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
if ids.len() > MAX_INSTANTIATED_VMS {
Expand Down Expand Up @@ -815,7 +892,7 @@ where
Ok(())
}

// Ensure corresponding VM is instantiated and return a mutable reference to it
/// Ensure corresponding VM is instantiated and return a mutable reference to it
fn ensure_get_instantiated(&mut self, id: &VmId) -> Result<&mut (VmContext<DL>, M), Error> {
self.ensure_vms_instantiated(&[*id])?;
self.instantiated
Expand Down Expand Up @@ -868,8 +945,29 @@ where
Ok(())
}

fn boot_root_vm_if_needed(&mut self) -> Result<(), Error> {
if self.states.is_empty() {
// Booting phase, we will need to initialize the first VM.
let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
assert_eq!(
self.boot_vm(
&DataLocation {
data_piece_id: program_id,
offset: 0,
length: u64::MAX,
},
VmArgs::Vector(vec![]),
)?,
ROOT_VM_ID
);
}
assert!(self.states.contains_key(&ROOT_VM_ID));

Ok(())
}

/// Boot a vm by given program and args.
pub fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
let id = self.next_vm_id;
self.next_vm_id += 1;
let (context, mut machine) = self.create_dummy_vm(&id)?;
Expand Down
19 changes: 19 additions & 0 deletions script/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,3 +1137,22 @@ pub enum RunMode {
/// Continues running until a Pause signal is received.
Pause(Pause),
}

/// Terminated result
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct TerminatedResult {
/// Root VM exit code
pub exit_code: i8,
/// Total consumed cycles by all VMs in current scheduler,
/// up to this execution point.
pub consumed_cycles: Cycle,
}

/// Single iteration result
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct IterationResult {
/// VM ID that gets executed
pub executed_vm: VmId,
/// Terminated status
pub terminated_status: Option<TerminatedResult>,
}
Loading
Loading