Skip to content

Commit 5cc6220

Browse files
authored
feat: checkpointing mid drive (#8606)
1 parent 15da1e0 commit 5cc6220

File tree

3 files changed

+642
-30
lines changed

3 files changed

+642
-30
lines changed

backend/onyx/connectors/sharepoint/connector.py

Lines changed: 174 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,12 @@ class SharepointConnectorCheckpoint(ConnectorCheckpoint):
244244
current_drive_name: str | None = None
245245
# Drive's web_url from the API - used as raw_node_id for DRIVE hierarchy nodes
246246
current_drive_web_url: str | None = None
247+
# Resolved drive ID — avoids re-resolving on checkpoint resume
248+
current_drive_id: str | None = None
249+
# Next delta API page URL for per-page checkpointing within a drive.
250+
# When set, Phase 3b fetches one page at a time so progress is persisted
251+
# between pages. None means BFS path or no active delta traversal.
252+
current_drive_delta_next_link: str | None = None
247253

248254
process_site_pages: bool = False
249255

@@ -1403,6 +1409,87 @@ def _iter_delta_pages(
14031409
if not page_url:
14041410
break
14051411

1412+
def _build_delta_start_url(
1413+
self,
1414+
drive_id: str,
1415+
start: datetime | None = None,
1416+
page_size: int = 200,
1417+
) -> str:
1418+
"""Build the initial delta API URL with query parameters embedded.
1419+
1420+
Embeds ``$top`` (and optionally a timestamp ``token``) directly in the
1421+
URL so that the returned string is fully self-contained and can be
1422+
stored in a checkpoint without needing a separate params dict.
1423+
"""
1424+
base_url = f"{self.graph_api_base}/drives/{drive_id}/root/delta"
1425+
params = [f"$top={page_size}"]
1426+
if start is not None and start > _EPOCH:
1427+
token = quote(start.isoformat(timespec="seconds"))
1428+
params.append(f"token={token}")
1429+
return f"{base_url}?{'&'.join(params)}"
1430+
1431+
def _fetch_one_delta_page(
1432+
self,
1433+
page_url: str,
1434+
drive_id: str,
1435+
start: datetime | None = None,
1436+
end: datetime | None = None,
1437+
page_size: int = 200,
1438+
) -> tuple[list[DriveItemData], str | None]:
1439+
"""Fetch a single page of delta API results.
1440+
1441+
Returns ``(items, next_page_url)``. *next_page_url* is ``None`` when
1442+
the delta enumeration is complete (deltaLink with no nextLink).
1443+
1444+
On 410 Gone (expired token) returns ``([], full_resync_url)`` so
1445+
the caller can store the resync URL in the checkpoint and retry on
1446+
the next cycle.
1447+
"""
1448+
try:
1449+
data = self._graph_api_get_json(page_url)
1450+
except requests.HTTPError as e:
1451+
if e.response is not None and e.response.status_code == 410:
1452+
logger.warning(
1453+
"Delta token expired (410 Gone) for drive '%s'. "
1454+
"Will restart with full delta enumeration.",
1455+
drive_id,
1456+
)
1457+
full_url = (
1458+
f"{self.graph_api_base}/drives/{drive_id}/root/delta"
1459+
f"?$top={page_size}"
1460+
)
1461+
return [], full_url
1462+
raise
1463+
1464+
items: list[DriveItemData] = []
1465+
for item in data.get("value", []):
1466+
if "folder" in item or "deleted" in item:
1467+
continue
1468+
if start is not None or end is not None:
1469+
raw_ts = item.get("lastModifiedDateTime")
1470+
if raw_ts:
1471+
mod_dt = datetime.fromisoformat(raw_ts.replace("Z", "+00:00"))
1472+
if start is not None and mod_dt < start:
1473+
continue
1474+
if end is not None and mod_dt > end:
1475+
continue
1476+
items.append(DriveItemData.from_graph_json(item))
1477+
1478+
next_url = data.get("@odata.nextLink")
1479+
if next_url:
1480+
return items, next_url
1481+
return items, None
1482+
1483+
@staticmethod
1484+
def _clear_drive_checkpoint_state(
1485+
checkpoint: "SharepointConnectorCheckpoint",
1486+
) -> None:
1487+
"""Reset all drive-level fields in the checkpoint."""
1488+
checkpoint.current_drive_name = None
1489+
checkpoint.current_drive_id = None
1490+
checkpoint.current_drive_web_url = None
1491+
checkpoint.current_drive_delta_next_link = None
1492+
14061493
def _fetch_slim_documents_from_sharepoint(self) -> GenerateSlimDocumentOutput:
14071494
site_descriptors = self.site_descriptors or self.fetch_sites()
14081495

@@ -1844,22 +1931,22 @@ def _load_from_checkpoint(
18441931
# Return checkpoint to allow persistence after drive initialization
18451932
return checkpoint
18461933

1847-
# Phase 3: Process documents from current drive
1934+
# Phase 3a: Initialize the next drive for processing
18481935
if (
18491936
checkpoint.current_site_descriptor
18501937
and checkpoint.cached_drive_names
18511938
and len(checkpoint.cached_drive_names) > 0
18521939
and checkpoint.current_drive_name is None
18531940
):
1854-
18551941
checkpoint.current_drive_name = checkpoint.cached_drive_names.popleft()
18561942

18571943
start_dt = datetime.fromtimestamp(start, tz=timezone.utc)
18581944
end_dt = datetime.fromtimestamp(end, tz=timezone.utc)
18591945
site_descriptor = checkpoint.current_site_descriptor
18601946

18611947
logger.info(
1862-
f"Processing drive '{checkpoint.current_drive_name}' in site: {site_descriptor.url}"
1948+
f"Processing drive '{checkpoint.current_drive_name}' "
1949+
f"in site: {site_descriptor.url}"
18631950
)
18641951
logger.debug(f"Time range: {start_dt} to {end_dt}")
18651952

@@ -1868,46 +1955,110 @@ def _load_from_checkpoint(
18681955
logger.warning("Current drive name is None, skipping")
18691956
return checkpoint
18701957

1871-
driveitems: Iterable[DriveItemData] = iter(())
1872-
drive_web_url: str | None = None
18731958
try:
18741959
logger.info(
18751960
f"Fetching drive items for drive name: {current_drive_name}"
18761961
)
18771962
result = self._resolve_drive(site_descriptor, current_drive_name)
1878-
if result is not None:
1879-
drive_id, drive_web_url = result
1880-
driveitems = self._get_drive_items_for_drive_id(
1881-
site_descriptor, drive_id, start_dt, end_dt
1882-
)
1883-
checkpoint.current_drive_web_url = drive_web_url
1963+
if result is None:
1964+
logger.warning(f"Drive '{current_drive_name}' not found, skipping")
1965+
self._clear_drive_checkpoint_state(checkpoint)
1966+
return checkpoint
1967+
1968+
drive_id, drive_web_url = result
1969+
checkpoint.current_drive_id = drive_id
1970+
checkpoint.current_drive_web_url = drive_web_url
18841971
except Exception as e:
18851972
logger.error(
1886-
f"Failed to retrieve items from drive '{current_drive_name}' in site: {site_descriptor.url}: {e}"
1973+
f"Failed to retrieve items from drive '{current_drive_name}' "
1974+
f"in site: {site_descriptor.url}: {e}"
18871975
)
18881976
yield _create_entity_failure(
18891977
f"{site_descriptor.url}|{current_drive_name}",
1890-
f"Failed to access drive '{current_drive_name}' in site '{site_descriptor.url}': {str(e)}",
1978+
f"Failed to access drive '{current_drive_name}' "
1979+
f"in site '{site_descriptor.url}': {str(e)}",
18911980
(start_dt, end_dt),
18921981
e,
18931982
)
1894-
checkpoint.current_drive_name = None
1895-
checkpoint.current_drive_web_url = None
1983+
self._clear_drive_checkpoint_state(checkpoint)
18961984
return checkpoint
18971985

1898-
# Normalize drive name (e.g., "Documents" -> "Shared Documents")
1899-
current_drive_name = SHARED_DOCUMENTS_MAP.get(
1986+
display_drive_name = SHARED_DOCUMENTS_MAP.get(
19001987
current_drive_name, current_drive_name
19011988
)
19021989

19031990
if drive_web_url:
19041991
yield from self._yield_drive_hierarchy_node(
19051992
site_descriptor.url,
19061993
drive_web_url,
1907-
current_drive_name,
1994+
display_drive_name,
19081995
checkpoint,
19091996
)
19101997

1998+
# For non-folder-scoped drives, use delta API with per-page
1999+
# checkpointing. Build the initial URL and fall through to 3b.
2000+
if not site_descriptor.folder_path:
2001+
checkpoint.current_drive_delta_next_link = self._build_delta_start_url(
2002+
drive_id, start_dt
2003+
)
2004+
# else: BFS path — delta_next_link stays None;
2005+
# Phase 3b will use _iter_drive_items_paged.
2006+
2007+
# Phase 3b: Process items from the current drive
2008+
if (
2009+
checkpoint.current_site_descriptor
2010+
and checkpoint.current_drive_name is not None
2011+
and checkpoint.current_drive_id is not None
2012+
):
2013+
site_descriptor = checkpoint.current_site_descriptor
2014+
start_dt = datetime.fromtimestamp(start, tz=timezone.utc)
2015+
end_dt = datetime.fromtimestamp(end, tz=timezone.utc)
2016+
current_drive_name = SHARED_DOCUMENTS_MAP.get(
2017+
checkpoint.current_drive_name, checkpoint.current_drive_name
2018+
)
2019+
drive_web_url = checkpoint.current_drive_web_url
2020+
2021+
# --- determine item source ---
2022+
driveitems: Iterable[DriveItemData]
2023+
has_more_delta_pages = False
2024+
2025+
if checkpoint.current_drive_delta_next_link:
2026+
# Delta path: fetch one page at a time for checkpointing
2027+
try:
2028+
page_items, next_url = self._fetch_one_delta_page(
2029+
page_url=checkpoint.current_drive_delta_next_link,
2030+
drive_id=checkpoint.current_drive_id,
2031+
start=start_dt,
2032+
end=end_dt,
2033+
)
2034+
except Exception as e:
2035+
logger.error(
2036+
f"Failed to fetch delta page for drive "
2037+
f"'{current_drive_name}': {e}"
2038+
)
2039+
yield _create_entity_failure(
2040+
f"{site_descriptor.url}|{current_drive_name}",
2041+
f"Failed to fetch delta page for drive "
2042+
f"'{current_drive_name}': {str(e)}",
2043+
(start_dt, end_dt),
2044+
e,
2045+
)
2046+
self._clear_drive_checkpoint_state(checkpoint)
2047+
return checkpoint
2048+
2049+
driveitems = page_items
2050+
has_more_delta_pages = next_url is not None
2051+
if next_url:
2052+
checkpoint.current_drive_delta_next_link = next_url
2053+
else:
2054+
# BFS path (folder-scoped): process all items at once
2055+
driveitems = self._iter_drive_items_paged(
2056+
drive_id=checkpoint.current_drive_id,
2057+
folder_path=site_descriptor.folder_path,
2058+
start=start_dt,
2059+
end=end_dt,
2060+
)
2061+
19112062
item_count = 0
19122063
for driveitem in driveitems:
19132064
item_count += 1
@@ -1949,8 +2100,6 @@ def _load_from_checkpoint(
19492100
if include_permissions:
19502101
ctx = self._create_rest_client_context(site_descriptor.url)
19512102

1952-
# Re-acquire token in case it expired during a long traversal
1953-
# MSAL has a cache that returns the same token while still valid.
19542103
access_token = self._get_graph_access_token()
19552104
doc_or_failure = _convert_driveitem_to_document_with_permissions(
19562105
driveitem,
@@ -1986,8 +2135,11 @@ def _load_from_checkpoint(
19862135
)
19872136

19882137
logger.info(f"Processed {item_count} items in drive '{current_drive_name}'")
1989-
checkpoint.current_drive_name = None
1990-
checkpoint.current_drive_web_url = None
2138+
2139+
if has_more_delta_pages:
2140+
return checkpoint
2141+
2142+
self._clear_drive_checkpoint_state(checkpoint)
19912143

19922144
# Phase 4: Progression logic - determine next step
19932145
# If we have more drives in current site, continue with current site

0 commit comments

Comments
 (0)