forked from open-edge-platform/edge-ai-suites
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobject.py
More file actions
172 lines (145 loc) · 4.87 KB
/
object.py
File metadata and controls
172 lines (145 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
#
from fastapi import APIRouter, Depends, HTTPException, File, UploadFile, BackgroundTasks, Form
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from utils.database import get_db
from utils.task_service import task_service
from utils.storage_service import storage_service
from utils.search_service import search_service
import urllib.parse
import mimetypes
import json
from utils.core_responses import resp_200
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
router = APIRouter()
# @router.get("/files")
@router.post("/upload")
async def upload_video(background_tasks: BackgroundTasks, file: UploadFile = File(...), db: Session = Depends(get_db)):
minio_payload = await storage_service.upload_and_prepare_payload(file)
result = await task_service.handle_file_upload(db, minio_payload, background_tasks, should_ingest=False)
return resp_200(
data={
"task_id": str(result["task_id"]),
"status": result["status"]
},
message="File received, processing started."
)
@router.post("/ingest")
async def ingest_existing_file(
payload: dict,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
file_key = payload.get("file_key")
if not file_key:
raise HTTPException(status_code=400, detail="file_key is required")
bucket_name = payload.get("bucket_name", "content-search")
meta = payload.get("meta", {})
if isinstance(meta, str):
try:
meta = json.loads(meta)
except:
meta = {"raw_info": meta}
minio_payload = {
"file_key": file_key,
"bucket_name": bucket_name,
"meta": meta,
"vs_options": {
"prompt": payload.get("prompt"),
"chunk_duration_s": payload.get("chunk_duration")
}
}
result = await task_service.handle_file_ingest(db, minio_payload, background_tasks)
return resp_200(
data={
"task_id": str(result["task_id"]),
"status": result["status"],
"file_key": file_key
},
message="Ingestion process started for existing file"
)
class IngestTextRequest(BaseModel):
text: Optional[str] = None
bucket_name: Optional[str] = "content-search"
file_key: Optional[str] = None
meta: Dict[str, Any] = Field(default_factory=dict)
@router.post("/ingest-text")
async def ingest_raw_text(
request: IngestTextRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
result = await task_service.handle_text_ingest(
db,
request.model_dump(),
background_tasks
)
return resp_200(
data={
"task_id": str(result["task_id"]),
"status": result["status"]
},
message="Text ingestion task created successfully"
)
@router.post("/upload-ingest")
async def upload_file_with_ingest(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
meta: str = Form(None),
prompt: str = Form(None),
chunk_duration: int = Form(None),
db: Session = Depends(get_db)
):
minio_payload = await storage_service.upload_and_prepare_payload(file)
if meta:
try:
minio_payload["meta"] = json.loads(meta)
except:
minio_payload["meta"] = {"raw_info": meta}
else:
minio_payload["meta"] = {}
minio_payload["vs_options"] = {
"prompt": prompt,
"chunk_duration_s": chunk_duration
}
result = await task_service.handle_file_upload(
db,
minio_payload,
background_tasks,
should_ingest=True
)
return resp_200(
data={
"task_id": str(result["task_id"]),
"status": result["status"],
"file_key": minio_payload["file_key"]
},
message="Upload and Ingest started"
)
@router.post("/search")
async def file_search(payload: dict, db: Session = Depends(get_db)):
result = await task_service.handle_sync_search(db, payload)
return resp_200(data=result, message="Search completed")
@router.get("/download")
async def download_file(file_key: str):
"""
e.g: GET /download?file_key=runs/run_xxx/raw/video/default/test.mp4
"""
file_stream = await storage_service.get_file_stream(file_key)
filename = file_key.split('/')[-1]
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = "application/octet-stream"
encoded_filename = urllib.parse.quote(filename)
return StreamingResponse(
file_stream,
media_type=content_type,
headers={
"Content-Disposition": f"attachment; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}",
"Access-Control-Expose-Headers": "Content-Disposition"
}
)