Skip to content

Commit

Permalink
Move ExecutionContext and CancellationNotificationFuture+state into s…
Browse files Browse the repository at this point in the history
…hared_state.rs

Summary:
This now puts all the shared state objects for managing cancellation
state into one place. I'll refactor these a bit to make the interactions
clearer.

Reviewed By: JakobDegen

Differential Revision: D65362033

fbshipit-source-id: 2aa7cd12f2216a720438ac0c6b6cc02ad621c7ec
  • Loading branch information
cjhopman authored and facebook-github-bot committed Dec 20, 2024
1 parent 1010400 commit 15d8eb8
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 268 deletions.
6 changes: 3 additions & 3 deletions app/buck2_futures/src/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use dupe::Dupe;
use futures::FutureExt;
use once_cell::sync::Lazy;

use crate::details::cancellable_future::context::ExecutionContextInner;
use crate::details::cancellable_future::CancellationNotificationData;
use crate::details::cancellable_future::CancellationNotificationFuture;
use crate::details::cancellation_context::CancellationContextInner;
use crate::details::cancellation_context::ExplicitCancellationContext;
use crate::details::cancellation_context::ExplicitCriticalSectionGuard;
use crate::details::shared_state::CancellationNotificationData;
use crate::details::shared_state::CancellationNotificationFuture;
use crate::details::shared_state::ExecutionContextInner;
use crate::details::shared_state::SharedState;

static NEVER_CANCELLED: Lazy<CancellationContext> =
Expand Down
263 changes: 1 addition & 262 deletions app/buck2_futures/src/details/cancellable_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,15 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use dupe::Dupe;
use futures::future::BoxFuture;
use futures::task::AtomicWaker;
use parking_lot::Mutex;
use pin_project::pin_project;
use slab::Slab;

use crate::cancellation::CancellationContext;
use crate::cancellation::CancellationHandle;
use crate::details::shared_state::ExecutionContextOuter;
use crate::details::shared_state::SharedState;
use crate::drop_on_ready::DropOnReadyFuture;
use crate::owning_future::OwningFuture;
Expand Down Expand Up @@ -161,261 +155,6 @@ impl<T> Future for ExplicitlyCancellableFutureInner<T> {
}
}

pub(crate) mod context {
use super::*;
use crate::cancellation::CriticalSectionGuard;

struct ExecutionContextData {
cancellation_notification: CancellationNotificationData,

/// How many observers are preventing immediate cancellation.
prevent_cancellation: usize,
}

impl ExecutionContextData {
/// Does this future not currently prevent its cancellation?
fn can_exit(&self) -> bool {
self.prevent_cancellation == 0
}

fn enter_structured_cancellation(&mut self) -> CancellationNotificationData {
self.prevent_cancellation += 1;

self.cancellation_notification.dupe()
}

fn notify_cancelled(&mut self) {
let updated = self.cancellation_notification.inner.notified.fetch_update(
Ordering::SeqCst,
Ordering::SeqCst,
|old| match CancellationNotificationStatus::from(old) {
CancellationNotificationStatus::Pending => {
Some(CancellationNotificationStatus::Notified.into())
}
CancellationNotificationStatus::Notified => None,
CancellationNotificationStatus::Disabled => None,
},
);
if updated.is_ok() {
if let Some(mut wakers) = self.cancellation_notification.inner.wakers.lock().take()
{
wakers.drain().for_each(|waker| waker.wake());
}
}
}

fn exit_prevent_cancellation(&mut self) -> bool {
self.prevent_cancellation -= 1;

self.prevent_cancellation == 0
}

fn try_to_disable_cancellation(&mut self) -> bool {
let maybe_updated = self.cancellation_notification.inner.notified.fetch_update(
Ordering::SeqCst,
Ordering::SeqCst,
|old| match CancellationNotificationStatus::from(old) {
CancellationNotificationStatus::Pending => {
Some(CancellationNotificationStatus::Disabled.into())
}
CancellationNotificationStatus::Notified => None,
CancellationNotificationStatus::Disabled => None,
},
);

match maybe_updated {
Ok(_) => true,
Err(old) => {
let old = CancellationNotificationStatus::from(old);
matches!(old, CancellationNotificationStatus::Disabled)
}
}
}
}

/// Context relating to execution of the `poll` of the future. This will contain the information
/// required for the `CancellationContext` that the future holds to enter critical sections and
/// structured cancellations.
pub(crate) struct ExecutionContextOuter {
shared: Arc<Mutex<ExecutionContextData>>,
}

pub(crate) struct ExecutionContextInner {
shared: Arc<Mutex<ExecutionContextData>>,
}

impl ExecutionContextOuter {
pub(crate) fn new() -> (ExecutionContextOuter, ExecutionContextInner) {
let shared = Arc::new(Mutex::new(ExecutionContextData {
cancellation_notification: {
CancellationNotificationData {
inner: Arc::new(CancellationNotificationDataInner {
notified: Default::default(),
wakers: Mutex::new(Some(Default::default())),
}),
}
},
prevent_cancellation: 0,
}));
(
ExecutionContextOuter {
shared: shared.dupe(),
},
ExecutionContextInner { shared },
)
}

pub(crate) fn notify_cancelled(&self) -> bool {
let mut lock = self.shared.lock();
if lock.can_exit() {
true
} else {
lock.notify_cancelled();
false
}
}

pub(crate) fn can_exit(&self) -> bool {
self.shared.lock().can_exit()
}
}

impl ExecutionContextInner {
pub(crate) fn enter_structured_cancellation(&self) -> CriticalSectionGuard {
let mut shared = self.shared.lock();

let notification = shared.enter_structured_cancellation();

CriticalSectionGuard::new_explicit(self, notification)
}

pub(crate) fn try_to_disable_cancellation(&self) -> bool {
let mut shared = self.shared.lock();
if shared.try_to_disable_cancellation() {
true
} else {
// couldn't prevent cancellation, so release our hold onto the counter
shared.exit_prevent_cancellation();
false
}
}

pub(crate) fn exit_prevent_cancellation(&self) -> bool {
let mut shared = self.shared.lock();
shared.exit_prevent_cancellation()
}
}
}
use context::ExecutionContextOuter;

