diff --git a/docs/source/changes.rst b/docs/source/changes.rst index aa7d3372..15a06be9 100644 --- a/docs/source/changes.rst +++ b/docs/source/changes.rst @@ -16,6 +16,14 @@ Release 0.4.0 Release date: XXXX-XX-XX +Features +~~~~~~~~ + +* New "steps" background task type for cancellable progress-reporting tasks. + This has a richer interface than the existing progress background task type. + It can be used via the new |submit_steps| convenience function, which + returns a |StepsFuture| object. (#484) + Documentation ~~~~~~~~~~~~~ @@ -472,9 +480,11 @@ and progress-reporting tasks for Traits UI applications based on Qt. .. |run| replace:: :meth:`~.BaseTask.run` .. |send| replace:: :meth:`~.BaseTask.send` .. |shutdown| replace:: :meth:`~.TraitsExecutor.shutdown` +.. |StepsFuture| replace:: :class:`~.StepsFuture` .. |submit_call| replace:: :func:`~.submit_call` .. |submit_iteration| replace:: :func:`~.submit_iteration` .. |submit_progress| replace:: :func:`~.submit_progress` +.. |submit_steps| replace:: :func:`~.submit_steps` .. |task| replace:: :meth:`~.ITaskSpecification.task` .. |TaskCancelled| replace:: :exc:`~.TaskCancelled` .. |TestAssistant| replace:: :exc:`~.TestAssistant` diff --git a/traits_futures/api.py b/traits_futures/api.py index 9d00855a..753a5542 100644 --- a/traits_futures/api.py +++ b/traits_futures/api.py @@ -28,6 +28,8 @@ - :func:`~.submit_call` - :func:`~.submit_iteration` - :func:`~.submit_progress` +- :func:`~.submit_steps` +- :class:`~.IStepsReporter` Types of futures ---------------- @@ -35,6 +37,7 @@ - :class:`~.CallFuture` - :class:`~.IterationFuture` - :class:`~.ProgressFuture` +- :class:`~.StepsFuture` - :exc:`~.TaskCancelled` Future states @@ -86,6 +89,11 @@ submit_iteration, ) from traits_futures.background_progress import ProgressFuture, submit_progress +from traits_futures.background_steps import ( + IStepsReporter, + StepsFuture, + submit_steps, +) from traits_futures.base_future import BaseFuture, BaseTask, TaskCancelled from traits_futures.ets_event_loop import ETSEventLoop from traits_futures.executor_states import ( @@ -115,6 +123,8 @@ "CallFuture", "IterationFuture", "ProgressFuture", + "StepsFuture", + "IStepsReporter", "TaskCancelled", # Future states "FutureState", @@ -135,6 +145,7 @@ "submit_call", "submit_iteration", "submit_progress", + "submit_steps", # Support for creating new task types "BaseFuture", "BaseTask", diff --git a/traits_futures/background_steps.py b/traits_futures/background_steps.py new file mode 100644 index 00000000..48294044 --- /dev/null +++ b/traits_futures/background_steps.py @@ -0,0 +1,490 @@ +# (C) Copyright 2018-2021 Enthought, Inc., Austin, TX +# All rights reserved. +# +# This software is provided without warranty under the terms of the BSD +# license included in LICENSE.txt and may be redistributed only under +# the conditions described in the aforementioned license. The license +# is also available online at http://www.enthought.com/licenses/BSD.txt +# +# Thanks for using Enthought open source! + +""" +Support for an interruptible progress-reporting callable. + +This module defines a task specification and a corresponding future for tasks +that execute in the background and report progress information to the +foreground. The points at which progress is reported also represent points at +which the task can be interrupted. +""" + +import abc +import collections + +from traits.api import ( + Callable, + Dict, + HasStrictTraits, + Instance, + Int, + observe, + Property, + Str, + Tuple, +) + +from traits_futures.base_future import BaseFuture, BaseTask, TaskCancelled +from traits_futures.i_task_specification import ITaskSpecification + + +class IStepsReporter(abc.ABC): + """ + Interface for progress reporters. + + This is the interface that's implemented by the StepsReporter + object that's passed to the background tasks. + """ + + @abc.abstractmethod + def step(self, message, *, size=1): + """ + Start a processing step. + + Parameters + ---------- + message : str + A description of this step. + size : int, optional + The size of this step, in whatever units make sense for the + application at hand. Defaults to 1. + + Raises + ------ + TaskCancelled + If the user has called ``cancel()`` before this. + """ + + @abc.abstractmethod + def stop(self, message): + """ + Report that processing is complete. + + Also updates the total progress, if any tasks are pending. + + Parameters + ---------- + message : str + Message to display on completion. For a progress dialog that + disappears on completion, this message will never be seen by + the user, but for other views the message may be visible. + + Raises + ------ + TaskCancelled + If the user has called ``cancel()`` before this. + """ + + +class StepsState( + collections.namedtuple( + "StepsState", ["total", "complete", "pending", "message"] + ) +): + """ + Tuple subclass encapsulating progress state of the task. + + Objects of this type capture the progress state of an in-progress task. + + Attributes + ---------- + total : int + Total number of units of work for the task. + complete : int + Total units of work completed. + pending : int + Size of the step currently in progress, or 0 if there's no + in-progress step. + message : str + Description of the current step, or a more general message + if processing has completed or has yet to start. + """ + + @classmethod + def initial(cls, total, message): + """ + Initial state, given total work and initial message. + + Parameters + ---------- + total : int + Total units of work. + message : str + Message to use for the initial state. + + Returns + ------- + StepsState + """ + return cls(total=total, complete=0, pending=0, message=message) + + def set_message(self, message): + """ + Return a copy of this state with an updated message. + + Parameters + ---------- + message : str + Message to use for the new state. + + Returns + ------- + StepsState + """ + return self._replace(message=message) + + def set_step(self, size): + """ + Return a copy of this state updated for the next processing step. + + Parameters + ---------- + size : int + Number of units of work represented by the next step. + + Returns + ------- + StepsState + """ + return self._replace( + complete=self.complete + self.pending, + pending=size, + ) + + +#: Message type for the message sent on each state update. The argument is an +#: instance of StepsState. +UPDATE = "update" + + +@IStepsReporter.register +class StepsReporter: + """ + Object used by the background task to report progress information. + + A :class:`StepsReporter` instance is passed to the background task, and its + ``step`` and ``stop`` methods can be used by that background task to report + progress. + + Parameters + ---------- + send + Callable provided by the Traits Futures machinery, and used to send + messages to the linked future. + cancelled + Callable provided by the Traits Futures machinery, and used to check + for cancellation requests. + state : StepsState + Initial state of the reporter. + """ + + def __init__(self, send, cancelled, state): + self._send = send + self._cancelled = cancelled + self._state = state + + def step(self, message, *, size=1): + """ + Start a processing step. + + Parameters + ---------- + message : str + A description of this step. + size : int, optional + The size of this step (in whatever units make sense for the + application at hand). Defaults to 1. + + Raises + ------ + TaskCancelled + If the user has called ``cancel()`` before this. + """ + self._check_cancel() + self._state = self._state.set_step(size).set_message(message) + self._send(UPDATE, self._state) + + def stop(self, message): + """ + Report that processing is complete. + + Also updates the total progress, if any tasks are pending. + + Parameters + ---------- + message : str + Message to display on completion. For a progress dialog that + disappears on completion, this message will never be seen by + the user, but for other views the message may be visible. + + Raises + ------ + TaskCancelled + If the user has called ``cancel()`` before this. + """ + self._check_cancel() + self._state = self._state.set_step(0).set_message(message) + self._send(UPDATE, self._state) + + # Private methods and properties ########################################## + + def _check_cancel(self): + """Check if the task has been cancelled. + + Raises + ------ + TaskCancelled + If the task has been cancelled. + """ + if self._cancelled(): + raise TaskCancelled("Cancellation requested via the future") + + +class StepsTask(BaseTask): + """ + Wrapper around the actual callable to be run. + + This wrapper handles capturing exceptions and sending the final status of + the task on completion. + + Parameters + ---------- + initial_state : StepsState + Initial state of the progress. + callable + User-supplied function to be called. + args : tuple + Positional arguments to be passed to ``callable``. + kwargs : dict + Named arguments to be passed to ``callable``, not including the + ``reporter`` argument. + """ + + def __init__(self, initial_state, callable, args, kwargs): + self.callable = callable + self.args = args + self.kwargs = kwargs + self.initial_state = initial_state + + def run(self): + """ + Run the body of the steps task. + + Returns + ------- + object + May return any object. That object will be delivered to the + future's ``result`` attribute. + """ + reporter = StepsReporter( + send=self.send, + cancelled=self.cancelled, + state=self.initial_state, + ) + try: + result = self.callable( + *self.args, + **self.kwargs, + reporter=reporter, + ) + except TaskCancelled: + return None + else: + return result + + +class StepsFuture(BaseFuture): + """ + Object representing the front-end handle to a background steps task. + """ + + #: Most recently received message from the background task. + message = Property(Str()) + + #: Total work, in whatever units make sense for the application. + total = Property(Int()) + + #: Units of work completed so far. + complete = Property(Int()) + + # Private traits ########################################################## + + #: The progress state of the background task. + _progress_state = Instance(StepsState, allow_none=False) + + # Private methods ######################################################### + + def _process_update(self, progress_state): + """ + Process an UPDATE message from the background task. + """ + self._progress_state = progress_state + + def _get_message(self): + """Traits property getter for the 'message' trait.""" + return self._progress_state.message + + def _get_total(self): + """Traits property getter for the 'total' trait.""" + return self._progress_state.total + + def _get_complete(self): + """Traits property getter for the 'complete' property.""" + return self._progress_state.complete + + @observe("_progress_state") + def _update_state_traits(self, event): + if event.old is None: + return + + old_state, new_state = event.old, event.new + + if old_state.message != new_state.message: + self.trait_property_changed( + "message", old_state.message, new_state.message + ) + if old_state.total != new_state.total: + self.trait_property_changed( + "total", old_state.total, new_state.total + ) + if old_state.complete != new_state.complete: + self.trait_property_changed( + "complete", old_state.complete, new_state.complete + ) + + +@ITaskSpecification.register +class BackgroundSteps(HasStrictTraits): + """ + Object representing the background task to be executed. + """ + + #: Total units of work for the task, if known. None if not known. + total = Int() + + #: Initial message. + message = Str() + + #: The callable for the task. + callable = Callable() + + #: Positional arguments to be passed to the callable. + args = Tuple() + + #: Named arguments to be passed to the callable, excluding the "reporter" + #: named argument. The "reporter" argument will be supplied through the + #: execution machinery. + kwargs = Dict(Str()) + + # --- ITaskSpecification implementation ----------------------------------- + + def future(self, cancel): + """ + Return a Future for the background task. + + Parameters + ---------- + cancel + Zero-argument callable, returning no useful result. The returned + future's ``cancel`` method should call this to request cancellation + of the associated background task. + + Returns + ------- + future : ProgressFuture + Future object that can be used to monitor the status of the + background task. + """ + return StepsFuture( + _cancel=cancel, + _progress_state=self._initial_state, + ) + + def task(self): + """ + Return a background callable for this task specification. + + Returns + ------- + task : StepsTask + Callable accepting arguments ``send`` and ``cancelled``. The + callable can use ``send`` to send messages and ``cancelled`` to + check whether cancellation has been requested. + """ + return StepsTask( + initial_state=self._initial_state, + callable=self.callable, + args=self.args, + kwargs=self.kwargs, + ) + + # Private traits and methods ############################################## + + #: Initial progress state to be passed to both the task and the future. + _initial_state = Property( + Instance(StepsState), observe=["total", "message"] + ) + + def _get__initial_state(self): + """Traits property getter for the _initial_state trait.""" + return StepsState.initial(total=self.total, message=self.message) + + +def submit_steps(executor, total, message, callable, *args, **kwargs): + """ + Convenience function to submit a BackgroundSteps task to an executor. + + Note: the 'executor', 'total', 'message', and 'callable' parameters should + always be passed by position instead of by name. Future versions of the + library may enforce this restriction. + + Parameters + ---------- + executor : TraitsExecutor + Executor to submit the task to. + total : int + Total units of work for this task, in whatever units are appropriate + for the task in hand. + message : str + Description of the task. This will be used until the first step + message is received from the background task. + callable : collections.abc.Callable + Callable to execute in the background. This should accept a + "reporter" keyword argument, in addition to any other positional + and named arguments it needs. When the callable is invoked, an + instance of "StepsReporter" will be supplied via that "reporter" + argument. + *args + Positional arguments to pass to the callable. + **kwargs + Named arguments to pass to the callable, excluding the "reporter" + argument. That argument will be passed separately. + + Returns + ------- + future : StepsFuture + Object representing the state of the background task. + """ + if "reporter" in kwargs: + raise TypeError( + "The 'reporter' parameter will be passed automatically; it " + "should not be included in the named parameters." + ) + + return executor.submit( + BackgroundSteps( + total=total, + message=message, + callable=callable, + args=args, + kwargs=kwargs, + ) + ) diff --git a/traits_futures/tests/background_steps_tests.py b/traits_futures/tests/background_steps_tests.py new file mode 100644 index 00000000..10f6e004 --- /dev/null +++ b/traits_futures/tests/background_steps_tests.py @@ -0,0 +1,359 @@ +# (C) Copyright 2018-2021 Enthought, Inc., Austin, TX +# All rights reserved. +# +# This software is provided without warranty under the terms of the BSD +# license included in LICENSE.txt and may be redistributed only under +# the conditions described in the aforementioned license. The license +# is also available online at http://www.enthought.com/licenses/BSD.txt +# +# Thanks for using Enthought open source! + + +from traits.api import ( + HasStrictTraits, + Instance, + Int, + List, + observe, + Str, + Tuple, + Union, +) + +from traits_futures.api import ( + CANCELLED, + COMPLETED, + FAILED, + IStepsReporter, + StepsFuture, + submit_steps, +) + +#: Maximum timeout for blocking calls, in seconds. A successful test should +#: never hit this timeout - it's there to prevent a failing test from hanging +#: forever and blocking the rest of the test suite. +SAFETY_TIMEOUT = 5.0 + + +#: Trait type for the progress state: total, complete, message. +ProgressState = Tuple(Int(), Int(), Str()) + + +class StepsListener(HasStrictTraits): + """ + Listener recording all state changes for a StepsFuture. + """ + + #: The future we're listening to. + future = Instance(StepsFuture) + + #: The progress state. + state = Union(None, ProgressState) + + #: All recorded states, including the initial state. + states = List(ProgressState) + + @observe("future.[total,complete,message]") + def _record_state(self, event): + future = event.new if event.name == "future" else event.object + self.state = future.total, future.complete, future.message + + @observe("state") + def _append_new_state(self, event): + """Record the new state whenever it changes.""" + self.states.append(event.new) + + +class BackgroundStepsTests: + def test_reporter_implements_i_steps_reporter(self): + def check_steps_reporter_interface(reporter): + return isinstance(reporter, IStepsReporter) + + future = submit_steps( + self.executor, 0, "Checking", check_steps_reporter_interface + ) + self.assertTaskEventuallyCompletes(future, True) + + def test_reporter_is_passed_by_name(self): + def reporter_by_name(*, reporter): + return 46 + + future = submit_steps( + self.executor, 0, "Doing nothing", reporter_by_name + ) + self.assertTaskEventuallyCompletes(future, 46) + + def test_result(self): + def return_a_value(reporter): + return 45 + + future = submit_steps( + self.executor, 0, "Returning a value", return_a_value + ) + self.assertTaskEventuallyCompletes(future, 45) + + def test_error(self): + def raise_an_error(reporter): + 1 / 0 + + future = submit_steps(self.executor, 0, "Raising", raise_an_error) + self.assertTaskEventuallyFails(future, ZeroDivisionError) + + def test_state_changes_no_reports(self): + def return_a_value(reporter): + return 45 + + future = submit_steps( + self.executor, 0, "Doing nothing", return_a_value + ) + listener = StepsListener(future=future) + self.assertTaskEventuallyCompletes(future, 45) + + self.assertEqual( + listener.states, + [ + (0, 0, "Doing nothing"), + ], + ) + + def test_simple_messages(self): + def send_messages(reporter): + reporter.step("Uploading file 1") + reporter.step("Uploading file 2") + reporter.stop("Finished") + + future = submit_steps( + self.executor, 2, "Uploading files", send_messages + ) + listener = StepsListener(future=future) + self.assertTaskEventuallyCompletes(future, None) + self.assertEqual( + listener.states, + [ + (2, 0, "Uploading files"), + (2, 0, "Uploading file 1"), + (2, 1, "Uploading file 2"), + (2, 2, "Finished"), + ], + ) + + def test_irregular_step_sizes(self): + def send_messages(reporter): + reporter.step("Uploading file 1", size=2) + reporter.step("Uploading file 2", size=5) + reporter.step("Uploading file 3", size=3) + reporter.stop("Uploads complete") + + future = submit_steps( + self.executor, 10, "Uploading files...", send_messages + ) + listener = StepsListener(future=future) + self.assertTaskEventuallyCompletes(future, None) + self.assertEqual( + listener.states, + [ + (10, 0, "Uploading files..."), + (10, 0, "Uploading file 1"), + (10, 2, "Uploading file 2"), + (10, 7, "Uploading file 3"), + (10, 10, "Uploads complete"), + ], + ) + + def test_no_stop(self): + # If the user doesn't call stop, we don't get a final update, but + # nothing else should go wrong. + def send_messages(reporter): + reporter.step("Uploading file 1") + reporter.step("Uploading file 2") + + future = submit_steps( + self.executor, 2, "Uploading files", send_messages + ) + listener = StepsListener(future=future) + self.assertTaskEventuallyCompletes(future, None) + self.assertEqual( + listener.states, + [ + (2, 0, "Uploading files"), + (2, 0, "Uploading file 1"), + (2, 1, "Uploading file 2"), + ], + ) + + def test_cancellation_on_step(self): + barrier = self._context.event() + detector = self._context.event() + + def send_messages(barrier, reporter, detector): + reporter.step("Uploading file 1") + # Test will cancel at this point. + barrier.wait(timeout=SAFETY_TIMEOUT) + reporter.step("Uploading file 2") + # Should never get here. + detector.set() + reporter.stop("All files uploaded") + + future = submit_steps( + self.executor, + 2, + "Uploading files", + send_messages, + barrier=barrier, + detector=detector, + ) + listener = StepsListener(future=future) + + # Run until we get the first progress message, then cancel and allow + # the background job to proceed. + self.run_until( + listener, + "state", + lambda listener: listener.state[2] == "Uploading file 1", + ) + future.cancel() + barrier.set() + + self.assertTaskEventuallyCancelled(future) + self.assertFalse(detector.is_set()) + + self.assertEqual( + listener.states, + [ + (2, 0, "Uploading files"), + (2, 0, "Uploading file 1"), + ], + ) + + def test_cancellation_on_stop(self): + barrier = self._context.event() + detector = self._context.event() + + def send_messages(barrier, reporter, detector): + reporter.step("Uploading file 1") + reporter.step("Uploading file 2") + # Test will cancel at this point. + barrier.wait(timeout=SAFETY_TIMEOUT) + reporter.stop("All files uploaded"), + # Should never get here. + detector.set() + + future = submit_steps( + self.executor, + 2, + "Uploading files", + send_messages, + barrier=barrier, + detector=detector, + ) + listener = StepsListener(future=future) + + # Run until we get the second progress message, then cancel and allow + # the background job to proceed. + self.run_until( + listener, + "state", + lambda listener: listener.state[2] == "Uploading file 2", + ) + future.cancel() + barrier.set() + + self.assertTaskEventuallyCancelled(future) + self.assertFalse(detector.is_set()) + + self.assertEqual( + listener.states, + [ + (2, 0, "Uploading files"), + (2, 0, "Uploading file 1"), + (2, 1, "Uploading file 2"), + ], + ) + + def test_initial_total(self): + # Exercise the case where we set the total and message up front. + def send_messages(reporter): + reporter.step("Uploading file 1") + reporter.step("Uploading file 2") + reporter.stop("All uploaded") + + future = submit_steps( + self.executor, 2, "Uploading files", send_messages + ) + listener = StepsListener(future=future) + self.assertTaskEventuallyCompletes(future, None) + self.assertEqual( + listener.states, + [ + (2, 0, "Uploading files"), + (2, 0, "Uploading file 1"), + (2, 1, "Uploading file 2"), + (2, 2, "All uploaded"), + ], + ) + + def test_reporter_in_kwargs(self): + def some_callable(reporter): + pass + + with self.assertRaises(TypeError): + submit_steps( + self.executor, + 2, + "Uploading files", + some_callable, + reporter=None, + ) + + # Helper functions + + def halt_executor(self): + """ + Wait for the executor to stop. + """ + executor = self.executor + executor.stop() + self.run_until(executor, "stopped", lambda executor: executor.stopped) + del self.executor + + def assertTaskEventuallyCompletes(self, future, result): + """ + Wait for a task to finish, and check its return value. + + Parameters + ---------- + future : BaseFuture + The future to wait for. + result : object + Value that that task is expected to return. + """ + self.run_until(future, "done", lambda future: future.done) + self.assertEqual(future.state, COMPLETED) + self.assertEqual(future.result, result) + + def assertTaskEventuallyFails(self, future, exception_type): + """ + Wait for a task to finish, and verify that it fails. + + Parameters + ---------- + future : BaseFuture + The future to wait for. + exception_type : type + Type of the exception that the task is expected to raise. + """ + self.run_until(future, "done", lambda future: future.done) + self.assertEqual(future.state, FAILED) + self.assertIn(exception_type.__name__, future.exception[0]) + + def assertTaskEventuallyCancelled(self, future): + """ + Wait for a task to finish, and check it reached CANCELLED state. + + Parameters + ---------- + future : BaseFuture + The future to wait for. + """ + self.run_until(future, "done", lambda future: future.done) + self.assertEqual(future.state, CANCELLED) diff --git a/traits_futures/tests/test_api.py b/traits_futures/tests/test_api.py index 3474bff7..91794ac8 100644 --- a/traits_futures/tests/test_api.py +++ b/traits_futures/tests/test_api.py @@ -29,17 +29,20 @@ def test_imports(self): IEventLoop, IFuture, IParallelContext, + IStepsReporter, ITaskSpecification, IterationFuture, MultiprocessingContext, MultithreadingContext, ProgressFuture, RUNNING, + StepsFuture, STOPPED, STOPPING, submit_call, submit_iteration, submit_progress, + submit_steps, TaskCancelled, TraitsExecutor, WAITING, diff --git a/traits_futures/tests/test_background_steps.py b/traits_futures/tests/test_background_steps.py new file mode 100644 index 00000000..be8ac148 --- /dev/null +++ b/traits_futures/tests/test_background_steps.py @@ -0,0 +1,47 @@ +# (C) Copyright 2018-2021 Enthought, Inc., Austin, TX +# All rights reserved. +# +# This software is provided without warranty under the terms of the BSD +# license included in LICENSE.txt and may be redistributed only under +# the conditions described in the aforementioned license. The license +# is also available online at http://www.enthought.com/licenses/BSD.txt +# +# Thanks for using Enthought open source! + +""" +Tests for the background steps task. +""" + + +import unittest + +from traits_futures.api import ( + MultithreadingContext, + StepsFuture, + TraitsExecutor, +) +from traits_futures.testing.test_assistant import TestAssistant +from traits_futures.tests.background_steps_tests import BackgroundStepsTests +from traits_futures.tests.common_future_tests import CommonFutureTests + + +class TestBackgroundSteps( + TestAssistant, BackgroundStepsTests, unittest.TestCase +): + def setUp(self): + TestAssistant.setUp(self) + self._context = MultithreadingContext() + self.executor = TraitsExecutor( + context=self._context, + event_loop=self._event_loop, + ) + + def tearDown(self): + self.halt_executor() + self._context.close() + TestAssistant.tearDown(self) + + +class TestStepsFuture(CommonFutureTests, unittest.TestCase): + def setUp(self): + self.future_class = StepsFuture