-
Notifications
You must be signed in to change notification settings - Fork 221
feat: support key: +{task}[{xcom_key}] use
#593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
key: {task}[{xcom_key}] usekey: +{task}[{xcom_key}] use
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #593 +/- ##
==========================================
- Coverage 93.77% 93.65% -0.13%
==========================================
Files 13 13
Lines 1125 1135 +10
==========================================
+ Hits 1055 1063 +8
- Misses 70 72 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
pankajastro
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add an example DAG here and document this feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for accessing XCom values from decorated tasks with multiple_outputs=True using the syntax +{task}[{xcom_key}]. This extends the existing +{task} functionality to allow fetching specific keys from tasks that return multiple outputs.
- Refactored XCom value replacement logic into a dedicated helper method
- Added regex pattern matching to support the new
+{task}[{xcom_key}]syntax - Updated existing functionality to use the new consolidated approach
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| dev/dags/sample.py | Added sample functions to demonstrate the new XCom key syntax |
| dagfactory/dagbuilder.py | Implemented regex-based parsing for XCom references and consolidated replacement logic |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| return decorator(**decorator_kwargs)(**callable_kwargs) | ||
|
|
||
| @staticmethod | ||
| def _replace_kwargs_values_as_xcom(kwargs: dict(str, Any), key: str, value: Any, tasks_dict: dict(str, Any)): |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method signature uses dict(str, Any) which is incorrect syntax. Should be Dict[str, Any] and requires importing Dict from typing.
| @staticmethod | ||
| def _replace_kwargs_values_as_xcom(kwargs: dict(str, Any), key: str, value: Any, tasks_dict: dict(str, Any)): | ||
| # Match with multiple_outputs=True case with {key}: +{task}["{xcom_key}"] | ||
| _PATTERN = re.compile(r"^\+(?P<task>\w*)(\[['\"](?P<xcom_key>.*)['\"]\])?$") |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The regex pattern allows empty task names with \w*. This should be \w+ to require at least one word character for the task name, preventing matches on invalid references like +[] or +['key'].
| _PATTERN = re.compile(r"^\+(?P<task>\w*)(\[['\"](?P<xcom_key>.*)['\"]\])?$") | |
| _PATTERN = re.compile(r"^\+(?P<task>\w+)(\[['\"](?P<xcom_key>.*)['\"]\])?$") |
| @staticmethod | ||
| def _replace_kwargs_values_as_xcom(kwargs: dict(str, Any), key: str, value: Any, tasks_dict: dict(str, Any)): | ||
| # Match with multiple_outputs=True case with {key}: +{task}["{xcom_key}"] | ||
| _PATTERN = re.compile(r"^\+(?P<task>\w*)(\[['\"](?P<xcom_key>.*)['\"]\])?$") |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The regex pattern (?P<xcom_key>.*) allows any characters including newlines and control characters in the xcom_key. Consider using [^'\"]* or similar to restrict allowed characters and prevent potential injection issues.
| _PATTERN = re.compile(r"^\+(?P<task>\w*)(\[['\"](?P<xcom_key>.*)['\"]\])?$") | |
| _PATTERN = re.compile(r"^\+(?P<task>\w*)(\[['\"](?P<xcom_key>[^'\"]*)['\"]\])?$") |
|
I also add a simple feature docs. You can reorder it if you want. |
Airflow supports use of {decorated_task}[{xcom_key}] for xcoms that are not return_value link
Here I support for fetching upstream task's xcom from decorated task with
multiple_outputs=TrueThe following example has already been tested in Airflow 3.1.0 (and should also works in Airflow 3)