Input/Output Sandbox/Data Management#39
Conversation
84f6dce to
d09e285
Compare
aldbr
left a comment
There was a problem hiding this comment.
Thanks @Loxeris!
As you can see in my comments, I am not fully understanding the concepts.
Could you add more documentation please? (I know there is none at the moment... 😅)
And could you explain why the approach is so different from what we have in the JDL please?
6b8dfc5 to
0917cbd
Compare
src/dirac_cwl_proto/execution_hooks/data_management/file_catalog.py
Outdated
Show resolved
Hide resolved
src/dirac_cwl_proto/execution_hooks/DataManagement/DataManager.py
Outdated
Show resolved
Hide resolved
src/dirac_cwl_proto/execution_hooks/DataManagement/DataManager.py
Outdated
Show resolved
Hide resolved
src/dirac_cwl_proto/execution_hooks/DataManagement/FileCatalog.py
Outdated
Show resolved
Hide resolved
ff019a7 to
6403918
Compare
src/dirac_cwl_proto/execution_hooks/data_management/data_manager.py
Outdated
Show resolved
Hide resolved
src/dirac_cwl_proto/execution_hooks/data_management/file_catalog.py
Outdated
Show resolved
Hide resolved
|
I'm not sure I understand the approach here. I was under the impression that we've agreed to reuse DIRAC components as much as possible, however I see re-implementation of them under the new DataManagement package... Besides, I thought we agreed on @arrabito proposal from #37 (comment):
with one caveat: instead of data catalog, it is a DataManager instance. |
I kept |
6403918 to
9020f2e
Compare
|
We discussed with @Loxeris about removing First of all the name is misleading, as it does not get a query but a list of directories where input files are located, so if we want to keep it I propose to rename it to Then as reminder, this method is used to mimic the remote execution of data-processing transformations, i.e. transformations with input files. It builds a list of input directories where the input files of a transformation are expected to be. Then the submission command creates the different jobs to treat these inputs as soon as they appear in these input data directories and it submits the jobs locally. Note also that in the current implementation of the Now it’s good to mimic the transformation execution locally, but I’m not sure that we need to keep such complexity, i.e. dynamic jobs creation as soon as input files appear. As a reminder, the DIRAC TS supports 2 ways to specify transformations inputs :
So I propose that if we want to keep transformation local execution, we could just support either transformations without input files either data-processing transformations with a static list of input files (supposed to be already present on the local host). We could then implement 2 submission client classes for transformations, as it’s done for job submission, i.e. one for Local Submission and one for DIRAC submission (I’m going to open a separate issue about the implementation of the DIRAC submission client for transformations). For the local submission, the eventual input files could be specified in a dedicated field of the that would be retrieved from the input metadata yaml file passed to the CLI:
and Any opinion? |
|
About : |
I am not sure to understand why it's not convenient, do you have an example?
It's mostly used when we launch a production with multiple transformations.
So how do we deal with productions? Does it mean it's not so useful for you?
Ok great! |
For me it's not practical because you have to implement a specific plugin for each different workflow.
I see, it's true that for production submission it makes sense to have such a feature. However, I don't really like the fact that the get input files logic is implemented in a hook plugin, because as I said before in diracx it will be handled by a diracx task. Maybe we should think at an alternative to keep this feature but not in the logic of the hook plugin.
You mean dealing with productions for local execution?
I wrote DIRAC to mean diracx. |
This method indeed should be removed. This kind of functionality is implemented by the DIRAC's DataManager that we mock here and that's a class member of the |
|
I responded into #61 (comment) |
24e04d2 to
ecdd23f
Compare
ecdd23f to
6d2ad79
Compare
| """Auto-derive hook plugin identifier from class name.""" | ||
| return cls.__name__ | ||
|
|
||
| def download_lfns( |
There was a problem hiding this comment.
This implementation of download_lfns violates the Single Responsibility Principle (SRP) for the execution hook plugin.
The hook's responsibility is solely job preparation metadata, executed client-side. Data management functionality, like downloading LFNs, belongs within the DataManager or FileCatalog architecture.
Implementing local downloads here creates unnecessary overhead, as data access within the job environment on the CE node is handled differently.
Please remove this logic from the hook plugin.
There was a problem hiding this comment.
This method simply calls the DataManager method for each input data.
I guess the method itself is not really needed and the code could be simply moved to pre-process.
There was a problem hiding this comment.
I don't think that having this code anywhere in the execution hook is a good idea, as it will download files locally to the submitter PC and not to the computing element where the job will run. In the pre-process you may e.g. create a list of LFNs by querying the FC (via data manager) using hook's parameters for query constraints, but I don't think downloading the files should happen here. @aldbr @arrabito please comment
There was a problem hiding this comment.
The execution hook and the pre-process code isn't supposed to be run on the submitter PC outside of local testing as far as I understand. This is supposed to be to be used by the JobWrapper running on the computing element.
There was a problem hiding this comment.
but we also have hooks for transformations, that could e.g. configure how many inputs per job the transformation should have. In this case, they would run at the launch site, not on CE, aren't they?
There was a problem hiding this comment.
I'm not 100% sure if this should be considered an Execution Hook. For me the hints related to transformation shouldn't need plugins and different pre/post-process.
Maybe the value of these hints can also be used during the pre-process and post-process of specific plugins? We might have to discuss this during next meeting.
There was a problem hiding this comment.
I don't think that having this code anywhere in the execution hook is a good idea, as it will download files locally to the submitter PC and not to the computing element where the job will run. In the pre-process you may e.g. create a list of LFNs by querying the FC (via data manager) using hook's parameters for query constraints, but I don't think downloading the files should happen here. @aldbr @arrabito please comment
I think there is some misunderstanding indeed. I think that we all agree that data management functionalities should be delegated to a Data Manager, which is class member of the ExecutionHooksBasePlugin. But the actual downlaod of input files still happens in the pre-processing step, isn't it? Just the implementation of the download should not be part of the pre_process method but it's still in this method that we would call the Data Manager 'getFile' to download files. Is that correct?
Now, for the execution on the worker node, the Data Manager would be the DIRAC Data Manager, while for local execution the Data Manager would be a Dummy DataManager which deals with local files.
Is that correct?
Then about creating a list of LFNs by querying the FC (via data manager) using hook's parameters for query constraints. This is specific to transformations. The implementation it's again part of the Data Manager but it doesn't happen neither on the PC submitter, neither on the worker node, but in a TS Agent on DIRAC server.
So for me it's not part of the pre-processing.
There was a problem hiding this comment.
But the actual downlaod of input files still happens in the pre-processing step, isn't it? Just the implementation of the download should not be part of the pre_process method but it's still in this method that we would call the Data Manager 'getFile' to download files. Is that correct?
Yes indeed, this download_lfns method aims at being executed within the JobWrapper.pre_process(), before we execute the pre execution commands.
I have a few comments though:
-
I think this method is generic, and should go within the
JobWrapper(do we want to let communities override that? I don't think so) -
Just wondering: do you implement your own method for the sake of simplicity? Because in the
DIRACJobWrapperwe are using theDownloadInputDatamodule (just in case you were not aware):At some point, we want to reuse that mechanism (LHCb does not download the inputs but stream them using another input data policy)
Now, for the execution on the worker node, the Data Manager would be the DIRAC Data Manager, while for local execution the Data Manager would be a Dummy DataManager which deals with local files.
Is that correct?
From what I understand yes.
Then about creating a list of LFNs by querying the FC (via data manager) using hook's parameters for query constraints. This is specific to transformations. The implementation it's again part of the Data Manager but it doesn't happen neither on the PC submitter, neither on the worker node, but in a TS Agent on DIRAC server.
So for me it's not part of the pre-processing.
Yes indeed
There was a problem hiding this comment.
I think this method is generic, and should go within the JobWrapper
True, I didn't think about that
Just wondering: do you implement your own method for the sake of simplicity? Because in the DIRAC JobWrapper we are using the DownloadInputData module (just in case you were not aware)
I wasn't aware, I still don't quite understand the link between InputDataResolution and DownloadInputData if I'm completely honest
There was a problem hiding this comment.
Yes this is not easy to understand.
In vanilla DIRAC we support 2 ways of importing input data in the JobWrapper:
DownloadInputData: theJobWrapperdownloads data from LFNsInputDataProtocol: theJobWrapperstreams the inputs (in LHCb, we create aPoolXMLCatalogcontaining, for each LFN, a list of PFNs that can be used (even in parallel) to get data of interest)
Note: if you look carefully, you will see that these 2 classes have the same structure (same public methods and signatures).
The JobWrapper either get the method from the job arguments (JDL), or from the InputDataResolution module that also look into the job arguments, or in the DIRAC configuration.
|
|
||
| Parameters | ||
| ---------- | ||
| output_name : str |
There was a problem hiding this comment.
I do not understand the parameters. It seems that multiple files can result in a single LFN?
There was a problem hiding this comment.
this is the output name in the cwl file, referring to one or multiple files.
I think the LFN should be the folder(?) where the data is stored in the FileCatalog.
There was a problem hiding this comment.
I think it was left before just to reduce the size of the refactoring, but since now you're refactoring this part, we should update the signature (I believe, here it should be just removed, as it looks like it is superseded by src_path)
There was a problem hiding this comment.
output_name is the name of the output in cwl and src_path the file or list of file associated to this output.
I have used output_name in the output_paths hint to map an output to a LFN directory and in the sandbox_output hint to know if the files should be stored in the output sandbox.
| "description": "Registry key for the metadata implementation class", | ||
| "title": "Hook Plugin", | ||
| "type": "string" | ||
| }, |
There was a problem hiding this comment.
this is generated file iirc, should not be modified, but regenerated in the CI.
There was a problem hiding this comment.
Oh okay, I regenerated it using pixi run schemas but I didn't know it was done in the CI.
There was a problem hiding this comment.
I think we should remove it from the repo, and get it only generated automatically
There was a problem hiding this comment.
I get it will be regenerated by the CI automatically, but why should it be removed from the repo ?
I think it's technically used in some workflows (i.e test/workflows/test_meta/test_meta.cwl), even though I don't think it's mandatory
There was a problem hiding this comment.
the schemas should be eventually published on a static website and be a part of deployment CI/CD. If needed locally, they should be generated as a post-install action, but they should not be stored in the version control as this is derived product, and only one source of truth should be provided. Leaving them committed separately may lead to a potential divergence between the schema and API implementation.
There was a problem hiding this comment.
This seems reasonable to me but maybe this should have its own issue and PR.
There was a problem hiding this comment.
Some tests are based on them though.
What if:
- one is modifying the
ExecutionHooksHintmodel - pushing to the repo
- CI detects that the new JSON schema is not in sync with this one and force the user to run the pixi commands and push again, until the new JSON schema matches with this one.
That would allow to keep a good dev experience, because if you don't keep this local copy:
- one modifies the
ExecutionHooksHint - push to the repo
- CI tests are failing because the pydantic model changed
- one has to understand what's wrong, then generate the new JSON schema, make tests but changing the reference to point to the local JSON schema which will not be uploaded to github.
- one pushes again, tests are still failing because the JSON schema deployed is the current one, not the new one
Any opinion?
src/dirac_cwl_proto/job/__init__.py
Outdated
| return files_path | ||
|
|
||
|
|
||
| def get_lfns(input_data: dict[str, Any]) -> dict[str, Path | list[Path]]: |
There was a problem hiding this comment.
if we keep get_input_query, and in general stage data during the pre-processing, what's the purpose of this method? Isn't it a duplication of the get_input_query?
There was a problem hiding this comment.
This method parse inputs of the cwl file to differentiate LFNs and local files, on the client side, before submission.
This is necessary to have only lfns in the lfns_input of the JobInputModel.
There was a problem hiding this comment.
While get_input_query builds the paths/LFNs of inputs on the FileCatalog needed for a transformation.
There was a problem hiding this comment.
Sorry I don't remember: why do we need to separate lfns_input exactly? (at some point, we may need to have them separated indeed, to schedule the jobs where the input data is; I am just wondering if we should separate them on the client side)
There was a problem hiding this comment.
It's not implemented yet, but what I had understood is that the conversion to JDL may need the list of LFNS / InputData for Scheduling purposes.
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class MockSandboxStoreClient(SandboxStoreClient): |
There was a problem hiding this comment.
Have you tried to "mock" the diracx sandbox store client rather than the DIRAC one? I think it would be preferable since it exists and the new JobWrapper is meant to be integrated to diracx eventually.
There was a problem hiding this comment.
I didn't try, it does seem preferable indeed.
| break | ||
| if res and not res["OK"]: | ||
| raise RuntimeError( | ||
| f"Could not save file {src} with LFN {str(lfn)} : {res['Message']}" |
There was a problem hiding this comment.
Do we want to specify store_output() in the QueryBasedPlugin?
I haven't checked carefully (so please let me know if I'm wrong), but it seems to be a copy paste of what is defined in ExecutionHooksHint.
If it's a duplicate, I would remove it from here.
There was a problem hiding this comment.
It's nearly a copy paste, with the exception of cwl outputs not present in output_paths being stored on the FileCatalog, with the path returned by get_output_query() if any.
If (or when) get_output_query() is deleted, it would be the same as ExecutionHooksHint and would be removed.
| """Auto-derive hook plugin identifier from class name.""" | ||
| return cls.__name__ | ||
|
|
||
| def download_lfns( |
There was a problem hiding this comment.
But the actual downlaod of input files still happens in the pre-processing step, isn't it? Just the implementation of the download should not be part of the pre_process method but it's still in this method that we would call the Data Manager 'getFile' to download files. Is that correct?
Yes indeed, this download_lfns method aims at being executed within the JobWrapper.pre_process(), before we execute the pre execution commands.
I have a few comments though:
-
I think this method is generic, and should go within the
JobWrapper(do we want to let communities override that? I don't think so) -
Just wondering: do you implement your own method for the sake of simplicity? Because in the
DIRACJobWrapperwe are using theDownloadInputDatamodule (just in case you were not aware):At some point, we want to reuse that mechanism (LHCb does not download the inputs but stream them using another input data policy)
Now, for the execution on the worker node, the Data Manager would be the DIRAC Data Manager, while for local execution the Data Manager would be a Dummy DataManager which deals with local files.
Is that correct?
From what I understand yes.
Then about creating a list of LFNs by querying the FC (via data manager) using hook's parameters for query constraints. This is specific to transformations. The implementation it's again part of the Data Manager but it doesn't happen neither on the PC submitter, neither on the worker node, but in a TS Agent on DIRAC server.
So for me it's not part of the pre-processing.
Yes indeed
| new_paths[input_name] = [paths[lfn] for lfn in paths] | ||
| return new_paths | ||
|
|
||
| def update_inputs(self, inputs: Any, updates: dict[str, Path | list[Path]]): |
There was a problem hiding this comment.
As my previous comment, this should probably go in JobWrapper itself.
| sandbox_path = Path("sandboxstore") / f"{sandbox}.tar.gz" | ||
| with tarfile.open(sandbox_path, "r:gz") as tar: | ||
| tar.extractall(job_path, filter="data") | ||
| self.execution_hooks_plugin._sandbox_store_client.downloadSandbox( |
There was a problem hiding this comment.
Please try to use diracx-api methods (or at least the structure) if you can
There was a problem hiding this comment.
Out of curiosity, don't you check your environment variable to use either the fake sandbox store methods or the real ones (from diracx)?
There was a problem hiding this comment.
I will try to use the diracx methods.
I didn't need to check the environment variable, because the version is chosen during the sandbox store instanciation, before this code runs.
There was a problem hiding this comment.
Ah yes I see. Now I think the upload of the outputs in the sandbox should also happen in the JobWrapper because it's very generic.
So I think it makes sense to move the sandbox_store_client there. What do you think?
There was a problem hiding this comment.
I guess it's pretty generic. I don't know the communities have a need to change the content of the sandbox. If not then moving it to the JobWrapper would be great.
Switching to the diracx implementation should remove the need for a sandbox_store_client instance entirely as the methods are just part of a module and not a class I think. (I'll have to find a way to mock them though).
| file.path = file.path.split("/")[-1] | ||
| if not self.execution_hooks_plugin: | ||
| raise RuntimeError("Could not download input data") | ||
| self.execution_hooks_plugin.download_lfns(arguments, job_path) |
There was a problem hiding this comment.
As I said, I think for now the content of execution_hooks_plugin.download_lfns should go here, if I don't say anything wrong, it's something that all communities share, I don't see why we would override it (at least for now).
| for input_name, group_size in transformation_execution_hooks.group_size.items(): | ||
| # Get input query | ||
| logger.info(f"\t- Getting input query for {input_name}...") | ||
| assert isinstance(transformation_metadata, QueryBasedPlugin) |
There was a problem hiding this comment.
Not sure to understand why you need that. Can you explain?
Because it's not something we want to keep in the code, unless I misunderstand something?
There was a problem hiding this comment.
Ah yes I think I get it, it's because your moved get_input_query to QueryBasedPlugin right?
There was a problem hiding this comment.
I would prefer to have get_input_query in ExecutionHooksHintPlugin rather than in QueryBasedPlugin to avoid that (we are experimenting LHCb workflows on our side, and this would prevent us from executing a transformation)
There was a problem hiding this comment.
Okay, but I don't think there will be any difference between ExecutionHooksBasePlugin and QueryBasedPlugin if I move the query methods back.
There was a problem hiding this comment.
Please correct me if I'm wrong, but we would not have this isinstance line here if we move it back to ExecutionHooksBasePlugin, would we?
There was a problem hiding this comment.
You're right it wouldn't be there.
I'm just questioning the purpose of QueryBasedPlugin if there's no difference with the base class
There was a problem hiding this comment.
Ah yes I see.
Well, I guess ExecutionHooksHintPlugin is expected to be abstract, unusable per se (well it is not abstract in practice).
And QueryBasedPlugin is just 1 concrete and simple example with no pre/post execution commands.
| "description": "Registry key for the metadata implementation class", | ||
| "title": "Hook Plugin", | ||
| "type": "string" | ||
| }, |
There was a problem hiding this comment.
Some tests are based on them though.
What if:
- one is modifying the
ExecutionHooksHintmodel - pushing to the repo
- CI detects that the new JSON schema is not in sync with this one and force the user to run the pixi commands and push again, until the new JSON schema matches with this one.
That would allow to keep a good dev experience, because if you don't keep this local copy:
- one modifies the
ExecutionHooksHint - push to the repo
- CI tests are failing because the pydantic model changed
- one has to understand what's wrong, then generate the new JSON schema, make tests but changing the reference to point to the local JSON schema which will not be uploaded to github.
- one pushes again, tests are still failing because the JSON schema deployed is the current one, not the new one
Any opinion?
| ], | ||
| ) | ||
| def test_run_job_with_input_data( | ||
| cli_runner, cleanup, pi_test_files, cwl_file, inputs, destination_source_input_data |
There was a problem hiding this comment.
Not 100% to understand the different with test_job_run_success
There was a problem hiding this comment.
The only difference is the preparation of the filecatalog directory
There was a problem hiding this comment.
Why don't we need to prepare the filecatalog directory in other test?
There was a problem hiding this comment.
It's also done in the transformation tests IIRC.
The usual test_run_job_success workflows use local files as inputs, so they are uploaded in a sandbox instead. There's no inputs that are expected to be on the FileCatalog for these workflows.
There was a problem hiding this comment.
Oh I see, thanks for the clarification! So may be we could add with_input_sandbox in the name of the other test then?
See #25