Skip to content

Commit 01e8210

Browse files
Ademola AdefioyeAdemola Adefioye
authored andcommitted
reading and writing graph to s3
This change includes functionality to be able to read from s3 and to write from output directly to s3.
1 parent 9715647 commit 01e8210

2 files changed

Lines changed: 47 additions & 11 deletions

File tree

app.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
from flask import Flask, request, jsonify, render_template
44
from dotenv import load_dotenv
5-
from src.generate_graph import generate_graph, load_graph_viewmodel
5+
from src.generate_graph import generate_graph, load_graph_viewmodel, generate_output_path, summarize_path
66

77
load_dotenv()
88

@@ -47,10 +47,14 @@ async def extract_quotes():
4747
Endpoint that runs the Cytoscape graph generation logic based on graph.json.
4848
"""
4949
try:
50-
logger.info('Starting graph generation process...')
51-
graph_data = await generate_graph("graph.json")
50+
input_path = request.args.get('input_path')
51+
if not input_path:
52+
return jsonify({"error": "Missing 'input_path' query parameter"}), 400
53+
54+
output_path = generate_output_path(input_path)
55+
logger.info(f'Starting graph generation process for {summarize_path(input_path)}...')
56+
graph_data = await generate_graph(input_path, output_path)
5257
logger.info('Graph generation completed successfully.')
53-
5458
return jsonify(graph_data), 200
5559

5660
except Exception as e:

src/generate_graph.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
from src.content_extractor.s3_sequential import S3QuoteExtractor
99
from src.content_extractor.base import BaseExtractorConfig
1010
from src.content_extractor.highlighter import highlight_occurrence
11+
import fsspec
1112
from src.models.graph_models import (
1213
GraphInput, GraphOutput, Node, NodeData, Edge, EdgeData, Occurrence, Entity
1314
)
1415

16+
1517
logger = logging.getLogger(__name__)
1618

1719
def slugify(text: str) -> str:
@@ -124,10 +126,11 @@ def build_node_structure(entities: List[Entity], entity_results: Dict[str, Any])
124126
async def generate_graph(input_data: Union[str, Dict[str, Any]], output_path: Optional[str] = None):
125127
"""Main orchestration function. Can take a file path (str) or a dictionary."""
126128
if isinstance(input_data, str):
127-
if not os.path.exists(input_data):
129+
of = fsspec.open(input_data, "r")
130+
if not of.fs.exists(of.path):
128131
logger.error(f"Input file {input_data} not found.")
129132
return
130-
with open(input_data, "r") as f:
133+
with of as f:
131134
graph_data = json.load(f)
132135
else:
133136
graph_data = input_data
@@ -149,24 +152,53 @@ async def generate_graph(input_data: Union[str, Dict[str, Any]], output_path: Op
149152
cy_json = cy_graph.model_dump(exclude_none=True)
150153

151154
if output_path:
152-
os.makedirs(os.path.dirname(output_path), exist_ok=True)
153-
with open(output_path, "w") as f:
155+
with fsspec.open(output_path, "w", auto_mkdir=True) as f:
154156
json.dump(cy_json, f, indent=4)
155-
logger.info(f"Graph saved to {output_path}")
157+
logger.info(f"Graph saved to {summarize_path(output_path)}")
156158

157159
return cy_json
158160

159161
def load_json_file(file_path: str) -> Dict[str, Any]:
160162
"""Utility function to load JSON data from a file."""
161-
if not os.path.exists(file_path):
163+
of = fsspec.open(file_path, "r")
164+
if not of.fs.exists(of.path):
162165
logger.error(f"File {file_path} not found.")
163166
return {}
164-
with open(file_path, "r") as f:
167+
with of as f:
165168
return json.load(f)
166169

167170
def load_graph_viewmodel(file_path: str) -> Dict[str, Any]:
168171
"""Loads the graph viewmodel JSON for the frontend."""
169172
return load_json_file(file_path)
170173

174+
175+
def summarize_path(path: str) -> str:
176+
"""Extracts a concise representation (project/run) of a path for logging."""
177+
match = re.search(r'([^/]+)/(run-\d+-\d+)', path)
178+
if match:
179+
return f"{match.group(1)}/{match.group(2)}"
180+
return path.split('/')[-1]
181+
182+
def generate_output_path(input_path: str) -> str:
183+
"""Generates the output path for the graph JSON file."""
184+
output_dir = os.getenv("OUTPUT_DIRECTORY", "outputs")
185+
186+
187+
match = re.search(r'(?P<base>s3://[^/]+/)?(?P<project>[^/]+)/(?P<run>run-\d+-\d+)', input_path)
188+
189+
if match:
190+
base = match.group('base')
191+
project = match.group('project')
192+
run_id = match.group('run')
193+
194+
if base:
195+
# Dynamically use the same S3 bucket but route to graph_tools prefix
196+
return f"{base}graph_tools/{project}/{run_id}/graphNode.json"
197+
198+
return f"{output_dir}/{project}/{run_id}/graphNode.json"
199+
200+
summary = summarize_path(input_path)
201+
raise ValueError(f"Input path '{summary}' does not contain a recognizable project/run structure.")
202+
171203
if __name__ == "__main__":
172204
asyncio.run(generate_graph("graph.json", "outputs/graphNode.json"))

0 commit comments

Comments
 (0)