Skip to content

Commit e02fb06

Browse files
committed
source-monday: update items backfill to use dict-based page cursors
This addresses a fundamental limitation and issue with the prior implementation by using a dictionary for tracking boards that need their items backfilled. This leverages the CDK so that the initial page of all boards is first yielded and only patches for processed boards are yieled on subsequent invocations fo `fetch_items_page`. This improves the reliability, efficiency and consistency in backfilling items, while working around the API limitations that Monday.com presents.
1 parent c9ed568 commit e02fb06

File tree

3 files changed

+102
-51
lines changed

3 files changed

+102
-51
lines changed

source-monday/source_monday/api.py

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
FullRefreshResource,
1919
Board,
2020
Item,
21+
ItemsBackfillCursor,
2122
)
2223
from source_monday.utils import parse_monday_17_digit_timestamp
2324

@@ -305,74 +306,79 @@ async def fetch_items_page(
305306
cutoff: LogCursor,
306307
) -> AsyncGenerator[Item | PageCursor, None]:
307308
"""
308-
Fetches items for a page of boards using BoardItemIterator for consistent
309-
performance and complexity control.
310-
311-
Since boards can only be sorted by created_at (DESC) but return updated_at,
312-
we use the updated_at field which reflects when boards were created or modified.
313-
The page cursor is an ISO 8601 string of the oldest updated_at timestamp
314-
from previously fetched boards plus one millisecond, allowing us to resume
315-
from the last page without being affected by changes in board counts.
316-
"""
309+
Fetches items for a page of boards using the `BoardItemIterator` for consistent performance
310+
and complexity control. The `BoardItemIterator` abstracts away Monday's short-lived cursors
311+
and multiple GraphQL queries necessary to get all items from a board.
317312
318-
assert page is None or isinstance(page, str)
319-
assert isinstance(cutoff, datetime)
313+
This function uses ItemsBackfillCursor dataclass to provide type safety with methods for
314+
cursor operations. The cursor tracks board IDs that need processing:
320315
321-
page_cutoff = datetime.fromisoformat(page) if page else cutoff
316+
Example progression:
317+
1. Initial: ItemsBackfillCursor(boards={"board1": False, "board2": False})
318+
2. After processing board1: {"board1": null} (merge patch to remove)
319+
3. Remaining state: ItemsBackfillCursor(boards={"board2": False})
320+
"""
322321

323-
item_iterator = BoardItemIterator(http, log)
322+
assert page is None or isinstance(page, dict)
323+
assert isinstance(cutoff, datetime)
324324

325325
total_items_count = 0
326326
emitted_items_count = 0
327327

328-
# Pre-filter boards to avoid fetching items from deleted boards (which causes GraphQL query failures)
329-
# This also filters out boards that are after our page cursor cutoff, so we only fetch items
330-
# from boards that are relevant to the current page
331-
qualifying_board_ids = set()
332-
max_items_count = 0
333-
async for board in fetch_boards_minimal(http, log):
334-
if board.updated_at <= page_cutoff and board.state != "deleted":
335-
qualifying_board_ids.add(board.id)
336-
if board.items_count is not None:
337-
max_items_count = max(max_items_count, board.items_count)
338-
339-
if len(qualifying_board_ids) >= BOARDS_PER_ITEMS_PAGE:
340-
break
328+
if page is None:
329+
board_ids = []
330+
async for board in fetch_boards_minimal(http, log):
331+
# Cannot query items for deleted boards, so we skip them
332+
# otherwise, the API returns ambiguous internal server errors.
333+
if board.updated_at <= cutoff and board.state != "deleted":
334+
board_ids.append(board.id)
335+
336+
if not board_ids:
337+
log.debug("No boards found for items backfill")
338+
return
339+
340+
cursor = ItemsBackfillCursor.from_board_ids(board_ids)
341+
# Emit a cursor to checkpoint the complete dicionary of boards
342+
yield cursor.create_initial_cursor()
343+
else:
344+
# Create typed cursor from existing page data
345+
cursor = ItemsBackfillCursor.from_cursor_dict(page)
341346

347+
board_ids = cursor.get_next_boards(BOARDS_PER_ITEMS_PAGE)
342348

343-
log.debug(f"Found {len(qualifying_board_ids)} qualifying boards for items backfill")
349+
if not board_ids:
350+
log.debug("No boards to process for items backfill")
351+
return
344352

345-
items_generator, get_board_cursor = await item_iterator.get_items_from_boards(list(qualifying_board_ids))
353+
log.debug(
354+
f"{len(cursor.boards)} boards to process for items backfill. Backfilling items for {len(board_ids)} boards.",
355+
{
356+
"cutoff": cutoff.isoformat(),
357+
"board_ids_for_page": board_ids,
358+
},
359+
)
346360

347-
async for item in items_generator:
361+
async for item in get_items_from_boards(http, log, board_ids):
348362
total_items_count += 1
349363
if item.updated_at <= cutoff and item.state != "deleted":
350364
emitted_items_count += 1
351365
yield item
352366

353-
board_cursor = get_board_cursor()
354-
355-
if board_cursor:
356-
if board_cursor > page_cutoff or board_cursor > cutoff:
357-
raise ValueError(
358-
f"Board cursor {board_cursor} is after cutoff {cutoff} or page cutoff {page_cutoff}. "
359-
"This may indicate a race condition or unexpected updated_at values in boards."
360-
)
361-
362-
# Note: we are subtracting since the board (page cursor) is moving backwards in time
363-
# to the oldest updated_at until we have no more boards to fetch items from
364-
next_board_cursor = (board_cursor - timedelta(milliseconds=1)).isoformat()
367+
log.debug(
368+
f"Items backfill completed for {len(board_ids)} boards with {emitted_items_count} items out of {total_items_count} total items.",
369+
{
370+
"completed_boards": board_ids,
371+
"remaining_boards": len(cursor.boards) - len(board_ids),
372+
"emitted_items_count": emitted_items_count,
373+
"total_items_count": total_items_count,
374+
},
375+
)
365376

366-
log.debug(
367-
f"Items backfill completed for page {page}",
368-
{
369-
"qualifying_items": emitted_items_count,
370-
"total_items": total_items_count,
371-
"next_page_cutoff": next_board_cursor,
372-
},
373-
)
377+
completion_patch = cursor.create_completion_patch(board_ids)
378+
yield completion_patch
374379

375-
yield next_board_cursor
380+
if len(cursor.boards) <= len(board_ids):
381+
return
376382

377383

378384
async def snapshot_resource_paginated(

source-monday/source_monday/graphql/items.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ async def get_items_from_boards(
131131
board_ids: list[str],
132132
items_limit: int = ITEMS_PER_BOARD,
133133
) -> AsyncGenerator[Item, None]:
134-
if not board_ids or len(board_ids) == 0:
134+
if len(board_ids) == 0:
135135
log.error("get_items_from_boards requires a non-empty list of board IDs.")
136136
raise ValueError("get_items_from_boards requires a non-empty list of board IDs.")
137137

source-monday/source_monday/models.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@
1010
TypeVar,
1111
)
1212

13+
from dataclasses import dataclass
14+
1315
from estuary_cdk.capture.common import (
1416
BaseDocument,
1517
ConnectorState as GenericConnectorState,
1618
LogCursor,
1719
PageCursor,
1820
ResourceState,
21+
make_cursor_dict,
1922
)
2023
from estuary_cdk.flow import (
2124
AccessToken,
@@ -179,6 +182,48 @@ class Board(BaseModel, extra="allow"):
179182
)
180183

181184

185+
@dataclass
186+
class ItemsBackfillCursor:
187+
"""
188+
Cursor structure for items backfill operations.
189+
190+
Tracks which boards need to be processed during backfill:
191+
- Keys: Board IDs (strings) that need processing
192+
- Values: Always False
193+
194+
When boards are completed, they're removed via JSON merge patch:
195+
{"completed_board_id": null}
196+
"""
197+
198+
boards: dict[str, bool]
199+
200+
@classmethod
201+
def from_cursor_dict(
202+
cls, cursor_dict: dict[str, Literal[False] | None]
203+
) -> "ItemsBackfillCursor":
204+
boards = {k: v for k, v in cursor_dict.items() if v is not None}
205+
return cls(boards=boards)
206+
207+
@classmethod
208+
def from_board_ids(cls, board_ids: list[str]) -> "ItemsBackfillCursor":
209+
if not board_ids:
210+
return cls(boards={})
211+
212+
boards = {board_id: False for board_id in board_ids}
213+
return cls(boards=boards)
214+
215+
def get_next_boards(self, limit: int) -> list[str]:
216+
return list(self.boards.keys())[:limit]
217+
218+
def create_initial_cursor(self) -> dict[str, Literal[False]]:
219+
return make_cursor_dict(self.boards)
220+
221+
def create_completion_patch(
222+
self, completed_board_ids: list[str]
223+
) -> dict[str, None]:
224+
return make_cursor_dict({board_id: None for board_id in completed_board_ids})
225+
226+
182227
IncrementalResourceFetchChangesFn = Callable[
183228
[HTTPSession, Logger, LogCursor],
184229
AsyncGenerator[BaseDocument | LogCursor, None],

0 commit comments

Comments
 (0)