Skip to content

feat: add deployer API for airflow, config params for sfn/batch/k8s#3106

Open
npow wants to merge 5 commits intomasterfrom
npow/core-deployer-changes-clean
Open

feat: add deployer API for airflow, config params for sfn/batch/k8s#3106
npow wants to merge 5 commits intomasterfrom
npow/core-deployer-changes-clean

Conversation

@npow
Copy link
Copy Markdown
Collaborator

@npow npow commented Apr 13, 2026

Summary

  • Add complete Airflow deployer implementation: airflow_client.py, airflow_deployer.py, airflow_deployer_objects.py
  • Implement SFN from_deployment (previously NotImplementedError)
  • Add SFN deployer enhancements: trigger with parameters, status polling, production token handling
  • Add config params for local emulator compatibility (SFN_CLIENT_PARAMS, BATCH_CLIENT_PARAMS, etc.)
  • Register Airflow deployer in plugin __init__.py

This is a foundational PR that enables the Airflow and SFN deployer integration tests in the UX test suite. Several feature PRs depend on this landing first:

Test plan

  • Airflow deployer tests in test/ux/core/ should no longer skip
  • SFN deployer tests should exercise full trigger/status/from_deployment paths
  • Existing unit and integration tests should continue to pass

🤖 Generated with Claude Code

Nissan Pow and others added 5 commits April 13, 2026 16:05
…local emulator compatibility

- Add AirflowDeployer + REST client (airflow_client.py, airflow_deployer.py, airflow_deployer_objects.py)
- Register AirflowDeployer in DEPLOYER_IMPL_PROVIDERS_DESC
- Add BATCH_CLIENT_PARAMS, SFN_CLIENT_PARAMS, SFN_DYNAMO_DB_CLIENT_PARAMS config vars
- Add KUBERNETES_JOB_TTL_SECONDS_AFTER_FINISHED config var
- Add AIRFLOW_REST_API_URL/USERNAME/PASSWORD, AIRFLOW_KUBERNETES_DAGS_PATH/NAMESPACE config vars
- Add --deployer-attribute-file and trigger command to airflow CLI
- Inject METAFLOW_FLOW_CONFIG_VALUE into Airflow pod env
- Fix sfn-local: ResultSelector for Parallel states, omit ProcessorConfig for Map states
- Fix sfn-local: use $$.Execution.Name for METAFLOW_RUN_ID in join steps
- Fix local_storage concurrent manifest writes (atomic rename)
- Add KUBERNETES_JOB_TTL to prevent pod accumulation
- Guard conda_decorator against uninitialized state
- Fix DATATOOLS_CLIENT_PARAMS to use setdefault (not unconditional override)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove xfail markers for sfn-batch (ResultSelector/sfn-local compat)
and airflow-kubernetes (AirflowDeployer) since the deployer
implementation is now in this PR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@npow npow force-pushed the npow/core-deployer-changes-clean branch from 78e00a5 to 446e38e Compare April 13, 2026 23:11
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 13, 2026

Greptile Summary

This PR adds a complete Airflow deployer (airflow_client.py, airflow_deployer.py, airflow_deployer_objects.py), implements StepFunctionsDeployedFlow.from_deployment (previously NotImplementedError), and adds boto3 client_params config knobs for Batch, SFN, and DynamoDB to enable local-emulator testing. The sfn-local compatibility fixes in step_functions.py (JSONPath corrections, conditional ProcessorConfig, ResultSelector on Parallel states) are well-reasoned.

  • P0: test_run_succeeds and test_all_steps_have_tasks in test_localbatch.py were accidentally de-indented out of TestMetaflowE2E; they are now uncollectable module-level functions.
  • P1: Both AirflowDeployedFlow.from_deployment and StepFunctionsDeployedFlow.from_deployment create temp files with delete=False but never clean them up, leaking files on every call.

Confidence Score: 3/5

Not safe to merge as-is: a P0 test regression and a P1 file-leak affect both new and existing test paths.

