2
2
3
3
from dagster_snowflake import SnowflakeResource , fetch_last_updated_timestamps
4
4
5
- from dagster import (
6
- AssetSelection ,
7
- AssetSpec ,
8
- Definitions ,
9
- EnvVar ,
10
- MetadataValue ,
11
- ObserveResult ,
12
- ScheduleDefinition ,
13
- build_last_update_freshness_checks ,
14
- define_asset_job ,
15
- multi_observable_source_asset ,
16
- )
5
+ import dagster as dg
17
6
18
7
TABLE_SCHEMA = "PUBLIC"
19
8
table_names = ["charges" , "customers" ]
20
- asset_specs = [AssetSpec (table_name ) for table_name in table_names ]
9
+ asset_specs = [dg . AssetSpec (table_name ) for table_name in table_names ]
21
10
22
11
23
- @multi_observable_source_asset (specs = asset_specs )
12
+ @dg . multi_observable_source_asset (specs = asset_specs )
24
13
def source_tables (snowflake : SnowflakeResource ):
25
14
with snowflake .get_connection () as conn :
26
15
freshness_results = fetch_last_updated_timestamps (
@@ -29,42 +18,42 @@ def source_tables(snowflake: SnowflakeResource):
29
18
schema = TABLE_SCHEMA ,
30
19
)
31
20
for table_name , last_updated in freshness_results .items ():
32
- yield ObserveResult (
21
+ yield dg . ObserveResult (
33
22
asset_key = table_name ,
34
23
metadata = {
35
- "dagster/last_updated_timestamp" : MetadataValue .timestamp (
24
+ "dagster/last_updated_timestamp" : dg . MetadataValue .timestamp (
36
25
last_updated
37
26
)
38
27
},
39
28
)
40
29
41
30
42
- source_tables_observation_schedule = ScheduleDefinition (
43
- job = define_asset_job (
31
+ source_tables_observation_schedule = dg . ScheduleDefinition (
32
+ job = dg . define_asset_job (
44
33
"source_tables_observation_job" ,
45
- selection = AssetSelection .assets (source_tables ),
34
+ selection = dg . AssetSelection .assets (source_tables ),
46
35
),
47
36
# Runs every minute. Usually, a much less frequent cadence is necessary,
48
37
# but a short cadence makes it easier to play around with this example.
49
38
cron_schedule = "* * * * *" ,
50
39
)
51
40
52
41
53
- source_table_freshness_checks = build_last_update_freshness_checks (
42
+ source_table_freshness_checks = dg . build_last_update_freshness_checks (
54
43
assets = [source_tables ],
55
44
lower_bound_delta = timedelta (hours = 2 ),
56
45
)
57
46
58
47
59
- defs = Definitions (
48
+ defs = dg . Definitions (
60
49
assets = [source_tables ],
61
50
asset_checks = source_table_freshness_checks ,
62
51
schedules = [source_tables_observation_schedule ],
63
52
resources = {
64
53
"snowflake" : SnowflakeResource (
65
- user = EnvVar ("SNOWFLAKE_USER" ),
66
- account = EnvVar ("SNOWFLAKE_ACCOUNT" ),
67
- password = EnvVar ("SNOWFLAKE_PASSWORD" ),
54
+ user = dg . EnvVar ("SNOWFLAKE_USER" ),
55
+ account = dg . EnvVar ("SNOWFLAKE_ACCOUNT" ),
56
+ password = dg . EnvVar ("SNOWFLAKE_PASSWORD" ),
68
57
)
69
58
},
70
59
)
0 commit comments