-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspreadsheet_chunker.py
305 lines (265 loc) · 13.6 KB
/
spreadsheet_chunker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import logging
import os
import time
from io import BytesIO
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from tabulate import tabulate
from .base_chunker import BaseChunker
class SpreadsheetChunker(BaseChunker):
"""
`SpreadsheetChunker` processes and chunks spreadsheet content, such as Excel files, into manageable pieces for analysis and summarization.
It handles both chunking by rows or sheets, allowing users to specify whether to include header rows in each chunk, and ensures that
the content size does not exceed a specified token limit.
The class supports the following operations:
- Converts spreadsheets into chunkable content.
- Provides options to chunk either by row or by sheet.
- Includes optional header rows in chunks.
- Summarizes large sheets if the content exceeds the maximum chunk size.
Initialization:
---------------
The `SpreadsheetChunker` is initialized with the following parameters:
- `data` (dict): A dictionary containing the JSON content to be chunked.
- `max_chunk_size` (int, optional): The maximum size of each chunk in tokens. Defaults to 2048 tokens or the value specified in the
`SPREADSHEET_NUM_TOKENS` environment variable.
- `chunking_by_row` (bool, optional): Whether to chunk by row instead of by sheet. Defaults to False or the value specified in the
`SPREADSHEET_CHUNKING_BY_ROW` environment variable.
- `include_header_in_chunks` (bool, optional): Whether to include header rows in each row-based chunk. Defaults to False or the value
specified in the `SPREADSHEET_INCLUDE_HEADER_IN_CHUNKS` environment variable.
Attributes:
-----------
- `max_chunk_size` (int): The maximum allowed size of each chunk in tokens, derived from the `SPREADSHEET_NUM_TOKENS`
environment variable (default is 2048 tokens).
- `chunking_by_row` (bool): Whether to chunk by row instead of by sheet, derived from the `SPREADSHEET_CHUNKING_BY_ROW`
environment variable (default is False).
- `include_header_in_chunks` (bool): Whether to include header rows in each row-based chunk, derived from the `SPREADSHEET_INCLUDE_HEADER_IN_CHUNKS`
environment variable (default is False).
Methods:
--------
- `get_chunks()`: Splits the spreadsheet content into manageable chunks, based on the configuration.
- `_spreadsheet_process()`: Extracts and processes data from each sheet, including summaries if necessary.
- `_get_sheet_data(sheet)`: Retrieves data and headers from the given sheet, handling empty cells.
- `_clean_markdown_table(table_str)`: Cleans up Markdown table strings by removing excessive whitespace.
"""
def __init__(
self,
data: dict,
max_chunk_size: int | None = None,
chunking_by_row: bool | None = None,
include_header_in_chunks: bool | None = None,
):
super().__init__(data)
self.max_chunk_size = max_chunk_size or int(
os.getenv("SPREADSHEET_NUM_TOKENS", "2048")
)
self.chunking_by_row = (
os.getenv("SPREADSHEET_CHUNKING_BY_ROW", "false").lower()
in ["true", "1", "yes"]
if chunking_by_row is None
else bool(chunking_by_row)
)
self.include_header_in_chunks = (
os.getenv("SPREADSHEET_CHUNKING_BY_ROW_INCLUDE_HEADER", "false").lower()
in ["true", "1", "yes"]
if include_header_in_chunks is None
else bool(include_header_in_chunks)
)
def get_chunks(self) -> list[dict]:
"""
Splits the spreadsheet content into smaller chunks. Depending on the configuration, chunks can be created by sheet or by row.
- If chunking by sheet, the method summarizes content that exceeds the maximum chunk size.
- If chunking by row, each row is processed into its own chunk, optionally including the header row.
Returns:
list[dict]: A list of dictionaries, each representing a chunk of the document.
"""
chunks = []
logging.info(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] Running `get_chunks`."
)
total_start_time = time.time()
sheets = self._spreadsheet_process()
logging.info(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] Workbook has {len(sheets)} sheets."
)
chunk_id = 0
for sheet in sheets:
# Original behaviour: Chunk per sheet
if not self.chunking_by_row:
logging.info(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] [{sheet['name']}] Starting sheet-wise chunking."
)
start_time = time.time()
chunk_id += 1
logging.debug(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] [{sheet['name']}] Started processing chunk {chunk_id} (sheet)."
)
table_content = sheet["table"]
table_content = self._clean_markdown_table(table_content)
table_tokens = self.token_estimator.estimate_tokens(table_content)
if self.max_chunk_size > 0 and table_tokens > self.max_chunk_size:
logging.info(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] [{sheet['name']}] Table has {table_tokens} tokens. Max tokens is {self.max_chunk_size}. Using summary instead."
)
table_content = sheet["summary"]
chunk_dict = self._create_chunk(
chunk_id=chunk_id,
content=table_content,
summary=sheet["summary"],
embedding_text=(
sheet["summary"] if sheet["summary"] else table_content
),
title=sheet["name"],
)
chunks.append(chunk_dict)
elapsed_time = time.time() - start_time
logging.debug(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] [{sheet['name']}] Processed chunk {chunk_id} in {elapsed_time:.2f} seconds."
)
# Alternate behaviour: Chunk per row
else:
logging.info(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] [{sheet['name']}] Starting row-wise chunking."
)
headers = sheet.get("headers", [])
rows = sheet.get("data", [])
for row_index, row in enumerate(rows, start=1):
if not any(cell.strip() for cell in row):
continue
start_time = time.time()
chunk_id += 1
logging.debug(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] [{sheet['name']}] Processing chunk {chunk_id} for row {row_index}."
)
if self.include_header_in_chunks:
table_content = tabulate(
[headers, row], headers="firstrow", tablefmt="github"
)
else:
table_content = tabulate(
[row], headers=headers, tablefmt="github"
)
table_content = self._clean_markdown_table(table_content)
table_tokens = self.token_estimator.estimate_tokens(table_content)
summary = ""
if self.max_chunk_size > 0 and table_tokens > self.max_chunk_size:
logging.info(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] [{sheet['name']}] Row table has {table_tokens} tokens. Max tokens is {self.max_chunk_size}. Truncating content."
)
content = table_content
embedding_text = table_content
else:
content = table_content
embedding_text = table_content
chunk_dict = self._create_chunk(
chunk_id=chunk_dict,
content=content,
summary=summary,
embedding_text=embedding_text,
title=f"{sheet['name']} — Row {row_index}",
)
chunks.append(chunk_dict)
elapsed_time = time.time() - start_time
logging.debug(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] [{sheet['name']}] Processed chunk {chunk_id} in {elapsed_time:.2f} seconds."
)
total_elapsed_time = time.time() - total_start_time
logging.debug(
f"[SpreadsheetChunker] [get_chunks] [{self.filename}] Finished `get_chunks`. Created {len(chunks)} chunks in {total_elapsed_time:.2f} seconds."
)
return chunks
def _spreadsheet_process(self) -> list[dict]:
"""
Extracts and processes each sheet from the spreadsheet, converting the content into Markdown table format.
If chunking by sheet, a summary is generated if the sheet's content exceeds the maximum token size.
Returns:
list[dict]: A list of dictionaries, where each dictionary contains sheet metadata, headers, rows, table content, and a summary if applicable.
"""
logging.debug(
f"[SpreadsheetChunker] [_spreadsheet_process] [{self.filename}] Starting blob download."
)
blob_data = self.document_bytes
blob_stream = BytesIO(blob_data)
logging.debug(
f"[SpreadsheetChunker] [_spreadsheet_process] [{self.filename}] Starting `openpyxl` `load_workbook`."
)
workbook = load_workbook(blob_stream, data_only=True)
sheets = []
total_start_time = time.time()
for sheet_name in workbook.sheetnames:
logging.info(
f"[SpreadsheetChunker] [_spreadsheet_process] [{self.filename}] [{sheet_name}] Started processing."
)
start_time = time.time()
sheet_dict = {}
sheet_dict["name"] = sheet_name
sheet = workbook[sheet_name]
data, headers = self._get_sheet_data(sheet)
sheet_dict["headers"] = headers
sheet_dict["data"] = data
table_str = tabulate(data, headers=headers, tablefmt="grid")
table = self._clean_markdown_table(table_str)
sheet_dict["table"] = table
# Chunk by sheets
if not self.chunking_by_row:
prompt = f"Summarize the table with data in it, by understanding the information clearly.\n table_data: {table}"
summary = self.aoai_client.get_completion(prompt, max_tokens=2048)
sheet_dict["summary"] = summary
logging.debug(
f"[SpreadsheetChunker] [_spreadsheet_process] [{self.filename}] [{sheet_dict['name']}] Generating summary."
)
# Chunk by rows
else:
sheet_dict["summary"] = ""
logging.debug(
f"[SpreadsheetChunker] [_spreadsheet_process] [{self.filename}] [{sheet_dict['name']}] Skipped summary generation (chunking by row)."
)
elapsed_time = time.time() - start_time
logging.info(
f"[SpreadsheetChunker] [_spreadsheet_process] [{self.filename}] [{sheet_dict['name']}] Processed in {elapsed_time:.2f} seconds."
)
sheets.append(sheet_dict)
total_elapsed_time = time.time() - total_start_time
logging.info(
f"[SpreadsheetChunker] [_spreadsheet_process] [{self.filename}] Total processing time: {total_elapsed_time:.2f} seconds."
)
return sheets
def _get_sheet_data(self, sheet: Worksheet) -> tuple:
"""
Retrieves data and headers from the given sheet. Each row's data is processed into a list format, ensuring that empty rows are excluded.
Args:
sheet (Worksheet): The `Worksheet` object to extract data from.
Returns:
tuple: A tuple containing a list of row data and a list of headers.
"""
data = []
for row in sheet.iter_rows(
min_row=2
): # start from the second row to skip headers
row_data = []
for cell in row:
cell_value = cell.value if cell.value is not None else ""
cell_text = str(cell_value)
row_data.append(cell_text)
if "".join(row_data).strip() != "":
data.append(row_data)
headers = [cell.value if cell.value is not None else "" for cell in sheet[1]]
return data, headers
def _clean_markdown_table(self, table_str: str) -> str:
"""
Cleans up a Markdown table string by removing excessive whitespace from each cell.
Args:
table_str (str): The Markdown table string to be cleaned.
Returns:
str: The cleaned Markdown table string with reduced whitespace.
"""
cleaned_lines = []
lines = table_str.splitlines()
for line in lines:
if set(line.strip()) <= set("-| "):
cleaned_lines.append(line)
continue
cells = line.split("|")
stripped_cells = [cell.strip() for cell in cells[1:-1]]
cleaned_lines = "| " + " | ".join(stripped_cells) + " |"
cleaned_lines.append(cleaned_lines)
return "\n".join(cleaned_lines)