diff --git a/education-ai-suite/smart-classroom/content_search/README.md b/education-ai-suite/smart-classroom/content_search/README.md index 24ab109090..2d4f0c828a 100644 --- a/education-ai-suite/smart-classroom/content_search/README.md +++ b/education-ai-suite/smart-classroom/content_search/README.md @@ -25,21 +25,18 @@ python .\start_services.py ## API Endpoints -| Endpoint | Method | Pattern | Description | Status | -| :--- | :---: | :---: | :--- | :---: | -| `/api/v1/system/health` | **GET** | SYNC | Backend app health check | DONE | -| `/api/v1/task/query/{task_id}` | **GET** | SYNC | Query status of a specific task | DONE | -| `/api/v1/task/list` | **GET** | SYNC | Query tasks by conditions (e.g., `?status=PROCESSING`) | DONE | -| `/api/v1/object/upload` | **POST** | ASYNC | Upload a file to MinIO | DONE | -| `/api/v1/object/ingest` | **POST** | ASYNC | Ingest a specific file from MinIO | DONE | -| `/api/v1/object/ingest-text` | **POST** | ASYNC | Emedding a raw text | DONE | -| `/api/v1/object/upload-ingest` | **POST** | ASYNC | Upload to MinIO and trigger ingestion | DONE | -| `/api/v1/object/search` | **POST** | ASYNC | Search for files based on description | DONE | -| `/api/v1/object/download` | **POST** | STREAM | Download file from MinIO | DONE | - -## API reference -[Content Search API reference](./docs/dev_guide/Content_search_API.md) - +| Endpoint | Method | Pattern | Description | +| :--- | :---: | :---: | :--- | +| `/api/v1/task/query/{task_id}` | **GET** | SYNC | **Task Status Inspection**: Retrieves real-time metadata for a specific job, including current lifecycle state (e.g. PROCESSING, COMPLETED, FAILED), and error logs if applicable. | +| `/api/v1/task/list` | **GET** | SYNC | **Batch Task Retrieval**: Queries task records. Supports filtering via query parameters (e.g., `?status=PROCESSING`) for monitoring system load and pipeline efficiency. | +| `/api/v1/object/ingest-text` | **POST** | ASYNC | **Text-Specific Ingestion**: Primarily processes raw text strings passed in the request body for semantic indexing. It also supports fetching content from existing text-based objects in MinIO. | +| `/api/v1/object/upload-ingest` | **POST** | ASYNC | **Atomic Upload & Ingestion**: A unified workflow that first saves the file to MinIO and then immediately initiates the ingestion pipeline. Features full content indexing and AI-driven Video Summarization for supported video formats. | +| `/api/v1/object/search` | **POST** | SYNC | **Semantic Content Retrieval**: Executes a similarity search across vector collections using either natural language queries or base64-encoded images. Returns ranked results with associated metadata and MinIO object references. | +| `/api/v1/object/download` | **POST** | STREAM | **Original File Download**: Securely fetches the raw source file directly from MinIO storage. Utilizes stream-bridging to pipe binary data to the client. | + +For detailed descriptions and examples of each endpoint, please refer to the: [Content Search API reference](./docs/dev_guide/Content_search_API.md) + +## Components API reference [Ingest and Retrieve](./docs/dev_guide/file_ingest_and_retrieve/API_GUIDE.md) [Video Preprocess](./docs/dev_guide/video_preprocess/API_GUIDE.md) diff --git a/education-ai-suite/smart-classroom/content_search/api/v1/endpoints/object.py b/education-ai-suite/smart-classroom/content_search/api/v1/endpoints/object.py index 867c71e94a..e0b967b42a 100644 --- a/education-ai-suite/smart-classroom/content_search/api/v1/endpoints/object.py +++ b/education-ai-suite/smart-classroom/content_search/api/v1/endpoints/object.py @@ -76,24 +76,21 @@ async def ingest_existing_file( ) class IngestTextRequest(BaseModel): - text: str - bucket_name: Optional[str] = None - file_path: Optional[str] = None + text: Optional[str] = None + bucket_name: Optional[str] = "content-search" + file_key: Optional[str] = None meta: Dict[str, Any] = Field(default_factory=dict) + @router.post("/ingest-text") async def ingest_raw_text( request: IngestTextRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db) ): - payload = request.model_dump() - - if "tags" not in payload["meta"] or payload["meta"]["tags"] is None: - payload["meta"]["tags"] = ["default"] result = await task_service.handle_text_ingest( db, - payload, + request.model_dump(), background_tasks ) @@ -102,7 +99,7 @@ async def ingest_raw_text( "task_id": str(result["task_id"]), "status": result["status"] }, - message="Text ingestion started" + message="Text ingestion task created successfully" ) @router.post("/upload-ingest") @@ -145,44 +142,11 @@ async def upload_file_with_ingest( message="Upload and Ingest started" ) -# @router.post("/search") -# async def file_search(payload: dict): -# query = payload.get("query") -# limit = payload.get("max_num_results", 3) -# if not query: -# raise HTTPException(status_code=400, detail="Query cannot be empty") - -# search_data = await search_service.semantic_search(query, limit) -# return resp_200(data=search_data, message="Resource found") - @router.post("/search") -async def file_search(payload: dict): - query = payload.get("query") - image_base64 = payload.get("image_base64") - filters = payload.get("filter") - limit = payload.get("max_num_results", 10) - - if not query and not image_base64: - raise HTTPException(status_code=400, detail="Either 'query' or 'image_base64' must be provided") - - if query and image_base64: - raise HTTPException(status_code=400, detail="Provide only one of 'query' or 'image_base64'") - - search_payload = { - "max_num_results": limit - } - - if query: - search_payload["query"] = query - else: - search_payload["image_base64"] = image_base64 - - if filters: - search_payload["filter"] = filters - - search_data = await search_service.semantic_search(search_payload) +async def file_search(payload: dict, db: Session = Depends(get_db)): + result = await task_service.handle_sync_search(db, payload) - return resp_200(data=search_data, message="Search completed") + return resp_200(data=result, message="Search completed") @router.get("/download") async def download_file(file_key: str): diff --git a/education-ai-suite/smart-classroom/content_search/docs/dev_guide/Content_search_API.md b/education-ai-suite/smart-classroom/content_search/docs/dev_guide/Content_search_API.md index 032928ba69..c29aa21ff1 100644 --- a/education-ai-suite/smart-classroom/content_search/docs/dev_guide/Content_search_API.md +++ b/education-ai-suite/smart-classroom/content_search/docs/dev_guide/Content_search_API.md @@ -22,7 +22,7 @@ Content-Type: application/json { "code": 20000, - "data": { "task_id": "0892f506-4087-4d7e-b890-21303145b4ee" }, + "data": { "task_id": "0892f506-4087-4d7e-b890-21303145b4ee", "status": "PROCESSING" }, "message": "Operation Successful", "timestamp": 167890123 } @@ -77,8 +77,8 @@ stateDiagram-v2 ``` ## API Endpoints - -### Get Task List +### Task endpoints +#### Get Task List * URL: /api/v1/task/list @@ -142,7 +142,7 @@ Response (200 OK) "timestamp": 1774330753 } ``` -### Task Status Polling +#### Task Status Polling Used to track the progress and retrieve the final result of a submitted task. * URL: /api/v1/task/query/{task_id} @@ -153,7 +153,7 @@ Used to track the progress and retrieve the final result of a submitted task. Request: ``` -curl --location 'http://127.0.0.1:9011/api/v1/task/query/56cc417c-9524-41a9-a500-9f0c44a05eac' +curl --location 'http://127.0.0.1:9011/api/v1/task/query/6b9a6a55-d327-42fe-b05e-e0f3098fe797' ``` Response (200 OK): @@ -161,31 +161,42 @@ Response (200 OK): { "code": 20000, "data": { - "task_id": "e557b305-e37c-4074-a04a-ebd067efbd5d", + "task_id": "6b9a6a55-d327-42fe-b05e-e0f3098fe797", "status": "COMPLETED", "progress": 100, "result": { - "message": "File from MinIO successfully processed. db returns {'visual': {'insert_count': 1}}", - "video_summary": { - "type": "done", - "job_id": "bc6513aa-e118-4945-84a8-02922595044e", - "run_id": "5e405f58-03cf-4e44-9e10-85741283587a", - "asset_id": "classroom_8.mp4", - "total_chunks": 1, - "succeeded_chunks": 1, - "failed_chunks": 0, - "ingest_ok_chunks": 1, - "ingest_failed_chunks": 0, - "elapsed_seconds": 36.89442276954651 + "message": "Upload only, no ingest requested", + "file_info": { + "source": "minio", + "file_key": "runs/9e96f16a-9689-4c25-a515-04a1040b193f/raw/text/default/phy_class.txt", + "bucket": "content-search", + "filename": "phy_class.txt", + "run_id": "9e96f16a-9689-4c25-a515-04a1040b193f" } } }, "message": "Query successful", - "timestamp": 1774879431 + "timestamp": 1774931711 } ``` +### File Process +#### File Support Matrix -### File Upload +The system supports the following file formats for all ingestion and upload-ingest operations. + +| Category | Supported Extensions | Processing Logic | +| :--- | :--- | :--- | +| **Video** | `.mp4` | Frame extraction, AI-driven summarization, and semantic indexing. | +| **Document** | `.txt`, `.pdf`, `.docx`, `.doc`, `.pptx`, `.ppt`, `.xlsx`, `.xls` | Full-text extraction, semantic chunking, and vector embedding. | +| **Web/Markup** | `.html`, `.htm`, `.xml`, `.md`, `.rst` | Structured text parsing and content indexing. | +| **Image** | `.jpg`, `.png`, `.jpeg` | Visual feature embedding and similarity search indexing. | + +> **Technical Note**: +> - **Video**: Default chunking is set to 30 seconds unless the `chunk_duration` parameter is provided. +> - **Text**: Automatic semantic segmentation is applied to ensure high-quality retrieval results. +> - **Max File Size**: Please refer to the `CS_MAX_CONTENT_LENGTH` environment variable (Default: 100MB). + +#### File Upload Used to upload a video file and initiate an asynchronous background task. * URL: /api/v1/object/upload @@ -205,14 +216,14 @@ Response (200 OK): "code": 20000, "data": { "task_id": "c68211de-2187-4f52-b47d-f3a51a52b9ca", - "status": "QUEUED" + "status": "PROCESSING" }, "message": "File received, processing started.", "timestamp": 1773909147 } ``` -### File ingestion +#### File ingestion * URL: /api/v1/object/ingest * Method: POST * Pattern: ASYNC @@ -248,8 +259,49 @@ Response: "timestamp": 1774878031 } ``` +#### Text file ingestion +Primarily processes raw text strings passed in the request body for semantic indexing. It also supports fetching content from existing text-based objects in MinIO. + +* URL: /api/v1/object/ingest-text +* Method: POST +* Pattern: ASYNC +* Parameters: + +| Field | Type | Required | Default | Description | +| :--- | :--- | :--- | :--- | :--- | +| `text` | `string` | **Yes** | — | **Raw text content** to be segmented, embedded, and stored in the vector database. | +| `bucket_name` | `string` | No | — | MinIO bucket name (used to logically group the data or build the identifier). | +| `file_path` | `string` | No | — | Logical path or filename (used as a unique identifier for the text source). | +| `meta` | `object` | No | `{}` | Extra metadata to store alongside the text (e.g., `course`, `author`, `tags`). | + +Request: +``` +# example for raw text content +curl --location 'http://127.0.0.1:9011/api/v1/object/ingest-text' \ +--header 'Content-Type: application/json' \ +--data '{ + "text": "Newton'\''s Second Law of Motion states that the force acting on an object is equal to the mass of that object multiplied by its acceleration (F = ma). This relationship describes how the velocity of an object changes when it is subjected to an external force.", + "meta": { + "source": "topic-search" + } +}' +``` +Response: +```json +{ + "code": 20000, + "data": { + "task_id": "df3caeb3-3287-4e41-a1f0-098c90d08e03", + "status": "PROCESSING" + }, + "message": "Text ingestion task created successfully", + "timestamp": 1775006765 +} +``` + +#### File upload and ingestion +A unified workflow that first saves the file to MinIO and then immediately initiates the ingestion pipeline. Features full content indexing and AI-driven Video Summarization for supported video formats. -### File upload ana ingestion * URL: /api/v1/object/upload-ingest * Method: POST * Content-Type: multipart/form-data @@ -263,6 +315,7 @@ Response: | chunk_duration | integer | No | Segment duration in seconds (passed as a Form field). | | meta | string | No | JSON string of metadata (e.g., '{"course": "CS101"}'). | +* Example: Request: ``` curl --location 'http://127.0.0.1:9011/api/v1/object/upload-ingest' \ @@ -283,11 +336,13 @@ Response (200 OK): } ``` -### Retrieve and Search +#### Retrieve and Search +Executes a similarity search across vector collections using either natural language queries or base64-encoded images. Returns ranked results with associated metadata and MinIO object references. + * URL: /api/v1/object/search * Method: POST * Content-Type: multipart/form-data -* Pattern: ASYNC +* Pattern: SYNC * Parameters: | Field | Type | Required | Description | @@ -297,8 +352,9 @@ Response (200 OK): | max_num_results | integer | No | Maximum number of results to return. Defaults to 10. | | filter | object | No | Metadata filters (e.g., {"run_id": "...", "tags": ["class"]}). | +* Example: Request: -```json +``` curl --location 'http://127.0.0.1:9011/api/v1/object/search' \ --header 'Content-Type: application/json' \ --data '{ @@ -346,7 +402,7 @@ Response (200 OK): "timestamp": 1774877744 } ``` -### Resource Download (Video/Image/Document) +#### Resource Download (Video/Image/Document) Download existing resources in Minio. * URL: /api/v1/object/download/{resource_id} diff --git a/education-ai-suite/smart-classroom/content_search/utils/storage_service.py b/education-ai-suite/smart-classroom/content_search/utils/storage_service.py index c8d24322fd..3670a754b1 100644 --- a/education-ai-suite/smart-classroom/content_search/utils/storage_service.py +++ b/education-ai-suite/smart-classroom/content_search/utils/storage_service.py @@ -6,6 +6,7 @@ import uuid import logging from fastapi import UploadFile +from typing import Optional logger = logging.getLogger(__name__) @@ -24,10 +25,10 @@ def _try_initialize(self): self._error_msg = None except (ImportError, ModuleNotFoundError) as e: self._error_msg = f"Component missing: {str(e)}" - logger.error(f"❌ MinIO component load failed: {self._error_msg}") + logger.error(f"MinIO component load failed: {self._error_msg}") except Exception as e: self._error_msg = f"Initialization failed: {str(e)}" - logger.error(f"❌ MinIO connection failed: {self._error_msg}") + logger.error(f"MinIO connection failed: {self._error_msg}") @property def is_available(self) -> bool: @@ -63,7 +64,17 @@ async def get_file_stream(self, file_key: str): response = self._store.client.get_object(self._store.bucket, file_key) return response except Exception as e: - logger.error(f"❌ Failed to get file {file_key}: {str(e)}") + logger.error(f"Failed to get file {file_key}: {str(e)}") + raise e + + async def get_file_content(self, file_key: str, bucket_name: Optional[str] = None) -> bytes: + if not self.is_available: + raise RuntimeError(f"Storage Service is unavailable: {self._error_msg}") + target_bucket = bucket_name or self._store.bucket + try: + return self._store.get_bytes(file_key) + except Exception as e: + logger.error(f"Failed to read content for {file_key}: {str(e)}") raise e storage_service = StorageService() diff --git a/education-ai-suite/smart-classroom/content_search/utils/task_service.py b/education-ai-suite/smart-classroom/content_search/utils/task_service.py index c176781178..15e853e1fd 100644 --- a/education-ai-suite/smart-classroom/content_search/utils/task_service.py +++ b/education-ai-suite/smart-classroom/content_search/utils/task_service.py @@ -11,7 +11,8 @@ from utils.crud_task import task_crud from utils.schemas_task import TaskStatus from utils.search_service import search_service -from utils.video_service import video_service +from utils.video_service import video_service +from utils.storage_service import storage_service from utils.core_models import AITask class TaskService: @@ -35,6 +36,10 @@ async def handle_file_upload( else: task.status = "COMPLETED" task.result = {"message": "Upload only, no ingest requested"} + task.result = { + "message": "Upload only, no ingest requested", + "file_info": minio_payload + } db.commit() return {"task_id": str(task.id), "status": task.status} @@ -65,16 +70,51 @@ async def handle_file_ingest( raise e @staticmethod - async def handle_text_ingest(db: Session, payload: dict, background_tasks: BackgroundTasks): + async def handle_text_ingest(db: Session, request_data: dict, background_tasks: BackgroundTasks): + payload = request_data.copy() + + meta = payload.get("meta", {}) + if "tags" not in meta or not meta["tags"]: + meta["tags"] = ["default"] + payload["meta"] = meta + task = task_crud.create_task( db, task_type="text_ingest", payload=payload, status=TaskStatus.PROCESSING ) + background_tasks.add_task(TaskService.execute_worker_logic, str(task.id)) + return {"task_id": str(task.id), "status": task.status} + @staticmethod + async def handle_sync_search(db: Session, payload: dict): + task = task_crud.create_task( + db, + task_type="file_search", + payload=payload, + status=TaskStatus.PROCESSING + ) + db.commit() + + try: + search_data = await search_service.semantic_search(payload) + task.status = TaskStatus.COMPLETED + task.result = search_data + db.commit() + return { + "task_id": str(task.id), + "status": task.status, + "results": search_data.get("results", []) + } + except Exception as e: + task.status = TaskStatus.FAILED + task.result = {"error": str(e)} + db.commit() + return {"task_id": str(task.id), "status": task.status, "error": str(e)} + @staticmethod def execute_worker_logic(task_id: str): print(f"[BACKGROUND] Starting Ingest for Task {task_id}", flush=True) @@ -82,21 +122,28 @@ def execute_worker_logic(task_id: str): task = db.query(AITask).filter(AITask.id == task_id).first() if not task: return try: - file_key = task.payload.get('file_key') or task.payload.get('video_key') or task.payload.get('file_path') - bucket_name = task.payload.get('bucket_name', "content-search") - + file_key = (task.payload.get('file_key') or + task.payload.get('file_path') or + task.payload.get('video_key') or "") + bucket_name = task.payload.get('bucket_name') is_video = any(file_key.lower().endswith(ext) for ext in ['.mp4', '.avi', '.mov', '.mkv']) if task.task_type == "text_ingest": - # 1. raw text Ingest + text_content = task.payload.get("text") + + if not text_content and file_key: + print(f"[WORKER] Fetching text from MinIO: {file_key}", flush=True) + file_data = asyncio.run(storage_service.get_file_content(file_key, bucket_name)) + text_content = file_data.decode("utf-8") + ai_result = asyncio.run(search_service.ingest_text( - text=task.payload.get("text"), - file_path=task.payload.get("file_path"), - bucket_name=task.payload.get("bucket_name"), + text=text_content, + file_path=file_key, + bucket_name=bucket_name, meta=task.payload.get("meta") )) + else: - # 2. file/video Ingest ai_result = asyncio.run(search_service.trigger_ingest( file_path=file_key, bucket_name=bucket_name @@ -112,7 +159,7 @@ def execute_worker_logic(task_id: str): raw_meta = json.loads(raw_meta) except: raw_meta = {} - + user_tags = raw_meta.get("tags", []) if not user_tags: user_tags = ["default_video"] diff --git a/education-ai-suite/smart-classroom/content_search/utils/video_service.py b/education-ai-suite/smart-classroom/content_search/utils/video_service.py index 280c6b176e..5317af7be5 100644 --- a/education-ai-suite/smart-classroom/content_search/utils/video_service.py +++ b/education-ai-suite/smart-classroom/content_search/utils/video_service.py @@ -28,7 +28,7 @@ async def trigger_summarization( payload = { "minio_video_key": file_key, "reuse_existing": True, - "tags": tags if (tags and len(tags) > 0) else ["video"] + "tags": tags } if prompt is not None: