5
5
6
6
import yaml
7
7
8
- from dagster import (
9
- AssetExecutionContext ,
10
- AssetKey ,
11
- AssetMaterialization ,
12
- AssetsDefinition ,
13
- AssetSpec ,
14
- ConfigurableResource ,
15
- _check as check ,
16
- multi_asset ,
17
- )
8
+ import dagster as dg
9
+ import dagster ._check as check
18
10
from dagster ._annotations import public
19
11
20
12
@@ -33,9 +25,11 @@ def load(self):
33
25
return yaml .safe_load (Path (self .replication_configuration_yaml ).read_text ())
34
26
35
27
36
- class ReplicationResource (ConfigurableResource ):
28
+ class ReplicationResource (dg . ConfigurableResource ):
37
29
@public
38
- def run (self , context : AssetExecutionContext ) -> Iterator [AssetMaterialization ]:
30
+ def run (
31
+ self , context : dg .AssetExecutionContext
32
+ ) -> Iterator [dg .AssetMaterialization ]:
39
33
metadata_by_key = context .assets_def .metadata_by_key
40
34
first_asset_metadata = next (iter (metadata_by_key .values ()))
41
35
@@ -52,16 +46,16 @@ def run(self, context: AssetExecutionContext) -> Iterator[AssetMaterialization]:
52
46
results = replicate (Path (project .replication_configuration_yaml ))
53
47
for table in results :
54
48
if table .get ("status" ) == "SUCCESS" :
55
- yield AssetMaterialization (
49
+ yield dg . AssetMaterialization (
56
50
asset_key = translator .get_asset_key (table ), metadata = table
57
51
)
58
52
59
53
60
54
@dataclass
61
55
class ReplicationTranslator :
62
56
@public
63
- def get_asset_key (self , table_definition : Mapping [str , str ]) -> AssetKey :
64
- return AssetKey (str (table_definition .get ("name" )))
57
+ def get_asset_key (self , table_definition : Mapping [str , str ]) -> dg . AssetKey :
58
+ return dg . AssetKey (str (table_definition .get ("name" )))
65
59
66
60
67
61
def custom_replication_assets (
@@ -70,19 +64,19 @@ def custom_replication_assets(
70
64
name : Optional [str ] = None ,
71
65
group_name : Optional [str ] = None ,
72
66
translator : Optional [ReplicationTranslator ] = None ,
73
- ) -> Callable [[Callable [..., Any ]], AssetsDefinition ]:
67
+ ) -> Callable [[Callable [..., Any ]], dg . AssetsDefinition ]:
74
68
project = replication_project .load ()
75
69
76
70
translator = (
77
71
check .opt_inst_param (translator , "translator" , ReplicationTranslator )
78
72
or ReplicationTranslator ()
79
73
)
80
74
81
- return multi_asset (
75
+ return dg . multi_asset (
82
76
name = name ,
83
77
group_name = group_name ,
84
78
specs = [
85
- AssetSpec (
79
+ dg . AssetSpec (
86
80
key = translator .get_asset_key (table ),
87
81
metadata = {
88
82
"replication_project" : project ,
0 commit comments