Retrievable transaction results #17545
-
Once a transaction is committed with persisted results, a file is written to disk with its name being the idempotency key if supplied. I'd like to know if this file could include a result attributed to the transaction so that it can be retrieved if need be in subsequent flow runs. Here is a minimal example which shares with my real use case some of the traits that prompted me to look for this. import time
from prefect import task, flow
@task
def task1():
time.sleep(2)
return "task1"
@task
def task2():
time.sleep(3)
return "task2"
@task
def task3(arg1):
time.sleep(1)
return "task3"
@task
def task4(arg1, arg2):
time.sleep(4)
return "task4"
@flow(persist_result=True)
def pipeline():
task1_future = task1.submit()
task2_future = task2.submit()
task3_future = task3.submit(task1_future)
task4_future = task4.submit(task2_future, task3_future)
result = task4_future.result()
return result
if __name__ == "__main__":
pipeline() In this workflow, I could enable caching for both tasks. But intermediate results may be objects that are tedious to serialise. Plus it would lead to needless disk use as I'm always only interested in the result of What I'd want is to be able to set a result for the transaction which could outlive the flow run: with transaction(key="my-key") as txn:
if txn.is_committed():
result = txn.get_result() # or txn.get("result") or something else along those lines
else:
task3_future = task3.submit(task1_future)
task4_future = task4.submit(task2_future, task3_future)
result = task4_future.result()
txn.set("result", result) # Or Prefect could commit the result automatically, e.g. if it's the return statement of the context manager block? Actually I thought of a workaround, but it feels like cheating: enable caching for @task(cache_key_fn: lambda context, parameters: "my-key")
def task4(arg1, cache_key):
time.sleep(4)
return "task4"
...
with transaction(key="my-key") as txn:
if txn.is_committed():
result = task4(task2_future, None) # Dummy input for task3
else:
task3_future = task3.submit(task1_future)
task4_future = task4.submit(task2_future, task3_future)
result = task4_future.result() |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 8 replies
-
Hi @TheoMathurin - I think if txn.is_committed():
result = txn.read().result |
Beta Was this translation helpful? Give feedback.
-
Oh wow, thanks @cicdw! I'll use this. How is this result determined? Is it the return statement of the transaction block? |
Beta Was this translation helpful? Give feedback.
Ah good question - so you would need to explicitly write/stage something to the
key
set on thetransaction
using thestage
method:Ultimately all of this transactions machinery is what backs Prefect's task result and cache storage mechanism, so it's possible you could get what you need using tasks directly.