Skip to content

Commit ae9f8c3

Browse files
authored
checkpointed confluence (#4473)
* checkpointed confluence * confluence checkpointing tested * fixed integration tests * attempt to fix connector test flakiness * fix rebase
1 parent 742041d commit ae9f8c3

File tree

19 files changed

+580
-126
lines changed

19 files changed

+580
-126
lines changed

backend/onyx/background/indexing/checkpointing_utils.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from onyx.configs.constants import FileOrigin
99
from onyx.connectors.interfaces import BaseConnector
10-
from onyx.connectors.interfaces import CheckpointConnector
10+
from onyx.connectors.interfaces import CheckpointedConnector
1111
from onyx.connectors.models import ConnectorCheckpoint
1212
from onyx.db.engine import get_db_current_time
1313
from onyx.db.index_attempt import get_index_attempt
@@ -61,7 +61,7 @@ def load_checkpoint(
6161
try:
6262
checkpoint_io = file_store.read_file(checkpoint_pointer, mode="rb")
6363
checkpoint_data = checkpoint_io.read().decode("utf-8")
64-
if isinstance(connector, CheckpointConnector):
64+
if isinstance(connector, CheckpointedConnector):
6565
return connector.validate_checkpoint_json(checkpoint_data)
6666
return ConnectorCheckpoint.model_validate_json(checkpoint_data)
6767
except RuntimeError:

backend/onyx/connectors/confluence/connector.py

+142-81
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
import copy
12
from datetime import datetime
23
from datetime import timedelta
34
from datetime import timezone
45
from typing import Any
56
from urllib.parse import quote
67

78
from requests.exceptions import HTTPError
9+
from typing_extensions import override
810

911
from onyx.configs.app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP
1012
from onyx.configs.app_configs import CONFLUENCE_TIMEZONE_OFFSET
@@ -22,17 +24,19 @@
2224
from onyx.connectors.exceptions import CredentialExpiredError
2325
from onyx.connectors.exceptions import InsufficientPermissionsError
2426
from onyx.connectors.exceptions import UnexpectedValidationError
27+
from onyx.connectors.interfaces import CheckpointedConnector
28+
from onyx.connectors.interfaces import CheckpointOutput
29+
from onyx.connectors.interfaces import ConnectorCheckpoint
30+
from onyx.connectors.interfaces import ConnectorFailure
2531
from onyx.connectors.interfaces import CredentialsConnector
2632
from onyx.connectors.interfaces import CredentialsProviderInterface
27-
from onyx.connectors.interfaces import GenerateDocumentsOutput
2833
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
29-
from onyx.connectors.interfaces import LoadConnector
30-
from onyx.connectors.interfaces import PollConnector
3134
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
3235
from onyx.connectors.interfaces import SlimConnector
3336
from onyx.connectors.models import BasicExpertInfo
3437
from onyx.connectors.models import ConnectorMissingCredentialError
3538
from onyx.connectors.models import Document
39+
from onyx.connectors.models import DocumentFailure
3640
from onyx.connectors.models import ImageSection
3741
from onyx.connectors.models import SlimDocument
3842
from onyx.connectors.models import TextSection
@@ -68,9 +72,16 @@
6872
ONE_HOUR = 3600
6973

7074

75+
def _should_propagate_error(e: Exception) -> bool:
76+
return "field 'updated' is invalid" in str(e)
77+
78+
79+
class ConfluenceCheckpoint(ConnectorCheckpoint):
80+
last_updated: SecondsSinceUnixEpoch
81+
82+
7183
class ConfluenceConnector(
72-
LoadConnector,
73-
PollConnector,
84+
CheckpointedConnector[ConfluenceCheckpoint],
7485
SlimConnector,
7586
CredentialsConnector,
7687
):
@@ -211,6 +222,8 @@ def _construct_page_query(
211222
"%Y-%m-%d %H:%M"
212223
)
213224
page_query += f" and lastmodified <= '{formatted_end_time}'"
225+
226+
page_query += " order by lastmodified asc"
214227
return page_query
215228

216229
def _construct_attachment_query(self, confluence_page_id: str) -> str:
@@ -236,11 +249,14 @@ def _get_comment_string_for_page_id(self, page_id: str) -> str:
236249
)
237250
return comment_string
238251

239-
def _convert_page_to_document(self, page: dict[str, Any]) -> Document | None:
252+
def _convert_page_to_document(
253+
self, page: dict[str, Any]
254+
) -> Document | ConnectorFailure:
240255
"""
241256
Converts a Confluence page to a Document object.
242257
Includes the page content, comments, and attachments.
243258
"""
259+
page_id = page_url = ""
244260
try:
245261
# Extract basic page information
246262
page_id = page["id"]
@@ -336,25 +352,103 @@ def _convert_page_to_document(self, page: dict[str, Any]) -> Document | None:
336352
)
337353
except Exception as e:
338354
logger.error(f"Error converting page {page.get('id', 'unknown')}: {e}")
339-
if not self.continue_on_failure:
355+
if _should_propagate_error(e):
340356
raise
341-
return None
357+
return ConnectorFailure(
358+
failed_document=DocumentFailure(
359+
document_id=page_id,
360+
document_link=page_url,
361+
),
362+
failure_message=f"Error converting page {page.get('id', 'unknown')}: {e}",
363+
exception=e,
364+
)
365+
366+
def _fetch_page_attachments(
367+
self, page: dict[str, Any], doc: Document
368+
) -> Document | ConnectorFailure:
369+
attachment_query = self._construct_attachment_query(page["id"])
370+
371+
for attachment in self.confluence_client.paginated_cql_retrieval(
372+
cql=attachment_query,
373+
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
374+
):
375+
attachment["metadata"].get("mediaType", "")
376+
if not validate_attachment_filetype(
377+
attachment,
378+
):
379+
logger.info(f"Skipping attachment: {attachment['title']}")
380+
continue
381+
382+
logger.info(f"Processing attachment: {attachment['title']}")
383+
384+
# Attempt to get textual content or image summarization:
385+
object_url = build_confluence_document_id(
386+
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
387+
)
388+
try:
389+
response = convert_attachment_to_content(
390+
confluence_client=self.confluence_client,
391+
attachment=attachment,
392+
page_id=page["id"],
393+
allow_images=self.allow_images,
394+
)
395+
if response is None:
396+
continue
397+
398+
content_text, file_storage_name = response
399+
400+
if content_text:
401+
doc.sections.append(
402+
TextSection(
403+
text=content_text,
404+
link=object_url,
405+
)
406+
)
407+
elif file_storage_name:
408+
doc.sections.append(
409+
ImageSection(
410+
link=object_url,
411+
image_file_name=file_storage_name,
412+
)
413+
)
414+
except Exception as e:
415+
logger.error(
416+
f"Failed to extract/summarize attachment {attachment['title']}",
417+
exc_info=e,
418+
)
419+
if not self.continue_on_failure:
420+
if _should_propagate_error(e):
421+
raise
422+
# TODO: should we remove continue_on_failure entirely now that we have checkpointing?
423+
return ConnectorFailure(
424+
failed_document=DocumentFailure(
425+
document_id=doc.id,
426+
document_link=object_url,
427+
),
428+
failure_message=f"Failed to extract/summarize attachment {attachment['title']} for doc {doc.id}",
429+
exception=e,
430+
)
431+
return doc
342432

