Skip to content

Commit 3fd779c

Browse files
author
Chava Goldshtein
committed
fix: change chunked processing logs to DEBUG level and add tests
- Change chunk processing logs from INFO to DEBUG level per PR feedback - Add test_chunks.py with comprehensive chunked processing tests - Tests verify chunking behavior and data integrity for large files - Ensures small files are not unnecessarily chunked Addresses PR review feedback on logging levels.
1 parent 47bf883 commit 3fd779c

File tree

2 files changed

+180
-2
lines changed

2 files changed

+180
-2
lines changed

ckanext/xloader/loader.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimi
205205
connection (str, optional): Connection string for the PostgreSQL database. Defaults to an empty string.
206206
delimiter (str, optional): Delimiter character used in the CSV file. Defaults to ','.
207207
"""
208+
209+
chunk_count = 0
210+
file_size = os.path.getsize(input_file)
211+
logger.info('Starting chunked processing for file size: {} bytes with chunk size: {} bytes'.format(file_size, max_size))
208212

209213
with open(input_file, 'r', encoding='utf-8') as infile:
210214
current_file = None
@@ -214,9 +218,10 @@ def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimi
214218
if current_file is None or current_file.tell() >= max_size:
215219
# Close previous file if necessary
216220
if current_file:
217-
logger.info('Before copying file: {}'.format(output_filename))
221+
chunk_count += 1
222+
logger.debug('Before copying chunk {}: {}'.format(chunk_count, output_filename))
218223
copy_file(output_filename, engine, logger, resource_id, headers, delimiter)
219-
logger.info('Copied file: {}'.format(output_filename))
224+
logger.debug('Copied chunk {}: {}'.format(chunk_count, output_filename))
220225
current_file.close()
221226
header = True
222227

@@ -231,8 +236,13 @@ def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimi
231236
current_file.close()
232237

233238
# Copy the last file
239+
chunk_count += 1
240+
logger.debug('Before copying final chunk {}: {}'.format(chunk_count, output_filename))
234241
copy_file(output_filename, engine, logger, resource_id, headers, delimiter)
242+
logger.debug('Copied final chunk {}: {}'.format(chunk_count, output_filename))
235243
os.remove(output_filename)
244+
245+
logger.info('Completed chunked processing: {} chunks processed for file size {} bytes'.format(chunk_count, file_size))
236246
if infile:
237247
infile.close()
238248

@@ -438,6 +448,7 @@ def strip_white_space_iter():
438448

439449
# Copy file to datastore db, split to chunks.
440450
max_size = config.get('ckanext.xloader.copy_chunk_size', 1024**3)
451+
logger.info('Using chunk size: {} bytes for resource {}'.format(int(max_size), resource_id))
441452
split_copy_by_size(csv_filepath, engine, logger, resource_id, headers, delimiter, int(max_size))
442453

443454
logger.info('...copying done')
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# -*- coding: utf-8 -*-
2+
import os
3+
import pytest
4+
import tempfile
5+
import logging
6+
from typing import Callable, List, Tuple, Any
7+
from unittest.mock import patch, MagicMock
8+
import csv
9+
import sqlalchemy.orm as orm
10+
11+
from ckan.tests import factories
12+
from ckanext.xloader import loader
13+
from ckanext.xloader.loader import get_write_engine
14+
from ckanext.xloader.tests.test_loader import TestLoadBase, get_sample_filepath
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
@pytest.fixture()
20+
def Session():
21+
engine = get_write_engine()
22+
Session = orm.scoped_session(orm.sessionmaker(bind=engine))
23+
yield Session
24+
Session.close()
25+
26+
27+
@pytest.mark.usefixtures("full_reset", "with_plugins")
28+
@pytest.mark.ckan_config("ckan.plugins", "datastore xloader")
29+
class TestChunkedLoading(TestLoadBase):
30+
31+
def _create_mock_split_copy(self, chunk_size: int) -> Callable:
32+
"""Create a mock function for split_copy_by_size with specified chunk size"""
33+
original_split_copy = loader.split_copy_by_size
34+
35+
def mock_split_copy(input_file: Any, engine: Any, logger: Any, resource_id: str, headers: List[str], delimiter: str = ',', max_size: int = 1024**3) -> Any:
36+
return original_split_copy(input_file, engine, logger, resource_id, headers, delimiter, chunk_size)
37+
38+
return mock_split_copy
39+
40+
def _create_mock_copy_file(self, copy_calls_list: List[Tuple]) -> Callable:
41+
"""Create a mock function for copy_file that tracks calls"""
42+
original_copy_file = loader.copy_file
43+
44+
def mock_copy_file(*args: Any, **kwargs: Any) -> Any:
45+
copy_calls_list.append(args)
46+
return original_copy_file(*args, **kwargs)
47+
48+
return mock_copy_file
49+
50+
def _generate_large_csv(self, filepath: str, num_rows: int = 100000, row_size_kb: int = 1) -> Tuple[str, List[str], int]:
51+
"""Generate a large CSV file for testing chunked processing"""
52+
headers = ['id', 'name', 'description', 'data']
53+
54+
# Create data that will make each row approximately row_size_kb KB
55+
padding_size = (row_size_kb * 1024) - 50 # Account for other columns
56+
padding_data = 'x' * max(1, padding_size)
57+
58+
with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
59+
writer = csv.writer(csvfile)
60+
writer.writerow(headers)
61+
62+
for i in range(num_rows):
63+
writer.writerow([
64+
i + 1,
65+
f'Name_{i + 1}',
66+
f'Description for row {i + 1}',
67+
padding_data
68+
])
69+
70+
return filepath, headers, num_rows
71+
72+
def test_chunked_processing_large_file(self, Session: Any) -> None:
73+
"""Test that large files are processed in chunks and data integrity is maintained"""
74+
75+
# Create a temporary large CSV file (~15MB to trigger chunking)
76+
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as temp_file:
77+
temp_filepath = temp_file.name
78+
79+
try:
80+
# Generate file with ~15MB (15000 rows * ~1KB each)
81+
csv_filepath, expected_headers, expected_rows = self._generate_large_csv(
82+
temp_filepath, num_rows=15000, row_size_kb=1
83+
)
84+
85+
# Verify file size is large enough to trigger chunking
86+
file_size = os.path.getsize(csv_filepath)
87+
assert file_size > 10 * 1024 * 1024, f"File size {file_size} should be > 10MB"
88+
89+
resource = factories.Resource()
90+
resource_id = resource['id']
91+
92+
# Set up mocks with 10MB chunk size
93+
copy_calls = []
94+
mock_split_copy = self._create_mock_split_copy(10 * 1024 * 1024)
95+
mock_copy_file = self._create_mock_copy_file(copy_calls)
96+
97+
with patch('ckanext.xloader.loader.split_copy_by_size', side_effect=mock_split_copy):
98+
with patch('ckanext.xloader.loader.copy_file', side_effect=mock_copy_file):
99+
# Load the CSV with chunked processing
100+
fields = loader.load_csv(
101+
csv_filepath,
102+
resource_id=resource_id,
103+
mimetype="text/csv",
104+
logger=logger,
105+
)
106+
107+
# Verify chunking occurred (should have multiple copy calls)
108+
assert len(copy_calls) > 1, f"Expected multiple chunks, got {len(copy_calls)} copy calls"
109+
110+
# Verify data integrity - check that all rows were loaded
111+
records = self._get_records(Session, resource_id)
112+
assert len(records) == expected_rows, f"Expected {expected_rows} records, got {len(records)}"
113+
114+
# Verify column structure
115+
column_names = self._get_column_names(Session, resource_id)
116+
expected_columns = ['_id', '_full_text'] + expected_headers
117+
assert column_names == expected_columns
118+
119+
# Verify first and last records to ensure data integrity
120+
first_record = records[0]
121+
last_record = records[-1]
122+
123+
# Check first record (excluding _id and _full_text columns)
124+
# The _get_records method excludes _full_text by default, so indices are:
125+
# 0: _id, 1: id, 2: name, 3: description, 4: data
126+
127+
assert first_record[1] == '1' # id column (index 1 after _id)
128+
assert first_record[2] == 'Name_1' # name column (index 2)
129+
130+
# Check last record
131+
assert last_record[1] == str(expected_rows) # id column
132+
assert last_record[2] == f'Name_{expected_rows}' # name column
133+
134+
finally:
135+
# Clean up temporary file
136+
if os.path.exists(temp_filepath):
137+
os.unlink(temp_filepath)
138+
139+
def test_small_file_no_chunking(self, Session: Any) -> None:
140+
"""Test that small files are not chunked when chunk size is larger than file"""
141+
142+
# Use existing small sample file
143+
csv_filepath = get_sample_filepath("simple.csv")
144+
resource = factories.Resource()
145+
resource_id = resource['id']
146+
147+
# Set up mocks with large chunk size to prevent chunking
148+
copy_calls = []
149+
mock_split_copy = self._create_mock_split_copy(10 * 1024 * 1024) # 10MB
150+
mock_copy_file = self._create_mock_copy_file(copy_calls)
151+
152+
with patch('ckanext.xloader.loader.split_copy_by_size', side_effect=mock_split_copy):
153+
with patch('ckanext.xloader.loader.copy_file', side_effect=mock_copy_file):
154+
fields = loader.load_csv(
155+
csv_filepath,
156+
resource_id=resource_id,
157+
mimetype="text/csv",
158+
logger=logger,
159+
)
160+
161+
# Small file should only have one copy call (no chunking)
162+
assert len(copy_calls) == 1, f"Small file should not be chunked, got {len(copy_calls)} copy calls"
163+
164+
# Verify data loaded correctly
165+
records = self._get_records(Session, resource_id)
166+
assert len(records) == 6 # Known number of records in simple.csv
167+

0 commit comments

Comments
 (0)