Skip to content

feat: automatic job grouping#95

Open
Stellatsuu wants to merge 21 commits intomainfrom
automatic-job-grouping
Open

feat: automatic job grouping#95
Stellatsuu wants to merge 21 commits intomainfrom
automatic-job-grouping

Conversation

@Stellatsuu
Copy link
Contributor

@Stellatsuu Stellatsuu commented Feb 3, 2026

cc @aldbr @arrabito @natthan-pigoux

Closes: #66
Related to: #61

Changes:

  • added input_data: list[str | File] to TransformationSubmissionModel and ProductionSubmissionModel
  • added optional inputs-file parameter to Transformation and Production CLIs:
    dirac-cwl transformation/production submit file.cwl --inputs-file file.yaml
  • renamed parameter-path to input_files: list[str] in Job CLI:
    dirac-cwl job submit file.cwl --input-files file1.yaml file2.yaml ...
  • added group_size executionHooksHint to Transformation Workflows, such as:
hints:
  - class: dirac:ExecutionHooks
    group_size: (int)
  • group_size determines the number of jobs to be created and how many inputs files they will contain in submit_transformation_router, by default, it equals 1, which mean a job will be created for each input in the inputs file. Once the list of jobs is created, it is sent to the job_router and processed.
  • added simple tests and workflows (e.g: count and list the inputs files contained in each created job)
  • quick fix about JobWrapper related tests: task.cwl was created during post_process but never cleared after running tests.

TODO after this PR:

class TransformationSubmissionModel(BaseModel):
"""Transformation definition sent to the router."""

# Allow arbitrary types to be passed to the model
model_config = ConfigDict(arbitrary_types_allowed=True)

