diff --git a/tap_quickbooks/quickbooks/reportstreams/GeneralLedgerReport.py b/tap_quickbooks/quickbooks/reportstreams/GeneralLedgerReport.py index 8ebf921..34d72dd 100644 --- a/tap_quickbooks/quickbooks/reportstreams/GeneralLedgerReport.py +++ b/tap_quickbooks/quickbooks/reportstreams/GeneralLedgerReport.py @@ -142,6 +142,7 @@ def sync(self, catalog_entry): params = { "accounting_method": self.accounting_method, "columns": ",".join(cols), + "sort_by": "tx_date" } if full_sync or self.qb.gl_full_sync: @@ -211,6 +212,67 @@ def sync(self, catalog_entry): self.gl_weekly = False self.gl_daily = True elif self.gl_daily: + batch_size = 10 + stitched_rows = [] + row_categories = [] + # Add tx_date to each batch to keep rows sorted + column_batches = [["tx_date"] + cols[i:i+batch_size] for i in range(0, len(cols), batch_size)] + batch_params_list = [] + for batch in column_batches: + batch_params = params.copy() + batch_params["columns"] = ",".join(batch) + batch_params["start_date"] = start_date.strftime("%Y-%m-%d") + batch_params["end_date"] = end_date.strftime("%Y-%m-%d") + batch_params_list.append(batch_params) + with concurrent.futures.ThreadPoolExecutor(max_workers=len(batch_params_list)) as executor: + resp_batches = list( + executor.map( + lambda x: self.concurrent_get(report_entity="GeneralLedger", params=x), + batch_params_list + ) + ) + columns_from_metadata = ['Date'] + for i, resp_batch in enumerate(resp_batches): + # remove tx_date and categories while appending to columns_from_metadata + # tx_date will be added automatically as it's already a column that will be fetched in a batch + # categories will be added in the end after all the columns are stitched together + columns_from_metadata += self._get_column_metadata(resp_batch)[1:-1] + + row_group = resp_batch.get("Rows") + row_array = row_group.get("Row") + + start_date = end_date + if row_array is None: + continue + + output = [] + categories = [] + for row in row_array: + self._recursive_row_search(row, output, categories) + + for i, raw_row in enumerate(output): + # if the row was never inserted in stitched_rows, append it + if len(stitched_rows) <= i: + stitched_rows.append(raw_row[:-1]) + # row_categories maintains a set of categories to avoid duplication + row_categories.append({*raw_row[-1]}) + # if the row was already inserted, join new columns to the right + else: + stitched_rows[i] += raw_row[1:-1] + row_categories[i].update(raw_row[-1]) + + if stitched_rows: + # join categories to the right of the rows + for i, row in enumerate(stitched_rows): + row += [list(row_categories[i])] + + # add the categories column at the end + columns_from_metadata.append("Categories") + + # we are ready to yield the full rows now + yield from self.clean_row(stitched_rows, columns_from_metadata) + break + else: # If we already are at gl_daily we have to give up raise Exception(r)