Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions tap_quickbooks/quickbooks/reportstreams/GeneralLedgerReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def sync(self, catalog_entry):
params = {
"accounting_method": self.accounting_method,
"columns": ",".join(cols),
"sort_by": "tx_date"
}

if full_sync or self.qb.gl_full_sync:
Expand Down Expand Up @@ -211,6 +212,67 @@ def sync(self, catalog_entry):
self.gl_weekly = False
self.gl_daily = True
elif self.gl_daily:
batch_size = 10
stitched_rows = []
row_categories = []
# Add tx_date to each batch to keep rows sorted
column_batches = [["tx_date"] + cols[i:i+batch_size] for i in range(0, len(cols), batch_size)]
batch_params_list = []
for batch in column_batches:
batch_params = params.copy()
batch_params["columns"] = ",".join(batch)
batch_params["start_date"] = start_date.strftime("%Y-%m-%d")
batch_params["end_date"] = end_date.strftime("%Y-%m-%d")
batch_params_list.append(batch_params)
with concurrent.futures.ThreadPoolExecutor(max_workers=len(batch_params_list)) as executor:
resp_batches = list(
executor.map(
lambda x: self.concurrent_get(report_entity="GeneralLedger", params=x),
batch_params_list
)
)
columns_from_metadata = ['Date']
for i, resp_batch in enumerate(resp_batches):
# remove tx_date and categories while appending to columns_from_metadata
# tx_date will be added automatically as it's already a column that will be fetched in a batch
# categories will be added in the end after all the columns are stitched together
columns_from_metadata += self._get_column_metadata(resp_batch)[1:-1]

row_group = resp_batch.get("Rows")
row_array = row_group.get("Row")

start_date = end_date
if row_array is None:
continue

output = []
categories = []
for row in row_array:
self._recursive_row_search(row, output, categories)

for i, raw_row in enumerate(output):
# if the row was never inserted in stitched_rows, append it
if len(stitched_rows) <= i:
stitched_rows.append(raw_row[:-1])
# row_categories maintains a set of categories to avoid duplication
row_categories.append({*raw_row[-1]})
# if the row was already inserted, join new columns to the right
else:
stitched_rows[i] += raw_row[1:-1]
row_categories[i].update(raw_row[-1])

if stitched_rows:
# join categories to the right of the rows
for i, row in enumerate(stitched_rows):
row += [list(row_categories[i])]

# add the categories column at the end
columns_from_metadata.append("Categories")

# we are ready to yield the full rows now
yield from self.clean_row(stitched_rows, columns_from_metadata)
break
else:
# If we already are at gl_daily we have to give up
raise Exception(r)

Expand Down