-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathsimple_pipes_script_asset.py
70 lines (51 loc) · 2.42 KB
/
simple_pipes_script_asset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import shutil
from pathlib import Path
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.decorators.asset_decorator import asset
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._core.pipes.subprocess import PipesSubprocessClient
from dagster.components import Component, ComponentLoadContext, Model
from dagster.components.component_scaffolding import scaffold_component
from dagster.components.scaffold.scaffold import Scaffolder, ScaffoldRequest, scaffold_with
from pydantic import BaseModel
# Same schema used for file generation and defs generation
class SimplePipesScriptScaffoldParams(BaseModel):
asset_key: str
filename: str
# Same schema used for file generation and defs generation
class SimplePipesScriptComponentModel(Model):
asset_key: str
filename: str
class SimplePipesScriptScaffolder(Scaffolder):
@classmethod
def get_scaffold_params(cls):
return SimplePipesScriptScaffoldParams
def scaffold(self, request: ScaffoldRequest, params: SimplePipesScriptScaffoldParams) -> None:
scaffold_component(request, params.model_dump())
Path(request.target_path, params.filename).write_text(
_SCRIPT_TEMPLATE.format(asset_key=params.asset_key)
)
_SCRIPT_TEMPLATE = """
from dagster_pipes import open_dagster_pipes
context = open_dagster_pipes()
context.log.info("Materializing asset {asset_key} from pipes")
context.report_asset_materialization(asset_key="{asset_key}")
"""
@scaffold_with(SimplePipesScriptScaffolder)
class SimplePipesScriptComponent(Component):
"""A simple asset that runs a Python script with the Pipes subprocess client.
Because it is a pipes asset, no value is returned.
"""
@classmethod
def get_model_cls(cls):
return SimplePipesScriptComponentModel
def __init__(self, asset_key: AssetKey, script_path: Path):
self._asset_key = asset_key
self._script_path = script_path
def build_defs(self, context: ComponentLoadContext) -> Definitions:
@asset(key=self._asset_key)
def _asset(context: AssetExecutionContext, pipes_client: PipesSubprocessClient):
cmd = [shutil.which("python"), self._script_path]
return pipes_client.run(command=cmd, context=context).get_results()
return Definitions(assets=[_asset])