1+ from enum import IntEnum
12import io
23import json
4+ import re
35import sys
46import time
57import numpy
1921# seven days.
2022CACHE_PUT_TTL_SECONDS = 604800
2123
22- HTTP_STATUS_ACCEPTED = 202
23- HTTP_STATUS_OK = 200
24- HTTP_STATUS_INTERNAL_ERROR = 500
25- HTTP_STATUS_INVALID_REQUEST = 400
26- HTTP_STATUS_NOT_FOUND = 404
24+
25+ class HttpStatus (IntEnum ):
26+ ACCEPTED = 202
27+ OK = 200
28+ INTERNAL_ERROR = 500
29+ INVALID_REQUEST = 400
30+ NOT_FOUND = 404
2731
2832
2933class PresignedRangeReader (io .RawIOBase ):
@@ -100,6 +104,14 @@ def seek(self, offset, whence=0):
100104 return self .pos
101105
102106
107+ def create_http_response (status : HttpStatus , message : str ):
108+ presigned_url_regex = r"(http|https)://.*"
109+ token_regex = r"apiv3_.*"
110+ sanitized_message = re .sub (presigned_url_regex , "*****" , message )
111+ sanitized_message = re .sub (token_regex , "*****" , sanitized_message )
112+ return {"status" : status , "message" : sanitized_message }
113+
114+
103115def process_request (
104116 influxdb3_local , query_parameters , request_headers , request_body , args = None
105117):
@@ -113,10 +125,9 @@ def process_request(
113125 db_name = parse_arg (influxdb3_local , "db_name" , args )
114126 migration_id = parse_arg (influxdb3_local , "migration_id" , args )
115127 except RuntimeError as e :
116- return {
117- "status" : HTTP_STATUS_INVALID_REQUEST ,
118- "message" : f"Invalid argument: { str (e )} " ,
119- }
128+ return create_http_response (
129+ HttpStatus .INVALID_REQUEST , f"Invalid argument: { str (e )} "
130+ )
120131
121132 if "verify" in query_parameters :
122133 response = verify_previous_migrations (influxdb3_local , migration_id )
@@ -148,16 +159,15 @@ def process_request(
148159 return migrate_parquet_file (
149160 influxdb3_local , migration_id , db_name , data .get ("parquet_path" , "" )
150161 )
151- return { "status" : 403 , "message" : " Invalid request"}
162+ return create_http_response ( HttpStatus . INVALID_REQUEST , "Invalid request" )
152163
153164
154165def migrate_parquet_file (influxdb3_local , migration_id , db_name , current_parquet_path ):
155166 if not current_parquet_path :
156167 influxdb3_local .warn ("Parquet path is empty, aborting" )
157- return {
158- "status" : HTTP_STATUS_INVALID_REQUEST ,
159- "message" : "Invalid body, Parquet path missing" ,
160- }
168+ return create_http_response (
169+ HttpStatus .INVALID_REQUEST , "Invalid body, Parquet path is missing"
170+ )
161171
162172 migration_records = influxdb3_local .cache .get (f"{ migration_id } -records" , default = {})
163173
@@ -167,7 +177,7 @@ def migrate_parquet_file(influxdb3_local, migration_id, db_name, current_parquet
167177 try :
168178 migration_records = get_migration_metadata (influxdb3_local , migration_id )
169179 except Exception as e :
170- return { "status" : HTTP_STATUS_INTERNAL_ERROR , "message" : str (e )}
180+ return create_http_response ( HttpStatus . INTERNAL_ERROR , str (e ))
171181 influxdb3_local .cache .put (
172182 key = f"{ migration_id } -records" ,
173183 value = migration_records ,
@@ -177,21 +187,21 @@ def migrate_parquet_file(influxdb3_local, migration_id, db_name, current_parquet
177187 else :
178188 verification = verify_previous_migrations (influxdb3_local , migration_id )
179189 if (
180- verification ["status" ] != HTTP_STATUS_OK
181- and verification ["status" ] != HTTP_STATUS_ACCEPTED
190+ verification ["status" ] != HttpStatus . OK
191+ and verification ["status" ] != HttpStatus . ACCEPTED
182192 ):
183193 influxdb3_local .error (f"{ migration_id } : Previous migration failed" )
184194 return verification
185195
186196 if not current_parquet_path :
187197 error_message = f"{ migration_id } : Migration failed: Parquet file path missing, aborting migration"
188198 influxdb3_local .error (error_message )
189- return { "status" : HTTP_STATUS_INVALID_REQUEST , "message" : error_message }
199+ return create_http_response ( HttpStatus . INVALID_REQUEST , error_message )
190200
191201 if current_parquet_path not in migration_records :
192202 error_message = f"{ migration_id } : Migration failed: Parquet path { current_parquet_path } not found"
193203 influxdb3_local .error (error_message )
194- return { "status" : HTTP_STATUS_NOT_FOUND , "message" : error_message }
204+ return create_http_response ( HttpStatus . NOT_FOUND , error_message )
195205
196206 try :
197207 presigned_get_url = migration_records [current_parquet_path ]["presigned_get_url" ]
@@ -212,11 +222,11 @@ def migrate_parquet_file(influxdb3_local, migration_id, db_name, current_parquet
212222 f"{ migration_id } : Migration complete, pending verification"
213223 )
214224 except Exception as e :
215- error_message = f" { migration_id } : Migration failed: { str ( e ) } "
216- influxdb3_local . error ( error_message )
217- return { "status" : HTTP_STATUS_INTERNAL_ERROR , "message" : error_message }
225+ return create_http_response (
226+ HttpStatus . INTERNAL_ERROR , f" { migration_id } : Migration failed: { str ( e ) } "
227+ )
218228
219- return { "status" : HTTP_STATUS_ACCEPTED , "message" : " Request processed"}
229+ return create_http_response ( HttpStatus . ACCEPTED , "Request processed" )
220230
221231
222232def get_migration_metadata (influxdb3_local , migration_id ):
@@ -237,10 +247,10 @@ def get_migration_metadata(influxdb3_local, migration_id):
237247 query : str = f'SELECT * FROM "{ METADATA_TABLE_NAME } "'
238248 tracking_data = influxdb3_local .query (query )
239249 if not tracking_data :
240- return {
241- "status" : "error" ,
242- "message" : f"{ migration_id } : Query for migration metadata failed" ,
243- }
250+ return create_http_response (
251+ HttpStatus . INTERNAL_ERROR ,
252+ f"{ migration_id } : Query for migration metadata failed" ,
253+ )
244254
245255 for record in tracking_data :
246256 s3_key = record ["s3_key" ]
@@ -269,7 +279,7 @@ def verify_previous_migrations(influxdb3_local, migration_id):
269279 if migration_record ["status" ] == MIGRATION_FAILED :
270280 error_message = f"{ migration_id } : Migration failed: Previous migration of { parquet_path } failed"
271281 influxdb3_local .error (error_message )
272- return { "status" : HTTP_STATUS_INTERNAL_ERROR , "message" : error_message }
282+ return create_http_response ( HttpStatus . INTERNAL_ERROR , error_message )
273283
274284 # Since each invocation writes to a buffer, we can only determine whether a previous migration
275285 # has succeeded or failed. Migrations can fail during the writing stage, after an invocation has
@@ -279,16 +289,15 @@ def verify_previous_migrations(influxdb3_local, migration_id):
279289 f"{ migration_id } : Verifying migration for Parquet file { parquet_path } "
280290 )
281291 if not parquet_path :
282- return {
283- "status" : HTTP_STATUS_INVALID_REQUEST ,
284- "message" : "Parquet file path was empty" ,
285- }
292+ return create_http_response (
293+ HttpStatus .INVALID_REQUEST , "Parquet file path was empty"
294+ )
286295 table_name_parts = parquet_path .split ("/" )
287296 if len (table_name_parts ) < 2 :
288- return {
289- "status" : HTTP_STATUS_INVALID_REQUEST ,
290- "message" : " Parquet file path was incorrectly formatted. Path should start wtih database-name/table-name" ,
291- }
297+ return create_http_response (
298+ HttpStatus . INVALID_REQUEST ,
299+ "Parquet file path was incorrectly formatted. Path should start wtih database-name/table-name" ,
300+ )
292301 table_name = table_name_parts [1 ]
293302 reader = PresignedRangeReader (migration_record ["presigned_get_url" ])
294303 parquet_file = pq .ParquetFile (reader )
@@ -304,7 +313,7 @@ def verify_previous_migrations(influxdb3_local, migration_id):
304313 if not query_response :
305314 error_message = f"{ migration_id } : Unable to verify record count for table { table_name } from Parquet file { parquet_path } "
306315 influxdb3_local .error (error_message )
307- return { "status" : HTTP_STATUS_INTERNAL_ERROR , "message" : error_message }
316+ return create_http_response ( HttpStatus . INTERNAL_ERROR , error_message )
308317
309318 actual_row_count = query_response [0 ]["row_count" ]
310319 if expected_row_count != actual_row_count :
@@ -317,7 +326,7 @@ def verify_previous_migrations(influxdb3_local, migration_id):
317326 value = migration_records ,
318327 ttl = CACHE_PUT_TTL_SECONDS ,
319328 )
320- return { "status" : HTTP_STATUS_INTERNAL_ERROR , "message" : error_message }
329+ return create_http_response ( HttpStatus . INTERNAL_ERROR , error_message )
321330 else :
322331 migration_record ["status" ] = MIGRATION_COMPLETED
323332 migration_records [parquet_path ] = migration_record
@@ -346,11 +355,11 @@ def verify_previous_migrations(influxdb3_local, migration_id):
346355 if all_parquet_files_migrated :
347356 success_message = f"{ migration_id } : All Parquet files migrated"
348357 influxdb3_local .info (success_message )
349- return { "status" : HTTP_STATUS_OK , "message" : success_message }
350- return {
351- "status" : HTTP_STATUS_ACCEPTED ,
352- "message" : " Verified outstanding migrations. Migrations are still in progress or pending verification. " ,
353- }
358+ return create_http_response ( HttpStatus . OK , success_message )
359+ return create_http_response (
360+ HttpStatus . ACCEPTED ,
361+ "Verified outstanding migrations. Migrations are still in progress or pending verification" ,
362+ )
354363
355364
356365def write_to_ingestion_buffer (
@@ -392,8 +401,8 @@ def put_done_file(influxdb3_local, s3_key: str, presigned_done_url: str):
392401 response : requests .Response = requests .put (presigned_done_url , data = b"" )
393402 response .raise_for_status ()
394403 influxdb3_local .info (f"Put done file for { s3_key } " )
395- except Exception :
396- influxdb3_local .error (f"Error putting done file for { s3_key } " )
404+ except Exception as e :
405+ influxdb3_local .error (f"Error putting done file for { s3_key } : { str ( e ) } " )
397406 return
398407
399408
0 commit comments