343433
def _fetch_document_batches(
344434
self,
435+
checkpoint: ConfluenceCheckpoint,
345436
start: SecondsSinceUnixEpoch | None = None,
346437
end: SecondsSinceUnixEpoch | None = None,
347-
) -> GenerateDocumentsOutput:
438+
) -> CheckpointOutput[ConfluenceCheckpoint]:
348439
"""
349440
Yields batches of Documents. For each page:
350441
- Create a Document with 1 Section for the page text/comments
351442
- Then fetch attachments. For each attachment:
352443
- Attempt to convert it with convert_attachment_to_content(...)
353444
- If successful, create a new Section with the extracted text or summary.
354445
"""
355-
doc_batch: list[Document] = []
446+
doc_count = 0
356447

357-
page_query = self._construct_page_query(start, end)
448+
checkpoint = copy.deepcopy(checkpoint)
449+
450+
# use "start" when last_updated is 0
451+
page_query = self._construct_page_query(checkpoint.last_updated or start, end)
358452
logger.debug(f"page_query: {page_query}")
359453

360454
for page in self.confluence_client.paginated_cql_retrieval(
@@ -363,94 +457,61 @@ def _fetch_document_batches(
363457
limit=self.batch_size,
364458
):
365459
# Build doc from page
366-
doc = self._convert_page_to_document(page)
367-
if not doc:
368-
continue
369-
370-
# Now get attachments for that page:
371-
attachment_query = self._construct_attachment_query(page["id"])
372-
# We'll use the page's XML to provide context if we summarize an image
373-
page.get("body", {}).get("storage", {}).get("value", "")
374-
375-
for attachment in self.confluence_client.paginated_cql_retrieval(
376-
cql=attachment_query,
377-
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
378-
):
379-
attachment["metadata"].get("mediaType", "")
380-
if not validate_attachment_filetype(
381-
attachment,
382-
):
383-
logger.info(f"Skipping attachment: {attachment['title']}")
384-
continue
460+
doc_or_failure = self._convert_page_to_document(page)
385461

386-
logger.info(f"Processing attachment: {attachment['title']}")
462+
if isinstance(doc_or_failure, ConnectorFailure):
463+
yield doc_or_failure
464+
continue
387465

388-
# Attempt to get textual content or image summarization:
389-
try:
390-
response = convert_attachment_to_content(
391-
confluence_client=self.confluence_client,
392-
attachment=attachment,
393-
page_id=page["id"],
394-
allow_images=self.allow_images,
395-
)
396-
if response is None:
397-
continue
466+
checkpoint.last_updated = datetime_from_string(
467+
page["version"]["when"]
468+
).timestamp()
398469

399-
content_text, file_storage_name = response
400-
object_url = build_confluence_document_id(
401-
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
402-
)
403-
if content_text:
404-
doc.sections.append(
405-
TextSection(
406-
text=content_text,
407-
link=object_url,
408-
)
409-
)
410-
elif file_storage_name:
411-
doc.sections.append(
412-
ImageSection(
413-
link=object_url,
414-
image_file_name=file_storage_name,
415-
)
416-
)
417-
except Exception as e:
418-
logger.error(
419-
f"Failed to extract/summarize attachment {attachment['title']}",
420-
exc_info=e,
421-
)
422-
if not self.continue_on_failure:
423-
raise
470+
# Now get attachments for that page:
471+
doc_or_failure = self._fetch_page_attachments(page, doc_or_failure)
424472

425-
doc_batch.append(doc)
473+
if isinstance(doc_or_failure, ConnectorFailure):
474+
yield doc_or_failure
475+
continue
426476

427-
if len(doc_batch) >= self.batch_size:
428-
yield doc_batch
429-
doc_batch = []
477+
# yield completed document
478+
doc_count += 1
479+
yield doc_or_failure
430480

431-
if doc_batch:
432-
yield doc_batch
481+
# create checkpoint after enough documents have been processed
482+
if doc_count >= self.batch_size:
483+
return checkpoint
433484

434-
def load_from_state(self) -> GenerateDocumentsOutput:
435-
return self._fetch_document_batches()
485+
checkpoint.has_more = False
486+
return checkpoint
436487

437-
def poll_source(
488+
@override
489+
def load_from_checkpoint(
438490
self,
439-
start: SecondsSinceUnixEpoch | None = None,
440-
end: SecondsSinceUnixEpoch | None = None,
441-
) -> GenerateDocumentsOutput:
491+
start: SecondsSinceUnixEpoch,
492+
end: SecondsSinceUnixEpoch,
493+
checkpoint: ConfluenceCheckpoint,
494+
) -> CheckpointOutput[ConfluenceCheckpoint]:
442495
try:
443-
return self._fetch_document_batches(start, end)
496+
return self._fetch_document_batches(checkpoint, start, end)
444497
except Exception as e:
445-
if "field 'updated' is invalid" in str(e) and start is not None:
498+
if _should_propagate_error(e) and start is not None:
446499
logger.warning(
447500
"Confluence says we provided an invalid 'updated' field. This may indicate"
448501
"a real issue, but can also appear during edge cases like daylight"
449502
f"savings time changes. Retrying with a 1 hour offset. Error: {e}"
450503
)
451-
return self._fetch_document_batches(start - ONE_HOUR, end)
504+
return self._fetch_document_batches(checkpoint, start - ONE_HOUR, end)
452505
raise
453506

507+
@override
508+
def build_dummy_checkpoint(self) -> ConfluenceCheckpoint:
509+
return ConfluenceCheckpoint(last_updated=0, has_more=True)
510+
511+
@override
512+
def validate_checkpoint_json(self, checkpoint_json: str) -> ConfluenceCheckpoint:
513+
return ConfluenceCheckpoint.model_validate_json(checkpoint_json)
514+
454515
def retrieve_all_slim_documents(
455516
self,
456517
start: SecondsSinceUnixEpoch | None = None,

backend/onyx/connectors/connector_runner.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import TypeVar
77

88
from onyx.connectors.interfaces import BaseConnector
9-
from onyx.connectors.interfaces import CheckpointConnector
9+
from onyx.connectors.interfaces import CheckpointedConnector
1010
from onyx.connectors.interfaces import CheckpointOutput
1111
from onyx.connectors.interfaces import LoadConnector
1212
from onyx.connectors.interfaces import PollConnector
@@ -95,9 +95,9 @@ def run(self, checkpoint: CT) -> Generator[
9595
]:
9696
"""Adds additional exception logging to the connector."""
9797
try:
98-
if isinstance(self.connector, CheckpointConnector):
98+
if isinstance(self.connector, CheckpointedConnector):
9999
if self.time_range is None:
100-
raise ValueError("time_range is required for CheckpointConnector")
100+
raise ValueError("time_range is required for CheckpointedConnector")
101101

102102
start = time.monotonic()
103103
checkpoint_connector_generator = self.connector.load_from_checkpoint(

backend/onyx/connectors/factory.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from onyx.connectors.highspot.connector import HighspotConnector
3535
from onyx.connectors.hubspot.connector import HubSpotConnector
3636
from onyx.connectors.interfaces import BaseConnector
37-
from onyx.connectors.interfaces import CheckpointConnector
37+
from onyx.connectors.interfaces import CheckpointedConnector
3838
from onyx.connectors.interfaces import CredentialsConnector
3939
from onyx.connectors.interfaces import EventConnector
4040
from onyx.connectors.interfaces import LoadConnector
@@ -148,7 +148,7 @@ def identify_connector_class(
148148
# all connectors should be checkpoint connectors
149149
and (
150150
not issubclass(connector, PollConnector)
151-
and not issubclass(connector, CheckpointConnector)
151+
and not issubclass(connector, CheckpointedConnector)
152152
)
153153
),
154154
(

backend/onyx/connectors/github/connector.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from onyx.connectors.exceptions import CredentialExpiredError
2626
from onyx.connectors.exceptions import InsufficientPermissionsError
2727
from onyx.connectors.exceptions import UnexpectedValidationError
28-
from onyx.connectors.interfaces import CheckpointConnector
28+
from onyx.connectors.interfaces import CheckpointedConnector
2929
from onyx.connectors.interfaces import CheckpointOutput
3030
from onyx.connectors.interfaces import ConnectorCheckpoint
3131
from onyx.connectors.interfaces import ConnectorFailure
@@ -143,7 +143,7 @@ class GithubConnectorCheckpoint(ConnectorCheckpoint):
143143
cached_repo: SerializedRepository | None = None
144144

145145

146-
class GithubConnector(CheckpointConnector[GithubConnectorCheckpoint]):
146+
class GithubConnector(CheckpointedConnector[GithubConnectorCheckpoint]):
147147
def __init__(
148148
self,
149149
repo_owner: str,

0 commit comments

Comments
 (0)