Skip to content

Commit 99cd201

Browse files
authored
Merge pull request #4870 from xxuejie/revisit-scheduler-public-apis
refactor: Revisit Scheduler's public API
2 parents f2cc79f + ded45d8 commit 99cd201

File tree

3 files changed

+235
-68
lines changed

3 files changed

+235
-68
lines changed

script/src/scheduler.rs

Lines changed: 190 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use crate::syscalls::{
55
};
66

77
use crate::types::{
8-
DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState, Message,
9-
ReadState, RunMode, SgData, SyscallGenerator, VmArgs, VmContext, VmId, VmState, WriteState,
8+
DataLocation, DataPieceId, FIRST_FD_SLOT, FIRST_VM_ID, Fd, FdArgs, FullSuspendedState,
9+
IterationResult, Message, ReadState, RunMode, SgData, SyscallGenerator, TerminatedResult,
10+
VmArgs, VmContext, VmId, VmState, WriteState,
1011
};
1112
use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
1213
use ckb_types::core::Cycle;
@@ -48,12 +49,12 @@ where
4849
M: DefaultMachineRunner,
4950
{
5051
/// Immutable context data for current running transaction & script.
51-
pub sg_data: SgData<DL>,
52+
sg_data: SgData<DL>,
5253

5354
/// Syscall generator
54-
pub syscall_generator: SyscallGenerator<DL, V, M::Inner>,
55+
syscall_generator: SyscallGenerator<DL, V, M::Inner>,
5556
/// Syscall generator context
56-
pub syscall_context: V,
57+
syscall_context: V,
5758

5859
/// Total cycles. When a scheduler executes, there are 3 variables
5960
/// that might all contain charged cycles: +total_cycles+,
@@ -83,29 +84,37 @@ where
8384
///
8485
/// One can consider that +total_cycles+ contains the total cycles
8586
/// consumed in current scheduler, when the scheduler is not busy executing.
86-
pub total_cycles: Arc<AtomicU64>,
87+
///
88+
/// NOTE: the above workflow describes the optimal case: `iteration_cycles`
89+
/// will always be zero after each iteration. However, our initial implementation
90+
/// for Meepo hardfork contains a bug: cycles charged by suspending / resuming
91+
/// VMs when processing IOs, will not be reflected in `current cycles` syscalls
92+
/// of the subsequent running VMs. To preserve this behavior, consumed cycles in
93+
/// iteration_cycles cannot be moved at iterate boundaries. Later hardfork versions
94+
/// might fix this, but for the Meepo hardfork, we will have to preserve this behavior.
95+
total_cycles: Arc<AtomicU64>,
8796
/// Iteration cycles, see +total_cycles+ on its usage
88-
pub iteration_cycles: Cycle,
97+
iteration_cycles: Cycle,
8998
/// Next vm id used by spawn.
90-
pub next_vm_id: VmId,
99+
next_vm_id: VmId,
91100
/// Next fd used by pipe.
92-
pub next_fd_slot: u64,
101+
next_fd_slot: u64,
93102
/// Used to store VM state.
94-
pub states: BTreeMap<VmId, VmState>,
103+
states: BTreeMap<VmId, VmState>,
95104
/// Used to confirm the owner of fd.
96-
pub fds: BTreeMap<Fd, VmId>,
105+
fds: BTreeMap<Fd, VmId>,
97106
/// Verify the VM's inherited fd list.
98-
pub inherited_fd: BTreeMap<VmId, Vec<Fd>>,
107+
inherited_fd: BTreeMap<VmId, Vec<Fd>>,
99108
/// Instantiated vms.
100-
pub instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
109+
instantiated: BTreeMap<VmId, (VmContext<DL>, M)>,
101110
/// Suspended vms.
102-
pub suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
111+
suspended: BTreeMap<VmId, Snapshot2<DataPieceId>>,
103112
/// Terminated vms.
104-
pub terminated_vms: BTreeMap<VmId, i8>,
113+
terminated_vms: BTreeMap<VmId, i8>,
105114

106115
/// MessageBox is expected to be empty before returning from `run`
107116
/// function, there is no need to persist messages.
108-
pub message_box: Arc<Mutex<Vec<Message>>>,
117+
message_box: Arc<Mutex<Vec<Message>>>,
109118
}
110119