enum CancellationNotificationStatus {
/// no notifications yet. maps to '0'
Pending,
/// notified, maps to '1'
Notified,
/// disabled notifications, maps to '2'
Disabled,
}

impl From<u8> for CancellationNotificationStatus {
fn from(value: u8) -> Self {
match value {
0 => CancellationNotificationStatus::Pending,
1 => CancellationNotificationStatus::Notified,
2 => CancellationNotificationStatus::Disabled,
_ => panic!("invalid status"),
}
}
}

impl From<CancellationNotificationStatus> for u8 {
fn from(value: CancellationNotificationStatus) -> Self {
match value {
CancellationNotificationStatus::Pending => 0,
CancellationNotificationStatus::Notified => 1,
CancellationNotificationStatus::Disabled => 2,
}
}
}

#[derive(Clone, Dupe)]
pub(crate) struct CancellationNotificationData {
inner: Arc<CancellationNotificationDataInner>,
}

struct CancellationNotificationDataInner {
/// notification status per enum 'CancellationNotificationStatus'
notified: AtomicU8,
wakers: Mutex<Option<Slab<Arc<AtomicWaker>>>>,
}

pub(crate) struct CancellationNotificationFuture {
data: CancellationNotificationData,
// index into the waker for this future held by the Slab in 'CancellationNotificationData'
id: Option<usize>,
// duplicate of the waker held for us to update the waker on poll without acquiring lock
waker: Arc<AtomicWaker>,
}

impl CancellationNotificationFuture {
pub(crate) fn new(data: CancellationNotificationData) -> Self {
let waker = Arc::new(AtomicWaker::new());
let id = data
.inner
.wakers
.lock()
.as_mut()
.map(|wakers| wakers.insert(waker.dupe()));
CancellationNotificationFuture { data, id, waker }
}

fn remove_waker(&mut self, id: Option<usize>) {
if let Some(id) = id {
self.data
.inner
.wakers
.lock()
.as_mut()
.map(|wakers| wakers.remove(id));
}
}
}

impl Clone for CancellationNotificationFuture {
fn clone(&self) -> Self {
CancellationNotificationFuture::new(self.data.dupe())
}
}

impl Dupe for CancellationNotificationFuture {}

impl Drop for CancellationNotificationFuture {
fn drop(&mut self) {
self.remove_waker(self.id);
}
}

impl Future for CancellationNotificationFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match CancellationNotificationStatus::from(self.data.inner.notified.load(Ordering::SeqCst))
{
CancellationNotificationStatus::Notified => {
// take the id so that we don't need to lock the wakers when this future is dropped
// after completion
let id = self.id.take();
self.remove_waker(id);
Poll::Ready(())
}
_ => {
self.waker.register(cx.waker());
Poll::Pending
}
}
}
}

#[cfg(test)]
mod tests {
use std::future::Future;
Expand Down
6 changes: 3 additions & 3 deletions app/buck2_futures/src/details/cancellation_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::cancellation::CancellationObserver;
use crate::cancellation::CancellationObserverInner;
use crate::cancellation::CriticalSectionGuard;
use crate::cancellation::DisableCancellationGuard;
use crate::details::cancellable_future::context::ExecutionContextInner;
use crate::details::cancellable_future::CancellationNotificationData;
use crate::details::cancellable_future::CancellationNotificationFuture;
use crate::details::shared_state::CancellationNotificationData;
use crate::details::shared_state::CancellationNotificationFuture;
use crate::details::shared_state::ExecutionContextInner;

pub struct ExplicitCriticalSectionGuard<'a> {
pub(crate) context: Option<&'a ExecutionContextInner>,
Expand Down
Loading

0 comments on commit 15d8eb8

Please sign in to comment.