|
| 1 | +import os |
1 | 2 | from contextlib import contextmanager |
2 | 3 | from dataclasses import dataclass |
3 | 4 | from queue import Queue |
4 | | -from typing import TYPE_CHECKING, Any, Iterator, Mapping, Optional, Sequence, Union, cast |
| 5 | +from typing import TYPE_CHECKING, Any, Dict, Iterator, Mapping, Optional, Sequence, Union, cast |
5 | 6 |
|
6 | 7 | from dagster_pipes import ( |
7 | 8 | DAGSTER_PIPES_CONTEXT_ENV_VAR, |
@@ -275,6 +276,40 @@ class PipesSession: |
275 | 276 | message_reader_params: PipesParams |
276 | 277 | context: OpExecutionContext |
277 | 278 |
|
| 279 | + @property |
| 280 | + def dagster_tags(self) -> Dict[str, str]: |
| 281 | + """Important Dagster key-value pairs typically attached to external resources launched from Pipes.""" |
| 282 | + tags = { |
| 283 | + "dagster/run-id": self.context.run_id, |
| 284 | + "dagster/job": self.context.job_name, |
| 285 | + } |
| 286 | + |
| 287 | + if self.context.dagster_run.external_job_origin: |
| 288 | + tags["dagster/code-location"] = ( |
| 289 | + self.context.dagster_run.external_job_origin.repository_origin.code_location_origin.location_name |
| 290 | + ) |
| 291 | + |
| 292 | + if user := self.context.get_tag("dagster/user"): |
| 293 | + tags["dagster/user"] = user |
| 294 | + |
| 295 | + if self.context.has_partition_key: |
| 296 | + tags["dagster/partition-key"] = self.context.partition_key |
| 297 | + |
| 298 | + # now using the walrus operator for os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME") |
| 299 | + |
| 300 | + for dagster_cloud_env_var in [ |
| 301 | + "DAGSTER_CLOUD_DEPLOYMENT_NAME", |
| 302 | + "DAGSTER_CLOUD_GIT_REPO", |
| 303 | + "DAGSTER_CLOUD_GIT_BRANCH", |
| 304 | + ]: |
| 305 | + if value := os.getenv(dagster_cloud_env_var): |
| 306 | + name_clean = ( |
| 307 | + dagster_cloud_env_var.lower().replace("dagster_cloud_", "").replace("_", "-") |
| 308 | + ) |
| 309 | + tags[f"dagster/{name_clean}"] = value |
| 310 | + |
| 311 | + return tags |
| 312 | + |
278 | 313 | @public |
279 | 314 | def get_bootstrap_env_vars(self) -> Mapping[str, str]: |
280 | 315 | """Encode context injector and message reader params as environment variables. |
|
0 commit comments