From 359db5c02aaa8f4211b1386d809aba0463eaff85 Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Wed, 11 Sep 2024 09:37:34 +0100 Subject: [PATCH 1/2] Output record count metric from batch files insert --- target_snowflake/connector.py | 6 ++++-- target_snowflake/sinks.py | 7 +++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index 01910c4..74f3e73 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -553,7 +553,8 @@ def merge_from_stage( key_properties=key_properties, ) self.logger.debug("Merging with SQL: %s", merge_statement) - conn.execute(merge_statement, **kwargs) + result = conn.execute(merge_statement, **kwargs) + return result.rowcount def copy_from_stage( self, @@ -578,7 +579,8 @@ def copy_from_stage( file_format=file_format, ) self.logger.debug("Copying with SQL: %s", copy_statement) - conn.execute(copy_statement, **kwargs) + result = conn.execute(copy_statement, **kwargs) + return result.rowcount def drop_file_format(self, file_format: str) -> None: """Drop a file format in the schema. diff --git a/target_snowflake/sinks.py b/target_snowflake/sinks.py index 4c2313b..5fc1c5d 100644 --- a/target_snowflake/sinks.py +++ b/target_snowflake/sinks.py @@ -190,7 +190,7 @@ def insert_batch_files_via_internal_stage( if self.key_properties: # merge into destination table - self.connector.merge_from_stage( + record_count = self.connector.merge_from_stage( full_table_name=full_table_name, schema=self.schema, sync_id=sync_id, @@ -199,13 +199,16 @@ def insert_batch_files_via_internal_stage( ) else: - self.connector.copy_from_stage( + record_count = self.connector.copy_from_stage( full_table_name=full_table_name, schema=self.schema, sync_id=sync_id, file_format=file_format, ) + with self.record_counter_metric as counter: + counter.increment(record_count) + finally: self.logger.debug("Cleaning up after batch processing") self.connector.drop_file_format(file_format=file_format) From fb4cc82f3d41f0f774b956c843728559b0a101b5 Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Thu, 3 Oct 2024 16:57:11 +0100 Subject: [PATCH 2/2] Emit record count for `BATCH` messages only `RECORD` messages are already counted by the SDK goes through, and processing of them here goes through `insert_batch_files_via_internal_stage` by default --- target_snowflake/sinks.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/target_snowflake/sinks.py b/target_snowflake/sinks.py index 5fc1c5d..96772f6 100644 --- a/target_snowflake/sinks.py +++ b/target_snowflake/sinks.py @@ -171,7 +171,7 @@ def insert_batch_files_via_internal_stage( self, full_table_name: str, files: t.Sequence[str], - ) -> None: + ) -> int: """Process a batch file with the given batch context. Args: @@ -206,9 +206,6 @@ def insert_batch_files_via_internal_stage( file_format=file_format, ) - with self.record_counter_metric as counter: - counter.increment(record_count) - finally: self.logger.debug("Cleaning up after batch processing") self.connector.drop_file_format(file_format=file_format) @@ -220,6 +217,8 @@ def insert_batch_files_via_internal_stage( if os.path.exists(file_path): # noqa: PTH110 os.remove(file_path) # noqa: PTH107 + return record_count + def process_batch_files( self, encoding: BaseBatchFileEncoding, @@ -235,7 +234,7 @@ def process_batch_files( NotImplementedError: If the batch file encoding is not supported. """ if encoding.format == BatchFileFormat.JSONL: - self.insert_batch_files_via_internal_stage( + record_count = self.insert_batch_files_via_internal_stage( full_table_name=self.full_table_name, files=files, ) @@ -245,6 +244,9 @@ def process_batch_files( msg, ) + with self.record_counter_metric as counter: + counter.increment(record_count) + # TODO: remove after https://github.com/meltano/sdk/issues/1819 is fixed def _singer_validate_message(self, record: dict) -> None: """Ensure record conforms to Singer Spec.