Skip to content

Commit 7be997d

Browse files
authored
Merge pull request #263 from cgoldshtein/master - Implements chunked file processing
## Description Implements chunked file processing to resolve system freezing when loading very large files (>2GB) into DataStore. Fixes #259 ## Problem - XLoader would freeze/hang when processing files >2GB - Entire file loaded into memory causing system unresponsiveness - No progress feedback during large file processing - Memory exhaustion on very large datasets ## Solution - **Chunked Processing**: Split large files into configurable chunks (default: 1GB) - **Progress Logging**: Log each chunk as it's processed - **Memory Efficiency**: Consistent memory usage regardless of file size - **Configurable**: New `ckanext.xloader.copy_chunk_size` setting ## Changes - Add `split_copy_by_size()` function for chunked file processing - Extract `copy_file()` helper for PostgreSQL COPY operations - Add configuration option `ckanext.xloader.copy_chunk_size` (default: 1GB) - Update tests to use smaller chunk size for testing - Maintain existing functionality for smaller files ## Configuration ```ini # Optional: Set chunk size (default: 1073741824 = 1GB) ckanext.xloader.copy_chunk_size = 104857600 # 100MB chunks Fixes #259
2 parents 2582464 + 29a9980 commit 7be997d

File tree

4 files changed

+287
-45
lines changed

4 files changed

+287
-45
lines changed

ckanext/xloader/config_declaration.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,13 @@ groups:
206206
like database deadlocks or network timeouts. Set to 0 to disable retries.
207207
type: int
208208
required: false
209+
- key: ckanext.xloader.copy_chunk_size
210+
default: 1073741824
211+
example: 536870912
212+
description: |
213+
Maximum size in bytes for each chunk when processing files.
214+
Files are split into chunks to prevent memory exhaustion and
215+
system freezing. Default is 1GB (1073741824 bytes). Smaller values
216+
use less memory but create more chunks.
217+
type: int
218+
required: false

ckanext/xloader/loader.py

Lines changed: 107 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,109 @@ def _clear_datastore_resource(resource_id):
143143
conn.execute(sa.text('TRUNCATE TABLE "{}" RESTART IDENTITY'.format(resource_id)))
144144

145145

