Skip to content

Commit 976e839

Browse files
committed
DS-2171:
transport.sql: - create new helper methods s3_retry_backoff_ms and is_retryable_s3_sqlstate. - add more SQL state to retry mechanism - add retry logic also for exports. Signed-off-by: mchrza <maximilian.chrzan@here.com>
1 parent ff5d51c commit 976e839

1 file changed

Lines changed: 173 additions & 53 deletions

File tree

xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql

Lines changed: 173 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,62 @@ BEGIN
144144
END;
145145
$BODY$;
146146

147+
-- ####################################################################################################################
148+
-- Retry helpers (shared between execute_import_from_s3 / execute_export_to_s3) ---
149+
150+
/**
151+
* is_retryable_s3_sqlstate(state TEXT)
152+
*
153+
* Returns TRUE if the given SQLSTATE is considered a transient / retryable error
154+
* for aws_s3 import & export operations.
155+
*
156+
* Covered classes:
157+
* - 40001 serialization_failure
158+
* - 40P01 deadlock_detected
159+
* - 55P03 lock_not_available
160+
* - 23505 unique_violation (import only in practice)
161+
* - 23P01 exclusion_violation (import only in practice)
162+
* - 53300 too_many_connections
163+
* - 08xxx connection_exception family
164+
* - 57P01 admin_shutdown
165+
* - 22P02 invalid_text_representation (import only in practice)
166+
* - 22P04 bad_copy_file_format (import only in practice)
167+
*/
168+
CREATE OR REPLACE FUNCTION is_retryable_s3_sqlstate(state TEXT)
169+
RETURNS BOOLEAN
170+
LANGUAGE 'sql'
171+
IMMUTABLE
172+
PARALLEL SAFE
173+
AS $BODY$
174+
SELECT state IN (
175+
'40001', '40P01', '55P03',
176+
'23505', '23P01',
177+
'53300',
178+
'08000', '08001', '08003', '08006', '08004',
179+
-- '57P01', TBD: clarify
180+
'22P02', '22P04'
181+
);
182+
$BODY$;
183+
184+
/**
185+
* s3_retry_backoff_ms(attempts INT, base_ms INT, cap_ms INT)
186+
*
187+
* Computes exponential backoff delay in milliseconds:
188+
* delay = min(base_ms * 2^attempts, cap_ms)
189+
*/
190+
CREATE OR REPLACE FUNCTION s3_retry_backoff_ms(
191+
attempts INT,
192+
base_delay_ms INT DEFAULT 10000,
193+
max_ms INT DEFAULT 60000
194+
)
195+
RETURNS INT
196+
LANGUAGE 'sql'
197+
IMMUTABLE
198+
PARALLEL SAFE
199+
AS $BODY$
200+
SELECT LEAST(base_delay_ms * (2 ^ attempts), max_ms)::INT;
201+
$BODY$;
202+
147203
-- ####################################################################################################################
148204
-- Import related helper functions ---
149205

@@ -410,12 +466,13 @@ $BODY$
410466
LANGUAGE plpgsql VOLATILE;
411467

