@@ -171,7 +171,7 @@ def insert_batch_files_via_internal_stage(
171
171
self ,
172
172
full_table_name : str ,
173
173
files : t .Sequence [str ],
174
- ) -> None :
174
+ ) -> int :
175
175
"""Process a batch file with the given batch context.
176
176
177
177
Args:
@@ -206,9 +206,6 @@ def insert_batch_files_via_internal_stage(
206
206
file_format = file_format ,
207
207
)
208
208
209
- with self .record_counter_metric as counter :
210
- counter .increment (record_count )
211
-
212
209
finally :
213
210
self .logger .debug ("Cleaning up after batch processing" )
214
211
self .connector .drop_file_format (file_format = file_format )
@@ -220,6 +217,8 @@ def insert_batch_files_via_internal_stage(
220
217
if os .path .exists (file_path ): # noqa: PTH110
221
218
os .remove (file_path ) # noqa: PTH107
222
219
220
+ return record_count
221
+
223
222
def process_batch_files (
224
223
self ,
225
224
encoding : BaseBatchFileEncoding ,
@@ -235,7 +234,7 @@ def process_batch_files(
235
234
NotImplementedError: If the batch file encoding is not supported.
236
235
"""
237
236
if encoding .format == BatchFileFormat .JSONL :
238
- self .insert_batch_files_via_internal_stage (
237
+ record_count = self .insert_batch_files_via_internal_stage (
239
238
full_table_name = self .full_table_name ,
240
239
files = files ,
241
240
)
@@ -245,6 +244,9 @@ def process_batch_files(
245
244
msg ,
246
245
)
247
246
247
+ with self .record_counter_metric as counter :
248
+ counter .increment (record_count )
249
+
248
250
# TODO: remove after https://github.com/meltano/sdk/issues/1819 is fixed
249
251
def _singer_validate_message (self , record : dict ) -> None :
250
252
"""Ensure record conforms to Singer Spec.
0 commit comments