Skip to content

Commit bac76fa

Browse files
HGI-6716: Add column batching for GL dailies (#157)
1 parent 22098c7 commit bac76fa

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed

tap_quickbooks/quickbooks/reportstreams/GeneralLedgerReport.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def sync(self, catalog_entry):
142142
params = {
143143
"accounting_method": self.accounting_method,
144144
"columns": ",".join(cols),
145+
"sort_by": "tx_date"
145146
}
146147

147148
if full_sync or self.qb.gl_full_sync:
@@ -211,6 +212,67 @@ def sync(self, catalog_entry):
211212
self.gl_weekly = False
212213
self.gl_daily = True
213214
elif self.gl_daily:
215+
batch_size = 10
216+
stitched_rows = []
217+
row_categories = []
218+
# Add tx_date to each batch to keep rows sorted
219+
column_batches = [["tx_date"] + cols[i:i+batch_size] for i in range(0, len(cols), batch_size)]
220+
batch_params_list = []
221+
for batch in column_batches:
222+
batch_params = params.copy()
223+
batch_params["columns"] = ",".join(batch)
224+
batch_params["start_date"] = start_date.strftime("%Y-%m-%d")
225+
batch_params["end_date"] = end_date.strftime("%Y-%m-%d")
226+
batch_params_list.append(batch_params)
227+
with concurrent.futures.ThreadPoolExecutor(max_workers=len(batch_params_list)) as executor:
228+
resp_batches = list(
229+
executor.map(
230+
lambda x: self.concurrent_get(report_entity="GeneralLedger", params=x),
231+
batch_params_list
232+
)
233+
)
234+
columns_from_metadata = ['Date']
235+
for i, resp_batch in enumerate(resp_batches):
236+
# remove tx_date and categories while appending to columns_from_metadata
237+
# tx_date will be added automatically as it's already a column that will be fetched in a batch
238+
# categories will be added in the end after all the columns are stitched together
239+
columns_from_metadata += self._get_column_metadata(resp_batch)[1:-1]
240+
241+
row_group = resp_batch.get("Rows")
242+
row_array = row_group.get("Row")
243+
244+
start_date = end_date
245+
if row_array is None:
246+
continue
247+
248+
output = []
249+
categories = []
250+
for row in row_array:
251+
self._recursive_row_search(row, output, categories)
252+
253+
for i, raw_row in enumerate(output):
254+
# if the row was never inserted in stitched_rows, append it
255+
if len(stitched_rows) <= i:
256+
stitched_rows.append(raw_row[:-1])
257+
# row_categories maintains a set of categories to avoid duplication
258+
row_categories.append({*raw_row[-1]})
259+
# if the row was already inserted, join new columns to the right
260+
else:
261+
stitched_rows[i] += raw_row[1:-1]
262+
row_categories[i].update(raw_row[-1])
263+
264+
if stitched_rows:
265+
# join categories to the right of the rows
266+
for i, row in enumerate(stitched_rows):
267+
row += [list(row_categories[i])]
268+
269+
# add the categories column at the end
270+
columns_from_metadata.append("Categories")
271+
272+
# we are ready to yield the full rows now
273+
yield from self.clean_row(stitched_rows, columns_from_metadata)
274+
break
275+
else:
214276
# If we already are at gl_daily we have to give up
215277
raise Exception(r)
216278

0 commit comments

Comments
 (0)