-
Notifications
You must be signed in to change notification settings - Fork 72
Expand file tree
/
Copy pathprocess.py
More file actions
227 lines (186 loc) · 9.04 KB
/
process.py
File metadata and controls
227 lines (186 loc) · 9.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import glob
import logging
import json
import os
import sys
from typing import Tuple
from datetime import datetime
import tempfile
from azure.cosmos import CosmosClient, exceptions
from PyPDF2 import PdfReader
from ai_ocr.azure.doc_intelligence import get_ocr_results
from ai_ocr.model import Config
from ai_ocr.azure.images import convert_pdf_into_image
from ai_ocr.azure.openai_ops import load_image, get_size_of_base64_images
from ai_ocr.genai_operations import get_structured_data, get_summary_with_gpt, perform_gpt_evaluation_and_enrichment
def connect_to_cosmos():
endpoint = os.environ['COSMOS_DB_ENDPOINT']
key = os.environ['COSMOS_DB_KEY']
database_name = os.environ['COSMOS_DB_DATABASE_NAME']
container_name = os.environ['COSMOS_DB_CONTAINER_NAME']
client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)
docs_container = database.get_container_client(container_name)
conf_container = database.get_container_client('configuration')
return docs_container, conf_container
def initialize_document(file_name: str, file_size: int, num_pages:int, prompt: str, json_schema: str, request_timestamp: datetime) -> dict:
return {
"id": file_name.replace('/', '__'),
"properties": {
"blob_name": file_name,
"blob_size": file_size,
"request_timestamp": request_timestamp.isoformat(),
"num_pages": num_pages
},
"state": {
"file_landed": False,
"ocr_completed": False,
"gpt_extraction_completed": False,
"gpt_evaluation_completed": False,
"gpt_summary_completed": False,
"processing_completed": False
},
"extracted_data": {
"ocr_output": '',
"gpt_extraction_output": {},
"gpt_extraction_output_with_evaluation": {},
"gpt_summary_output": ''
},
"model_input":{
"model_deployment": os.getenv("AZURE_OPENAI_MODEL_DEPLOYMENT_NAME"),
"model_prompt": prompt,
"example_schema": json_schema
},
"errors": []
}
def update_state(document: dict, container: any, state_name: str, state: bool, processing_time: float = None):
document['state'][state_name] = state
if processing_time is not None:
document['state'][f"{state_name}_time_seconds"] = processing_time
container.upsert_item(document)
def write_blob_to_temp_file(myblob):
file_content = myblob.read()
file_name = myblob.name
temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
os.makedirs(os.path.dirname(temp_file_path), exist_ok=True)
with open(temp_file_path, 'wb') as file_to_write:
file_to_write.write(file_content)
# Get the size of the file
file_size = os.path.getsize(temp_file_path)
# If file is PDF calculate the number of pages in the PDF
if file_name.lower().endswith('.pdf'):
pdf_reader = PdfReader(temp_file_path)
number_of_pages = len(pdf_reader.pages)
else:
number_of_pages = None
return temp_file_path, number_of_pages, file_size
def fetch_model_prompt_and_schema(dataset_type):
docs_container, conf_container = connect_to_cosmos()
try:
config_item = conf_container.read_item(item='configuration', partition_key={})
except exceptions.CosmosResourceNotFoundError:
logging.info("Configuration item not found in Cosmos DB. Creating a new configuration item.")
config_item = {
"id": "configuration"
}
# Get the absolute path of the script's directory and construct the demo folder path
script_dir = os.path.dirname(os.path.abspath(__file__))
demo_folder_path = os.path.abspath(os.path.join(script_dir, '../', 'example-datasets'))
if not os.path.exists(demo_folder_path):
logging.error(f"Demo folder not found at {demo_folder_path}")
raise FileNotFoundError(f"Demo folder not found at {demo_folder_path}")
for folder_name in os.listdir(demo_folder_path):
folder_path = os.path.join(demo_folder_path, folder_name)
if os.path.isdir(folder_path):
item_config = {}
model_prompt = "Default model prompt."
example_schema = {}
# Find any txt file for model prompt
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)
if file_name.endswith('.txt'):
with open(file_path, 'r') as txt_file:
model_prompt = txt_file.read().strip()
break
# Find any json file for example schema
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)
if file_name.endswith('.json'):
with open(file_path, 'r') as json_file:
example_schema = json.load(json_file)
break
# Add item config to config_item
item_config['model_prompt'] = model_prompt
item_config['example_schema'] = example_schema
config_item[folder_name] = item_config
conf_container.create_item(body=config_item)
logging.info("Configuration item created.")
model_prompt = config_item[dataset_type]['model_prompt']
example_schema = config_item[dataset_type]['example_schema']
return model_prompt, example_schema
async def run_ocr_and_gpt(file_to_ocr: str, prompt: str, json_schema: str, document: dict, container: any, config: Config = Config()) -> Tuple[any, dict, dict]:
processing_times = {}
# Get OCR results
ocr_start_time = datetime.now()
ocr_result = get_ocr_results(file_to_ocr)
ocr_processing_time = (datetime.now() - ocr_start_time).total_seconds()
processing_times['ocr_processing_time'] = ocr_processing_time
# Update state after OCR processing
update_state(document, container, 'ocr_completed', True, ocr_processing_time)
# Ensure the /tmp/ directory exists
imgs_path = "/tmp/"
os.makedirs(imgs_path, exist_ok=True)
# Extract images from the PDF
convert_pdf_into_image(file_to_ocr)
# Determine the path for the temporary images
imgs = glob.glob(f"{imgs_path}/page*.png")
# Limit images by config
imgs = imgs[:config.max_images]
imgs = [load_image(img) for img in imgs]
# Check and reduce images total size if over 20MB
max_size = config.gpt_vision_limit_mb * 1024 * 1024 # 20MB
while get_size_of_base64_images(imgs) > max_size:
imgs.pop()
# Get structured data
gpt_extraction_start_time = datetime.now()
structured = await get_structured_data(ocr_result.content, prompt, json_schema, imgs)
gpt_extraction_time = (datetime.now() - gpt_extraction_start_time).total_seconds()
processing_times['gpt_extraction_time'] = gpt_extraction_time
# Update state after GPT extraction
update_state(document, container, 'gpt_extraction_completed', True, gpt_extraction_time)
# Parse structured data
stripped_content = structured.content.strip()
extracted_data = json.loads(stripped_content)
# Perform GPT evaluation and enrichment
evaluation_start_time = datetime.now()
enriched_data = await perform_gpt_evaluation_and_enrichment(imgs, extracted_data, json_schema)
evaluation_time = (datetime.now() - evaluation_start_time).total_seconds()
processing_times['gpt_evaluation_time'] = evaluation_time
# Update state after GPT evaluation
update_state(document, container, 'gpt_evaluation_completed', True, evaluation_time)
# Delete all generated images created after processing
for img_path in glob.glob(f"{imgs_path}/page*.png"):
try:
os.remove(img_path)
print(f"Deleted image: {img_path}")
except Exception as e:
print(f"Error deleting image {img_path}: {e}")
return ocr_result.content, json.dumps(extracted_data), json.dumps(enriched_data), processing_times
async def process_gpt_summary(ocr_response, document, container):
try:
classification = 'N/A'
try:
classification = ocr_response.categorization
except AttributeError:
logging.warning("Cannot find 'categorization' in output schema! Logging it as N/A...")
summary_start_time = datetime.now()
gpt_summary = await get_summary_with_gpt(ocr_response)
summary_processing_time = (datetime.now() - summary_start_time).total_seconds()
update_state(document, container, 'gpt_summary_completed', True, summary_processing_time)
document['extracted_data']['classification'] = classification
document['extracted_data']['gpt_summary_output'] = gpt_summary.content
except Exception as e:
document['errors'].append(f"NL processing error: {str(e)}")
update_state(document, container, 'gpt_summary_completed', False)
sys.exit(1) # Exit with error
raise