Skip to content

Commit 1a1e99d

Browse files
author
Nissan Pow
committed
feat: extract parameter info in SFN from_deployment for richer flow reconstruction
The from_deployment classmethod now extracts parameter names and types from the METAFLOW_DEFAULT_PARAMETERS environment variable stored in the state machine definition, enabling the reconstructed fake flow file to include proper Parameter declarations.
1 parent 13f6194 commit 1a1e99d

1 file changed

Lines changed: 43 additions & 5 deletions

File tree

metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,17 +209,55 @@ def from_deployment(cls, identifier: str, metadata: Optional[str] = None):
209209

210210
# Extract flow metadata stored in the start state's Parameters.
211211
try:
212-
start = json.loads(workflow["definition"])["States"]["start"]
213-
parameters = start["Parameters"]["Parameters"]
214-
flow_name = parameters.get("metaflow.flow_name", "")
215-
username = parameters.get("metaflow.owner", "")
212+
definition = json.loads(workflow["definition"])
213+
start = definition["States"]["start"]
214+
batch_params = start["Parameters"]["Parameters"]
215+
flow_name = batch_params.get("metaflow.flow_name", "")
216+
username = batch_params.get("metaflow.owner", "")
216217
except (KeyError, json.JSONDecodeError):
217218
raise MetaflowException(
218219
"Could not extract flow metadata from state machine: %s" % identifier
219220
)
220221

222+
# Extract parameter info from the start state's environment variables.
223+
# METAFLOW_DEFAULT_PARAMETERS is a JSON dict of {param_name: default_value}.
224+
param_info = {}
225+
try:
226+
env_vars = (
227+
start.get("Parameters", {})
228+
.get("ContainerOverrides", {})
229+
.get("Environment", [])
230+
)
231+
env_dict = {item.get("Name"): item.get("Value") for item in env_vars}
232+
default_params_str = env_dict.get("METAFLOW_DEFAULT_PARAMETERS")
233+
if default_params_str:
234+
default_params = json.loads(default_params_str)
235+
for pname, pvalue in default_params.items():
236+
# Infer type from the default value
237+
if isinstance(pvalue, bool):
238+
ptype = "bool"
239+
elif isinstance(pvalue, int):
240+
ptype = "int"
241+
elif isinstance(pvalue, float):
242+
ptype = "float"
243+
else:
244+
ptype = "str"
245+
param_info[pname] = {
246+
"name": pname,
247+
"python_var_name": pname,
248+
"type": ptype,
249+
"description": "",
250+
"is_required": False,
251+
}
252+
# If METAFLOW_PARAMETERS env var is present, there are parameters
253+
# even if they don't have defaults. We already captured those with
254+
# defaults above; required params without defaults would not appear
255+
# in METAFLOW_DEFAULT_PARAMETERS but the flow still has them.
256+
except (KeyError, json.JSONDecodeError, TypeError):
257+
pass # best-effort extraction; proceed with empty param_info
258+
221259
fake_flow_file_contents = generate_fake_flow_file_contents(
222-
flow_name=flow_name, param_info={}, project_name=None
260+
flow_name=flow_name, param_info=param_info, project_name=None
223261
)
224262

225263
with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as fake_flow_file:

0 commit comments

Comments
 (0)