Skip to content

Commit 0dddac6

Browse files
committed
Emit record count for BATCH messages only
`RECORD` messages are already counted by the SDK goes through, and processing of them here goes through `insert_batch_files_via_internal_stage` by default
1 parent 359db5c commit 0dddac6

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

Diff for: target_snowflake/sinks.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,6 @@ def insert_batch_files_via_internal_stage(
206206
file_format=file_format,
207207
)
208208

209-
with self.record_counter_metric as counter:
210-
counter.increment(record_count)
211-
212209
finally:
213210
self.logger.debug("Cleaning up after batch processing")
214211
self.connector.drop_file_format(file_format=file_format)
@@ -220,6 +217,8 @@ def insert_batch_files_via_internal_stage(
220217
if os.path.exists(file_path): # noqa: PTH110
221218
os.remove(file_path) # noqa: PTH107
222219

220+
return record_count
221+
223222
def process_batch_files(
224223
self,
225224
encoding: BaseBatchFileEncoding,
@@ -235,7 +234,7 @@ def process_batch_files(
235234
NotImplementedError: If the batch file encoding is not supported.
236235
"""
237236
if encoding.format == BatchFileFormat.JSONL:
238-
self.insert_batch_files_via_internal_stage(
237+
record_count = self.insert_batch_files_via_internal_stage(
239238
full_table_name=self.full_table_name,
240239
files=files,
241240
)
@@ -245,6 +244,9 @@ def process_batch_files(
245244
msg,
246245
)
247246

247+
with self.record_counter_metric as counter:
248+
counter.increment(record_count)
249+
248250
# TODO: remove after https://github.com/meltano/sdk/issues/1819 is fixed
249251
def _singer_validate_message(self, record: dict) -> None:
250252
"""Ensure record conforms to Singer Spec.

0 commit comments

Comments
 (0)