Skip to content

Commit d91c35d

Browse files
committed
added working progress tracking for file upload
1 parent 93ffcf1 commit d91c35d

File tree

3 files changed

+125
-52
lines changed

3 files changed

+125
-52
lines changed

backend/api/server_fastapi_router.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ async def status(self, job_id: str):
113113
dict: Contains:
114114
- job_id (str): The job identifier
115115
- status (str): 'processing', 'completed', or 'failed'
116+
- progress_percent (int): Percentage completion (0-100)
116117
- message (str, optional): If still processing or not found
117118
- result (dict, optional): Full job result if completed
118119
@@ -123,8 +124,20 @@ async def status(self, job_id: str):
123124
return {
124125
"job_id": job_id,
125126
"status": "processing",
127+
"progress_percent": 0,
126128
"message": "Job is still processing or not found"
127129
}
130+
131+
# Ensure progress_percent is always included
132+
if "progress_percent" not in job_data:
133+
# If job is completed, it's 100%, otherwise estimate based on status
134+
if job_data.get("status") == "completed":
135+
job_data["progress_percent"] = 100
136+
elif job_data.get("status") == "failed":
137+
job_data["progress_percent"] = 0
138+
else:
139+
job_data["progress_percent"] = 0
140+
128141
return job_data
129142

130143
async def upload(self, files: list[UploadFile] = File(default=[]), namespace: str = Form("")):

