Sequential Tasks using Task Class Inheritance #4028
-
ContextUsing the functional API on Prefect v0.14.15. I have to Task subclasses that are inheriting IssueI think my issue is that the flow isn't being setup correctly; and I say that because the error is
I think the Expected BehaviorMy goal here is for the output from Code Snippet to Recreate Error:from prefect import Flow, Task, Parameter
from dask import dataframe as dd
import pandas as pd
import numpy as np
class DataReader(Task):
def _read_dask(self):
df = pd.DataFrame(np.random.randn(100, 4), columns=list('ABCD'))
return dd.from_pandas(df, npartitions=3)
def _read_pandas(self):
raise NotImplementedError
def run(self, data_type):
if data_type == 'dask':
return self._read_dask()
if data_type == 'pandas':
return self._read_pandas()
class ProcessData(Task):
def __init__(self):
pass
def run(self, data_frame):
return data_frame.apply(lambda x: (x + 1).all())
with Flow("subclass task sequential flow") as flow:
data_type = Parameter('data_type')
reader = DataReader()
flow.add_task(reader)
data = reader.run(data_type=data_type)
data_processing = ProcessData()
data_processing.run(data_frame=data)
flow.visualize()
flow.run(data_type='dask') Let me know if I can provide any more info on the issue / expected behavior. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Hi! Your primary problem is that you're calling from prefect import Flow, Task, Parameter
import pandas as pd
import numpy as np
class DataReader(Task):
def _read_dask(self):
df = pd.DataFrame(np.random.randn(100, 4), columns=list("ABCD"))
# return dd.from_pandas(df, npartitions=3)
# I'm just doing this because I don't want to install dask dataframe
return df
def _read_pandas(self):
raise NotImplementedError
def run(self, data_type):
if data_type == "dask":
return self._read_dask()
if data_type == "pandas":
return self._read_pandas()
class ProcessData(Task):
def __init__(self):
# You were not calling the Task superclass
# __init__ which will cause problems
super().__init__()
def run(self, data_frame):
print(data_frame)
return data_frame.apply(lambda x: (x + 1).all())
# This was correct, you need to initialize an instance of the task class before
# you can use it. Often this is done outside of the flow to make it clear that it's
# not quite a part of a flow but it's stylistic and you can put it in the with if you
# want
reader = DataReader()
data_processing = ProcessData()
with Flow("subclass task sequential flow") as flow:
data_type = Parameter("data_type")
# You were calling .run() directly but that should be saved for runtime.
# When building your flow, you just call the task with your args.
data = reader(data_type=data_type)
data_processing(data_frame=data)
flow.run(data_type="dask") Also note this is covered in https://docs.prefect.io/core/getting_started/first-steps.html#task-classes |
Beta Was this translation helpful? Give feedback.
Hi! Your primary problem is that you're calling
task.run()
directly which is only for testing tasks. You build the flow by calling tasks objects e.g.my_task(arg, arg)
and thentask.run()
is called by the executor at runtime. Here's an annotated and working version of your code