Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a new QuickBooks source connector for the Estuary Flow platform. The connector implements OAuth2 authentication, supports 28 different QuickBooks entity types, and provides both backfill and incremental sync capabilities.
Key changes:
- Implementation of QuickBooks API connector with OAuth2 authentication
- Support for 28 QuickBooks entities including Accounts, Invoices, Customers, etc.
- Backfill and incremental data sync with configurable window sizes
- Snapshot testing setup for connector spec validation
Reviewed Changes
Copilot reviewed 39 out of 41 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| source_quickbooks/init.py | Main connector class implementing spec, discover, validate, and open methods |
| source_quickbooks/models.py | Data models for all 28 QuickBooks entities and endpoint configuration |
| source_quickbooks/api.py | API interaction logic for querying, backfilling, and fetching entities |
| source_quickbooks/resources.py | Resource configuration and credential validation logic |
| tests/test_snapshots.py | Snapshot test for connector spec validation |
| test.flow.yaml | Flow configuration file for testing all resource bindings |
| acmeCo/*.schema.yaml | JSON schemas for all QuickBooks entity collections |
| pyproject.toml | Project dependencies and configuration |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try: | ||
| await anext( | ||
| query_entity( | ||
| Account, EPOCH, datetime.now(tz=UTC), http, config.realm_id, log | ||
| ) | ||
| ) | ||
| except HTTPError as err: | ||
| msg = f"Encountered error validating credentials.\n\n{err.message}" | ||
| if err.code == 401: | ||
| msg = f"Invalid credentials. Please confirm the provided credentials are correct.\n\n{err.message}" | ||
|
|
||
| raise ValidationError([msg]) |
There was a problem hiding this comment.
The validate_credentials function only catches HTTPError but doesn't handle the case where the query returns zero results. The anext() call will raise StopAsyncIteration if the Account query returns no results, which would not be caught and would propagate as an unhandled exception rather than a ValidationError.
| inc=common.ResourceState.Incremental(cursor=cutoff), | ||
| ), | ||
| initial_config=ResourceConfig( | ||
| name=resource.resource_name, interval=timedelta(minutes=5) |
There was a problem hiding this comment.
The default interval of 5 minutes is hardcoded here and duplicated across all resources in test.flow.yaml (PT5M). Consider defining this as a constant (e.g., DEFAULT_RESOURCE_INTERVAL) to ensure consistency and ease of maintenance.
| from estuary_cdk.capture.common import ( | ||
| BaseDocument, | ||
| ResourceConfig, | ||
| ResourceState, | ||
| ) |
There was a problem hiding this comment.
Import of 'ResourceConfig' is not used.
| def dt_to_ts(dt: AwareDatetime) -> str: | ||
| return dt.isoformat(timespec="seconds").replace("+00:00", "Z") |
There was a problem hiding this comment.
I'm adding source-specific timestamp handling logic to the standard utility function dt_to_ts, which might be counterintuitive. Let me know if you'd like it changed
There was a problem hiding this comment.
As long as this transformation is generally useful and how we'd want to transform all datetimes to strings in source-quickbooks, I think it's fine.
17bc996 to
5e35804
Compare
Alex-Bair
left a comment
There was a problem hiding this comment.
Had a question about the operators used in query_entity.
Also, the connector will need added to the CI process in .github/workflows/python.yaml so it gets built and deployed.
| def dt_to_ts(dt: AwareDatetime) -> str: | ||
| return dt.isoformat(timespec="seconds").replace("+00:00", "Z") |
There was a problem hiding this comment.
As long as this transformation is generally useful and how we'd want to transform all datetimes to strings in source-quickbooks, I think it's fine.
5e35804 to
4ad8a6a
Compare
Alex-Bair
left a comment
There was a problem hiding this comment.
After our discussion this morning, I had a few more comments around the backfill strategy & what I think caused those 401/403 errors you mentioned.
LMK if anything isn't clear or you have questions about any of my comments.
| }, | ||
| accessTokenHeaders={ | ||
| "Content-Type": "application/x-www-form-urlencoded", | ||
| "Authorization": r"Basic {{#basicauth}}{{{ client_id }}}:{{{ client_secret }}}{{/basicauth}}", |
There was a problem hiding this comment.
Nice job figuring out how to use a template to place the client id and secret in an Authorization header during the access token request! This template is used by the access-token.ts Supabase function to fetch the initial set of tokens when users press the "AUTHENTICATE WITH INTUIT" button in the UI. However, this template does not affect how token exchange is performed within the connector, and I think that may explain the 401/403 errors you were seeing during testing.
The TokenSource._fetch_oauth2_token function is where the headers & body for token exchange requests are constructed and used. Right now, connectors cannot control whether the client id and secret are used in an Authorization header or in the body; that's hardcoded based on the OAuth2 credentials class. Since the RotatingOAuth2Credentials always puts the client id and secret in the request body, I'd anticipate the token exchange in TokenSource._fetch_oauth2_token is failing for source-quickbooks since Intuit expects the client id and secret to be base64 encoded in an Authorization header.
What we want is to allow connectors to choose whether the client id and secret gets placed in an Authorization header or the request body within TokenSource._fetch_oauth2_token regardless of what type of OAuth2 credentials class is used. Could you take a crack at figuring out how to do that? This would be a relatively contained change that'd be a good introduction to working in the CDK.
| async def backfill_entity( | ||
| model: Type[T], | ||
| http: HTTPSession, | ||
| realm_id: str, | ||
| window_size: timedelta, | ||
| log: Logger, | ||
| page: PageCursor, | ||
| cutoff: LogCursor, | ||
| ) -> AsyncGenerator[T | PageCursor, None]: | ||
| assert isinstance(page, str) | ||
| assert isinstance(cutoff, datetime) | ||
|
|
||
| page_ts = datetime.fromisoformat(page) | ||
| max_date_to_fetch = min(page_ts + window_size, cutoff) | ||
|
|
||
| log.info( | ||
| "Initiating backfill fetch", | ||
| { | ||
| "page_ts": page_ts, | ||
| "max_date_to_fetch": max_date_to_fetch, | ||
| "window_size": window_size, | ||
| "cutoff": cutoff, | ||
| }, | ||
| ) | ||
|
|
||
| if page_ts >= cutoff: | ||
| return | ||
|
|
||
| async for item in query_entity( | ||
| model, page_ts, max_date_to_fetch, http, realm_id, log | ||
| ): | ||
| yield item | ||
|
|
||
| yield max_date_to_fetch.isoformat(timespec="seconds") |
There was a problem hiding this comment.
What's the motivation for using a window_size during the backfill? Based on query_entity, it looks like we can both filter and sort the results returned from the API, meaning we could request all entities between some start and the cutoff and treat the items from query_entity as a stream of ordered results.
Instead of potentially requiring users to fiddle with the window_size, I'd prefer if we used a backfill strategy like we're doing in source-klaviyo-native's backfill_incremental_resources. Klaviyo's API is similar; we can filter and sort results so the connector receives results in ascending order of some cursor field. The connector iterates through the results, yielding them and checkpointing when it's safe to do so (i.e. we've captured all results at that cursor value and eariler). And if the backfill_incremental_resources function runs for more than 5 minutes, it exits after its next checkpoint, then the CDK will re-invoke with the most recent checkpoint passed in as page.
There was a problem hiding this comment.
For additional context, we've usually used date windows when the results from the API aren't sorted & it's unlikely the fetch_page function could process all results in a reasonable time frame. We use date windows to split the backfill range into smaller, more manageable chunks, then checkpoint each chunk as it's completed.
If the API supports sorting results in ascending order and there's not some other API limitation that makes date windows more appealing, we usually try to avoid using date windows
01afe58 to
5aa95c5
Compare
| # Though technically a numerical value, realm IDs tend to be large enough that precision loss | ||
| # can alter what's actually used. | ||
| # This value is presented to end users as their company ID | ||
| realm_id: str = Field( | ||
| description="ID for the Company to Request Data From", | ||
| title="Company ID", | ||
| json_schema_extra={"order": 0}, | ||
| ) |
There was a problem hiding this comment.
I tried to parse this from QuickBook's OAuth flow, but it looks like the CDK doesn't support reading URL parameters from redirect URI requests yet
eb1f1b3 to
52cb681
Compare
Alex-Bair
left a comment
There was a problem hiding this comment.
LGTM %:
- Merging #3503 next week and rebasing this PR after that one is merged. We'll want this PR to end up having the
source-quickbookschange without of all the CDK changes that'll get merged with the other PR. - Adding a
config.yaml(or just some mock credentials directly intest.flow.yaml) so CI checks run & pass.
| - python | ||
| - "-m" | ||
| - source_quickbooks | ||
| config: config.yaml |
There was a problem hiding this comment.
It looks like there's no config.yaml file with some mock credentials, and that's causing the checks in CI to fail.
|
|
||
| yield item | ||
|
|
||
| yield cutoff.isoformat(timespec="seconds") |
There was a problem hiding this comment.
At the end of backfill_entity, instead of:
- yielding the cutoff
- reinvoking
backfill_entitywith the cutoff as thepageargument - immediately returning
we could just return. Both signal that the backfill is finished, but the latter has fewer steps.
4fde7db to
c73f4be
Compare
c73f4be to
4c76882
Compare
Description:
This PR adds a new connector for the QuickBooks accounting service. The collected objects are
Closes #3437
Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
A documentation page will have to be prepared.
Notes for reviewers: