2020
2121
2222class DataCatalogInterface (ABC ):
23- """Abstract interface for data catalog operations."""
23+ """Abstract interface for data catalog operations.
24+
25+ This interface defines the contract for data discovery and output registration
26+ in various data management systems. Implementations can range from simple
27+ filesystem-based catalogs to complex distributed data management systems.
28+
29+ The interface is designed to be storage-agnostic and allows different
30+ implementations to handle data in ways appropriate to their underlying
31+ storage and metadata systems.
32+ """
2433
2534 @abstractmethod
2635 def get_input_query (
2736 self , input_name : str , ** kwargs : Any
2837 ) -> Union [Path , List [Path ], None ]:
29- """Generate input data query.
38+ """Resolve input data locations for processing.
39+
40+ This method provides a mechanism to discover input data based on
41+ logical names and additional query parameters. The implementation
42+ determines how to translate logical input names into concrete data
43+ locations.
3044
3145 Parameters
3246 ----------
3347 input_name : str
34- Name of the input parameter.
48+ Logical name or identifier for the input data. This serves as
49+ a documentation label and lookup key for the data catalog.
3550 **kwargs : Any
36- Additional query parameters.
51+ Implementation-specific query parameters that may influence
52+ data discovery (e.g., version, campaign, site, data_type).
3753
3854 Returns
3955 -------
4056 Union[Path, List[Path], None]
41- Path(s) to input data or None if not found.
57+ Resolved data location(s). Returns:
58+ - Path: Single data location
59+ - List[Path]: Multiple data locations for the same logical input
60+ - None: No data found matching the query criteria
4261 """
4362 pass
4463
4564 @abstractmethod
4665 def get_output_query (self , output_name : str , ** kwargs : Any ) -> Optional [Path ]:
47- """Generate output data path.
66+ """Determine where output data should be stored.
67+
68+ This method generates appropriate storage locations for output data
69+ based on the output name and catalog configuration. The returned
70+ location serves as a staging area or final destination for outputs.
4871
4972 Parameters
5073 ----------
5174 output_name : str
52- Name of the output parameter.
75+ Logical name or identifier for the output data. This serves as
76+ a documentation label and determines output organization.
77+ **kwargs : Any
78+ Implementation-specific parameters that may influence output
79+ placement (e.g., campaign, site, data classification).
5380
5481 Returns
5582 -------
5683 Optional[Path]
57- Path where output should be stored or None.
84+ Designated output location where data should be stored.
85+ Returns None if no suitable output location can be determined.
5886 """
59- ...
87+ pass
6088
61- def store_output (self , output_name : str , src_path : str , ** kwargs : Any ) -> None :
62- """Store output in the data catalog.
89+ @abstractmethod
90+ def store_output (self , output_name : str , ** kwargs : Any ) -> None :
91+ """Register or store output data in the catalog.
92+
93+ This method handles the catalog-specific operations needed to make
94+ output data available through the data management system. The actual
95+ storage mechanism is implementation-dependent and may involve file
96+ operations, database registrations, or API calls to external systems.
6397
6498 Parameters
6599 ----------
66100 output_name : str
67- Name of the output parameter.
68- src_path : str | Path
69- Source path of the output file.
101+ Logical name or identifier for the output data. This serves as
102+ a documentation label for organizing and retrieving the data.
103+ **kwargs : Any
104+ Implementation-specific parameters that provide the necessary
105+ information for storing the output (e.g., source paths, metadata,
106+ checksums, file sizes, destination parameters).
70107 """
71- output_path = self .get_output_query (output_name , ** kwargs )
72- if not output_path :
73- raise RuntimeError (f"No output path defined for { output_name } " )
74-
75- output_path .mkdir (exist_ok = True , parents = True )
76- dest = output_path / Path (src_path ).name
77- Path (src_path ).rename (dest )
78- logger .info (f"Output { output_name } stored in { dest } " )
108+ pass
79109
80110
81- class DummyDataCatalogInterface (DataCatalogInterface ):
82- """Default implementation that returns None for all queries .
111+ class DefaultDataCatalogInterface (DataCatalogInterface ):
112+ """Default filesystem-based data catalog using Logical File Names (LFNs) .
83113
84- This is used as the default data catalog when no specific implementation
85- is provided by a plugin.
114+ This provides a simple, filesystem-based implementation suitable for
115+ examples and testing. Uses a structured LFN path format:
116+ /vo/campaign/site/data_type/files
86117 """
87118
119+ def __init__ (
120+ self ,
121+ vo : Optional [str ] = None ,
122+ campaign : Optional [str ] = None ,
123+ site : Optional [str ] = None ,
124+ data_type : Optional [str ] = None ,
125+ base_path : str = "/" ,
126+ ):
127+ self .vo = vo
128+ self .campaign = campaign
129+ self .site = site
130+ self .data_type = data_type
131+ self .base_path = Path (base_path )
132+
88133 def get_input_query (
89134 self , input_name : str , ** kwargs : Any
90135 ) -> Union [Path , List [Path ], None ]:
91- """Return None - no input data available."""
92- return None
136+ """Generate LFN-based input query path.
137+
138+ Accepts and ignores extra kwargs for interface compatibility.
139+ """
140+ # Build LFN: /base_path/vo/campaign/site/data_type/input_name
141+ path_parts = []
142+
143+ if self .vo :
144+ path_parts .append (self .vo )
145+
146+ if self .campaign :
147+ path_parts .append (self .campaign )
148+ if self .site :
149+ path_parts .append (self .site )
150+ if self .data_type :
151+ path_parts .append (self .data_type )
152+
153+ if len (path_parts ) > 0 : # More than just VO
154+ return self .base_path / Path (* path_parts ) / Path (input_name )
155+
156+ return self .base_path / Path (input_name )
93157
94158 def get_output_query (self , output_name : str , ** kwargs : Any ) -> Optional [Path ]:
95- """Return None - no output path available."""
96- return None
159+ """Generate LFN-based output path.
160+
161+ Accepts and ignores extra kwargs for interface compatibility.
162+ """
163+ # Output path: /grid/data/vo/outputs/campaign/site
164+ output_base = self .base_path
165+ if self .vo :
166+ output_base = output_base / self .vo
167+ output_base = output_base / "outputs"
168+
169+ if self .campaign :
170+ output_base = output_base / self .campaign
171+ if self .site :
172+ output_base = output_base / self .site
173+
174+ return output_base
175+
176+ def store_output (self , output_name : str , ** kwargs : Any ) -> None :
177+ """Store output file in the filesystem-based catalog.
178+
179+ This implementation handles filesystem operations to move output files
180+ to their designated LFN-based storage locations.
181+
182+ Parameters
183+ ----------
184+ output_name : str
185+ Logical name for the output data.
186+ **kwargs : Any
187+ Expected parameters:
188+ - src_path (str | Path): Source path of the output file to store
189+ Additional parameters are passed to get_output_query.
190+
191+ Raises
192+ ------
193+ RuntimeError
194+ If no output path can be determined or if src_path is not provided.
195+ """
196+ # Extract the filesystem-specific parameter
197+ src_path = kwargs .get ("src_path" )
198+ if not src_path :
199+ raise RuntimeError (
200+ f"src_path parameter required for filesystem storage of { output_name } "
201+ )
202+
203+ # Get the output directory
204+ output_path = self .get_output_query (output_name , ** kwargs )
205+ if not output_path :
206+ raise RuntimeError (f"No output path defined for { output_name } " )
207+
208+ # Ensure output directory exists
209+ output_path .mkdir (exist_ok = True , parents = True )
210+
211+ # Move file to destination
212+ src_path_obj = Path (src_path )
213+ dest = output_path / src_path_obj .name
214+ src_path_obj .rename (dest )
215+ logger .info (f"Output { output_name } stored in { dest } " )
97216
98217
99218class ExecutionHooksBasePlugin (BaseModel ):
@@ -119,12 +238,12 @@ class ExecutionHooksBasePlugin(BaseModel):
119238 description : ClassVar [str ] = "Base metadata model"
120239
121240 # Private attribute for data catalog interface - not part of Pydantic model validation
122- _data_catalog : DataCatalogInterface = PrivateAttr (
123- default_factory = DummyDataCatalogInterface
241+ _data_catalog : Optional [ DataCatalogInterface ] = PrivateAttr (
242+ default_factory = lambda : DefaultDataCatalogInterface ()
124243 )
125244
126245 @property
127- def data_catalog (self ) -> DataCatalogInterface :
246+ def data_catalog (self ) -> Optional [ DataCatalogInterface ] :
128247 """Get the data catalog interface."""
129248 return self ._data_catalog
130249
@@ -176,15 +295,30 @@ def get_input_query(
176295 self , input_name : str , ** kwargs : Any
177296 ) -> Union [Path , List [Path ], None ]:
178297 """Delegate to data catalog interface."""
298+ if self .data_catalog is None :
299+ return None
179300 return self .data_catalog .get_input_query (input_name , ** kwargs )
180301
181302 def get_output_query (self , output_name : str , ** kwargs : Any ) -> Optional [Path ]:
182303 """Delegate to data catalog interface."""
304+ if self .data_catalog is None :
305+ return None
183306 return self .data_catalog .get_output_query (output_name , ** kwargs )
184307
185308 def store_output (self , output_name : str , src_path : str , ** kwargs : Any ) -> None :
186- """Delegate to data catalog interface."""
187- self .data_catalog .store_output (output_name , src_path , ** kwargs )
309+ """Delegate to data catalog interface.
310+
311+ This method provides backward compatibility by forwarding the src_path
312+ parameter through kwargs to the data catalog implementation.
313+ """
314+ if self .data_catalog is None :
315+ logger .warning (
316+ f"No data catalog available, cannot store output { output_name } "
317+ )
318+ return
319+ # Forward src_path through kwargs to maintain interface compatibility
320+ kwargs ["src_path" ] = src_path
321+ self .data_catalog .store_output (output_name , ** kwargs )
188322
189323 @classmethod
190324 def get_schema_info (cls ) -> Dict [str , Any ]:
@@ -259,7 +393,7 @@ class ExecutionHooksHint(BaseModel, Hint):
259393 )
260394
261395 hook_plugin : str = Field (
262- default = "UserPlugin " ,
396+ default = "QueryBasedPlugin " ,
263397 description = "Registry key for the metadata implementation class" ,
264398 )
265399
0 commit comments