The P0 de-indentation bug in test_localbatch.py silently disables two E2E tests that were just un-xfailed, defeating the stated goal of the PR. The temp-file leak in both from_deployment implementations is a P1 correctness issue that will accumulate unbounded files in CI and production environments. These two findings together prevent a score above 3.

test/unit/localbatch/test_localbatch.py (de-indented methods), metaflow/plugins/airflow/airflow_deployer_objects.py and metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py (temp-file leaks in from_deployment)

Important Files Changed

Filename Overview
test/unit/localbatch/test_localbatch.py Removed xfail markers but accidentally de-indented two test methods out of their class, making them uncollectable module-level functions with a dangling self parameter.
metaflow/plugins/airflow/airflow_deployer_objects.py New AirflowDeployedFlow/AirflowTriggeredRun objects; from_deployment leaks its temporary flow file due to delete=False without a cleanup path.
metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py Implements from_deployment, get_triggered_run, resume, and status; same temp-file leak as the Airflow counterpart, and status creates two boto3 clients unnecessarily.
metaflow/plugins/airflow/airflow_deployer.py New AirflowDeployer implementation; correctly cleans up its own temp DAG file via try/finally; delegates kubectl cp and REST-API wait to helpers.
metaflow/plugins/airflow/airflow_client.py New thin Airflow 2.x REST API wrapper; minor issue with parameter name body shadowed in the except block, otherwise straightforward.
metaflow/plugins/aws/step_functions/step_functions.py sfn-local compatibility fixes: JSONPath dot-bracket syntax corrections, conditional ProcessorConfig, ResultSelector on Parallel states, and per-emulator retry-strategy trimming.
metaflow/metaflow_config.py Adds BATCH_CLIENT_PARAMS, SFN_CLIENT_PARAMS, SFN_DYNAMO_DB_CLIENT_PARAMS, and Airflow REST API config; Airflow username/password default to "admin"/"admin".
metaflow/plugins/datastores/local_storage.py Race condition fix: catches FileExistsError on concurrent mkdir, safe and correct.
metaflow/plugins/pypi/conda_decorator.py Adds disabled = False class default and a _metaflow_home is None guard to prevent AttributeError; safe defensive fix.
metaflow/plugins/airflow/airflow_cli.py Adds --deployer-attribute-file to create and a new trigger CLI command; straightforward and consistent with the SFN deployer pattern.

Comments Outside Diff (1)

  1. metaflow/plugins/airflow/airflow_client.py, line 258-284 (link)

    P2 Parameter name body shadowed in except block

    The method parameter body (the request payload) is overwritten with the error response bytes inside the except block, which is confusing and makes the original value unavailable for debugging. Rename the exception variable to error_body.

Reviews (1): Last reviewed commit: "test: remove BATCH_CLIENT_PARAMS xfail, ..." | Re-trigger Greptile

Comment on lines +362 to 367
def test_run_succeeds(self, simple_batch_run):
assert simple_batch_run.successful

@_NEEDS_CORE_BATCH_PARAMS
def test_all_steps_have_tasks(self, simple_batch_run):
def test_all_steps_have_tasks(self, simple_batch_run):
step_names = {s.id for s in simple_batch_run.steps()}
assert {"start", "end"} <= step_names
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Test methods de-indented out of class

test_run_succeeds and test_all_steps_have_tasks are now module-level functions instead of class methods. Pytest won't pass them simple_batch_run (a class-scoped fixture) and they carry a self parameter with no caller to supply it, so both tests will be silently skipped or error on collection.

Suggested change
def test_run_succeeds(self, simple_batch_run):
assert simple_batch_run.successful
@_NEEDS_CORE_BATCH_PARAMS
def test_all_steps_have_tasks(self, simple_batch_run):
def test_all_steps_have_tasks(self, simple_batch_run):
step_names = {s.id for s in simple_batch_run.steps()}
assert {"start", "end"} <= step_names
def test_run_succeeds(self, simple_batch_run):
assert simple_batch_run.successful
def test_all_steps_have_tasks(self, simple_batch_run):
step_names = {s.id for s in simple_batch_run.steps()}
assert {"start", "end"} <= step_names

