Description
Describe the current behavior
Currently, Prefect supports many user interfaces that we attempt to maintain as compatible with both synchronous execution as well as asynchronous execution. A popular example of this is Block.load
. The way this is achieved is through an internal utility that "magically" - and more critically: quietly - attempts to decide on the user's behalf whether the user will be able to await the coroutine or not.
This works well in certain situations such as methods / utilities called within Prefect tasks and flows - this is because we explicitly track whether that task or flow is synchronous or asynchronous.
However, this can turn out very poorly in situations where something about the runtime environment changes between local development and production. A few examples to make the point:
- AttributeError: 'coroutine' object has no attribute 'get' #14712
sync_compatible
decorator returning coroutines unexpectedly when running a flow #14625- "RuntimeError: is bound to a different event loop" when using prefect client from sync flow in kubernetes #13181
Prefect 3.0 exposes a new keyword argument on these special methods / functions for explicitly setting the behavior a user wants: to continue with the block loading example, users can now specify Block.load(**kwargs, _sync=True)
for enforcing synchronous execution and await Block.load(**kwargs, _sync=False)
for enforcing asynchronous execution. This is useful as an escape hatch, but under the hood it still engages with complex event loop and threading logic that risks performance degradation and more difficult to inspect failure modes.
Clearly there is room for improvement here along a few dimensions:
- warning users who are relying on implicit magic so they can more easily self-debug / harden their code
- improving Prefect performance by not managing its own event loops / threads for running coroutines
- exposing
_sync=True/False
behavior in a more first class way for discoverability
Describe the proposed behavior
To achieve the goals outlined above, I propose first expanding the sync_compatible
interface in two ways:
- allowing for
sync_compatible(sync_version=sync_method)
that explicitly provides an alternative synchronous implementation to dispatch between; note this does mean expanding the codebase into both synchronous and asynchronous implementations. The decorator and our current "magic" will allow us to take a strangler fig approach and incrementally add these duplicate implementations as we develop - exposing a mirror utility
async_compatible(async_version=async_method)
(the need for this will become clear below when I discuss naming conventions)
For any decorated function / method that has a dual implementation, Prefect can begin issuing a warning whenever the user relies on behavior for which that function / method is dispatching to another implementation. For example:
await Block.load(**kwargs)
## warning is issued that directs the user to use `Block.aload(**kwargs)` explicitly
To make sure this part is not glossed over: this will ultimately result in Prefect maintaining two implementations for a large class of user interfaces (primarily those that interact with the Prefect client / API).
Naming Convention
To achieve this, we will rely on the following naming conventions:
- classes for which the core implementation will change between sync / async will be sychronous by default, and the async versions will prefix their class name with
Async
; e.g.,PrefectClient
vs.AsyncPrefectClient
- whenever the class remains the same, synchronous methods will be the default and asynchronous methods will implemented with an
a
prefix: e.g.,Block.load
vs.Block.aload
- for top-line functions, the same will be true: implementations will be synchronous first, and async implementations will have an
a
prefix: e.g.,run_deployment
vsarun_deployment
There is one edge case with this, which is .wait
methods on Prefect futures; for this we will make the corresponding class awaitable for async waits.
Remaining uses of @sync_compatible
prefect
prefect.artifacts
-
Artifact.create
-
Artifact.get
-
Artifact.get_or_create
-
create_link_artifact
-
create_markdown_artifact
-
create_table_artifact
-
update_progress_artifact
-
create_image_artifact
prefect.filesystems
-
LocalFileSystem.get_directory
-
LocalFileSystem.put_directory
-
LocalFileSystem.read_path
-
LocalFileSystem.write_path
-
RemoteFileSystem.get_directory
-
RemoteFileSystem.put_directory
-
RemoteFileSystem.read_path
-
RemoteFileSystem.write_path
-
SMB.get_directory
-
SMB.put_directory
-
SMB.read_path
-
SMB.write_path
prefect.flow_runs
-
pause_flow_run
-
suspend_flow_run
-
resume_flow_run
prefect.flows
-
Flow.to_deployment
-
Flow.from_source
-
Flow.deploy
-
Flow.visualize
prefect.results
-
ResultStore.update_for_flow
-
ResultStore.update_for_task
-
ResultStore._exists
-
ResultStore._read
-
ResultStore._persist_result_record
-
ResultStore.store_parameters
-
ResultStore.read_parameters
prefect.states
-
_get_state_result
-
get_state_exception
-
raise_state_exception
prefect.task_worker
-
TaskWorker.start
-
TaskWorker.stop
-
serve
prefect.tasks
-
Task.serve
prefect.blocks.core
-
Block.load_from_ref
-
Block.register_type_and_schema
-
Block.save
-
Block.delete
prefect.blocks.notifications
-
AbstractAppriseNotificationBlock.notify
-
AppriseNotifictionBlock.notify
-
PagerDutyWebHook.notify
-
CustomWebhookNotificationBlock.notify
prefect.blocks.redis
-
RedisStorageContainer.read_path
-
RedisStorageContainer.write_path
`prefect.concurrency.v1._asyncio
-
acquire_concurrency_slots
-
release_concurrency_slots
prefect.deployments.flow_runs
-
run_deployment
prefect.deployments.runner
-
RunnerDeployment.apply
-
RunnerDeployment.from_storage
-
deploy
prefect.input.actions
-
create_flow_run_input_from_model
-
create_flow_run_input
-
filter_flow_run_input
-
read_flow_run_input
-
delete_flow_run_input
prefect.input.run_input
-
BaseRunInput.save
-
BaseRunInput.load
-
BaseRunInput.respond
-
BaseRunInput.send_to
-
AutomaticRunInput.load
-
AutomaticRunInput.next
-
GetAutomaticInputHandler.next
-
send_input
prefect.runner.runner
-
Runner.add_deployment
-
Runner.add_flow
-
Runner._add_storage
-
Runner.stop
prefect.runner.submit
-
submit_to_runner
-
wait_for_submitted_runs
prefect.testing.fixtures
-
process_events
x2
Additional context
Feel free to comment and discuss.