146+
def copy_file(csv_filepath, engine, logger, resource_id, headers, delimiter):
147+
# Options for loading into postgres:
148+
# 1. \copy - can't use as that is a psql meta-command and not accessible
149+
# via psycopg2
150+
# 2. COPY - requires the db user to have superuser privileges. This is
151+
# dangerous. It is also not available on AWS, for example.
152+
# 3. pgloader method? - as described in its docs:
153+
#
154+
# Note that while the COPY command is restricted to read either from
155+
# its standard input or from a local file on the server's file system,
156+
# the command line tool psql implements a \copy command that knows
157+
# how to stream a file local to the client over the network and into
158+
# the PostgreSQL server, using the same protocol as pgloader uses.
159+
# 4. COPY FROM STDIN - not quite as fast as COPY from a file, but avoids
160+
# the superuser issue. <-- picked
161+
162+
with engine.begin() as conn:
163+
cur = conn.connection.cursor()
164+
#cur.execute('SET DATESTYLE TO "SQL , MDY"')
165+
try:
166+
with open(csv_filepath, 'rb') as f:
167+
# can't use :param for table name because params are only
168+
# for filter values that are single quoted.
169+
try:
170+
cur.copy_expert(
171+
"COPY \"{resource_id}\" ({column_names}) "
172+
"FROM STDIN "
173+
"WITH (DELIMITER '{delimiter}', FORMAT csv, HEADER 1, "
174+
" ENCODING '{encoding}');"
175+
.format(
176+
resource_id=resource_id,
177+
column_names=', '.join(['"{}"'.format(h)
178+
for h in headers]),
179+
delimiter=delimiter,
180+
encoding='UTF8',
181+
),
182+
f)
183+
except psycopg2.DataError as e:
184+
# e is a str but with foreign chars e.g.
185+
# 'extra data: "paul,pa\xc3\xbcl"\n'
186+
# but logging and exceptions need a normal (7 bit) str
187+
error_str = str(e)
188+
logger.warning('%s: %s', resource_id, error_str)
189+
raise LoaderError('Error during the load into PostgreSQL:'
190+
' {}'.format(error_str))
191+
finally:
192+
cur.close()
193+
194+
def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimiter = ',', max_size=1024**3, encoding='utf-8'): # 1 Gigabyte
195+
"""
196+
Reads a CSV file, splits it into chunks of maximum size, and writes each chunk
197+
to PostgreSQL COPY command to load the data into a table.
198+
199+
Args:
200+
input_file (str): Path to the input CSV file.
201+
max_size (int, optional): Maximum size (in bytes) of each output file. Defaults to 1 Gigabyte.
202+
tablename (str, optional): Name of the target table in PostgreSQL for the COPY command. Defaults to None.
203+
columns (list, optional): List of column names for the COPY command, matching the CSV header. Defaults to None.
204+
connection (str, optional): Connection string for the PostgreSQL database. Defaults to an empty string.
205+
delimiter (str, optional): Delimiter character used in the CSV file. Defaults to ','.
206+
"""
207+
208+
chunk_count = 0
209+
file_size = os.path.getsize(input_file)
210+
logger.info('Starting chunked processing for file size: %s bytes with chunk size: %s bytes', file_size, max_size)
211+
212+
with open(input_file, 'r', encoding = encoding) as infile:
213+
current_file = None
214+
output_filename = f'/tmp/output_{resource_id}.csv'
215+
header = False
216+
for row in infile:
217+
if current_file is None or current_file.tell() >= max_size:
218+
# Close previous file if necessary
219+
if current_file:
220+
chunk_count += 1
221+
logger.debug('Before copying chunk %s: %s', chunk_count, output_filename)
222+
copy_file(output_filename, engine, logger, resource_id, headers, delimiter)
223+
logger.debug('Copied chunk %s: %s', chunk_count, output_filename)
224+
current_file.close()
225+
header = True
226+
227+
current_file = open(output_filename, 'w')
228+
if header:
229+
current_file.write(delimiter.join(headers) + '\n')
230+
current_file.write(row)
231+
232+
233+
# Close the last file if open
234+
if current_file:
235+
current_file.close()
236+
237+
# Copy the last file
238+
chunk_count += 1
239+
logger.debug('Before copying final chunk %s: %s', chunk_count, output_filename)
240+
copy_file(output_filename, engine, logger, resource_id, headers, delimiter)
241+
logger.debug('Copied final chunk %s: %s', chunk_count, output_filename)
242+
os.remove(output_filename)
243+
244+
logger.info('Completed chunked processing: %s chunks processed for file size %s bytes', chunk_count, file_size)
245+
if infile:
246+
infile.close()
247+
248+
146249
def _read_metadata(table_filepath, mimetype, logger):
147250
# Determine the header row
148251
logger.info('Determining column names and types')
@@ -342,51 +445,10 @@ def strip_white_space_iter():
342445

343446
logger.info('Copying to database...')
344447

345-
# Options for loading into postgres:
346-
# 1. \copy - can't use as that is a psql meta-command and not accessible
347-
# via psycopg2
348-
# 2. COPY - requires the db user to have superuser privileges. This is
349-
# dangerous. It is also not available on AWS, for example.
350-
# 3. pgloader method? - as described in its docs:
351-
# Note that while the COPY command is restricted to read either from
352-
# its standard input or from a local file on the server's file system,
353-
# the command line tool psql implements a \copy command that knows
354-
# how to stream a file local to the client over the network and into
355-
# the PostgreSQL server, using the same protocol as pgloader uses.
356-
# 4. COPY FROM STDIN - not quite as fast as COPY from a file, but avoids
357-
# the superuser issue. <-- picked
358-
359-
with engine.begin() as conn:
360-
cur = conn.connection.cursor()
361-
try:
362-
with open(csv_filepath, 'rb') as f:
363-
# can't use :param for table name because params are only
364-
# for filter values that are single quoted.
365-
try:
366-
cur.copy_expert(
367-
"COPY \"{resource_id}\" ({column_names}) "
368-
"FROM STDIN "
369-
"WITH (DELIMITER '{delimiter}', FORMAT csv, HEADER 1, "
370-
" ENCODING '{encoding}');"
371-
.format(
372-
resource_id=resource_id,
373-
column_names=', '.join(['"{}"'.format(h)
374-
for h in headers]),
375-
delimiter=delimiter,
376-
encoding='UTF8',
377-
),
378-
f)
379-
except psycopg2.DataError as e:
380-
# e is a str but with foreign chars e.g.
381-
# 'extra data: "paul,pa\xc3\xbcl"\n'
382-
# but logging and exceptions need a normal (7 bit) str
383-
error_str = str(e)
384-
logger.warning(error_str)
385-
raise LoaderError('Error during the load into PostgreSQL:'
386-
' {}'.format(error_str))
387-
388-
finally:
389-
cur.close()
448+
# Copy file to datastore db, split to chunks.
449+
max_size = config.get('ckanext.xloader.copy_chunk_size', 1024**3)
450+
logger.debug('Using chunk size: %s bytes for resource %s', max_size, resource_id)
451+
split_copy_by_size(csv_filepath, engine, logger, resource_id, headers, delimiter, int(max_size))
390452

