Skip to content

Commit 4e9382b

Browse files
authored
Add script to detect changed assets in Dagster Cloud
This script detects changed assets in a Dagster Cloud branch deployment by querying its GraphQL API. It outputs the results grouped by code location, with options for different output formats.
1 parent cd1588d commit 4e9382b

File tree

1 file changed

+291
-0
lines changed

1 file changed

+291
-0
lines changed

scripts/get_changed_assets.py

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
#!/usr/bin/env python3
2+
"""Script to detect changed assets in a Dagster Cloud branch deployment.
3+
4+
Queries a branch deployment's GraphQL API for assets with changedReasons of NEW
5+
or CODE_VERSION, and outputs them grouped by code location — ready for
6+
`dagster-cloud job launch` invocations.
7+
8+
Example usage:
9+
# Output dagster-cloud job launch commands (default)
10+
python get_changed_assets.py \
11+
--deployment my-branch-deployment
12+
13+
# Filter to specific change types
14+
python get_changed_assets.py \
15+
--deployment my-branch-deployment \
16+
--change-types NEW,CODE_VERSION
17+
18+
# Output as JSON
19+
python get_changed_assets.py \
20+
--deployment my-branch-deployment \
21+
--output-format json
22+
23+
# Output as tab-separated lines
24+
python get_changed_assets.py \
25+
--deployment my-branch-deployment \
26+
--output-format lines
27+
28+
# Verbose mode (diagnostics to stderr, commands to stdout)
29+
python get_changed_assets.py \
30+
--deployment my-branch-deployment \
31+
--verbose
32+
33+
# Pipe directly to bash
34+
python get_changed_assets.py --deployment my-branch-deployment | bash
35+
36+
# Environment variables:
37+
# DAGSTER_CLOUD_ORGANIZATION - Dagster Cloud organization name
38+
# DAGSTER_CLOUD_API_TOKEN - API token for authentication
39+
"""
40+
41+
import argparse
42+
import json
43+
import os
44+
import sys
45+
from collections import defaultdict
46+
from typing import Any, Optional
47+
48+
import requests
49+
50+
CHANGED_ASSETS_QUERY = """
51+
query GetChangedAssets {
52+
assetNodes {
53+
assetKey { path }
54+
changedReasons
55+
repository {
56+
name
57+
location { name }
58+
}
59+
}
60+
}
61+
"""
62+
63+
VALID_CHANGE_TYPES = {
64+
"NEW",
65+
"CODE_VERSION",
66+
"DEPENDENCIES",
67+
"PARTITIONS_DEFINITION",
68+
"TAGS",
69+
"METADATA",
70+
"REMOVED",
71+
}
72+
73+
74+
class DagsterAPIClient:
75+
"""Client for interacting with Dagster Cloud's GraphQL API."""
76+
77+
def __init__(self, organization: str, deployment: str, api_token: str):
78+
self.graphql_url = (
79+
f"https://{organization}.dagster.cloud/{deployment}/graphql"
80+
)
81+
self.api_token = api_token
82+
83+
def execute_query(
84+
self, query: str, variables: Optional[dict[str, Any]] = None
85+
) -> dict[str, Any]:
86+
headers = {
87+
"Content-Type": "application/json",
88+
"Dagster-Cloud-Api-Token": self.api_token,
89+
}
90+
91+
response = requests.post(
92+
self.graphql_url,
93+
json={"query": query, "variables": variables or {}},
94+
headers=headers,
95+
)
96+
response.raise_for_status()
97+
98+
data = response.json()
99+
if "errors" in data:
100+
raise Exception(f"GraphQL errors: {data['errors']}")
101+
102+
return data
103+
104+
105+
def get_changed_assets(
106+
client: DagsterAPIClient,
107+
change_types: set[str],
108+
verbose: bool = False,
109+
) -> dict[tuple[str, str], list[str]]:
110+
"""Query the branch deployment for assets with the specified change reasons.
111+
112+
Returns a dict mapping (location_name, repository_name) to sorted lists of
113+
asset key strings (path components joined with '/').
114+
"""
115+
result = client.execute_query(CHANGED_ASSETS_QUERY)
116+
asset_nodes = result["data"]["assetNodes"]
117+
118+
if verbose:
119+
print(f"Fetched {len(asset_nodes)} total asset nodes", file=sys.stderr)
120+
121+
grouped: dict[tuple[str, str], list[str]] = defaultdict(list)
122+
matched_count = 0
123+
124+
for node in asset_nodes:
125+
reasons = set(node.get("changedReasons") or [])
126+
if not reasons & change_types:
127+
continue
128+
129+
matched_count += 1
130+
asset_key = "/".join(node["assetKey"]["path"])
131+
location_name = node["repository"]["location"]["name"]
132+
repository_name = node["repository"]["name"]
133+
grouped[(location_name, repository_name)].append(asset_key)
134+
135+
if verbose:
136+
print(
137+
f" Changed: {asset_key} ({', '.join(sorted(reasons))})"
138+
f" in {location_name}/{repository_name}",
139+
file=sys.stderr,
140+
)
141+
142+
# Sort asset keys within each group for deterministic output
143+
for key in grouped:
144+
grouped[key].sort()
145+
146+
if verbose:
147+
print(
148+
f"Found {matched_count} changed assets across"
149+
f" {len(grouped)} code location(s)",
150+
file=sys.stderr,
151+
)
152+
153+
return dict(grouped)
154+
155+
156+
def format_commands(
157+
grouped: dict[tuple[str, str], list[str]],
158+
job: str,
159+
) -> str:
160+
lines = []
161+
for (location, repository), asset_keys in sorted(grouped.items()):
162+
asset_args = " ".join(f"--asset-key {key}" for key in asset_keys)
163+
lines.append(
164+
f"dagster-cloud job launch"
165+
f" --location {location}"
166+
f" --repository {repository}"
167+
f" --job {job}"
168+
f" {asset_args}"
169+
)
170+
return "\n".join(lines)
171+
172+
173+
def format_json(grouped: dict[tuple[str, str], list[str]]) -> str:
174+
# Key by "location_name/repository_name" for readability
175+
out = {}
176+
for (location, repository), asset_keys in sorted(grouped.items()):
177+
key = f"{location}/{repository}" if repository != "__repository__" else location
178+
out[key] = asset_keys
179+
return json.dumps(out, indent=2)
180+
181+
182+
def format_lines(grouped: dict[tuple[str, str], list[str]]) -> str:
183+
lines = []
184+
for (location, repository), asset_keys in sorted(grouped.items()):
185+
for asset_key in asset_keys:
186+
lines.append(f"{location}\t{repository}\t{asset_key}")
187+
return "\n".join(lines)
188+
189+
190+
def main():
191+
parser = argparse.ArgumentParser(
192+
description="Detect changed assets in a Dagster Cloud branch deployment",
193+
formatter_class=argparse.RawDescriptionHelpFormatter,
194+
epilog=__doc__,
195+
)
196+
parser.add_argument(
197+
"--deployment",
198+
required=True,
199+
help="Branch deployment name (the slug used in the URL path)",
200+
)
201+
parser.add_argument(
202+
"--organization",
203+
default=os.environ.get("DAGSTER_CLOUD_ORGANIZATION"),
204+
help="Dagster Cloud organization name (default: $DAGSTER_CLOUD_ORGANIZATION)",
205+
)
206+
parser.add_argument(
207+
"--api-token",
208+
default=os.environ.get("DAGSTER_CLOUD_API_TOKEN"),
209+
help="Dagster Cloud API token (default: $DAGSTER_CLOUD_API_TOKEN)",
210+
)
211+
parser.add_argument(
212+
"--change-types",
213+
default="NEW,CODE_VERSION",
214+
help="Comma-separated change types to filter on (default: NEW,CODE_VERSION)",
215+
)
216+
parser.add_argument(
217+
"--output-format",
218+
choices=["commands", "json", "lines"],
219+
default="commands",
220+
help="Output format (default: commands)",
221+
)
222+
parser.add_argument(
223+
"--job",
224+
default="__ASSET_JOB",
225+
help="Job name for launch commands (default: __ASSET_JOB)",
226+
)
227+
parser.add_argument(
228+
"--verbose",
229+
action="store_true",
230+
help="Print diagnostics to stderr",
231+
)
232+
233+
args = parser.parse_args()
234+
235+
if not args.organization:
236+
print(
237+
"Error: --organization or DAGSTER_CLOUD_ORGANIZATION env var is required",
238+
file=sys.stderr,
239+
)
240+
sys.exit(1)
241+
242+
if not args.api_token:
243+
print(
244+
"Error: --api-token or DAGSTER_CLOUD_API_TOKEN env var is required",
245+
file=sys.stderr,
246+
)
247+
sys.exit(1)
248+
249+
# Parse and validate change types
250+
change_types = {ct.strip().upper() for ct in args.change_types.split(",")}
251+
invalid = change_types - VALID_CHANGE_TYPES
252+
if invalid:
253+
print(
254+
f"Error: invalid change type(s): {', '.join(sorted(invalid))}. "
255+
f"Valid types: {', '.join(sorted(VALID_CHANGE_TYPES))}",
256+
file=sys.stderr,
257+
)
258+
sys.exit(1)
259+
260+
if args.verbose:
261+
print(
262+
f"Querying {args.organization}/{args.deployment}"
263+
f" for change types: {', '.join(sorted(change_types))}",
264+
file=sys.stderr,
265+
)
266+
267+
client = DagsterAPIClient(args.organization, args.deployment, args.api_token)
268+
269+
try:
270+
grouped = get_changed_assets(client, change_types, verbose=args.verbose)
271+
except Exception as e:
272+
print(f"Error querying branch deployment: {e}", file=sys.stderr)
273+
sys.exit(1)
274+
275+
if not grouped:
276+
if args.verbose:
277+
print("No changed assets found", file=sys.stderr)
278+
sys.exit(0)
279+
280+
if args.output_format == "json":
281+
output = format_json(grouped)
282+
elif args.output_format == "lines":
283+
output = format_lines(grouped)
284+
else:
285+
output = format_commands(grouped, args.job)
286+
287+
print(output)
288+
289+
290+
if __name__ == "__main__":
291+
main()

0 commit comments

Comments
 (0)