412468
/**
413-
* Function: perform_import_from_s3
469+
* Function: execute_import_from_s3
414470
* (used for tasked import with retries)
415471
*
416472
* Purpose:
417473
* Imports data from an S3 bucket into a target table using the AWS S3 extension.
418-
* Supports automatic retries with exponential backoff for transient errors.
474+
* Supports automatic retries with exponential backoff for transient errors
475+
* (see is_retryable_s3_sqlstate()).
419476
*
420477
* Arguments:
421478
* - schema (TEXT): The schema name (not directly used in the function).
@@ -424,20 +481,13 @@ LANGUAGE plpgsql VOLATILE;
424481
* - s3_bucket (TEXT): The S3 bucket name.
425482
* - s3_key (TEXT): The S3 object key.
426483
* - s3_region (TEXT): The AWS region of the S3 bucket.
427-
* - max_attempts (INT, default 2): Maximum number of retry attempts.
428-
* - attempts (INT, default 0): Current attempt count.
484+
* - max_attempts (INT, default 3): Maximum number of retry attempts.
485+
* - attempts (INT, default 0): Current attempt count (internal, for recursion).
429486
*
430487
* Returns:
431488
* - TEXT: Import statistics (e.g., '20 rows imported').
432-
*
433-
* Behavior:
434-
* - Fetches plugin configuration for the given format.
435-
* - Executes the import using aws_s3.table_import_from_s3.
436-
* - On retryable errors (lock, duplicate key, invalid input, extra data), waits and retries.
437-
* - Uses exponential backoff for delay between retries, capped at 10 seconds.
438-
* - Raises an exception if max_attempts is exceeded.
439489
*/
440-
CREATE OR REPLACE FUNCTION perform_import_from_s3(
490+
CREATE OR REPLACE FUNCTION execute_import_from_s3(
441491
schema TEXT,
442492
target_tbl REGCLASS,
443493
format TEXT,
@@ -452,12 +502,7 @@ AS $BODY$
452502
DECLARE
453503
config RECORD;
454504
import_statistics TEXT;
455-
base_delay_ms INT := 10000;
456-
delay_ms INT;
457505
BEGIN
458-
-- Calculate exponential backoff delay: base * 2^attempts, capped at 10 seconds
459-
delay_ms := LEAST(base_delay_ms * (2 ^ attempts), 60000);
460-
461506
SELECT * FROM s3_plugin_config(format) into config;
462507

463508
EXECUTE format(
@@ -475,27 +520,110 @@ BEGIN
475520

476521
RETURN import_statistics;
477522

478-
EXCEPTION
479-
WHEN SQLSTATE '55P03' OR SQLSTATE '23505' OR SQLSTATE '22P02' OR SQLSTATE '22P04' THEN
480-
IF attempts < max_attempts THEN
481-
--1 s, 2 s, 4 s, 8 s, 10 s .. max
482-
PERFORM pg_sleep(delay_ms / 1000.0);
523+
EXCEPTION WHEN OTHERS THEN
524+
IF NOT is_retryable_s3_sqlstate(SQLSTATE) THEN
525+
RAISE;
526+
END IF;
483527

484-
RETURN perform_import_from_s3(
485-
schema,
486-
target_tbl,
487-
format,
488-
s3_bucket,
489-
s3_key,
490-
s3_region,
491-
max_attempts,
492-
attempts + 1
493-
);
494-
ELSE
495-
RAISE EXCEPTION 'Import of ''%'' failed after ''%'' attempts. SQLSTATE: %, Message: %',
496-
s3_key, max_attempts, SQLSTATE, SQLERRM
497-
USING ERRCODE = SQLSTATE;
498-
END IF;
528+
IF attempts >= max_attempts THEN
529+
RAISE EXCEPTION 'Import of ''%'' failed after ''%'' attempts. SQLSTATE: %, Message: %',
530+
s3_key, max_attempts, SQLSTATE, SQLERRM
531+
USING ERRCODE = SQLSTATE;
532+
END IF;
533+
534+
-- Exponential backoff: 10 s, 20 s, 40 s .. capped at 60 s
535+
PERFORM pg_sleep(s3_retry_backoff_ms(attempts) / 1000.0);
536+
537+
RETURN execute_import_from_s3(
538+
schema,
539+
target_tbl,
540+
format,
541+
s3_bucket,
542+
s3_key,
543+
s3_region,
544+
max_attempts,
545+
attempts + 1
546+
);
547+
END;
548+
$BODY$;
549+
550+
/**
551+
* Function: execute_export_to_s3
552+
* (used for tasked export with retries)
553+
*
554+
* Purpose:
555+
* Exports data from RDS to an S3 bucket using the AWS S3 extension.
556+
* Supports automatic retries with exponential backoff for transient errors
557+
* (see is_retryable_s3_sqlstate()).
558+
*
559+
* Arguments:
560+
* - s3_bucket (TEXT): The target S3 bucket.
561+
* - s3_path (TEXT): The target S3 object key/path.
562+
* - s3_region (TEXT): The AWS region of the S3 bucket.
563+
* - content_query (TEXT): SQL query producing the content to export.
564+
* - max_attempts (INT, default 3): Maximum number of retry attempts.
565+
* - attempts (INT, default 0): Current attempt count (internal, for recursion).
566+
*
567+
* Returns:
568+
* - TABLE(rows_uploaded BIGINT, files_uploaded BIGINT, bytes_uploaded BIGINT):
569+
* Export statistics returned by aws_s3.query_export_to_s3.
570+
*/
571+
CREATE OR REPLACE FUNCTION execute_export_to_s3(
572+
s3_bucket TEXT, s3_path TEXT, s3_region TEXT,
573+
content_query TEXT,
574+
max_attempts INT DEFAULT 3,
575+
attempts INT DEFAULT 0
576+
)
577+
RETURNS TABLE(rows_uploaded BIGINT, files_uploaded BIGINT, bytes_uploaded BIGINT)
578+
LANGUAGE 'plpgsql'
579+
VOLATILE
580+
AS $BODY$
581+
DECLARE
582+
config RECORD;
583+
export_statistics RECORD;
584+
BEGIN
585+
SELECT * FROM s3_plugin_config('GEOJSON') INTO config;
586+
587+
EXECUTE format(
588+
'SELECT * from aws_s3.query_export_to_s3( '
589+
||' ''%1$s'', '
590+
||' aws_commons.create_s3_uri(%2$L,%3$L,%4$L),'
591+
||' %5$L )',
592+
format('select jsondata || jsonb_build_object(''''geometry'''', ST_AsGeoJSON(geo, 8)::jsonb) from (%1$s) X',
593+
REPLACE(content_query, '''', '''''')),
594+
s3_bucket,
595+
s3_path,
596+
s3_region,
597+
REGEXP_REPLACE(config.plugin_options, '[\(\)]', '', 'g')
598+
) INTO export_statistics;
599+
600+
rows_uploaded := export_statistics.rows_uploaded;
601+
files_uploaded := export_statistics.files_uploaded;
602+
bytes_uploaded := export_statistics.bytes_uploaded;
603+
RETURN NEXT;
604+
605+
EXCEPTION WHEN OTHERS THEN
606+
IF NOT is_retryable_s3_sqlstate(SQLSTATE) THEN
607+
RAISE;
608+
END IF;
609+
610+
IF attempts >= max_attempts THEN
611+
RAISE EXCEPTION 'Export to ''%'' failed after ''%'' attempts. SQLSTATE: %, Message: %',
612+
s3_path, max_attempts, SQLSTATE, SQLERRM
613+
USING ERRCODE = SQLSTATE;
614+
END IF;
615+
616+
-- Exponential backoff: 10 s, 20 s, 40 s .. capped at 60 s
617+
PERFORM pg_sleep(s3_retry_backoff_ms(attempts) / 1000.0);
618+
619+
RETURN QUERY SELECT * FROM execute_export_to_s3(
620+
s3_bucket,
621+
s3_path,
622+
s3_region,
623+
content_query,
624+
max_attempts,
625+
attempts + 1
626+
);
499627
END;
500628
$BODY$;
501629

@@ -736,7 +864,6 @@ BEGIN
736864
$wrappedinner$
737865
DECLARE
738866
export_statistics RECORD;
739-
config RECORD;
740867
task_id INT := $wrappedouter$||task_id||$wrappedouter$::INT;
741868
content_query TEXT := $x$$wrappedouter$||coalesce(content_query,'')||$wrappedouter$$x$::TEXT;
742869
s3_bucket TEXT := '$wrappedouter$||s3_bucket||$wrappedouter$'::TEXT;
@@ -746,19 +873,12 @@ BEGIN
746873
lambda_function_arn TEXT := '$wrappedouter$||lambda_function_arn||$wrappedouter$'::TEXT;
747874
lambda_region TEXT := '$wrappedouter$||lambda_region||$wrappedouter$'::TEXT;
748875
BEGIN
749-
SELECT * FROM s3_plugin_config('GEOJSON') INTO config;
750-
EXECUTE format(
751-
'SELECT * from aws_s3.query_export_to_s3( '
752-
||' ''%1$s'', '
753-
||' aws_commons.create_s3_uri(%2$L,%3$L,%4$L),'
754-
||' %5$L )',
755-
format('select jsondata || jsonb_build_object(''''geometry'''', ST_AsGeoJSON(geo, 8)::jsonb) from (%1$s) X',
756-
REPLACE(content_query, '''', '''''')),
757-
s3_bucket,
758-
s3_path,
759-
s3_region,
760-
REGEXP_REPLACE(config.plugin_options, '[\(\)]', '', 'g')
761-
)INTO export_statistics;
876+
SELECT * FROM execute_export_to_s3(
877+
s3_bucket,
878+
s3_path,
879+
s3_region,
880+
content_query
881+
) INTO export_statistics;
762882

763883
PERFORM report_task_progress(
764884
lambda_function_arn,
@@ -808,7 +928,7 @@ $BODY$;
808928
* - failure_callback (TEXT): Callback to execute on failure.
809929
*
810930
* Behavior:
811-
* - Executes the import using perform_import_from_s3.
931+
* - Executes the import using execute_import_from_s3.
812932
* - Reports import statistics and file size to Lambda via report_task_progress.
813933
* - On error, executes the failure callback.
814934
*
@@ -851,7 +971,7 @@ BEGIN
851971
lambda_function_arn TEXT := '$wrappedouter$||lambda_function_arn||$wrappedouter$'::TEXT;
852972
lambda_region TEXT := '$wrappedouter$||lambda_region||$wrappedouter$'::TEXT;
853973
BEGIN
854-
SELECT perform_import_from_s3(
974+
SELECT execute_import_from_s3(
855975
schema,
856976
target_tbl,
857977
format,

0 commit comments

Comments
 (0)