|
18 | 18 | FullRefreshResource, |
19 | 19 | Board, |
20 | 20 | Item, |
| 21 | + ItemsBackfillCursor, |
21 | 22 | ) |
22 | 23 | from source_monday.utils import parse_monday_17_digit_timestamp |
23 | 24 |
|
@@ -305,74 +306,79 @@ async def fetch_items_page( |
305 | 306 | cutoff: LogCursor, |
306 | 307 | ) -> AsyncGenerator[Item | PageCursor, None]: |
307 | 308 | """ |
308 | | - Fetches items for a page of boards using BoardItemIterator for consistent |
309 | | - performance and complexity control. |
310 | | -
|
311 | | - Since boards can only be sorted by created_at (DESC) but return updated_at, |
312 | | - we use the updated_at field which reflects when boards were created or modified. |
313 | | - The page cursor is an ISO 8601 string of the oldest updated_at timestamp |
314 | | - from previously fetched boards plus one millisecond, allowing us to resume |
315 | | - from the last page without being affected by changes in board counts. |
316 | | - """ |
| 309 | + Fetches items for a page of boards using the `BoardItemIterator` for consistent performance |
| 310 | + and complexity control. The `BoardItemIterator` abstracts away Monday's short-lived cursors |
| 311 | + and multiple GraphQL queries necessary to get all items from a board. |
317 | 312 |
|
318 | | - assert page is None or isinstance(page, str) |
319 | | - assert isinstance(cutoff, datetime) |
| 313 | + This function uses ItemsBackfillCursor dataclass to provide type safety with methods for |
| 314 | + cursor operations. The cursor tracks board IDs that need processing: |
320 | 315 |
|
321 | | - page_cutoff = datetime.fromisoformat(page) if page else cutoff |
| 316 | + Example progression: |
| 317 | + 1. Initial: ItemsBackfillCursor(boards={"board1": False, "board2": False}) |
| 318 | + 2. After processing board1: {"board1": null} (merge patch to remove) |
| 319 | + 3. Remaining state: ItemsBackfillCursor(boards={"board2": False}) |
| 320 | + """ |
322 | 321 |
|
323 | | - item_iterator = BoardItemIterator(http, log) |
| 322 | + assert page is None or isinstance(page, dict) |
| 323 | + assert isinstance(cutoff, datetime) |
324 | 324 |
|
325 | 325 | total_items_count = 0 |
326 | 326 | emitted_items_count = 0 |
327 | 327 |
|
328 | | - # Pre-filter boards to avoid fetching items from deleted boards (which causes GraphQL query failures) |
329 | | - # This also filters out boards that are after our page cursor cutoff, so we only fetch items |
330 | | - # from boards that are relevant to the current page |
331 | | - qualifying_board_ids = set() |
332 | | - max_items_count = 0 |
333 | | - async for board in fetch_boards_minimal(http, log): |
334 | | - if board.updated_at <= page_cutoff and board.state != "deleted": |
335 | | - qualifying_board_ids.add(board.id) |
336 | | - if board.items_count is not None: |
337 | | - max_items_count = max(max_items_count, board.items_count) |
338 | | - |
339 | | - if len(qualifying_board_ids) >= BOARDS_PER_ITEMS_PAGE: |
340 | | - break |
| 328 | + if page is None: |
| 329 | + board_ids = [] |
| 330 | + async for board in fetch_boards_minimal(http, log): |
| 331 | + # Cannot query items for deleted boards, so we skip them |
| 332 | + # otherwise, the API returns ambiguous internal server errors. |
| 333 | + if board.updated_at <= cutoff and board.state != "deleted": |
| 334 | + board_ids.append(board.id) |
| 335 | + |
| 336 | + if not board_ids: |
| 337 | + log.debug("No boards found for items backfill") |
| 338 | + return |
| 339 | + |
| 340 | + cursor = ItemsBackfillCursor.from_board_ids(board_ids) |
| 341 | + # Emit a cursor to checkpoint the complete dicionary of boards |
| 342 | + yield cursor.create_initial_cursor() |
| 343 | + else: |
| 344 | + # Create typed cursor from existing page data |
| 345 | + cursor = ItemsBackfillCursor.from_cursor_dict(page) |
341 | 346 |
|
| 347 | + board_ids = cursor.get_next_boards(BOARDS_PER_ITEMS_PAGE) |
342 | 348 |
|
343 | | - log.debug(f"Found {len(qualifying_board_ids)} qualifying boards for items backfill") |
| 349 | + if not board_ids: |
| 350 | + log.debug("No boards to process for items backfill") |
| 351 | + return |
344 | 352 |
|
345 | | - items_generator, get_board_cursor = await item_iterator.get_items_from_boards(list(qualifying_board_ids)) |
| 353 | + log.debug( |
| 354 | + f"{len(cursor.boards)} boards to process for items backfill. Backfilling items for {len(board_ids)} boards.", |
| 355 | + { |
| 356 | + "cutoff": cutoff.isoformat(), |
| 357 | + "board_ids_for_page": board_ids, |
| 358 | + }, |
| 359 | + ) |
346 | 360 |
|
347 | | - async for item in items_generator: |
| 361 | + async for item in get_items_from_boards(http, log, board_ids): |
348 | 362 | total_items_count += 1 |
349 | 363 | if item.updated_at <= cutoff and item.state != "deleted": |
350 | 364 | emitted_items_count += 1 |
351 | 365 | yield item |
352 | 366 |
|
353 | | - board_cursor = get_board_cursor() |
354 | | - |
355 | | - if board_cursor: |
356 | | - if board_cursor > page_cutoff or board_cursor > cutoff: |
357 | | - raise ValueError( |
358 | | - f"Board cursor {board_cursor} is after cutoff {cutoff} or page cutoff {page_cutoff}. " |
359 | | - "This may indicate a race condition or unexpected updated_at values in boards." |
360 | | - ) |
361 | | - |
362 | | - # Note: we are subtracting since the board (page cursor) is moving backwards in time |
363 | | - # to the oldest updated_at until we have no more boards to fetch items from |
364 | | - next_board_cursor = (board_cursor - timedelta(milliseconds=1)).isoformat() |
| 367 | + log.debug( |
| 368 | + f"Items backfill completed for {len(board_ids)} boards with {emitted_items_count} items out of {total_items_count} total items.", |
| 369 | + { |
| 370 | + "completed_boards": board_ids, |
| 371 | + "remaining_boards": len(cursor.boards) - len(board_ids), |
| 372 | + "emitted_items_count": emitted_items_count, |
| 373 | + "total_items_count": total_items_count, |
| 374 | + }, |
| 375 | + ) |
365 | 376 |
|
366 | | - log.debug( |
367 | | - f"Items backfill completed for page {page}", |
368 | | - { |
369 | | - "qualifying_items": emitted_items_count, |
370 | | - "total_items": total_items_count, |
371 | | - "next_page_cutoff": next_board_cursor, |
372 | | - }, |
373 | | - ) |
| 377 | + completion_patch = cursor.create_completion_patch(board_ids) |
| 378 | + yield completion_patch |
374 | 379 |
|
375 | | - yield next_board_cursor |
| 380 | + if len(cursor.boards) <= len(board_ids): |
| 381 | + return |
376 | 382 |
|
377 | 383 |
|
378 | 384 | async def snapshot_resource_paginated( |
|
0 commit comments