Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions storage/digital_ocean_spaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ def upload_content(

if isinstance(content_to_be_uploaded, str):
f = BytesIO(content_to_be_uploaded.encode())
self._client.upload_fileobj(
f, self._bucket, file_key
)
self._client.upload_fileobj(f, self._bucket, file_key)
# Explicit cleanup
f.close()
else:
Expand All @@ -119,9 +117,7 @@ def upload_file(
permission: str = "public-read",
) -> None:
logging.debug(f"Uploading {file_key}")
self._client.upload_file(
file_path, self._bucket, file_key
)
self._client.upload_file(file_path, self._bucket, file_key)

def upload_file_multipart(
self,
Expand Down
46 changes: 41 additions & 5 deletions tasks/gazette_text_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,20 @@ def try_process_gazette_file(
)

gazette["source_text"] = try_to_extract_content(gazette_file, text_extractor)
gazette["url"] = define_file_url(gazette["file_path"])
gazette_txt_path = define_gazette_txt_path(gazette)
gazette["file_raw_txt"] = define_file_url(gazette_txt_path)

# Store relative paths instead of full URLs (controlled by feature flag)
use_relative_paths = (
os.environ.get("USE_RELATIVE_FILE_PATHS", "false").lower() == "true"
)
if use_relative_paths:
gazette["url"] = gazette["file_path"] # Relative path only
gazette["file_raw_txt"] = gazette_txt_path # Relative path only
else:
# Legacy behavior: store full URLs
gazette["url"] = define_file_url(gazette["file_path"])
gazette["file_raw_txt"] = define_file_url(gazette_txt_path)

upload_raw_text(gazette_txt_path, gazette["source_text"], storage)

# Delete file ASAP to free disk space
Expand All @@ -143,9 +154,23 @@ def try_process_gazette_file(

for segment in territory_segments:
segment_txt_path = define_segment_txt_path(segment)
segment["file_raw_txt"] = define_file_url(segment_txt_path)

# Store relative path for segments (controlled by feature flag)
use_relative_paths = (
os.environ.get("USE_RELATIVE_FILE_PATHS", "false").lower() == "true"
)
if use_relative_paths:
segment["file_raw_txt"] = segment_txt_path # Relative path only
else:
# Legacy behavior: store full URL
segment["file_raw_txt"] = define_file_url(segment_txt_path)

upload_raw_text(segment_txt_path, segment["source_text"], storage)
index.index_document(segment, document_id=segment["file_checksum"])
# Create a copy before indexing to avoid issues with mock references in tests
segment_to_index = dict(segment)
index.index_document(
segment_to_index, document_id=segment["file_checksum"]
)
document_ids.append(segment["file_checksum"])

# Clear segment data from memory
Expand All @@ -154,7 +179,10 @@ def try_process_gazette_file(
# Clear segments list
del territory_segments
else:
index.index_document(gazette, document_id=gazette["file_checksum"])
# Create a copy before indexing to avoid issues with mock references in tests
# and to preserve data integrity during concurrent operations
gazette_to_index = dict(gazette)
index.index_document(gazette_to_index, document_id=gazette["file_checksum"])
document_ids.append(gazette["file_checksum"])

set_gazette_as_processed(gazette, database)
Expand Down Expand Up @@ -215,6 +243,10 @@ def define_segment_txt_path(segment: Dict):
def define_file_url(path: str):
"""
Joins the storage endpoint with the path to form the URL

DEPRECATED: This function will be removed in a future version.
With USE_RELATIVE_FILE_PATHS=true, paths are stored without endpoints.
The API will handle endpoint concatenation dynamically.
"""
file_endpoint = get_file_endpoint()
return f"{file_endpoint}/{path}"
Expand All @@ -223,6 +255,10 @@ def define_file_url(path: str):
def get_file_endpoint() -> str:
"""
Get the endpoint where the gazette files can be downloaded.

DEPRECATED: This function will be removed in a future version.
The QUERIDO_DIARIO_FILES_ENDPOINT should be used in the API layer,
not in data processing.
"""
return os.environ["QUERIDO_DIARIO_FILES_ENDPOINT"]

Expand Down
27 changes: 18 additions & 9 deletions tasks/list_gazettes_to_be_processed.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def get_gazettes_extracted_since_yesterday(
"""
logging.info("Listing gazettes extracted since yesterday (paginated)")

# Read page size dynamically to allow test mocking
page_size = int(os.environ.get("GAZETTE_QUERY_PAGE_SIZE", DEFAULT_PAGE_SIZE))

offset = 0
while True:
command = f"""
Expand All @@ -59,7 +62,7 @@ def get_gazettes_extracted_since_yesterday(
scraped_at > current_timestamp - interval '1 day'
AND gazettes.file_path NOT LIKE '%.zip'
ORDER BY gazettes.id
LIMIT {QUERY_PAGE_SIZE} OFFSET {offset}
LIMIT {page_size} OFFSET {offset}
;
"""

Expand All @@ -75,10 +78,10 @@ def get_gazettes_extracted_since_yesterday(
for gazette in page_results:
yield format_gazette_data(gazette)

offset += QUERY_PAGE_SIZE
offset += page_size

# If we got fewer results than page size, we're done
if len(page_results) < QUERY_PAGE_SIZE:
if len(page_results) < page_size:
break


Expand All @@ -91,6 +94,9 @@ def get_all_gazettes_extracted(
"""
logging.info("Listing all gazettes extracted (paginated)")

# Read page size dynamically to allow test mocking
page_size = int(os.environ.get("GAZETTE_QUERY_PAGE_SIZE", DEFAULT_PAGE_SIZE))

offset = 0
while True:
command = f"""
Expand All @@ -116,7 +122,7 @@ def get_all_gazettes_extracted(
WHERE
gazettes.file_path NOT LIKE '%.zip'
ORDER BY gazettes.id
LIMIT {QUERY_PAGE_SIZE} OFFSET {offset}
LIMIT {page_size} OFFSET {offset}
;
"""

Expand All @@ -132,10 +138,10 @@ def get_all_gazettes_extracted(
for gazette in page_results:
yield format_gazette_data(gazette)

offset += QUERY_PAGE_SIZE
offset += page_size

# If we got fewer results than page size, we're done
if len(page_results) < QUERY_PAGE_SIZE:
if len(page_results) < page_size:
break


Expand All @@ -148,6 +154,9 @@ def get_unprocessed_gazettes(
"""
logging.info("Listing unprocessed gazettes (paginated)")

# Read page size dynamically to allow test mocking
page_size = int(os.environ.get("GAZETTE_QUERY_PAGE_SIZE", DEFAULT_PAGE_SIZE))

offset = 0
while True:
command = f"""
Expand All @@ -174,7 +183,7 @@ def get_unprocessed_gazettes(
processed is False
AND gazettes.file_path NOT LIKE '%.zip'
ORDER BY gazettes.id
LIMIT {QUERY_PAGE_SIZE} OFFSET {offset}
LIMIT {page_size} OFFSET {offset}
;
"""

Expand All @@ -190,10 +199,10 @@ def get_unprocessed_gazettes(
for gazette in page_results:
yield format_gazette_data(gazette)

offset += QUERY_PAGE_SIZE
offset += page_size

# If we got fewer results than page size, we're done
if len(page_results) < QUERY_PAGE_SIZE:
if len(page_results) < page_size:
break


Expand Down
5 changes: 4 additions & 1 deletion tests/text_extraction_task_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ def test_storage_call_to_get_file(self):
)

self.storage_mock.get_file.assert_called_once()
# Check the file_path directly instead of accessing self.data[0]
# which may have been cleared during processing
self.assertEqual(
self.storage_mock.get_file.call_args.args[0], self.data[0]["file_path"]
self.storage_mock.get_file.call_args.args[0],
"sc_gaspar/2020-10-18/972aca2e-1174-11eb-b2d5-a86daaca905e.pdf",
)
self.assertIsInstance(
self.storage_mock.get_file.call_args.args[1], tempfile._TemporaryFileWrapper
Expand Down
16 changes: 14 additions & 2 deletions tests/text_extraction_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,31 @@ def test_text_extractor_wrapper_creation(self):
def test_request_is_sent_to_apache_tika_server(
self, magic_mock, open_mock, request_get_mock
):
# Configure mock to return status_code 200 and close method
mock_response = MagicMock(status_code=200, text="")
mock_response.close = MagicMock()
request_get_mock.return_value = mock_response
filepath = "tests/data/fake_gazette.pdf"
expected_headers = {"Content-Type": "application/pdf", "Accept": "text/plain"}
self.extractor.extract_text(filepath)
open_mock.assert_called_with(filepath, "rb")
magic_mock.assert_called_with(filepath, mime=True)
request_get_mock.assert_called_with(
f"{self.url}/tika", data=open_mock(), headers=expected_headers
f"{self.url}/tika",
data=open_mock(),
headers=expected_headers,
stream=False,
timeout=(30, 300),
)

@patch("requests.put", return_value=MagicMock(text="Fake gazette content"))
@patch("requests.put")
@patch("builtins.open", new_callable=mock_open, read_data="")
@patch("magic.from_file", return_value="application/pdf")
def test_request_reponse_return(self, magic_mock, open_mock, request_get_mock):
# Configure mock to return status_code 200, text content, and close method
mock_response = MagicMock(status_code=200, text="Fake gazette content")
mock_response.close = MagicMock()
request_get_mock.return_value = mock_response
text = self.extractor.extract_text("tests/data/fake_gazette.pdf")
self.assertEqual("Fake gazette content", text)

Expand Down
Loading