Skip to content

Commit e7ae0b1

Browse files
author
Chava Goldshtein
committed
feat: implement chunked file processing to prevent freezing on large files
- Add split_copy_by_size() function to process files in configurable chunks - Extract copy_file() helper for PostgreSQL COPY operations - Add ckanext.xloader.copy_chunk_size config (default: 1GB) - Process large files (>2GB) without memory exhaustion or system freezing - Maintain progress logging for each chunk processed - Prevent system unresponsiveness during large file uploads Fixes ckan#259
1 parent 62f4aca commit e7ae0b1

File tree

2 files changed

+99
-45
lines changed

2 files changed

+99
-45
lines changed

ckanext/xloader/loader.py

Lines changed: 98 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,101 @@ def _clear_datastore_resource(resource_id):
123123
conn.execute(sa.text('TRUNCATE TABLE "{}" RESTART IDENTITY'.format(resource_id)))
124124

125125

126+
def copy_file(csv_filepath, engine, logger, resource_id, headers, delimiter):
127+
# Options for loading into postgres:
128+
# 1. \copy - can't use as that is a psql meta-command and not accessible
129+
# via psycopg2
130+
# 2. COPY - requires the db user to have superuser privileges. This is
131+
# dangerous. It is also not available on AWS, for example.
132+
# 3. pgloader method? - as described in its docs:
133+
#
134+
# Note that while the COPY command is restricted to read either from
135+
# its standard input or from a local file on the server's file system,
136+
# the command line tool psql implements a \copy command that knows
137+
# how to stream a file local to the client over the network and into
138+
# the PostgreSQL server, using the same protocol as pgloader uses.
139+
# 4. COPY FROM STDIN - not quite as fast as COPY from a file, but avoids
140+
# the superuser issue. <-- picked
141+
142+
with engine.begin() as conn:
143+
cur = conn.connection.cursor()
144+
#cur.execute('SET DATESTYLE TO "SQL , MDY"')
145+
try:
146+
with open(csv_filepath, 'rb') as f:
147+
# can't use :param for table name because params are only
148+
# for filter values that are single quoted.
149+
try:
150+
cur.copy_expert(
151+
"COPY \"{resource_id}\" ({column_names}) "
152+
"FROM STDIN "
153+
"WITH (DELIMITER '{delimiter}', FORMAT csv, HEADER 1, "
154+
" ENCODING '{encoding}');"
155+
.format(
156+
resource_id=resource_id,
157+
column_names=', '.join(['"{}"'.format(h)
158+
for h in headers]),
159+
delimiter=delimiter,
160+
encoding='UTF8',
161+
),
162+
f)
163+
except psycopg2.DataError as e:
164+
# e is a str but with foreign chars e.g.
165+
# 'extra data: "paul,pa\xc3\xbcl"\n'
166+
# but logging and exceptions need a normal (7 bit) str
167+
error_str = str(e)
168+
logger.warning('{id}: {error_str}'.format(id=resource_id, error_str=error_str))
169+
jobs.write_log('{id}: {error_str}'.format(id=resource_id, error_str=error_str))
170+
raise LoaderError('Error during the load into PostgreSQL:'
171+
' {}'.format(error_str))
172+
finally:
173+
cur.close()
174+
175+
176+
def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimiter = ',', max_size=1024**3): # 1 Gigabyte
177+
"""
178+
Reads a CSV file, splits it into chunks of maximum size, and writes each chunk
179+
to PostgreSQL COPY command to load the data into a table.
180+
181+
Args:
182+
input_file (str): Path to the input CSV file.
183+
max_size (int, optional): Maximum size (in bytes) of each output file. Defaults to 1 Gigabyte.
184+
tablename (str, optional): Name of the target table in PostgreSQL for the COPY command. Defaults to None.
185+
columns (list, optional): List of column names for the COPY command, matching the CSV header. Defaults to None.
186+
connection (str, optional): Connection string for the PostgreSQL database. Defaults to an empty string.
187+
delimiter (str, optional): Delimiter character used in the CSV file. Defaults to ','.
188+
"""
189+
190+
with open(input_file, 'r', encoding='utf-8') as infile:
191+
current_file = None
192+
output_filename = f'/tmp/output_{resource_id}.csv'
193+
header = False
194+
for row in infile:
195+
if current_file is None or current_file.tell() >= max_size:
196+
# Close previous file if necessary
197+
if current_file:
198+
logger.info('Before copying file: {}'.format(output_filename))
199+
copy_file(output_filename, engine, logger, resource_id, headers, delimiter)
200+
logger.info('Copied file: {}'.format(output_filename))
201+
current_file.close()
202+
header = True
203+
204+
current_file = open(output_filename, 'w')
205+
if header:
206+
current_file.write(delimiter.join(headers) + '\n')
207+
current_file.write(row)
208+
209+
210+
# Close the last file if open
211+
if current_file:
212+
current_file.close()
213+
214+
# Copy the last file
215+
copy_file(output_filename, engine, logger, resource_id, headers, delimiter)
216+
os.remove(output_filename)
217+
if infile:
218+
infile.close()
219+
220+
126221
def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
127222
'''Loads a CSV into DataStore. Does not create the indexes.'''
128223

