11import os
22import logging
3- from fastapi import UploadFile , HTTPException
3+ from fastapi import UploadFile , HTTPException , Form
44import modal
55
6- # Constants
7- PINECONE_CHUNKS_INDEX = "chunks-index"
6+ # Pinecone index names per environment
7+ PINECONE_INDEX_MAP = {
8+ "dev" : "chunks-index" ,
9+ "prod" : "prod-chunks"
10+ }
811
912# Configure logging
1013logging .basicConfig (
2730 )
2831 )
2932
30- # Environment: "dev" (default) or "prod" (set via ENV variable)
31- env = os .environ .get ("ENV " , "dev" )
33+ # Environment: "dev" (default) or "prod" (set via ENVIRONMENT variable)
34+ env = os .environ .get ("ENVIRONMENT " , "dev" )
3235
3336# Create Modal app
3437app = modal .App (
@@ -82,14 +85,18 @@ def startup(self):
8285 raise ValueError ("R2_SECRET_ACCESS_KEY not found in environment variables" )
8386
8487 ENVIRONMENT = os .getenv ("ENVIRONMENT" , "dev" )
85- if ENVIRONMENT not in ["dev" , "test" , " prod" ]:
86- raise ValueError (f"Invalid ENVIRONMENT value: { ENVIRONMENT } . Must be one of: dev, test, prod" )
88+ if ENVIRONMENT not in ["dev" , "prod" ]:
89+ raise ValueError (f"Invalid ENVIRONMENT value: { ENVIRONMENT } . Must be one of: dev, prod" )
8790 logger .info (f"Running in environment: { ENVIRONMENT } " )
8891
92+ # Select Pinecone index based on environment
93+ pinecone_index = PINECONE_INDEX_MAP [ENVIRONMENT ]
94+ logger .info (f"Using Pinecone index: { pinecone_index } " )
95+
8996 # Instantiate classes
9097 self .preprocessor = Preprocessor (min_chunk_duration = 1.0 , max_chunk_duration = 10.0 , scene_threshold = 13.0 )
9198 self .video_embedder = VideoEmbedder ()
92- self .pinecone_connector = PineconeConnector (api_key = PINECONE_API_KEY , index_name = PINECONE_CHUNKS_INDEX )
99+ self .pinecone_connector = PineconeConnector (api_key = PINECONE_API_KEY , index_name = pinecone_index )
93100 self .job_store = JobStoreConnector (dict_name = "clipabit-jobs" )
94101
95102 self .r2_connector = R2Connector (
@@ -101,7 +108,7 @@ def startup(self):
101108
102109 self .searcher = Searcher (
103110 api_key = PINECONE_API_KEY ,
104- index_name = PINECONE_CHUNKS_INDEX ,
111+ index_name = pinecone_index ,
105112 r2_connector = self .r2_connector
106113 )
107114
@@ -110,16 +117,16 @@ def startup(self):
110117 print (f"[Container] Started at { self .start_time .isoformat ()} " )
111118
112119 @modal .method ()
113- async def process_video (self , video_bytes : bytes , filename : str , job_id : str ):
114- logger .info (f"[Job { job_id } ] Processing started: { filename } ({ len (video_bytes )} bytes)" )
120+ async def process_video (self , video_bytes : bytes , filename : str , job_id : str , namespace : str = "" ):
121+ logger .info (f"[Job { job_id } ] Processing started: { filename } ({ len (video_bytes )} bytes) | namespace=' { namespace } ' " )
115122
116123 try :
117124 # Upload original video to R2 bucket
118125 # TODO: do this in parallel with processing and provide url once done
119126 success , hashed_identifier = self .r2_connector .upload_video (
120127 video_data = video_bytes ,
121128 filename = filename ,
122- # user_id="user1" # Specify user ID once we have user management
129+ namespace = namespace
123130 )
124131 if not success :
125132 raise Exception (f"Failed to upload video to R2 storage: { hashed_identifier } " )
@@ -175,7 +182,7 @@ async def process_video(self, video_bytes: bytes, filename: str, job_id: str):
175182 self .pinecone_connector .upsert_chunk (
176183 chunk_id = chunk ['chunk_id' ],
177184 chunk_embedding = embedding .numpy (),
178- namespace = "" ,
185+ namespace = namespace ,
179186 metadata = chunk ['metadata' ]
180187 )
181188
@@ -244,12 +251,13 @@ async def status(self, job_id: str):
244251 return job_data
245252
246253 @modal .fastapi_endpoint (method = "POST" )
247- async def upload (self , file : UploadFile = None ):
254+ async def upload (self , file : UploadFile = None , namespace : str = Form ( "" ) ):
248255 """
249256 Handle video file upload and start background processing.
250257
251258 Args:
252259 file (UploadFile): The uploaded video file.
260+ namespace (str, optional): Namespace for Pinecone and R2 storage (default: "")
253261
254262 Returns:
255263 dict: Contains job_id, filename, content_type, size_bytes, status, and message.
@@ -264,11 +272,12 @@ async def upload(self, file: UploadFile = None):
264272 "filename" : file .filename ,
265273 "status" : "processing" ,
266274 "size_bytes" : file_size ,
267- "content_type" : file .content_type
275+ "content_type" : file .content_type ,
276+ "namespace" : namespace
268277 })
269278
270279 # Spawn background processing (non-blocking - returns immediately)
271- self .process_video .spawn (contents , file .filename , job_id )
280+ self .process_video .spawn (contents , file .filename , job_id , namespace )
272281
273282 return {
274283 "job_id" : job_id ,
@@ -281,12 +290,13 @@ async def upload(self, file: UploadFile = None):
281290
282291
283292 @modal .fastapi_endpoint (method = "GET" )
284- async def search (self , query : str ):
293+ async def search (self , query : str , namespace : str = "" ):
285294 """
286295 Search endpoint - accepts a text query and returns semantic search results.
287296
288297 Args:
289298 - query (str): The search query string (required)
299+ - namespace (str, optional): Namespace for Pinecone search (default: "")
290300 - top_k (int, optional): Number of top results to return (default: 10)
291301
292302 Returns: dict with 'query', 'results', and 'timing'.
@@ -300,16 +310,17 @@ async def search(self, query: str):
300310 raise HTTPException (status_code = 400 , detail = "Missing 'query' parameter" )
301311
302312 top_k = 10
303- logger .info (f"[Search] Query: '{ query } ' | top_k={ top_k } " )
313+ logger .info (f"[Search] Query: '{ query } ' | namespace=' { namespace } ' | top_k={ top_k } " )
304314
305315 # Execute semantic search
306316 results = self .searcher .search (
307317 query = query ,
308- top_k = top_k
318+ top_k = top_k ,
319+ namespace = namespace
309320 )
310-
321+
311322 t_done = time .perf_counter ()
312-
323+
313324 # Log chunk-level results only
314325 logger .info (f"[Search] Found { len (results )} chunk-level results in { t_done - t_start :.3f} s" )
315326
0 commit comments