Skip to content

Commit 3d0ac25

Browse files
authored
HGI-9408: Fix column batching logic (#192)
1 parent 2726cc5 commit 3d0ac25

File tree

2 files changed

+181
-1
lines changed

2 files changed

+181
-1
lines changed

.DS_Store

-6 KB
Binary file not shown.

tap_quickbooks/quickbooks/reportstreams/GeneralLedgerReport.py

Lines changed: 181 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ def sync(self, catalog_entry):
7979
cols = [
8080
"tx_date",
8181
"subt_nat_amount",
82+
"subt_nat_amount_nt",
83+
"subt_nat_amount_home_nt",
8284
"credit_amt",
8385
"debt_amt",
8486
"subt_nat_home_amount",
@@ -142,13 +144,15 @@ def sync(self, catalog_entry):
142144
params = {
143145
"accounting_method": self.accounting_method,
144146
"columns": ",".join(cols),
147+
"sort_by": "tx_date"
145148
}
146149

147150
if full_sync or self.qb.gl_full_sync:
148151
LOGGER.info(f"Starting full sync of GeneralLedgerReport")
149152
start_date = self.start_date
150153
start_date = start_date.replace(tzinfo=None)
151154
min_time = datetime.datetime.min.time()
155+
152156
today = datetime.date.today()
153157
today = datetime.datetime.combine(today, min_time)
154158

@@ -202,18 +206,194 @@ def sync(self, catalog_entry):
202206
# parse data and set the new start_date
203207
for r in resp:
204208
if r.get("error") == "Too much data for current period":
205-
start_date = datetime.datetime.strptime(
209+
error_start_date = datetime.datetime.strptime(
206210
r.get("start_date"), "%Y-%m-%d"
207211
)
212+
error_end_date = datetime.datetime.strptime(
213+
r.get("end_date"), "%Y-%m-%d"
214+
)
208215
if not self.gl_weekly and not self.gl_daily:
209216
self.gl_weekly = True
217+
start_date = error_start_date
210218
elif self.gl_weekly and not self.gl_daily:
211219
self.gl_weekly = False
212220
self.gl_daily = True
221+
start_date = error_start_date
213222
elif self.gl_daily:
223+
# Set start_date to track which period we're processing
224+
start_date = error_start_date
225+
batch_size = 10
226+
227+
# Define identity columns that will be included in every batch
228+
# These are used to match rows across batches
229+
identity_cols = ["tx_date", "txn_type", "subt_nat_amount", "subt_nat_home_amount", "subt_nat_amount_nt", "subt_nat_amount_home_nt", "credit_amt", "debt_amt", "credit_home_amt", "debt_home_amt"]
230+
# Add doc_num and account_name if they exist in cols
231+
if "doc_num" in cols:
232+
identity_cols.append("doc_num")
233+
if "account_name" in cols:
234+
identity_cols.append("account_name")
235+
236+
# Remove identity columns from cols to avoid duplication
237+
# Keep original order for non-identity columns
238+
other_cols = [c for c in cols if c not in identity_cols]
239+
240+
# Create batches: each batch includes identity_cols + a slice of other_cols
241+
column_batches = []
242+
for i in range(0, len(other_cols), batch_size):
243+
batch = identity_cols + other_cols[i:i+batch_size]
244+
column_batches.append(batch)
245+
246+
batch_params_list = []
247+
for batch in column_batches:
248+
batch_params = params.copy()
249+
batch_params["columns"] = ",".join(batch)
250+
batch_params["start_date"] = error_start_date.strftime("%Y-%m-%d")
251+
batch_params["end_date"] = error_end_date.strftime("%Y-%m-%d")
252+
batch_params_list.append(batch_params)
253+
254+
with concurrent.futures.ThreadPoolExecutor(max_workers=len(batch_params_list)) as executor:
255+
resp_batches = list(
256+
executor.map(
257+
lambda x: self.concurrent_get(report_entity="GeneralLedger", params=x),
258+
batch_params_list
259+
)
260+
)
261+
262+
# Dictionary to store rows by their identity key
263+
# Key: tuple of identity column values, Value: list of row entries
264+
# Using list to handle duplicate keys (pop in order)
265+
rows_by_key = {}
266+
key_order = [] # Maintain order of keys as they appear in first batch
267+
268+
# Build complete column metadata list as we process batches
269+
all_columns = []
270+
identity_col_mappings = {} # Map identity col name to its English schema name
271+
272+
for id_col in identity_cols:
273+
identity_col_mappings[id_col] = eng_schema.get(id_col, id_col)
274+
275+
for batch_idx, resp_batch in enumerate(resp_batches):
276+
row_group = resp_batch.get("Rows")
277+
row_array = row_group.get("Row")
278+
279+
if row_array is None:
280+
continue
281+
282+
# Get column metadata for this batch
283+
batch_metadata = self._get_column_metadata(resp_batch, eng_schema)[:-1] # Exclude Categories
284+
285+
# Build complete column list as we process batches
286+
if batch_idx == 0:
287+
all_columns = batch_metadata.copy()
288+
else:
289+
# Add new columns from this batch to all_columns
290+
for col in batch_metadata:
291+
if col not in all_columns:
292+
all_columns.append(col)
293+
294+
output = []
295+
categories = []
296+
for row in row_array:
297+
self._recursive_row_search(row, output, categories)
298+
299+
# Find identity column indices in this batch's metadata
300+
identity_indices = []
301+
for id_col in identity_cols:
302+
id_col_mapped = identity_col_mappings[id_col]
303+
if id_col_mapped in batch_metadata:
304+
identity_indices.append(batch_metadata.index(id_col_mapped))
305+
else:
306+
identity_indices.append(None)
307+
308+
# Process each row in this batch
309+
for raw_row in output:
310+
# Extract identity key from raw_row
311+
identity_values = []
312+
for idx in identity_indices:
313+
if idx is not None and idx < len(raw_row) - 1: # -1 for categories
314+
cell = raw_row[idx]
315+
# Extract value from dict or use directly
316+
if isinstance(cell, dict):
317+
identity_values.append(cell.get("value", ""))
318+
else:
319+
identity_values.append(cell if cell is not None else "")
320+
else:
321+
identity_values.append("")
322+
row_key = tuple(identity_values)
323+
324+
# Extract all column data for this row, maintaining batch_metadata order
325+
row_data_by_col = {}
326+
for col_idx, col_name in enumerate(batch_metadata):
327+
if col_idx < len(raw_row) - 1: # -1 for categories
328+
row_data_by_col[col_name] = raw_row[col_idx]
329+
330+
categories_data = set(raw_row[-1]) if raw_row[-1] else set()
331+
332+
# Store row data
333+
if batch_idx == 0:
334+
# First batch: initialize row entry
335+
if row_key not in rows_by_key:
336+
rows_by_key[row_key] = []
337+
key_order.append(row_key)
338+
339+
rows_by_key[row_key].append({
340+
'column_data': row_data_by_col.copy(),
341+
'categories': categories_data,
342+
'batches_processed': [0]
343+
})
344+
else:
345+
# Subsequent batches: find matching row and merge data
346+
if row_key in rows_by_key and rows_by_key[row_key]:
347+
# Find the first unprocessed row with this key
348+
for row_entry in rows_by_key[row_key]:
349+
if batch_idx not in row_entry['batches_processed']:
350+
# Merge column data from this batch
351+
row_entry['column_data'].update(row_data_by_col)
352+
row_entry['categories'].update(categories_data)
353+
row_entry['batches_processed'].append(batch_idx)
354+
break
355+
356+
# Reconstruct stitched rows in key_order with correct column ordering
357+
stitched_rows = []
358+
row_categories = []
359+
for row_key in key_order:
360+
if row_key in rows_by_key:
361+
for row_entry in rows_by_key[row_key]:
362+
# Build row in all_columns order
363+
stitched_row = []
364+
for col in all_columns:
365+
if col in row_entry['column_data']:
366+
stitched_row.append(row_entry['column_data'][col])
367+
else:
368+
# Column not present in any batch for this row
369+
stitched_row.append(None)
370+
371+
stitched_rows.append(stitched_row)
372+
row_categories.append(list(row_entry['categories']))
373+
374+
columns_from_metadata = all_columns
375+
376+
if stitched_rows:
377+
# Join categories to the right of the rows
378+
for i, row in enumerate(stitched_rows):
379+
row.append(row_categories[i])
380+
381+
# Add the categories column at the end
382+
columns_from_metadata.append("Categories")
383+
384+
# We are ready to yield the full rows now
385+
yield from self.clean_row(stitched_rows, columns_from_metadata)
386+
387+
# After successful batching, continue to process other responses
388+
# The while loop will naturally continue after all responses are processed
389+
start_date = error_end_date + datetime.timedelta(days=1)
390+
break
391+
else:
214392
# If we already are at gl_daily we have to give up
215393
raise Exception(r)
216394

395+
# For mode switching (weekly/daily), break to retry with new mode
396+
# The start_date is already set to error_start_date above
217397
break
218398
else:
219399
self.gl_weekly = False

0 commit comments

Comments
 (0)