111120
impl<DL, V, M> Scheduler<DL, V, M>
@@ -143,8 +152,40 @@ where
143152
self.total_cycles.load(Ordering::Acquire)
144153
}
145154

155+
/// Fetch specified VM state
156+
pub fn state(&self, vm_id: &VmId) -> Option<VmState> {
157+
self.states.get(vm_id).cloned()
158+
}
159+
160+
/// Access the SgData data structure
161+
pub fn sg_data(&self) -> &SgData<DL> {
162+
&self.sg_data
163+
}
164+
165+
/// This function provides a peek into one of the current created
166+
/// VM. Depending on the actual state, the VM might either be instantiated
167+
/// or suspended. As a result, 2 callback functions must be provided to handle
168+
/// both cases. The function only provides a *peek*, meaning the caller must
169+
/// not make any changes to an instantiated VMs. the VM is passed as a mutable
170+
/// reference only because memory load functions in CKB-VM require mutable
171+
/// references. It does not mean the caller can modify the VM in any sense.
172+
/// Even a slight tampering of the VM can result in non-determinism.
173+
pub fn peek<F, G, W>(&mut self, vm_id: &VmId, mut f: F, mut g: G) -> Result<W, Error>
174+
where
175+
F: FnMut(&mut M) -> Result<W, Error>,
176+
G: FnMut(&Snapshot2<DataPieceId>, &SgData<DL>) -> Result<W, Error>,
177+
{
178+
if let Some((_, machine)) = self.instantiated.get_mut(vm_id) {
179+
return f(machine);
180+
}
181+
if let Some(snapshot) = self.suspended.get(vm_id) {
182+
return g(snapshot, &self.sg_data);
183+
}
184+
Err(Error::Unexpected(format!("VM {} does not exist!", vm_id)))
185+
}
186+
146187
/// Add cycles to total cycles.
147-
pub fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
188+
fn consume_cycles(&mut self, cycles: Cycle) -> Result<(), Error> {
148189
match self
149190
.total_cycles
150191
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |total_cycles| {
@@ -167,7 +208,7 @@ where
167208
syscall_generator,
168209
syscall_context,
169210
total_cycles: Arc::new(AtomicU64::new(full.total_cycles)),
170-
iteration_cycles: 0,
211+
iteration_cycles: full.iteration_cycles,
171212
next_vm_id: full.next_vm_id,
172213
next_fd_slot: full.next_fd_slot,
173214
states: full
@@ -217,6 +258,7 @@ where
217258
// consensus. We are not charging cycles for suspending
218259
// a VM in the process of suspending the whole scheduler.
219260
total_cycles: self.total_cycles.load(Ordering::Acquire),
261+
iteration_cycles: self.iteration_cycles,
220262
next_vm_id: self.next_vm_id,
221263
next_fd_slot: self.next_fd_slot,
222264
vms,
@@ -240,50 +282,51 @@ where
240282
/// * Cycle limit reached, the returned error would be ckb_vm::Error::CyclesExceeded,
241283
/// * Pause trigger, the returned error would be ckb_vm::Error::Pause,
242284
/// * Other terminating errors
243-
pub fn run(&mut self, mode: RunMode) -> Result<(i8, Cycle), Error> {
244-
if self.states.is_empty() {
245-
// Booting phase, we will need to initialize the first VM.
246-
let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
247-
assert_eq!(
248-
self.boot_vm(
249-
&DataLocation {
250-
data_piece_id: program_id,
251-
offset: 0,
252-
length: u64::MAX,
253-
},
254-
VmArgs::Vector(vec![]),
255-
)?,
256-
ROOT_VM_ID
257-
);
258-
}
259-
assert!(self.states.contains_key(&ROOT_VM_ID));
285+
pub fn run(&mut self, mode: RunMode) -> Result<TerminatedResult, Error> {
286+
self.boot_root_vm_if_needed()?;
260287

261288
let (pause, mut limit_cycles) = match mode {
262289
RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles),
263290
RunMode::Pause(pause) => (pause, u64::MAX),
264291
};
265292

266-
while self.states[&ROOT_VM_ID] != VmState::Terminated {
267-
assert_eq!(self.iteration_cycles, 0);
268-
let iterate_return = self.iterate(pause.clone(), limit_cycles);
269-
self.consume_cycles(self.iteration_cycles)?;
270-
limit_cycles = limit_cycles
271-
.checked_sub(self.iteration_cycles)
272-
.ok_or(Error::CyclesExceeded)?;
273-
// Clear iteration cycles intentionally after each run
274-
self.iteration_cycles = 0;
275-
iterate_return?;
293+
while !self.terminated() {
294+
limit_cycles = self.iterate_outer(&pause, limit_cycles)?.1;
276295
}
296+
assert_eq!(self.iteration_cycles, 0);
277297

278-
// At this point, root VM cannot be suspended
279-
let root_vm = &self.instantiated[&ROOT_VM_ID];
280-
Ok((root_vm.1.machine().exit_code(), self.consumed_cycles()))
298+
self.terminated_result()
299+
}
300+
301+
/// Public API that runs a single VM, processes all messages, then returns the
302+
/// executed VM ID(so caller can fetch later data). This can be used when more
303+
/// finer tweaks are required for a single VM.
304+
pub fn iterate(&mut self) -> Result<IterationResult, Error> {
305+
self.boot_root_vm_if_needed()?;
306+
307+
if self.terminated() {
308+
return Ok(IterationResult {
309+
executed_vm: ROOT_VM_ID,
310+
terminated_status: Some(self.terminated_result()?),
311+
});
312+
}
313+
314+
let (id, _) = self.iterate_outer(&Pause::new(), u64::MAX)?;
315+
let terminated_status = if self.terminated() {
316+
assert_eq!(self.iteration_cycles, 0);
317+
Some(self.terminated_result()?)
318+
} else {
319+
None
320+
};
321+
322+
Ok(IterationResult {
323+
executed_vm: id,
324+
terminated_status,
325+
})
281326
}
282327

283328
/// Returns the machine that needs to be executed in the current iterate.
284-
pub fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
285-
// Process all pending VM reads & writes.
286-
self.process_io()?;
329+
fn iterate_prepare_machine(&mut self) -> Result<(u64, &mut M), Error> {
287330
// Find a runnable VM that has the largest ID.
288331
let vm_id_to_run = self
289332
.states
@@ -300,7 +343,7 @@ where
300343
}
301344

302345
/// Process machine execution results in the current iterate.
303-
pub fn iterate_process_results(
346+
fn iterate_process_results(
304347
&mut self,
305348
vm_id_to_run: u64,
306349
result: Result<i8, Error>,
@@ -309,7 +352,7 @@ where
309352
self.process_message_box()?;
310353
assert!(self.message_box.lock().expect("lock").is_empty());
311354
// If the VM terminates, update VMs in join state, also closes its fds
312-
match result {
355+
let result = match result {
313356
Ok(code) => {
314357
self.terminated_vms.insert(vm_id_to_run, code);
315358
// When root VM terminates, the execution stops immediately, we will purge
@@ -357,13 +400,59 @@ where
357400
}
358401
Err(Error::Yield) => Ok(()),
359402
Err(e) => Err(e),
360-
}
403+
};
404+
result
405+
}
406+
407+
// This internal function is actually a wrapper over +iterate_inner+,
408+
// it is split into a different function, so cycle calculation will be
409+
// executed no matter what result +iterate_inner+ returns.
410+
#[inline]
411+
fn iterate_outer(
412+
&mut self,
413+
pause: &Pause,
414+
limit_cycles: Cycle,
415+
) -> Result<(VmId, Cycle), Error> {
416+
let iterate_return = self.iterate_inner(pause.clone(), limit_cycles);
417+
self.consume_cycles(self.iteration_cycles)?;
418+
let remaining_cycles = limit_cycles
419+
.checked_sub(self.iteration_cycles)
420+
.ok_or(Error::CyclesExceeded)?;
421+
// Clear iteration cycles intentionally after each run
422+
self.iteration_cycles = 0;
423+
// Process all pending VM reads & writes. Notice ideally, this invocation
424+
// should be put at the end of `iterate_inner` function. However, 2 things
425+
// prevent this:
426+
//
427+
// * In earlier implementation of the Meepo hardfork version, `self.process_io`
428+
// was put at the very start of +iterate_prepare_machine+ method. Meaning we used
429+
// to process IO syscalls at the very start of a new iteration.
430+
// * Earlier implementation contains a bug that cycles consumed by suspending / resuming
431+
// VMs are not updated in the subsequent VM's `current cycles` syscalls.
432+
//
433+
// To make ckb-script package suitable for outside usage, we want IOs processed at
434+
// the end of each iteration, not at the start of the next iteration. We also need
435+
// to replicate the exact same runtime behavior of Meepo hardfork. This means the only
436+
// viable change will be:
437+
//
438+
// * Move `self.process_io` call to the very end of `iterate_outer` method, which is
439+
// exactly current location
440+
// * For now we have to live with the fact that `iteration_cycles` will not always be
441+
// zero at iteration boundaries, and also preserve its value in `FullSuspendedState`.
442+
//
443+
// One expected change is that +process_io+ is now called once more
444+
// after the whole scheduler terminates, and not called at the very beginning
445+
// when no VM is executing. But since no VMs will be in IO states at this 2 timeslot,
446+
// we should be fine here.
447+
self.process_io()?;
448+
let id = iterate_return?;
449+
Ok((id, remaining_cycles))
361450
}
362451

363452
// This is internal function that does the actual VM execution loop.
364453
// Here both pause signal and limit_cycles are provided so as to simplify
365454
// branches.
366-
fn iterate(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<(), Error> {
455+
fn iterate_inner(&mut self, pause: Pause, limit_cycles: Cycle) -> Result<VmId, Error> {
367456
// Execute the VM for real, consumed cycles in the virtual machine is
368457
// moved over to +iteration_cycles+, then we reset virtual machine's own
369458
// cycle count to zero.
@@ -380,7 +469,8 @@ where
380469
.iteration_cycles
381470
.checked_add(cycles)
382471
.ok_or(Error::CyclesExceeded)?;
383-
self.iterate_process_results(id, result)
472+
self.iterate_process_results(id, result)?;
473+
Ok(id)
384474
}
385475

386476
fn process_message_box(&mut self) -> Result<(), Error> {
@@ -774,6 +864,27 @@ where
774864
Ok(())
775865
}
776866

867+
/// If current scheduler is terminated
868+
pub fn terminated(&self) -> bool {
869+
self.states
870+
.get(&ROOT_VM_ID)
871+
.map(|state| *state == VmState::Terminated)
872+
.unwrap_or(false)
873+
}
874+
875+
fn terminated_result(&mut self) -> Result<TerminatedResult, Error> {
876+
assert!(self.terminated());
877+
878+
let exit_code = {
879+
let root_vm = &self.ensure_get_instantiated(&ROOT_VM_ID)?.1;
880+
root_vm.machine().exit_code()
881+
};
882+
Ok(TerminatedResult {
883+
exit_code,
884+
consumed_cycles: self.consumed_cycles(),
885+
})
886+
}
887+
777888
// Ensure VMs are instantiated
778889
fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> {
779890
if ids.len() > MAX_INSTANTIATED_VMS {
@@ -815,7 +926,7 @@ where
815926
Ok(())
816927
}
817928

818-
// Ensure corresponding VM is instantiated and return a mutable reference to it
929+
/// Ensure corresponding VM is instantiated and return a mutable reference to it
819930
fn ensure_get_instantiated(&mut self, id: &VmId) -> Result<&mut (VmContext<DL>, M), Error> {
820931
self.ensure_vms_instantiated(&[*id])?;
821932
self.instantiated
@@ -868,8 +979,29 @@ where
868979
Ok(())
869980
}
870981

982+
fn boot_root_vm_if_needed(&mut self) -> Result<(), Error> {
983+
if self.states.is_empty() {
984+
// Booting phase, we will need to initialize the first VM.
985+
let program_id = self.sg_data.sg_info.program_data_piece_id.clone();
986+
assert_eq!(
987+
self.boot_vm(
988+
&DataLocation {
989+
data_piece_id: program_id,
990+
offset: 0,
991+
length: u64::MAX,
992+
},
993+
VmArgs::Vector(vec![]),
994+
)?,
995+
ROOT_VM_ID
996+
);
997+
}
998+
assert!(self.states.contains_key(&ROOT_VM_ID));
999+
1000+
Ok(())
1001+
}
1002+
8711003
/// Boot a vm by given program and args.
872-
pub fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
1004+
fn boot_vm(&mut self, location: &DataLocation, args: VmArgs) -> Result<VmId, Error> {
8731005
let id = self.next_vm_id;
8741006
self.next_vm_id += 1;
8751007
let (context, mut machine) = self.create_dummy_vm(&id)?;

0 commit comments

Comments
 (0)