@@ -289,51 +384,9 @@ def strip_white_space_iter():
289384

290385
logger.info('Copying to database...')
291386

292-
# Options for loading into postgres:
293-
# 1. \copy - can't use as that is a psql meta-command and not accessible
294-
# via psycopg2
295-
# 2. COPY - requires the db user to have superuser privileges. This is
296-
# dangerous. It is also not available on AWS, for example.
297-
# 3. pgloader method? - as described in its docs:
298-
# Note that while the COPY command is restricted to read either from
299-
# its standard input or from a local file on the server's file system,
300-
# the command line tool psql implements a \copy command that knows
301-
# how to stream a file local to the client over the network and into
302-
# the PostgreSQL server, using the same protocol as pgloader uses.
303-
# 4. COPY FROM STDIN - not quite as fast as COPY from a file, but avoids
304-
# the superuser issue. <-- picked
305-
306-
with engine.begin() as conn:
307-
cur = conn.connection.cursor()
308-
try:
309-
with open(csv_filepath, 'rb') as f:
310-
# can't use :param for table name because params are only
311-
# for filter values that are single quoted.
312-
try:
313-
cur.copy_expert(
314-
"COPY \"{resource_id}\" ({column_names}) "
315-
"FROM STDIN "
316-
"WITH (DELIMITER '{delimiter}', FORMAT csv, HEADER 1, "
317-
" ENCODING '{encoding}');"
318-
.format(
319-
resource_id=resource_id,
320-
column_names=', '.join(['"{}"'.format(h)
321-
for h in headers]),
322-
delimiter=delimiter,
323-
encoding='UTF8',
324-
),
325-
f)
326-
except psycopg2.DataError as e:
327-
# e is a str but with foreign chars e.g.
328-
# 'extra data: "paul,pa\xc3\xbcl"\n'
329-
# but logging and exceptions need a normal (7 bit) str
330-
error_str = str(e)
331-
logger.warning(error_str)
332-
raise LoaderError('Error during the load into PostgreSQL:'
333-
' {}'.format(error_str))
334-
335-
finally:
336-
cur.close()
387+
# Copy file to datastore db, split to chunks.
388+
max_size = config.get('ckanext.xloader.copy_chunk_size', 1024**3)
389+
split_copy_by_size(csv_filepath, engine, logger, resource_id, headers, delimiter, int(max_size))
337390
finally:
338391
os.remove(csv_filepath) # i.e. the tempfile
339392

ckanext/xloader/tests/test_jobs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def data(create_with_upload, apikey):
8484

8585
@pytest.mark.usefixtures("clean_db", "with_plugins")
8686
@pytest.mark.ckan_config("ckanext.xloader.job_timeout", 2)
87+
@pytest.mark.ckan_config("ckanext.xloader.copy_chunk_size", 5120)
8788
@pytest.mark.ckan_config("ckan.jobs.timeout", 2)
8889
class TestXLoaderJobs(helpers.FunctionalRQTestBase):
8990

0 commit comments

Comments
 (0)