391453
logger.info('...copying done')
392454

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# -*- coding: utf-8 -*-
2+
import os
3+
import pytest
4+
import tempfile
5+
import logging
6+
from typing import Callable, List, Tuple, Any
7+
from unittest.mock import patch, MagicMock
8+
import csv
9+
import sqlalchemy.orm as orm
10+
11+
from ckan.tests import factories
12+
from ckanext.xloader import loader
13+
from ckanext.xloader.loader import get_write_engine
14+
from ckanext.xloader.tests.test_loader import TestLoadBase, get_sample_filepath
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
@pytest.fixture()
20+
def Session():
21+
engine = get_write_engine()
22+
Session = orm.scoped_session(orm.sessionmaker(bind=engine))
23+
yield Session
24+
Session.close()
25+
26+
27+
@pytest.mark.usefixtures("full_reset", "with_plugins")
28+
@pytest.mark.ckan_config("ckan.plugins", "datastore xloader")
29+
class TestChunkedLoading(TestLoadBase):
30+
31+
def _create_mock_split_copy(self, chunk_size: int) -> Callable:
32+
"""Create a mock function for split_copy_by_size with specified chunk size"""
33+
original_split_copy = loader.split_copy_by_size
34+
35+
def mock_split_copy(input_file: Any, engine: Any, logger: Any, resource_id: str, headers: List[str], delimiter: str = ',', max_size: int = 1024**3) -> Any:
36+
return original_split_copy(input_file, engine, logger, resource_id, headers, delimiter, chunk_size)
37+
38+
return mock_split_copy
39+
40+
def _create_mock_copy_file(self, copy_calls_list: List[Tuple]) -> Callable:
41+
"""Create a mock function for copy_file that tracks calls"""
42+
original_copy_file = loader.copy_file
43+
44+
def mock_copy_file(*args: Any, **kwargs: Any) -> Any:
45+
copy_calls_list.append(args)
46+
return original_copy_file(*args, **kwargs)
47+
48+
return mock_copy_file
49+
50+
def _generate_large_csv(self, filepath: str, num_rows: int = 100000, row_size_kb: int = 1) -> Tuple[str, List[str], int]:
51+
"""Generate a large CSV file for testing chunked processing"""
52+
headers = ['id', 'name', 'description', 'data']
53+
54+
# Create data that will make each row approximately row_size_kb KB
55+
padding_size = (row_size_kb * 1024) - 50 # Account for other columns
56+
padding_data = 'x' * max(1, padding_size)
57+
58+
with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
59+
writer = csv.writer(csvfile)
60+
writer.writerow(headers)
61+
62+
for i in range(num_rows):
63+
writer.writerow([
64+
i + 1,
65+
f'Name_{i + 1}',
66+
f'Description for row {i + 1}',
67+
padding_data
68+
])
69+
70+
return filepath, headers, num_rows
71+
72+
def test_chunked_processing_large_file(self, Session: Any) -> None:
73+
"""Test that large files are processed in chunks and data integrity is maintained"""
74+
75+
# Create a temporary large CSV file (~15MB to trigger chunking)
76+
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as temp_file:
77+
temp_filepath = temp_file.name
78+
79+
try:
80+
# Generate file with ~15MB (15000 rows * ~1KB each)
81+
csv_filepath, expected_headers, expected_rows = self._generate_large_csv(
82+
temp_filepath, num_rows=15000, row_size_kb=1
83+
)
84+
85+
# Verify file size is large enough to trigger chunking
86+
file_size = os.path.getsize(csv_filepath)
87+
assert file_size > 10 * 1024 * 1024, f"File size {file_size} should be > 10MB"
88+
89+
resource = factories.Resource()
90+
resource_id = resource['id']
91+
92+
# Set up mocks with 10MB chunk size
93+
copy_calls = []
94+
mock_split_copy = self._create_mock_split_copy(10 * 1024 * 1024)
95+
mock_copy_file = self._create_mock_copy_file(copy_calls)
96+
97+
with patch('ckanext.xloader.loader.split_copy_by_size', side_effect=mock_split_copy):
98+
with patch('ckanext.xloader.loader.copy_file', side_effect=mock_copy_file):
99+
# Load the CSV with chunked processing
100+
fields = loader.load_csv(
101+
csv_filepath,
102+
resource_id=resource_id,
103+
mimetype="text/csv",
104+
logger=logger,
105+
)
106+
107+
# Verify chunking occurred (should have multiple copy calls)
108+
assert len(copy_calls) > 1, "Expected multiple chunks but file was not chunked"
109+
110+
# Verify data integrity - check that all rows were loaded
111+
records = self._get_records(Session, resource_id)
112+
assert len(records) == expected_rows, f"Expected {expected_rows} records, got {len(records)}"
113+
114+
# Verify column structure
115+
column_names = self._get_column_names(Session, resource_id)
116+
expected_columns = ['_id', '_full_text'] + expected_headers
117+
assert column_names == expected_columns
118+
119+
# Verify first and last records to ensure data integrity
120+
# Sort records by the 'id' column (index 1) to ensure consistent ordering
121+
sorted_records = sorted(records, key=lambda x: int(x[1]))
122+
first_record = sorted_records[0]
123+
last_record = sorted_records[-1]
124+
125+
# Check first record (excluding _id and _full_text columns)
126+
# The _get_records method excludes _full_text by default, so indices are:
127+
# 0: _id, 1: id, 2: name, 3: description, 4: data
128+
129+
assert first_record[1] == '1' # id column (index 1 after _id)
130+
assert first_record[2] == 'Name_1' # name column (index 2)
131+
132+
# Check last record
133+
assert last_record[1] == str(expected_rows) # id column
134+
assert last_record[2] == f'Name_{expected_rows}' # name column
135+
136+
finally:
137+
# Clean up temporary file
138+
if os.path.exists(temp_filepath):
139+
os.unlink(temp_filepath)
140+
141+
def test_small_file_no_chunking(self, Session: Any) -> None:
142+
"""Test that small files are not chunked when chunk size is larger than file"""
143+
144+
# Use existing small sample file
145+
csv_filepath = get_sample_filepath("simple.csv")
146+
resource = factories.Resource()
147+
resource_id = resource['id']
148+
149+
# Set up mocks with large chunk size to prevent chunking
150+
copy_calls = []
151+
mock_split_copy = self._create_mock_split_copy(10 * 1024 * 1024) # 10MB
152+
mock_copy_file = self._create_mock_copy_file(copy_calls)
153+
154+
with patch('ckanext.xloader.loader.split_copy_by_size', side_effect=mock_split_copy):
155+
with patch('ckanext.xloader.loader.copy_file', side_effect=mock_copy_file):
156+
fields = loader.load_csv(
157+
csv_filepath,
158+
resource_id=resource_id,
159+
mimetype="text/csv",
160+
logger=logger,
161+
)
162+
163+
# Small file should only have one copy call (no chunking)
164+
assert len(copy_calls) == 1, f"Small file should not be chunked, got {len(copy_calls)} copy calls"
165+
166+
# Verify data loaded correctly
167+
records = self._get_records(Session, resource_id)
168+
assert len(records) == 6 # Known number of records in simple.csv
169+

ckanext/xloader/tests/test_jobs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def data(create_with_upload, apikey):
8383

8484
@pytest.mark.usefixtures("clean_db", "with_plugins")
8585
@pytest.mark.ckan_config("ckanext.xloader.job_timeout", 2)
86+
@pytest.mark.ckan_config("ckanext.xloader.copy_chunk_size", 5120)
8687
@pytest.mark.ckan_config("ckan.jobs.timeout", 2)
8788
class TestXLoaderJobs(helpers.FunctionalRQTestBase):
8889

0 commit comments

Comments
 (0)