Skip to content

Commit c39f6bf

Browse files
authored
Melhora tratamento de erros para tipos de arquivo não suportados (#109)
## 🎯 Objetivo Reduzir a verbosidade dos logs para arquivos com tipos não suportados (especialmente ZIP) e melhorar a eficiência evitando tentativas de processamento desnecessárias. ## 📋 Problema Arquivos ZIP e outros tipos não suportados geravam logs extremamente verbosos: - WARNING com o caminho do arquivo - ERROR com a mensagem - Stack trace completo com ~15 linhas Exemplo: ``` WARNING:root:Could not process gazette: 5107602/2009-02-27/e9642a24f98c20f0ce2ef5637d65c6abb0bdf854.zip. Cause: Unsupported file type: application/zip ERROR:root:Unsupported file type: application/zip Traceback (most recent call last): [Stack trace completo] ``` ## ✅ Solução Implementada ### 1. Nova Exceção Customizada - `UnsupportedFileTypeError` criada em `data_extraction/text_extraction.py` - Exportada no módulo para uso consistente no projeto - Permite tratamento específico de erros de tipos não suportados ### 2. Detecção Antecipada de ZIP - Novo método `is_zip()` na classe `ApacheTikaTextExtractor` - Verifica tipo de arquivo ANTES de tentar processar - Log específico: "Skipping unsupported ZIP file: {file_path}" - Evita processamento desnecessário ### 3. Tratamento de Erros Simplificado - `UnsupportedFileTypeError` capturado separadamente - Apenas WARNING logado (sem stack trace) - Outros erros mantêm log completo para debugging ## 📊 Resultado **Antes:** ``` WARNING:root:Could not process gazette: [...].zip. Cause: Unsupported file type: application/zip ERROR:root:Unsupported file type: application/zip Traceback (most recent call last): [~15 linhas de stack trace] ``` **Depois:** ``` WARNING:root:Skipping unsupported ZIP file: [...].zip WARNING:root:Could not process gazette: [...].zip. Cause: Unsupported file type: application/zip ``` ## 📝 Arquivos Modificados - `data_extraction/text_extraction.py` - Nova exceção e método `is_zip()` - `data_extraction/__init__.py` - Export da exceção - `tasks/gazette_text_extraction.py` - Detecção antecipada e tratamento de erros ## 🧪 Testes - Sintaxe Python validada com `py_compile` - Imports verificados - Lógica de fluxo mantida inalterada
2 parents cc88b36 + 7762d21 commit c39f6bf

6 files changed

Lines changed: 131 additions & 84 deletions

File tree

data_extraction/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
from .interfaces import TextExtractorInterface
2-
from .text_extraction import ApacheTikaTextExtractor, create_apache_tika_text_extraction
2+
from .text_extraction import (
3+
ApacheTikaTextExtractor,
4+
UnsupportedFileTypeError,
5+
create_apache_tika_text_extraction,
6+
)
37

48
__all__ = [
59
"ApacheTikaTextExtractor",
10+
"UnsupportedFileTypeError",
611
"create_apache_tika_text_extraction",
712
"TextExtractorInterface",
813
]

data_extraction/text_extraction.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88
from .interfaces import TextExtractorInterface
99

1010

11+
class UnsupportedFileTypeError(Exception):
12+
"""Exception raised when a file type is not supported for text extraction."""
13+
14+
pass
15+
16+
1117
class ApacheTikaTextExtractor(TextExtractorInterface):
1218
def __init__(self, url: str):
1319
self._url = url
@@ -28,7 +34,7 @@ def _try_extract_text(self, filepath: str) -> str:
2834
"""
2935
if self.is_txt(filepath):
3036
return self._return_file_content(filepath)
31-
37+
3238
try:
3339
with open(filepath, "rb") as file:
3440
headers = {
@@ -37,19 +43,19 @@ def _try_extract_text(self, filepath: str) -> str:
3743
}
3844
# Use streaming to prevent loading entire file in memory
3945
response = requests.put(
40-
f"{self._url}/tika",
41-
data=file,
46+
f"{self._url}/tika",
47+
data=file,
4248
headers=headers,
43-
stream=False # Tika requires full upload, but we stream the read
49+
stream=False, # Tika requires full upload, but we stream the read
4450
)
4551
response.encoding = "UTF-8"
4652
text = response.text
47-
53+
4854
# Explicit cleanup to free memory immediately
4955
response.close()
5056
del response
5157
gc.collect()
52-
58+
5359
return text
5460
except Exception as e:
5561
# Ensure cleanup even on error
@@ -70,12 +76,13 @@ def check_file_exists(self, filepath: str):
7076
raise Exception(f"File does not exists: {filepath}")
7177

7278
def check_file_type_supported(self, filepath: str) -> None:
79+
file_type = self.get_file_type(filepath)
7380
if (
7481
not self.is_doc(filepath)
7582
and not self.is_pdf(filepath)
7683
and not self.is_txt(filepath)
7784
):
78-
raise Exception("Unsupported file type: " + self.get_file_type(filepath))
85+
raise UnsupportedFileTypeError(f"Unsupported file type: {file_type}")
7986

8087
def is_pdf(self, filepath):
8188
"""
@@ -115,6 +122,13 @@ def is_file_type(self, filepath, file_types):
115122
"""
116123
return self.get_file_type(filepath) in file_types
117124

125+
def is_zip(self, filepath):
126+
"""
127+
If the file type is zip returns True. Otherwise,
128+
returns False
129+
"""
130+
return self.is_file_type(filepath, file_types=["application/zip"])
131+
118132

119133
def get_apache_tika_server_url():
120134
return os.environ["APACHE_TIKA_SERVER"]

tasks/gazette_text_extraction.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pathlib import Path
1212
from typing import Any, Dict, Iterable, List, Union
1313

14-
from data_extraction import TextExtractorInterface
14+
from data_extraction import TextExtractorInterface, UnsupportedFileTypeError
1515
from database import DatabaseInterface
1616
from index import IndexInterface
1717
from segmentation import get_segmenter
@@ -38,19 +38,23 @@ def extract_text_from_gazettes(
3838

3939
ids = []
4040
processed_count = 0
41-
41+
4242
for gazette in gazettes:
4343
try:
4444
document_ids = try_process_gazette_file(
4545
gazette, territories, database, storage, index, text_extractor
4646
)
4747
ids.extend(document_ids)
4848
processed_count += 1
49-
49+
5050
# Log progress periodically
5151
if processed_count % 10 == 0:
5252
logging.info(f"Processed {processed_count} gazettes")
53-
53+
54+
except UnsupportedFileTypeError as e:
55+
logging.warning(
56+
f"Could not process gazette: {gazette['file_path']}. Cause: {e}"
57+
)
5458
except Exception as e:
5559
logging.warning(
5660
f"Could not process gazette: {gazette['file_path']}. Cause: {e}"
@@ -59,7 +63,7 @@ def extract_text_from_gazettes(
5963
finally:
6064
# Clear gazette data from memory after processing
6165
gazette.clear()
62-
66+
6367
# Force GC every 10 documents to prevent memory accumulation
6468
if processed_count % 10 == 0:
6569
gc.collect()
@@ -82,23 +86,28 @@ def try_process_gazette_file(
8286
"""
8387
logging.debug(f"Processing gazette {gazette['file_path']}")
8488
gazette_file = None
85-
89+
8690
try:
8791
gazette_file = download_gazette_file(gazette, storage)
88-
92+
93+
# Check if file is ZIP - not supported, skip processing
94+
if text_extractor.is_zip(gazette_file):
95+
logging.warning(f"Skipping unsupported ZIP file: {gazette['file_path']}")
96+
raise UnsupportedFileTypeError("application/zip")
97+
8998
# Check file size to prevent OOM on very large files
9099
file_size = os.path.getsize(gazette_file)
91100
if file_size > MAX_FILE_SIZE_BYTES:
92101
raise Exception(
93102
f"File too large ({file_size / 1024 / 1024:.2f}MB > {MAX_FILE_SIZE_MB}MB): {gazette['file_path']}"
94103
)
95-
104+
96105
gazette["source_text"] = try_to_extract_content(gazette_file, text_extractor)
97106
gazette["url"] = define_file_url(gazette["file_path"])
98107
gazette_txt_path = define_gazette_txt_path(gazette)
99108
gazette["file_raw_txt"] = define_file_url(gazette_txt_path)
100109
upload_raw_text(gazette_txt_path, gazette["source_text"], storage)
101-
110+
102111
# Delete file ASAP to free disk space
103112
delete_gazette_files(gazette_file)
104113
gazette_file = None
@@ -114,18 +123,18 @@ def try_process_gazette_file(
114123
upload_raw_text(segment_txt_path, segment["source_text"], storage)
115124
index.index_document(segment, document_id=segment["file_checksum"])
116125
document_ids.append(segment["file_checksum"])
117-
126+
118127
# Clear segment data from memory
119128
segment.clear()
120-
129+
121130
# Clear segments list
122131
del territory_segments
123132
else:
124133
index.index_document(gazette, document_id=gazette["file_checksum"])
125134
document_ids.append(gazette["file_checksum"])
126135

127136
set_gazette_as_processed(gazette, database)
128-
137+
129138
# Clear gazette source_text from memory (large string)
130139
if "source_text" in gazette:
131140
del gazette["source_text"]
@@ -138,7 +147,7 @@ def try_process_gazette_file(
138147
os.remove(gazette_file)
139148
except Exception as e:
140149
logging.warning(f"Failed to cleanup temp file {gazette_file}: {e}")
141-
150+
142151
# Force garbage collection after each document
143152
gc.collect()
144153

tasks/list_gazettes_to_be_processed.py

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def get_gazettes_extracted_since_yesterday(
3232
Uses pagination to prevent loading all data into memory at once (OOM prevention)
3333
"""
3434
logging.info("Listing gazettes extracted since yesterday (paginated)")
35-
35+
3636
offset = 0
3737
while True:
3838
command = f"""
@@ -61,19 +61,21 @@ def get_gazettes_extracted_since_yesterday(
6161
LIMIT {QUERY_PAGE_SIZE} OFFSET {offset}
6262
;
6363
"""
64-
64+
6565
page_results = list(database.select(command))
66-
66+
6767
if not page_results:
6868
break
69-
70-
logging.debug(f"Processing page with {len(page_results)} gazettes (offset={offset})")
71-
69+
70+
logging.debug(
71+
f"Processing page with {len(page_results)} gazettes (offset={offset})"
72+
)
73+
7274
for gazette in page_results:
7375
yield format_gazette_data(gazette)
74-
76+
7577
offset += QUERY_PAGE_SIZE
76-
78+
7779
# If we got fewer results than page size, we're done
7880
if len(page_results) < QUERY_PAGE_SIZE:
7981
break
@@ -87,7 +89,7 @@ def get_all_gazettes_extracted(
8789
Uses pagination to prevent loading all data into memory at once (OOM prevention)
8890
"""
8991
logging.info("Listing all gazettes extracted (paginated)")
90-
92+
9193
offset = 0
9294
while True:
9395
command = f"""
@@ -114,19 +116,21 @@ def get_all_gazettes_extracted(
114116
LIMIT {QUERY_PAGE_SIZE} OFFSET {offset}
115117
;
116118
"""
117-
119+
118120
page_results = list(database.select(command))
119-
121+
120122
if not page_results:
121123
break
122-
123-
logging.debug(f"Processing page with {len(page_results)} gazettes (offset={offset})")
124-
124+
125+
logging.debug(
126+
f"Processing page with {len(page_results)} gazettes (offset={offset})"
127+
)
128+
125129
for gazette in page_results:
126130
yield format_gazette_data(gazette)
127-
131+
128132
offset += QUERY_PAGE_SIZE
129-
133+
130134
# If we got fewer results than page size, we're done
131135
if len(page_results) < QUERY_PAGE_SIZE:
132136
break
@@ -140,7 +144,7 @@ def get_unprocessed_gazettes(
140144
Uses pagination to prevent loading all data into memory at once (OOM prevention)
141145
"""
142146
logging.info("Listing unprocessed gazettes (paginated)")
143-
147+
144148
offset = 0
145149
while True:
146150
command = f"""
@@ -169,19 +173,21 @@ def get_unprocessed_gazettes(
169173
LIMIT {QUERY_PAGE_SIZE} OFFSET {offset}
170174
;
171175
"""
172-
176+
173177
page_results = list(database.select(command))
174-
178+
175179
if not page_results:
176180
break
177-
178-
logging.debug(f"Processing page with {len(page_results)} unprocessed gazettes (offset={offset})")
179-
181+
182+
logging.debug(
183+
f"Processing page with {len(page_results)} unprocessed gazettes (offset={offset})"
184+
)
185+
180186
for gazette in page_results:
181187
yield format_gazette_data(gazette)
182-
188+
183189
offset += QUERY_PAGE_SIZE
184-
190+
185191
# If we got fewer results than page size, we're done
186192
if len(page_results) < QUERY_PAGE_SIZE:
187193
break

tests/list_gazettes_pagination_tests.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import os
1212
from datetime import date, datetime
1313
from unittest import TestCase
14-
from unittest.mock import MagicMock, call, patch
14+
from unittest.mock import MagicMock, patch
1515

1616
from tasks.list_gazettes_to_be_processed import (
1717
get_all_gazettes_extracted,
@@ -30,7 +30,7 @@ class GazettesListingPaginationTests(TestCase):
3030
def setUp(self):
3131
"""Setup comum para todos os testes"""
3232
self.database_mock = MagicMock()
33-
33+
3434
# Mock data - simula resultados do banco
3535
self.sample_gazette_row = (
3636
1, # id
@@ -110,8 +110,12 @@ def test_get_unprocessed_gazettes_queries_contain_limit_and_offset(self):
110110
self.assertIn("OFFSET 0", sql_command, "SQL deve conter 'OFFSET 0'")
111111

112112
# CRÍTICO: Verifica que NÃO usa placeholders de parâmetros
113-
self.assertNotIn("%(limit)s", sql_command, "SQL não deve usar placeholder %(limit)s")
114-
self.assertNotIn("%(offset)s", sql_command, "SQL não deve usar placeholder %(offset)s")
113+
self.assertNotIn(
114+
"%(limit)s", sql_command, "SQL não deve usar placeholder %(limit)s"
115+
)
116+
self.assertNotIn(
117+
"%(offset)s", sql_command, "SQL não deve usar placeholder %(offset)s"
118+
)
115119

116120
@patch.dict("os.environ", {"GAZETTE_QUERY_PAGE_SIZE": "3"})
117121
def test_get_unprocessed_gazettes_stops_when_no_more_results(self):
@@ -167,7 +171,7 @@ def test_get_unprocessed_gazettes_offset_increments_correctly(self):
167171

168172
# Verifica os OFFSETs nas chamadas
169173
calls = self.database_mock.select.call_args_list
170-
174+
171175
sql_1 = calls[0][0][0]
172176
sql_2 = calls[1][0][0]
173177
sql_3 = calls[2][0][0]
@@ -288,20 +292,20 @@ class GazettesListingRegressionTests(TestCase):
288292
def test_select_method_signature_compatibility(self):
289293
"""
290294
REGRESSÃO: Garante que select() é sempre chamado com a assinatura correta
291-
295+
292296
Este teste falha se tentarmos passar parâmetros extras para select(),
293297
prevenindo a regressão do bug original:
294298
TypeError: PostgreSQL.select() takes 2 positional arguments but 3 were given
295299
"""
296300
database_mock = MagicMock()
297-
301+
298302
# Configura o mock para aceitar APENAS 1 argumento
299303
def strict_select(command):
300304
"""Mock que rejeita chamadas com mais de 1 argumento"""
301305
if not isinstance(command, str):
302306
raise TypeError("select() expects a string command")
303307
return []
304-
308+
305309
database_mock.select.side_effect = strict_select
306310

307311
# Se o código tentar passar parâmetros extras, este teste falhará
@@ -325,6 +329,7 @@ def test_sql_injection_safety_numeric_values(self):
325329

326330
# Verifica que LIMIT e OFFSET são números inteiros no SQL
327331
import re
332+
328333
limit_match = re.search(r"LIMIT\s+(\d+)", sql_command)
329334
offset_match = re.search(r"OFFSET\s+(\d+)", sql_command)
330335

0 commit comments

Comments
 (0)