Comment on lines +239 to +248
with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as fake_flow_file:
with open(fake_flow_file.name, "w") as fp:
fp.write(fake_flow_file_contents)

d = Deployer(fake_flow_file.name).airflow(name=identifier)
d.name = identifier
d.flow_name = flow_name
d.metadata = metadata if metadata is not None else get_metadata()

return cls(deployer=d)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Temp file leaked on every from_deployment call

delete=False is used but the file is never removed. Every call to from_deployment (or get_triggered_run, which calls it) permanently leaks a .py file in the system temp directory. The same pattern appears in StepFunctionsDeployedFlow.from_deployment (line 225 of that file).

with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as fake_flow_file:
    tmp_path = fake_flow_file.name
try:
    with open(tmp_path, "w") as fp:
        fp.write(fake_flow_file_contents)
    d = Deployer(tmp_path).airflow(name=identifier)
    d.name = identifier
    d.flow_name = flow_name
    d.metadata = metadata if metadata is not None else get_metadata()
finally:
    try:
        os.unlink(tmp_path)
    except OSError:
        pass
return cls(deployer=d)

Comment on lines +225 to +241
with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as fake_flow_file:
with open(fake_flow_file.name, "w") as fp:
fp.write(fake_flow_file_contents)

d = Deployer(
fake_flow_file.name,
env={"METAFLOW_USER": username},
).step_functions(name=identifier)

d.name = identifier
d.flow_name = flow_name
if metadata is None:
d.metadata = get_metadata()
else:
d.metadata = metadata

return cls(deployer=d)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Temp file leaked on every from_deployment call

Same leak as in AirflowDeployedFlow.from_deployment: the temp file is created with delete=False and never removed. Wrap the usage in a try/finally that calls os.unlink (as shown in the Airflow file comment) or use tempfile.TemporaryDirectory as a scoping mechanism.

Comment on lines 13 to +41
A class representing a triggered AWS Step Functions state machine execution.
"""

@property
def status(self) -> Optional[str]:
"""
Get the status of the triggered execution.

Returns
-------
str, optional
One of RUNNING, SUCCEEDED, FAILED, TIMED_OUT, ABORTED, or None.
"""
try:
from metaflow.plugins.aws.step_functions.step_functions import (
StepFunctions,
)
from metaflow.plugins.aws.step_functions.step_functions_client import (
StepFunctionsClient,
)

_, run_id = self.pathspec.split("/")
execution_name = run_id[4:] # strip "sfn-"
state_machine_name = self.deployer.name

state_machine = StepFunctionsClient().get(state_machine_name)
if state_machine is None:
return None
sm_arn = state_machine["stateMachineArn"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Two StepFunctionsClient() instances created per status call

StepFunctionsClient() is instantiated twice in the status property — once to get the state machine ARN and once to describe_execution. Each instantiation creates a new boto3 client. Reusing a single instance would be more efficient:

sfn = StepFunctionsClient()
state_machine = sfn.get(state_machine_name)
if state_machine is None:
    return None
sm_arn = state_machine["stateMachineArn"]
execution_arn = (
    sm_arn.replace(":stateMachine:", ":execution:") + ":" + execution_name
)
result = sfn.describe_execution(execution_arn)

Comment on lines +498 to +499
# Airflow REST API endpoint, e.g. http://localhost:8090/api/v1
AIRFLOW_REST_API_URL = from_conf("AIRFLOW_REST_API_URL")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Default Airflow credentials are "admin"/"admin"

AIRFLOW_REST_API_USERNAME and AIRFLOW_REST_API_PASSWORD default to "admin" and "admin". While these are meant to be overridden for real deployments, the defaults could cause accidental authentication against a remote Airflow instance using well-known credentials. Consider defaulting to None and raising a clear error at call-time when they are not set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant