88from src .content_extractor .s3_sequential import S3QuoteExtractor
99from src .content_extractor .base import BaseExtractorConfig
1010from src .content_extractor .highlighter import highlight_occurrence
11+ import fsspec
1112from src .models .graph_models import (
1213 GraphInput , GraphOutput , Node , NodeData , Edge , EdgeData , Occurrence , Entity
1314)
1415
16+
1517logger = logging .getLogger (__name__ )
1618
1719def slugify (text : str ) -> str :
@@ -124,11 +126,12 @@ def build_node_structure(entities: List[Entity], entity_results: Dict[str, Any])
124126async def generate_graph (input_data : Union [str , Dict [str , Any ]], output_path : Optional [str ] = None ):
125127 """Main orchestration function. Can take a file path (str) or a dictionary."""
126128 if isinstance (input_data , str ):
127- if not os .path .exists (input_data ):
129+ try :
130+ with fsspec .open (input_data , "r" ) as f :
131+ graph_data = json .load (f )
132+ except FileNotFoundError :
128133 logger .error (f"Input file { input_data } not found." )
129- return
130- with open (input_data , "r" ) as f :
131- graph_data = json .load (f )
134+ raise
132135 else :
133136 graph_data = input_data
134137
@@ -149,10 +152,9 @@ async def generate_graph(input_data: Union[str, Dict[str, Any]], output_path: Op
149152 cy_json = cy_graph .model_dump (exclude_none = True )
150153
151154 if output_path :
152- os .makedirs (os .path .dirname (output_path ), exist_ok = True )
153- with open (output_path , "w" ) as f :
155+ with fsspec .open (output_path , "w" , auto_mkdir = True ) as f :
154156 json .dump (cy_json , f , indent = 4 )
155- logger .info (f"Graph saved to { output_path } " )
157+ logger .info (f"Graph saved to { summarize_path ( output_path ) } " )
156158
157159 return cy_json
158160
@@ -168,5 +170,32 @@ def load_graph_viewmodel(file_path: str) -> Dict[str, Any]:
168170 """Loads the graph viewmodel JSON for the frontend."""
169171 return load_json_file (file_path )
170172
173+
174+ def summarize_path (path : str ) -> str :
175+ """Extracts a concise representation (project/run) of a path for logging."""
176+ match = re .search (r'([^/]+)/(run-\d+-\d+)' , path )
177+ if match :
178+ return f"{ match .group (1 )} /{ match .group (2 )} "
179+ return path .split ('/' )[- 1 ]
180+
181+ def generate_output_path (input_path : str ) -> str :
182+ """Generates the output path for the graph JSON file."""
183+
184+ #TODO: make input from user be relative without the bucketname applied
185+ match = re .search (r'(?P<base>s3://govuk-ai-accelerator-data-integration/)?(?P<domain_name>[^/]+)/(?P<run>run-\d+-\d+)' , input_path )
186+
187+ if match :
188+ base = match .group ('base' )
189+ domain_name = match .group ('domain_name' )
190+ run_id = match .group ('run' )
191+
192+ if base :
193+ return f"{ base } graph_tools/{ domain_name } /{ run_id } /graphNode.json"
194+ else :
195+ raise ValueError (f"Input path '{ input_path } ' does not contain a recognizable S3 path structure." )
196+
197+ summary = summarize_path (input_path )
198+ raise ValueError (f"Input path '{ summary } ' does not contain a recognizable project/run structure." )
199+
171200if __name__ == "__main__" :
172201 asyncio .run (generate_graph ("graph.json" , "outputs/graphNode.json" ))
0 commit comments