Skip to content

Commit 004fc65

Browse files
Multi-filetype Support via Markitdown (#269)
Co-authored-by: Josh Bradley <[email protected]>
1 parent 5d2ab18 commit 004fc65

14 files changed

+1421
-719
lines changed

Diff for: backend/graphrag_app/api/data.py

+77-58
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Licensed under the MIT License.
33

44
import asyncio
5+
import hashlib
56
import os
67
import re
78
import traceback
@@ -14,20 +15,25 @@
1415
Depends,
1516
HTTPException,
1617
UploadFile,
18+
status,
1719
)
20+
from markitdown import MarkItDown, StreamInfo
1821

1922
from graphrag_app.logger.load_logger import load_pipeline_logger
2023
from graphrag_app.typing.models import (
2124
BaseResponse,
2225
StorageNameList,
2326
)
2427
from graphrag_app.utils.common import (
28+
check_cache,
29+
create_cache,
2530
delete_cosmos_container_item_if_exist,
2631
delete_storage_container_if_exist,
2732
get_blob_container_client,
2833
get_cosmos_container_store_client,
2934
sanitize_name,
3035
subscription_key_check,
36+
update_cache,
3137
)
3238

3339
data_route = APIRouter(
@@ -42,7 +48,7 @@
4248
"",
4349
summary="Get list of data containers.",
4450
response_model=StorageNameList,
45-
responses={200: {"model": StorageNameList}},
51+
responses={status.HTTP_200_OK: {"model": StorageNameList}},
4652
)
4753
async def get_all_data_containers():
4854
"""
@@ -67,56 +73,66 @@ async def get_all_data_containers():
6773
return StorageNameList(storage_name=items)
6874

6975

70-
async def upload_file_async(
76+
async def upload_file(
7177
upload_file: UploadFile, container_client: ContainerClient, overwrite: bool = True
72-
) -> None:
78+
):
7379
"""
74-
Asynchronously upload a file to the specified blob container.
75-
Silently ignore errors that occur when overwrite=False.
80+
Convert and upload a file to a specified blob container.
81+
82+
Returns a list of objects where each object will have one of the following types:
83+
* Tuple[str, str] - a tuple of (filename, file_hash) for successful uploads
84+
* Tuple[str, None] - a tuple of (filename, None) for failed uploads or
85+
* None for skipped files
7686
"""
77-
blob_client = container_client.get_blob_client(upload_file.filename)
87+
filename = upload_file.filename
88+
extension = os.path.splitext(filename)[1]
89+
converted_filename = filename + ".txt"
90+
converted_blob_client = container_client.get_blob_client(converted_filename)
91+
7892
with upload_file.file as file_stream:
7993
try:
80-
await blob_client.upload_blob(file_stream, overwrite=overwrite)
94+
file_hash = hashlib.sha256(file_stream.read()).hexdigest()
95+
if not await check_cache(file_hash, container_client):
96+
# extract text from file using MarkItDown
97+
md = MarkItDown()
98+
stream_info = StreamInfo(
99+
extension=extension,
100+
)
101+
file_stream._file.seek(0)
102+
file_stream = file_stream._file
103+
result = md.convert_stream(
104+
stream=file_stream,
105+
stream_info=stream_info,
106+
)
107+
108+
# remove illegal unicode characters and upload to blob storage
109+
cleaned_result = _clean_output(result.text_content)
110+
await converted_blob_client.upload_blob(
111+
cleaned_result, overwrite=overwrite
112+
)
113+
114+
# return tuple of (filename, file_hash) to indicate success
115+
return (filename, file_hash)
81116
except Exception:
82-
pass
83-
117+
# if any exception occurs, return a tuple of (filename, None) to indicate conversion/upload failure
118+
return (upload_file.filename, None)
84119

85-
class Cleaner:
86-
def __init__(self, file):
87-
self.file = file
88-
self.name = file.name
89-
self.changes = 0
90120

91-
def clean(self, val, replacement=""):
92-
# fmt: off
93-
_illegal_xml_chars_RE = re.compile(
121+
def _clean_output(val: str, replacement: str = ""):
122+
"""Removes unicode characters that are invalid XML characters (not valid for graphml files at least)."""
123+
# fmt: off
124+
_illegal_xml_chars_RE = re.compile(
94125
"[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]"
95126
)
96-
# fmt: on
97-
self.changes += len(_illegal_xml_chars_RE.findall(val))
98-
return _illegal_xml_chars_RE.sub(replacement, val)
99-
100-
def read(self, n):
101-
return self.clean(self.file.read(n).decode()).encode(
102-
encoding="utf-8", errors="strict"
103-
)
104-
105-
def name(self):
106-
return self.file.name
107-
108-
def __enter__(self):
109-
return self
110-
111-
def __exit__(self, *args):
112-
self.file.close()
127+
# fmt: on
128+
return _illegal_xml_chars_RE.sub(replacement, val)
113129

114130

115131
@data_route.post(
116132
"",
117133
summary="Upload data to a data storage container",
118134
response_model=BaseResponse,
119-
responses={200: {"model": BaseResponse}},
135+
responses={status.HTTP_201_CREATED: {"model": BaseResponse}},
120136
)
121137
async def upload_files(
122138
files: List[UploadFile],
@@ -125,36 +141,33 @@ async def upload_files(
125141
overwrite: bool = True,
126142
):
127143
"""
128-
Create a Azure Storage container and upload files to it.
129-
130-
Args:
131-
files (List[UploadFile]): A list of files to be uploaded.
132-
storage_name (str): The name of the Azure Blob Storage container to which files will be uploaded.
133-
overwrite (bool): Whether to overwrite existing files with the same name. Defaults to True. If False, files that already exist will be skipped.
134-
135-
Returns:
136-
BaseResponse: An instance of the BaseResponse model with a status message indicating the result of the upload.
137-
138-
Raises:
139-
HTTPException: If the container name is invalid or if any error occurs during the upload process.
144+
Create a Azure Storage container (if needed) and upload files. Multiple file types are supported, including pdf, powerpoint, word, excel, html, csv, json, xml, etc.
145+
The complete set of supported file types can be found in the MarkItDown (https://github.com/microsoft/markitdown) library.
140146
"""
141147
try:
142-
# clean files - remove illegal XML characters
143-
files = [UploadFile(Cleaner(f.file), filename=f.filename) for f in files]
144-
145-
# upload files in batches of 1000 to avoid exceeding Azure Storage API limits
148+
# create the initial cache if it doesn't exist
146149
blob_container_client = await get_blob_container_client(
147150
sanitized_container_name
148151
)
149-
batch_size = 1000
152+
await create_cache(blob_container_client)
153+
154+
# process file uploads in batches to avoid exceeding Azure Storage API limits
155+
processing_errors = []
156+
batch_size = 100
150157
num_batches = ceil(len(files) / batch_size)
151158
for i in range(num_batches):
152159
batch_files = files[i * batch_size : (i + 1) * batch_size]
153160
tasks = [
154-
upload_file_async(file, blob_container_client, overwrite)
161+
upload_file(file, blob_container_client, overwrite)
155162
for file in batch_files
156163
]
157-
await asyncio.gather(*tasks)
164+
upload_results = await asyncio.gather(*tasks)
165+
successful_uploads = [r for r in upload_results if r and r[1] is not None]
166+
# update the file cache with successful uploads
167+
await update_cache(successful_uploads, blob_container_client)
168+
# collect failed uploads
169+
failed_uploads = [r[0] for r in upload_results if r and r[1] is None]
170+
processing_errors.extend(failed_uploads)
158171

159172
# update container-store entry in cosmosDB once upload process is successful
160173
cosmos_container_store_client = get_cosmos_container_store_client()
@@ -163,17 +176,23 @@ async def upload_files(
163176
"human_readable_name": container_name,
164177
"type": "data",
165178
})
166-
return BaseResponse(status="File upload successful.")
179+
180+
if len(processing_errors) > 0:
181+
raise HTTPException(
182+
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
183+
detail=f"Error uploading files: {processing_errors}.",
184+
)
185+
return BaseResponse(status="Success.")
167186
except Exception as e:
168187
logger = load_pipeline_logger()
169188
logger.error(
170189
message="Error uploading files.",
171190
cause=e,
172191
stack=traceback.format_exc(),
173-
details={"files": [f.filename for f in files]},
192+
details={"files": processing_errors},
174193
)
175194
raise HTTPException(
176-
status_code=500,
195+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
177196
detail=f"Error uploading files to container '{container_name}'.",
178197
)
179198

@@ -182,7 +201,7 @@ async def upload_files(
182201
"/{container_name}",
183202
summary="Delete a data storage container",
184203
response_model=BaseResponse,
185-
responses={200: {"model": BaseResponse}},
204+
responses={status.HTTP_200_OK: {"model": BaseResponse}},
186205
)
187206
async def delete_files(
188207
container_name: str, sanitized_container_name: str = Depends(sanitize_name)

Diff for: backend/graphrag_app/api/graph.py

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
APIRouter,
99
Depends,
1010
HTTPException,
11+
status,
1112
)
1213
from fastapi.responses import StreamingResponse
1314

@@ -31,6 +32,7 @@
3132
"/graphml/{container_name}",
3233
summary="Retrieve a GraphML file of the knowledge graph",
3334
response_description="GraphML file successfully downloaded",
35+
status_code=status.HTTP_200_OK,
3436
)
3537
async def get_graphml_file(
3638
container_name, sanitized_container_name: str = Depends(sanitize_name)

Diff for: backend/graphrag_app/api/index.py

+12-8
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
Depends,
1313
HTTPException,
1414
UploadFile,
15+
status,
1516
)
1617
from kubernetes import (
1718
client as kubernetes_client,
@@ -49,7 +50,7 @@
4950
"",
5051
summary="Build an index",
5152
response_model=BaseResponse,
52-
responses={200: {"model": BaseResponse}},
53+
responses={status.HTTP_202_ACCEPTED: {"model": BaseResponse}},
5354
)
5455
async def schedule_index_job(
5556
storage_container_name: str,
@@ -71,7 +72,7 @@ async def schedule_index_job(
7172
sanitized_storage_container_name
7273
).exists():
7374
raise HTTPException(
74-
status_code=500,
75+
status_code=status.HTTP_412_PRECONDITION_FAILED,
7576
detail=f"Storage container '{storage_container_name}' does not exist",
7677
)
7778

@@ -101,7 +102,7 @@ async def schedule_index_job(
101102
PipelineJobState(existing_job.status) == PipelineJobState.RUNNING
102103
):
103104
raise HTTPException(
104-
status_code=202, # request has been accepted for processing but is not complete.
105+
status_code=status.HTTP_425_TOO_EARLY, # request has been accepted for processing but is not complete.
105106
detail=f"Index '{index_container_name}' already exists and has not finished building.",
106107
)
107108
# if indexing job is in a failed state, delete the associated K8s job and pod to allow for a new job to be scheduled
@@ -142,7 +143,7 @@ async def schedule_index_job(
142143
"",
143144
summary="Get all index names",
144145
response_model=IndexNameList,
145-
responses={200: {"model": IndexNameList}},
146+
responses={status.HTTP_200_OK: {"model": IndexNameList}},
146147
)
147148
async def get_all_index_names(
148149
container_store_client=Depends(get_cosmos_container_store_client),
@@ -218,7 +219,7 @@ def _delete_k8s_job(job_name: str, namespace: str) -> None:
218219
"/{container_name}",
219220
summary="Delete a specified index",
220221
response_model=BaseResponse,
221-
responses={200: {"model": BaseResponse}},
222+
responses={status.HTTP_200_OK: {"model": BaseResponse}},
222223
)
223224
async def delete_index(
224225
container_name: str,
@@ -257,7 +258,8 @@ async def delete_index(
257258
details={"container": container_name},
258259
)
259260
raise HTTPException(
260-
status_code=500, detail=f"Error deleting '{container_name}'."
261+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
262+
detail=f"Error deleting '{container_name}'.",
261263
)
262264

263265
return BaseResponse(status="Success")
@@ -267,6 +269,7 @@ async def delete_index(
267269
"/status/{container_name}",
268270
summary="Track the status of an indexing job",
269271
response_model=IndexStatusResponse,
272+
status_code=status.HTTP_200_OK,
270273
)
271274
async def get_index_status(
272275
container_name: str, sanitized_container_name: str = Depends(sanitize_name)
@@ -275,7 +278,7 @@ async def get_index_status(
275278
if pipelinejob.item_exist(sanitized_container_name):
276279
pipeline_job = pipelinejob.load_item(sanitized_container_name)
277280
return IndexStatusResponse(
278-
status_code=200,
281+
status_code=status.HTTP_200_OK,
279282
index_name=pipeline_job.human_readable_index_name,
280283
storage_name=pipeline_job.human_readable_storage_name,
281284
status=pipeline_job.status.value,
@@ -284,5 +287,6 @@ async def get_index_status(
284287
)
285288
else:
286289
raise HTTPException(
287-
status_code=404, detail=f"'{container_name}' does not exist."
290+
status_code=status.HTTP_404_NOT_FOUND,
291+
detail=f"'{container_name}' does not exist.",
288292
)

Diff for: backend/graphrag_app/api/prompt_tuning.py

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
APIRouter,
1212
Depends,
1313
HTTPException,
14+
status,
1415
)
1516
from graphrag.config.create_graphrag_config import create_graphrag_config
1617

@@ -27,6 +28,7 @@
2728
"/prompts",
2829
summary="Generate custom graphrag prompts based on user-provided data.",
2930
description="Generating custom prompts from user-provided data may take several minutes to run based on the amount of data used.",
31+
status_code=status.HTTP_200_OK,
3032
)
3133
async def generate_prompts(
3234
container_name: str,

Diff for: backend/graphrag_app/api/query.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
APIRouter,
1111
Depends,
1212
HTTPException,
13+
status,
1314
)
1415
from graphrag.api.query import global_search, local_search
1516
from graphrag.config.create_graphrag_config import create_graphrag_config
@@ -42,7 +43,7 @@
4243
summary="Perform a global search across the knowledge graph index",
4344
description="The global query method generates answers by searching over all AI-generated community reports in a map-reduce fashion. This is a resource-intensive method, but often gives good responses for questions that require an understanding of the dataset as a whole.",
4445
response_model=GraphResponse,
45-
responses={200: {"model": GraphResponse}},
46+
responses={status.HTTP_200_OK: {"model": GraphResponse}},
4647
)
4748
async def global_query(request: GraphRequest):
4849
# this is a slightly modified version of the graphrag.query.cli.run_global_search method
@@ -51,7 +52,7 @@ async def global_query(request: GraphRequest):
5152

5253
if not _is_index_complete(sanitized_index_name):
5354
raise HTTPException(
54-
status_code=500,
55+
status_code=status.HTTP_425_TOO_EARLY,
5556
detail=f"{index_name} not ready for querying.",
5657
)
5758

@@ -122,15 +123,15 @@ async def global_query(request: GraphRequest):
122123
summary="Perform a local search across the knowledge graph index.",
123124
description="The local query method generates answers by combining relevant data from the AI-extracted knowledge-graph with text chunks of the raw documents. This method is suitable for questions that require an understanding of specific entities mentioned in the documents (e.g. What are the healing properties of chamomile?).",
124125
response_model=GraphResponse,
125-
responses={200: {"model": GraphResponse}},
126+
responses={status.HTTP_200_OK: {"model": GraphResponse}},
126127
)
127128
async def local_query(request: GraphRequest):
128129
index_name = request.index_name
129130
sanitized_index_name = sanitize_name(index_name)
130131

131132
if not _is_index_complete(sanitized_index_name):
132133
raise HTTPException(
133-
status_code=500,
134+
status_code=status.HTTP_425_TOO_EARLY,
134135
detail=f"{index_name} not ready for querying.",
135136
)
136137

0 commit comments

Comments
 (0)