backend/services/processing_service.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ def process_video_background(
8181
upserted_chunk_ids = []
8282

8383
try:
84-
# Stage 1: Upload original video to R2 bucket
84+
# Stage 1: Upload original video to R2 bucket (0-15%)
85+
self.job_store.update_job(job_id, {"progress_percent": 5})
8586
success, hashed_identifier = self.r2_connector.upload_video(
8687
video_data=video_bytes,
8788
filename=filename,
@@ -91,14 +92,17 @@ def process_video_background(
9192
upload_error_details = hashed_identifier
9293
hashed_identifier = None
9394
raise Exception(f"Failed to upload video to R2 storage: {upload_error_details}")
95+
self.job_store.update_job(job_id, {"progress_percent": 15})
9496

95-
# Stage 2: Process video through preprocessing pipeline
97+
# Stage 2: Process video through preprocessing pipeline (15-40%)
98+
self.job_store.update_job(job_id, {"progress_percent": 20})
9699
processed_chunks = self.preprocessor.process_video_from_bytes(
97100
video_bytes=video_bytes,
98101
video_id=job_id,
99102
filename=filename,
100103
hashed_identifier=hashed_identifier
101104
)
105+
self.job_store.update_job(job_id, {"progress_percent": 40})
102106

103107
# Calculate summary statistics
104108
total_frames = sum(chunk['metadata']['frame_count'] for chunk in processed_chunks)
@@ -114,11 +118,12 @@ def process_video_background(
114118
f"{total_frames} frames, {total_memory:.2f} MB, avg_complexity={avg_complexity:.3f}"
115119
)
116120

117-
# Stage 3-4: Embed frames and store in Pinecone
121+
# Stage 3-4: Embed frames and store in Pinecone (40-100%)
118122
logger.info(f"[{self.__class__.__name__}][Job {job_id}] Embedding and upserting {len(processed_chunks)} chunks")
119123

120124
chunk_details = []
121-
for chunk in processed_chunks:
125+
total_chunks = len(processed_chunks)
126+
for idx, chunk in enumerate(processed_chunks):
122127
embedding = self.video_embedder._generate_clip_embedding(
123128
chunk["frames"],
124129
num_frames=8
@@ -158,6 +163,11 @@ def process_video_background(
158163
"metadata": chunk['metadata'],
159164
"memory_mb": chunk['memory_mb'],
160165
})
166+
167+
# Update progress: 40% base + 60% distributed across chunks
168+
if total_chunks > 0:
169+
chunk_progress = 40 + int((idx + 1) / total_chunks * 60)
170+
self.job_store.update_job(job_id, {"progress_percent": chunk_progress})
161171

162172
result = {
163173
"job_id": job_id,
@@ -173,7 +183,8 @@ def process_video_background(
173183

174184
logger.info(f"[{self.__class__.__name__}][Job {job_id}] Finished processing {filename}")
175185

176-
# Stage 5: Store result
186+
# Stage 5: Store result (100%)
187+
result["progress_percent"] = 100
177188
self.job_store.set_job_completed(job_id, result)
178189

179190
# Update parent batch if exists

frontend/streamlit/pages/search_demo.py

Lines changed: 96 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,44 @@ def upload_dialog():
250250
# Single video
251251
job_id = data.get("job_id")
252252
st.success(f"Video uploaded! Job ID: {job_id}")
253+
254+
# Show progress for single video
255+
progress_bar = st.progress(0)
256+
status_text = st.empty()
257+
258+
max_polls = 300 # Max 10 minutes
259+
poll_count = 0
260+
261+
while poll_count < max_polls:
262+
try:
263+
resp = requests.get(SERVER_STATUS_URL, params={"job_id": job_id}, timeout=30)
264+
if resp.status_code == 200:
265+
status_data = resp.json()
266+
status = status_data.get("status")
267+
progress = status_data.get("progress_percent", 0)
268+
269+
progress_bar.progress(progress / 100.0)
270+
status_text.text(f"Status: {status} | Progress: {progress}%")
271+
272+
if status in ["completed", "failed"]:
273+
if status == "completed":
274+
st.success("Video processed successfully!")
275+
else:
276+
st.error(f"Processing failed: {status_data.get('error', 'Unknown error')}")
277+
break
278+
else:
279+
st.error(f"Status check failed with status {resp.status_code}")
280+
break
281+
except requests.RequestException as e:
282+
st.error(f"Error checking status: {e}")
283+
break
284+
285+
time.sleep(2)
286+
poll_count += 1
287+
288+
if poll_count >= max_polls:
289+
st.warning("Status check timed out. The video may still be processing.")
290+
253291
reset_repository_state()
254292
time.sleep(2)
255293
st.rerun()
@@ -262,56 +300,67 @@ def upload_dialog():
262300
progress_bar = st.progress(0)
263301
status_text = st.empty()
264302

265-
while True:
266-
status_data = poll_job_status(batch_job_id, max_wait=5)
267-
268-
if "error" in status_data:
269-
st.error(f"Error checking status: {status_data['error']}")
270-
break
271-
272-
status = status_data.get("status")
273-
progress = status_data.get("progress_percent", 0)
274-
completed = status_data.get("completed_count", 0)
275-
failed = status_data.get("failed_count", 0)
276-
processing = status_data.get("processing_count", 0)
277-
278-
progress_bar.progress(progress / 100.0)
279-
status_text.text(
280-
f"Status: {status} | "
281-
f"Completed: {completed} | "
282-
f"Failed: {failed} | "
283-
f"Processing: {processing}"
284-
)
285-
286-
if status in ["completed", "partial", "failed"]:
287-
if status == "completed":
288-
st.success(f"All {completed} videos processed successfully!")
289-
metrics = status_data.get("metrics", {})
290-
st.write(f"Total chunks: {metrics.get('total_chunks', 0)}")
291-
st.write(f"Total frames: {metrics.get('total_frames', 0)}")
292-
elif status == "partial":
293-
st.warning(
294-
f"Batch completed with {completed} successes and {failed} failures"
303+
max_polls = 300 # Max 10 minutes
304+
poll_count = 0
305+
306+
while poll_count < max_polls:
307+
try:
308+
resp = requests.get(SERVER_STATUS_URL, params={"job_id": batch_job_id}, timeout=30)
309+
if resp.status_code == 200:
310+
status_data = resp.json()
311+
status = status_data.get("status")
312+
progress = status_data.get("progress_percent", 0)
313+
completed = status_data.get("completed_count", 0)
314+
failed = status_data.get("failed_count", 0)
315+
processing = status_data.get("processing_count", 0)
316+
317+
progress_bar.progress(progress / 100.0)
318+
status_text.text(
319+
f"Status: {status} | "
320+
f"Completed: {completed} | "
321+
f"Failed: {failed} | "
322+
f"Processing: {processing} | "
323+
f"Progress: {progress}%"
295324
)
296-
failed_jobs = status_data.get("failed_jobs", [])
297-
if failed_jobs:
298-
with st.expander("Failed Videos"):
299-
for job in failed_jobs:
300-
st.write(f"- {job.get('filename')}: {job.get('error')}")
301-
else:
302-
st.error(f"All {failed} videos failed to process")
303-
failed_jobs = status_data.get("failed_jobs", [])
304-
if failed_jobs:
305-
with st.expander("Failed Videos"):
306-
for job in failed_jobs:
307-
st.write(f"- {job.get('filename')}: {job.get('error')}")
308-
309-
reset_repository_state()
310-
time.sleep(2)
311-
st.rerun()
312-
break
313325

326+
if status in ["completed", "partial", "failed"]:
327+
if status == "completed":
328+
st.success(f"All {completed} videos processed successfully!")
329+
metrics = status_data.get("metrics", {})
330+
st.write(f"Total chunks: {metrics.get('total_chunks', 0)}")
331+
st.write(f"Total frames: {metrics.get('total_frames', 0)}")
332+
elif status == "partial":
333+
st.warning(
334+
f"Batch completed with {completed} successes and {failed} failures"
335+
)
336+
failed_jobs = status_data.get("failed_jobs", [])
337+
if failed_jobs:
338+
with st.expander("Failed Videos"):
339+
for job in failed_jobs:
340+
st.write(f"- {job.get('filename')}: {job.get('error')}")
341+
else:
342+
st.error(f"All {failed} videos failed to process")
343+
failed_jobs = status_data.get("failed_jobs", [])
344+
if failed_jobs:
345+
with st.expander("Failed Videos"):
346+
for job in failed_jobs:
347+
st.write(f"- {job.get('filename')}: {job.get('error')}")
348+
349+
reset_repository_state()
350+
time.sleep(2)
351+
st.rerun()
352+
break
353+
else:
354+
st.error(f"Status check failed with status {resp.status_code}")
355+
break
356+
except requests.RequestException as e:
357+
st.error(f"Error checking status: {e}")
358+
break
314359
time.sleep(2)
360+
poll_count += 1
361+
362+
if poll_count >= max_polls:
363+
st.warning("Status check timed out. The batch may still be processing.")
315364
else:
316365
st.error(f"Upload failed with status {resp.status_code}. Message: {resp.text}")
317366
except requests.RequestException as e:

0 commit comments

Comments
 (0)