task: CommandLineTool | Workflow | ExpressionTool
input_data: Optional[list[str | File] | None] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are going to integrate input sandbox within transformations (#92), it would be interesting to see if we could reuse the JobInputModel (renamed as InputModel?)

Copy link
Contributor Author

@Stellatsuu Stellatsuu Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding @arrabito comments:

  • I agree that we don't need to have input sandbox for now, so it can't be local files.
  • I don't remember how we will add support for sandboxes in the transformation system. For simplicity, I would keep just LFN paths for now.
  • As said before, in my opinion there is no need to support/create sandboxes for now.

Do I still make this change in this PR? Or wouldn't it be better to do it in a (futur) sandbox PR? Maybe I missunderstood what you meant here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this change in a future sandbox PR I would say

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking a little bit further, we may also want to allow local file paths, but only to be used for Local execution (without adding them to SB).

So if the submission is local we allow only local paths, while if the submission is to DIRAC we allow only LFN paths.

In this way, we could also execute transformations locally.

Eventually later on, we will also allow local file paths for DIRAC submission (adding them to ISB).

@aldbr what do you think?

@Stellatsuu
Copy link
Contributor Author

@aldbr Regarding this part of the code:

# Temporary comment
# if transformation_execution_hooks.configuration and transformation_execution_hooks.group_size:
# # Get the metadata class
# transformation_metadata = transformation_execution_hooks.to_runtime(transformation)
#
# # Build the input cwl for the jobs to submit
# logger.info("Getting the input data for the transformation...")
# input_data_dict = {}
# min_length = None
# for input_name, group_size in transformation_execution_hooks.group_size.items():
# # Get input query
# logger.info("\t- Getting input query for %s...", input_name)
# input_query = transformation_metadata.get_input_query(input_name)
# if not input_query:
# raise RuntimeError("Input query not found.")
#
# # Wait for the input to be available
# logger.info("\t- Waiting for input data for %s...", input_name)
# logger.debug("\t\t- Query: %s", input_query)
# logger.debug("\t\t- Group Size: %s", group_size)
# while not (inputs := _get_inputs(input_query, group_size)):
# logger.debug("\t\t- Result: %s", inputs)
# time.sleep(5)
# logger.info("\t- Input data for %s available.", input_name)
# if not min_length or len(inputs) < min_length:
# min_length = len(inputs)
#
# # Update the input data in the metadata
# # Only keep the first min_length inputs
# input_data_dict[input_name] = inputs[:min_length]
#
# # Get the JobModelParameter for each input
# job_model_params = _generate_job_model_parameter(input_data_dict)
# logger.info("Input data for the transformation retrieved!")

Are we planning on keeping it? Just so I un-comment it and make the changes related to the group_size type change.

@Stellatsuu
Copy link
Contributor Author

Waiting on #66 (comment) and #95 (comment) approval about what we're doing, and then, PR should be ready to be fully reviewed (and potentially merged 🙏).

@aldbr
Copy link
Contributor

aldbr commented Feb 10, 2026

@aldbr Regarding this part of the code:

# Temporary comment
# if transformation_execution_hooks.configuration and transformation_execution_hooks.group_size:
# # Get the metadata class
# transformation_metadata = transformation_execution_hooks.to_runtime(transformation)
#
# # Build the input cwl for the jobs to submit
# logger.info("Getting the input data for the transformation...")
# input_data_dict = {}
# min_length = None
# for input_name, group_size in transformation_execution_hooks.group_size.items():
# # Get input query
# logger.info("\t- Getting input query for %s...", input_name)
# input_query = transformation_metadata.get_input_query(input_name)
# if not input_query:
# raise RuntimeError("Input query not found.")
#
# # Wait for the input to be available
# logger.info("\t- Waiting for input data for %s...", input_name)
# logger.debug("\t\t- Query: %s", input_query)
# logger.debug("\t\t- Group Size: %s", group_size)
# while not (inputs := _get_inputs(input_query, group_size)):
# logger.debug("\t\t- Result: %s", inputs)
# time.sleep(5)
# logger.info("\t- Input data for %s available.", input_name)
# if not min_length or len(inputs) < min_length:
# min_length = len(inputs)
#
# # Update the input data in the metadata
# # Only keep the first min_length inputs
# input_data_dict[input_name] = inputs[:min_length]
#
# # Get the JobModelParameter for each input
# job_model_params = _generate_job_model_parameter(input_data_dict)
# logger.info("Input data for the transformation retrieved!")

Are we planning on keeping it? Just so I un-comment it and make the changes related to the group_size type change.

Yes we want to keep it. A transformation should either get inputs from the CLI, or from a DataCatalog/Bookkeeping service.

@Stellatsuu
Copy link
Contributor Author

Stellatsuu commented Feb 10, 2026

I’m also not sure whether the job_grouping workflow is worth keeping. It only counts the file in each job and list them, based on the inputs_file content.
I'm not sure on what to test otherwise than that.

Also, the test_run_transformation_with_inputs_file test only check if the transformation succeed, it doesn't check if the number of groups and jobs created are correct, maybe I should change that? If so, I think it would be better to create a dedicated test for this case, so we still have a more general test to see if transformation with inputs_file succeed or not and a dedicated test for this automatic grouping check.

If you have any ideas.

@Stellatsuu
Copy link
Contributor Author

@aldbr Regarding this part of the code:

# Temporary comment
# if transformation_execution_hooks.configuration and transformation_execution_hooks.group_size:
# # Get the metadata class
# transformation_metadata = transformation_execution_hooks.to_runtime(transformation)
#
# # Build the input cwl for the jobs to submit
# logger.info("Getting the input data for the transformation...")
# input_data_dict = {}
# min_length = None
# for input_name, group_size in transformation_execution_hooks.group_size.items():
# # Get input query
# logger.info("\t- Getting input query for %s...", input_name)
# input_query = transformation_metadata.get_input_query(input_name)
# if not input_query:
# raise RuntimeError("Input query not found.")
#
# # Wait for the input to be available
# logger.info("\t- Waiting for input data for %s...", input_name)
# logger.debug("\t\t- Query: %s", input_query)
# logger.debug("\t\t- Group Size: %s", group_size)
# while not (inputs := _get_inputs(input_query, group_size)):
# logger.debug("\t\t- Result: %s", inputs)
# time.sleep(5)
# logger.info("\t- Input data for %s available.", input_name)
# if not min_length or len(inputs) < min_length:
# min_length = len(inputs)
#
# # Update the input data in the metadata
# # Only keep the first min_length inputs
# input_data_dict[input_name] = inputs[:min_length]
#
# # Get the JobModelParameter for each input
# job_model_params = _generate_job_model_parameter(input_data_dict)
# logger.info("Input data for the transformation retrieved!")

Are we planning on keeping it? Just so I un-comment it and make the changes related to the group_size type change.

Yes we want to keep it. A transformation should either get inputs from the CLI, or from a DataCatalog/Bookkeeping service.

Since group_size is now an int, this code is kinda broken now, no? I need an input_name to retrieve the input_query: input_query = transformation_metadata.get_input_query(input_name), where would this value be now? In a Transformation hint? As a list of input_names?

@arrabito
Copy link
Contributor

Since group_size is now an int, this code is kinda broken now, no? I need an input_name to retrieve the input_query: input_query = transformation_metadata.get_input_query(input_name), where would this value be now? In a Transformation hint? As a list of input_names?

As far as I see, I'm not sure that any input_name is needed anymore.

In the current QueryBasedPlugin, input_name is just used to build the LFN path, see:

https://github.com/DIRACGrid/dirac-cwl/blob/main/src/dirac_cwl/execution_hooks/plugins/core.py#L37C31-L37C41

Probably we could just change get_input_query to not take any argument and just build LFN path as:

/query_root/vo/campaign/site/data_type

instead of:

/query_root/vo/campaign/site/data_type/input_name

Then, I guess that the group_size in yaml file should be specified as:

- class: dirac:ExecutionHooks
  hook_plugin: QueryBasedPlugin
  group_size: 5

instead of:

- class: dirac:ExecutionHooks
  hook_plugin: QueryBasedPlugin
  group_size:
    input-data: 5

@aldbr do you agree?

(Maybe some other changes are needed that I haven't thought).

@aldbr
Copy link
Contributor

aldbr commented Feb 10, 2026

Since group_size is now an int, this code is kinda broken now, no? I need an input_name to retrieve the input_query: input_query = transformation_metadata.get_input_query(input_name), where would this value be now? In a Transformation hint? As a list of input_names?

As far as I see, I'm not sure that any input_name is needed anymore.

In the current QueryBasedPlugin, input_name is just used to build the LFN path, see:

https://github.com/DIRACGrid/dirac-cwl/blob/main/src/dirac_cwl/execution_hooks/plugins/core.py#L37C31-L37C41

Probably we could just change get_input_query to not take any argument and just build LFN path as:

/query_root/vo/campaign/site/data_type

instead of:

/query_root/vo/campaign/site/data_type/input_name

Then, I guess that the group_size in yaml file should be specified as:

- class: dirac:ExecutionHooks
  hook_plugin: QueryBasedPlugin
  group_size: 5

instead of:

- class: dirac:ExecutionHooks
  hook_plugin: QueryBasedPlugin
  group_size:
    input-data: 5

@aldbr do you agree?

(Maybe some other changes are needed that I haven't thought).

Yes I agree. In any case, this is going to be revised at some point with the hints proposed in #69

@Stellatsuu Stellatsuu self-assigned this Feb 17, 2026
@Stellatsuu
Copy link
Contributor Author

Current PR status:

@Stellatsuu
Copy link
Contributor Author

I don't know how to fix current lint error. I had to rebase and reword all my "wrong" commits and it's still not working because of old commits already pushed.

@Stellatsuu
Copy link
Contributor Author

Stellatsuu commented Mar 3, 2026

Also, PyPi is failling on something I didn't touch directly (mostly sure about that), so I don't really know what to do about that too. If you have any ideas

scripts/generate_schemas.py:70: error: Dict entry 0 has incompatible type "str": "type[JobInputModel]"; expected "str": "type[BaseModel]"  [dict-item]
Found 1 error in 1 file (checked 54 source files)
 models.update(
            {
                "JobInputModel": JobInputModel, # <--- error happens here
                "JobSubmissionModel": JobSubmissionModel,
                "TransformationSubmissionModel": TransformationSubmissionModel,
                "ProductionSubmissionModel": ProductionSubmissionModel,
            }
        )
class JobInputModel(BaseModel): # <-- it's a BaseModel ?
    """Input data and sandbox files for a job execution."""

    # Allow arbitrary types to be passed to the model
    model_config = ConfigDict(arbitrary_types_allowed=True)

    sandbox: list[str] | None
    cwl: dict[str, Any]

    @field_serializer("cwl")
    def serialize_cwl(self, value):
        """Serialize CWL object to dictionary.

        :param value: CWL object to serialize.
        :return: Serialized CWL dictionary.
        """
        return save(value)

@Stellatsuu Stellatsuu closed this Mar 3, 2026
@Stellatsuu Stellatsuu force-pushed the automatic-job-grouping branch from f9337a8 to 52e6144 Compare March 3, 2026 15:26
@Stellatsuu Stellatsuu reopened this Mar 3, 2026
@Stellatsuu Stellatsuu marked this pull request as ready for review March 3, 2026 15:29
@Stellatsuu Stellatsuu force-pushed the automatic-job-grouping branch from 99df614 to 3a1314c Compare March 6, 2026 11:54
# Conflicts:
#	src/dirac_cwl/job/job_wrapper.py
#	test/test_integration.py
#	test/test_job_wrapper.py
@Stellatsuu Stellatsuu deployed to github-pages March 16, 2026 13:06 — with GitHub Actions Active
Copy link
Contributor

@aldbr aldbr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this PR was opened, there has been some changes in the design of the hints so we should be pragmatic and focus on the user interfaces mostly.

The input_data dictionary maps CWL input parameter names to lists of files. This avoids hardcoding any key name (like input-data) and makes the mapping between hint data and workflow inputs explicit.

CLI convenience: --inputs-file

Users can provide a standard CWL inputs YAML file via the CLI:

dirac-cwl transformation submit workflow.cwl --inputs-file data.yaml

Where data.yaml is:

simulation-files:
  - /lfn/path/file1.root
  - /lfn/path/file2.root
  - /lfn/path/file3.root

The CLI reads this file and populates dirac:ExecutionHooks.input_data in the hint before submission. This means:

  • The router always reads input data from one place (the hint)
  • The --inputs-file flag is syntactic sugar for populating the hint:
hints:
  - class: dirac:ExecutionHooks
    group_size: 5
    input_data:
      simulation-files:
        - /lfn/path/file1.root
        - /lfn/path/file2.root
        - /lfn/path/file3.root
  • If the hint already contains input_data AND --inputs-file is provided, emit a warning (--input_file should override input_data).
  • The file format is a standard CWL inputs YAML, nothing new to learn. For simplicity, it should contain only 1 parameter I think (unless CTAO has a use case where they would need multiple static lists of inputs?).

Dynamic queries (what we already have with configuration)

For transformations that discover inputs at runtime (e.g., from upstream transformation outputs), the hint uses a plugin-based configuration instead:

hints:
  - class: dirac:ExecutionHooks
    group_size: 5
    input_query:
      plugin: QueryBasedPlugin
      config:
        query_root: "."
        campaign: "pi"
        data_type: "100"

A transformation receives dynamic inputs from one upstream transformation only.

Mutual exclusivity

input_data (static) and input_query (dynamic) cannot coexist on the same transformation. The system should raise a clear error:

Cannot specify both static input data and dynamic input query.
Use input_data (or --inputs-file) for standalone transformations with known files.
Use input_query for transformations that discover inputs from upstream outputs.

Which input gets split?

A transformation receives input data through one file-array input only, whether static (input_data with a single key) or dynamic (input_query from one upstream transformation). group_size applies to that input.

If input_data contains multiple keys, raise an error. Multiple dynamic input sources per transformation are not supported, each transformation queries one upstream transformation only.

Non-file inputs (scalars, non-array types) defined in the workflow's regular CWL inputs are passed unchanged to every job.

Productions

Static inputs and the first transformation convention

When a production is submitted with --inputs-file, the static input data is passed to the first transformation by convention. This follows the natural pipeline structure:

t1 (receives static inputs) -> t2 (queries t1 outputs) -> t3 (queries t2 outputs)

Downstream transformations typically get their inputs via dynamic queries against the DataCatalog/Bookkeeping service.

In that way we start simple. If it's enough to cover CTAO and LHCb use cases, then it's fine. If we need something more complex (multiple input queries/data), then we can reevaluate based on the use cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like that shouldn't have been changed like that (we more and more need to update the lock file from github 😅 )

Copy link
Contributor Author

@Stellatsuu Stellatsuu Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add #22 to the current sprint? I don't think it'll take a lot of time (might be a weight 1)

Comment on lines -42 to -46
if os.getenv("DIRAC_PROTO_LOCAL") == "1":
from dirac_cwl.mocks.sandbox import create_sandbox, download_sandbox # type: ignore[no-redef]
from dirac_cwl.mocks.status import JobReportMock
else:
from diracx.api.jobs import create_sandbox, download_sandbox # type: ignore[no-redef]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you move these lines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had issues during tests: #95 (comment)
The problem was that the import weren't working correctly. IIRC, the value of os.getenv wasn't set yet when the code was called in the tests.

So, the JobWrapper would import diracx.api.jobs instead of dirac_cwl.mocks... because os.getenv("DIRAC_PROTO_LOCAL") was != "1" (None or 0 I don't remember)

Comment on lines +77 to +80
@property
def job_path(self):
"""Return the job path."""
return self._job_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where and why do you need this getter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This getter is used in this fixture: conftest.py

@pytest.fixture
def job_wrapper():
    """Create a JobWrapper instance and cleanup test files."""
    job_wrapper = JobWrapper(job_id=0)
    yield job_wrapper

    task_file = job_wrapper.job_path / "task.cwl"
    task_file.unlink(missing_ok=True)

It’s used to clean up any files created during tests. As mentioned in one of our discussions, running JobWrapper-related tests generates a task.cwl file in the project directory, which isn’t automatically removed. This fixture handles that cleanup and can also provide a job_wrapper instance for use in tests when needed.
I had to create it since now, job_path is a private attribute.

raise ValueError(f"Invalid DIRAC hints:\n{exc}") from exc

# Inputs from Transformation inputs_file
if transformation.input_data:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess input data should come either from input_data or from the query parameters but not both (I would make if/else instead of if/if).
What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, more like a if, elseif?
So we can keep this condition just in case: transformation_execution_hooks.configuration and transformation_execution_hooks.group_size or it's not needed anymore?

@Stellatsuu
Copy link
Contributor Author

Since this PR was opened, there has been some changes in the design of the hints so we should be pragmatic and focus on the user interfaces mostly.

The input_data dictionary maps CWL input parameter names to lists of files. This avoids hardcoding any key name (like input-data) and makes the mapping between hint data and workflow inputs explicit.

CLI convenience: --inputs-file

Users can provide a standard CWL inputs YAML file via the CLI:

dirac-cwl transformation submit workflow.cwl --inputs-file data.yaml

Where data.yaml is:

simulation-files:
  - /lfn/path/file1.root
  - /lfn/path/file2.root
  - /lfn/path/file3.root

The CLI reads this file and populates dirac:ExecutionHooks.input_data in the hint before submission. This means:

  • The router always reads input data from one place (the hint)
  • The --inputs-file flag is syntactic sugar for populating the hint:
hints:
  - class: dirac:ExecutionHooks
    group_size: 5
    input_data:
      simulation-files:
        - /lfn/path/file1.root
        - /lfn/path/file2.root
        - /lfn/path/file3.root
  • If the hint already contains input_data AND --inputs-file is provided, emit a warning (--input_file should override input_data).
  • The file format is a standard CWL inputs YAML, nothing new to learn. For simplicity, it should contain only 1 parameter I think (unless CTAO has a use case where they would need multiple static lists of inputs?).

If I understand correctly, input_data is only defined in the hint now (at CLI-level) and shouldn't be part of SubmissionModels?

@DIRACGrid DIRACGrid deleted a comment from aldbr Mar 24, 2026
@Stellatsuu
Copy link
Contributor Author

Stellatsuu commented Mar 25, 2026

@aldbr I'm not sure to understand how input_query works.

Currently, we have this:

class ExecutionHooksHint(BaseModel, Hint):
    configuration: Dict[str, Any] = Field(
        default_factory=dict, description="Additional parameters for metadata plugins"
    )

You said:

Dynamic queries (what we already have with configuration)

So, input_query is managed by configuration..?

But then, you said this:

hints:
  - class: dirac:ExecutionHooks
    group_size: 5
    input_query:
      plugin: QueryBasedPlugin
      config:
        query_root: "."
        campaign: "pi"
        data_type: "100"

What is the "relation" between input_query and configuration?

I had to add this to be able to check if input_query and input_data aren't both declared in the CLI, before submission:

class TransformationExecutionHooksHint(ExecutionHooksHint):
    """Extended data manager for transformations."""

    group_size: Optional[int] = Field(default=None, description="Input grouping configuration for transformation jobs")
    input_data: Optional[Dict[str, List[str]]] = Field(default=None, description="Input data for transformation jobs")
    input_query: Optional[Dict] = Field(default=None, description="Input query for transformation jobs")

Because I'm calling:

task = load_document(pack(task_path))
execution_hooks = TransformationExecutionHooksHint.from_cwl(task)

        # Input_query and input_data are mutually exclusive
        if execution_hooks.input_query and (execution_hooks.input_data or inputs_file):
           console.print("Error.....")
           return typer.Exit(code=1)

But if input_query is part of configuration for example, this wouldn't work..?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support data-processing transformations specified through a static list of input files

4 participants