source-monday: refactor and simplify to fix data discrepancies#3020
source-monday: refactor and simplify to fix data discrepancies#3020JustinASmith merged 1 commit intomainfrom
Conversation
|
@Alex-Bair This is marked as Ready for Review. However, I still need to finish up the PR description (mostly AI generated at the moment) and force push with a single commit detailing the changes concisely |
| yield item | ||
| log.debug( | ||
| f"Item {item.id} marked as deleted (updated: {item.updated_at})" | ||
| window_has_updates = True |
There was a problem hiding this comment.
Should window_has_updates be reassigned to True whether or not the item was deleted? If so, both that reassignment and the max_update_at_in_window one could be moved one level up, like:
if window_start <= item.updated_at <= window_end:
window_has_updates = True
max_updated_at_in_window = max(
max_updated_at_in_window, item.updated_at
)
if item.state == "deleted":
item.meta_ = Item.Meta(op="d")
yield item
else:
item_ids_to_fetch.add(item.id)There was a problem hiding this comment.
Good question! Though, the way I have it should be correct. The reason is that I only set window_has_updates to True when we actually yield something from the function. I could probably rename this or just use a docs_count counter and check docs_count > 0 to determine when we need to move the cursor forward.
There was a problem hiding this comment.
I'll use a counter to make it a bit more clear when it is used and why.
There was a problem hiding this comment.
and I will remove items_yielded since it is not correctly tracking in all phases of this functions logic.
| yield max_updated_at_in_window + timedelta(seconds=1) | ||
| else: | ||
| log.debug("Incremental sync complete. No updates found.") | ||
| yield window_end |
There was a problem hiding this comment.
Why do we need to move the cursor forward if there are no updates? This would be necessary if the cursor expires after a while or the source system has limited data retention (like Stripe only keeping ~30 days worth of events), and it might be necessary here to reduce API calls somehow? My suspicion is that fetch_items_changes checks a fixed date window so the binding can make smaller, incremental updates that require fewer API calls if it falls behind the present?
There was a problem hiding this comment.
Great question! This is actually a critical design decision related to Monday.com's API limitations. Monday.com only retains the most recent 10,000 activity logs per board. If we don't move the cursor forward when there are no updates, we risk getting stuck in a scenario where:
- Log Retention Issue: If we keep checking the same time window (or barley move the cursor) repeatedly without updates, and meanwhile new activity logs are being generated on the board, the older logs (that we haven't processed yet) could get purged from the 10,000 log limit.
- Data Loss Prevention: If we stayed at the same cursor position and the board had high activity, we could miss processing logs that existed during our previous check but were later purged due to the 10k retention limit.
Since the check to see if there are updates uses both a minimal query to check updated_at and the activity_logs I am confident we can safely move the log cursor forward. However, if you see something I do not now that you know my reasoning, let me know!
I'll add a comment in the code to explain this Monday.com-specific behavior for future me :)
There was a problem hiding this comment.
Gotcha, that make sense then that we have to advance the cursor then. Thanks for adding that comment!
Alex-Bair
left a comment
There was a problem hiding this comment.
Had a few more comments/questions. I like the overall direction you're heading, simplifying the connector so it's easier to figure out what's happening & troubleshoot when something goes wrong.
| ) | ||
| raise ValueError("Query modification failed. Cannot find query body.") | ||
|
|
||
| if complexity_cache.next_query_delay > 0: |
There was a problem hiding this comment.
Correct me if I missed it, but it doesn't look like accessing or setting complexity_cache is coordinated between multiple execute_query invocations. Does this complexity budget tracking strategy work without some kind of coordination? I imagine having separate execute_query coroutines accessing complexity_cache without any coordination would experience read-modify-write race conditions. Multiple coroutines can read the same shared state, make decisions based on that state, and then modify it - but the state they read becomes stale as soon as another coroutine modifies it.
If this works better than the current strategy, let's go with it. Managing complexity limits for GraphQL APIs seems pretty tricky, and I bet properly managing this with some kind of complexity reservation system/leaky bucket algorithm would take some time to work out.
There was a problem hiding this comment.
I removed complexity tracking and rate limiting except for setting a LOW_COMPLEXITY_BUDGET threshold of 10K to delay and wait for Monday's rate limit bucket to refill before continuing to request data. This seemed to work well enough and reduced the complexity of my prior implementation.
I also implemented a Leaky Bucket Rater Limiter, but that too was not worth the complexity. The main goal was to reduce the chance we hit many HTTP 429 errors and subsequently rack up longer and longer delays in the CDKs Rate Limiter which is just an exponential backoff strategy.
There was a problem hiding this comment.
Also, there should not be an issue with multiple coroutines using this since this is handled per-query. There may still be HTTP 429 errors that come up, but this threshold and delay approach was working fine in testing even with many coroutines using the query executor
fcefbbd to
61dd163
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the Monday.com connector to improve modularity, performance, and data consistency by:
- Splitting the monolithic GraphQL file into domain-specific modules under
source_monday/graphql/ - Adding an item cache system and streaming processor for memory-efficient large-response handling
- Updating default sync intervals and enhancing timestamp parsing and complexity tracking
Reviewed Changes
Copilot reviewed 17 out of 18 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| tests/snapshots/* | Updated expected sync intervals in snapshots |
| test.flow.yaml | Aligned default intervals for all resources |
| source_monday/utils.py | Added robust 17-digit timestamp parser |
| source_monday/resources.py | Changed full/incremental fetch to use query constants and updated default intervals |
| source_monday/models.py | Streamlined models, added complexity info fields |
| source_monday/graphql/query_executor.py | Centralized GraphQL executor with complexity limits and retries |
| source_monday/graphql/items/* | New modular item fetch logic and cache adapter |
| source_monday/graphql/boards.py | Batch-aware board fetchers |
| source_monday/graphql/activity_logs.py | Activity log fetch with improved timestamp filtering |
| source_monday/graphql/constants.py | Introduced centralized GraphQL queries |
| source_monday/api.py | Switched to new fetch and cache session API |
| source_monday/init.py | Minor signature adjustment for request_class |
| pyproject.toml | Added aiostream dependency |
Comments suppressed due to low confidence (2)
source-monday/source_monday/graphql/items/item_cache.py:216
- [nitpick] This helper returns an empty async generator but is undocumented. Adding a one-line docstring to explain that it exists purely to satisfy the async generator signature would improve readability.
async def _empty_generator(self) -> AsyncGenerator[Item, None]:
source-monday/source_monday/graphql/items/item_cache.py:1
- The new
ItemCacheSessionand its methods (process_page,_stream_items_from_cache, etc.) are critical for backfill performance but lack direct test coverage. Consider adding unit tests that simulate multiple boards/items and edge cases such as no remaining boards or items.
import itertools
| return | ||
|
|
||
| except GraphQLQueryError as e: | ||
| complexity_error = False |
There was a problem hiding this comment.
The complexity_error flag is initialized but never set to True when a complexity-related error is detected. This prevents the retry logic from distinguishing complexity errors; consider setting complexity_error = True inside the relevant if error.extensions.complexity > error.extensions.maxComplexity or other complexity branches.
There was a problem hiding this comment.
@Alex-Bair this is a good catch by copilot. I am going to remove this complexity_error variable, keeping the same logic. This is what I'll force push once you are done reviewing.
diff --git a/source-monday/source_monday/graphql/query_executor.py b/source-monday/source_monday/graphql/query_executor.py
index d5f424e0..153c34cc 100644
--- a/source-monday/source_monday/graphql/query_executor.py
+++ b/source-monday/source_monday/graphql/query_executor.py
@@ -167,7 +167,6 @@ async def execute_query(
return
except GraphQLQueryError as e:
- complexity_error = False
for error in e.errors:
if error.extensions:
if error.extensions.complexity and error.extensions.maxComplexity:
@@ -198,9 +197,10 @@ async def execute_query(
},
)
await asyncio.sleep(COMPLEXITY_RESET_WAIT_SECONDS)
+ attempt += 1
break
- if not complexity_error and attempt == MAX_RETRY_ATTEMPTS:
+ if attempt == MAX_RETRY_ATTEMPTS:
log.error(
"GraphQL streaming query failed permanently",
{
@@ -213,23 +213,21 @@ async def execute_query(
)
raise
- if not complexity_error:
- retry_delay = attempt * 2
- log.warning(
- "GraphQL query failed - retrying with exponential backoff",
- {
- "error": str(e),
- "attempt": attempt,
- "max_attempts": MAX_RETRY_ATTEMPTS,
- "query_preview": modified_query[:100] + "..." if len(modified_query) > 100 else modified_query,
- "variables": variables,
- "json_path": json_path,
- "response_model": cls.__name__,
- "retry_delay_seconds": retry_delay,
- },
- )
- await asyncio.sleep(retry_delay)
-
+ retry_delay = attempt * 2
+ log.warning(
+ "GraphQL query failed - retrying with exponential backoff",
+ {
+ "error": str(e),
+ "attempt": attempt,
+ "max_attempts": MAX_RETRY_ATTEMPTS,
+ "query_preview": modified_query[:100] + "..." if len(modified_query) > 100 else modified_query,
+ "variables": variables,
+ "json_path": json_path,
+ "response_model": cls.__name__,
+ "retry_delay_seconds": retry_delay,
+ },
+ )
+ await asyncio.sleep(retry_delay)
attempt += 1
except Exception as e:
Alex-Bair
left a comment
There was a problem hiding this comment.
LGTM % question around fewer fields in the capture snapshot for items.
| return dt.strftime(DATETIME_STRING_FORMAT) | ||
|
|
||
|
|
||
| def _str_to_dt(string: str) -> datetime: |
There was a problem hiding this comment.
nit: It doesn't look like _str_to_dt is used anywhere?
| async def _get_or_create_item_cache_session( | ||
| http: HTTPSession, log: Logger, cutoff: datetime | ||
| ) -> ItemCacheSession: | ||
| global _item_cache_session | ||
|
|
||
| if _item_cache_session is None or _item_cache_session.cutoff != cutoff: | ||
| log.debug(f"Creating new item cache session for cutoff: {cutoff}") | ||
| _item_cache_session = ItemCacheSession( | ||
| http=http, | ||
| log=log, | ||
| cutoff=cutoff, | ||
| boards_batch_size=100, | ||
| items_batch_size=500, | ||
| ) | ||
| await _item_cache_session.initialize() | ||
|
|
||
| return _item_cache_session |
There was a problem hiding this comment.
nit: Instead of having _get_or_create_item_cache_session initialize the global variable _item_cache_session, can we instead initialize an ItemCacheSession when we're creating the items resource in resources.py and thread it into fetch_items_page? I think that'd be clearer, especially since the ItemCacheSession is only used within fetch_items_page and nowhere else.
There was a problem hiding this comment.
nit: None of the changes in this file are needed, are they?
There was a problem hiding this comment.
I blame Claude Code. 😆 I'll revert that since it was just things that satisfy mypy and/or reordering imports.
| else: | ||
| log.warning( | ||
| f"Item {item.id} updated_at {item.updated_at} is outside the sync window " | ||
| f"({window_start} to {window_end}). This should not happen with the query filter." | ||
| ) |
There was a problem hiding this comment.
nit: Trying to confirm my understanding. We have to add a day to end since the API uses a daily granularity, and we're expecting there to be some items returned that have been updated after window_end. We're filtering those out client-side, which makes sense. But that would mean we're expecting to reach this warning log in the else branch occasionally. The log message sounds like this should never happen, but it seems like it's actually expected?
There was a problem hiding this comment.
Nice catch. That is true. It is expected and I'll remove that log message 🤦 . That is just what is going to happen with the daily granularity constraint and needing to move the window_end a day forward.
| yield max_updated_at_in_window + timedelta(seconds=1) | ||
| else: | ||
| log.debug("Incremental sync complete. No updates found.") | ||
| yield window_end |
There was a problem hiding this comment.
Gotcha, that make sense then that we have to advance the cursor then. Thanks for adding that comment!
| json_path: str, | ||
| query: str, | ||
| variables: dict[str, Any] | None = None, | ||
| remainder_cls: type[TRemainder] = GraphQLResponseRemainder, # type: ignore[assignment] |
There was a problem hiding this comment.
nit: It doesn't look like we need the #type: ignore[assignment], do we? If I remove it, I don't see any type errors/warnings.
| "assets": [], | ||
| "board": { | ||
| "id": "9323816949", | ||
| "name": "jjjjj" | ||
| "id": "8431289505" | ||
| }, | ||
| "column_values": [ | ||
| { | ||
| "id": "person", | ||
| "text": "", | ||
| "type": "people", | ||
| "value": null | ||
| }, | ||
| { | ||
| "id": "status", | ||
| "text": "Working on it", | ||
| "type": "status", | ||
| "value": "{\"index\":0,\"post_id\":null,\"changed_at\":\"2019-03-01T17:24:57.321Z\"}" | ||
| }, | ||
| { | ||
| "id": "date4", | ||
| "text": "2025-06-04", | ||
| "type": "date", | ||
| "value": "{\"date\":\"2025-06-04\",\"icon\":null,\"changed_at\":\"2025-06-06T20:38:20.505Z\"}" | ||
| } | ||
| ], | ||
| "created_at": "2025-06-06T20:38:19Z", | ||
| "creator_id": "71985416", | ||
| "group": { | ||
| "id": "topics" | ||
| }, | ||
| "id": "9323816986", | ||
| "name": "Item 1", | ||
| "parent_item": null, | ||
| "state": "archived", | ||
| "subitems": [], | ||
| "subscribers": [ | ||
| { | ||
| "id": "71985416" | ||
| } | ||
| ], | ||
| "updated_at": "redacted", | ||
| "updates": [] |
There was a problem hiding this comment.
There are a lot of fields that are no longer returned for items. What's the reason for that?
There was a problem hiding this comment.
Ops. That was because I simplified the query for final testing to get data quicker and forgot to add it back. Note, that this was only for a final test. I tested with full queries too.
There was a problem hiding this comment.
Note that the captured board/items is different since the prior snapshot was for a deleted board and deleted boards should not be captured since their items/subitems cannot be queried.
…erformance and memory improvements • Restructure monolithic graphql.py (1109 lines) into specialized modules: - graphql/activity_logs.py for activity log operations - graphql/boards.py for board management - graphql/items/ package with items.py and item_cache.py - graphql/query_executor.py for centralized query execution with IncrementalJsonProcessor - graphql/constants.py for shared constants • Implement IncrementalJsonProcessor for memory-efficient streaming of large GraphQL responses • Add item cache system for efficient items backfill operations and reduced API calls • Enhance API client with improved error handling, complexity management, and debugging capabilities • Improve code quality, API compliance, and maintainability through modular design • Update models and resources to support modular architecture • Add utility functions for common operations • Update test snapshots to reflect architectural changes This refactoring addresses data discrepancy (missing data) issues and maintainability concerns while significantly improving performance and memory efficiency for data synchronization operations.
61dd163 to
e85e629
Compare
Description:
This PR refactors the Monday.com source connector's GraphQL architecture to address data discrepancy issues and improve performance through modular design and intelligent caching.
Key Changes
🏗️ Architectural Refactoring
graphql.py(1109 lines) into specialized modules:graphql/activity_logs.py- Activity log operationsgraphql/boards.py- Board managementgraphql/items/- Items processing with cachinggraphql/query_executor.py- Centralized query execution withIncrementalJsonProcessorgraphql/constants.py- Shared constants🚀 Performance & Memory Improvements
IncrementalJsonProcessorfor large GraphQL responses🔧 Data Consistency Fixes
🛠️ Code Quality Improvements
Impact
Before: Data discrepancies due to performance bottlenecks preventing checkpoints on large accounts, inefficient API usage causing rate limits, and complex tightly-coupled code.
After: Reliable data synchronization with meaningful progress tracking, optimized API usage with proactive rate limit management, and maintainable modular architecture.
Testing
Workflow steps: No changes to user workflow - connector maintains same interface and functionality.
Documentation links affected: None - internal refactoring only.
Notes for reviewers:
source_monday/graphql/directoryThis change is