Skip to content

Commit 9156b74

Browse files
committed
checkpointed confluence
1 parent 88d4a65 commit 9156b74

File tree

15 files changed

+181
-123
lines changed

15 files changed

+181
-123
lines changed

backend/onyx/background/indexing/checkpointing_utils.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from onyx.configs.constants import FileOrigin
99
from onyx.connectors.interfaces import BaseConnector
10-
from onyx.connectors.interfaces import CheckpointConnector
10+
from onyx.connectors.interfaces import CheckpointedConnector
1111
from onyx.connectors.models import ConnectorCheckpoint
1212
from onyx.db.engine import get_db_current_time
1313
from onyx.db.index_attempt import get_index_attempt
@@ -61,7 +61,7 @@ def load_checkpoint(
6161
try:
6262
checkpoint_io = file_store.read_file(checkpoint_pointer, mode="rb")
6363
checkpoint_data = checkpoint_io.read().decode("utf-8")
64-
if isinstance(connector, CheckpointConnector):
64+
if isinstance(connector, CheckpointedConnector):
6565
return connector.validate_checkpoint_json(checkpoint_data)
6666
return ConnectorCheckpoint.model_validate_json(checkpoint_data)
6767
except RuntimeError:

backend/onyx/connectors/confluence/connector.py

+138-81
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
import copy
12
from datetime import datetime
23
from datetime import timedelta
34
from datetime import timezone
45
from typing import Any
56
from urllib.parse import quote
67

78
from requests.exceptions import HTTPError
9+
from typing_extensions import override
810

911
from onyx.configs.app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP
1012
from onyx.configs.app_configs import CONFLUENCE_TIMEZONE_OFFSET
@@ -22,17 +24,19 @@
2224
from onyx.connectors.exceptions import CredentialExpiredError
2325
from onyx.connectors.exceptions import InsufficientPermissionsError
2426
from onyx.connectors.exceptions import UnexpectedValidationError
27+
from onyx.connectors.interfaces import CheckpointedConnector
28+
from onyx.connectors.interfaces import CheckpointOutput
29+
from onyx.connectors.interfaces import ConnectorCheckpoint
30+
from onyx.connectors.interfaces import ConnectorFailure
2531
from onyx.connectors.interfaces import CredentialsConnector
2632
from onyx.connectors.interfaces import CredentialsProviderInterface
27-
from onyx.connectors.interfaces import GenerateDocumentsOutput
2833
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
29-
from onyx.connectors.interfaces import LoadConnector
30-
from onyx.connectors.interfaces import PollConnector
3134
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
3235
from onyx.connectors.interfaces import SlimConnector
3336
from onyx.connectors.models import BasicExpertInfo
3437
from onyx.connectors.models import ConnectorMissingCredentialError
3538
from onyx.connectors.models import Document
39+
from onyx.connectors.models import DocumentFailure
3640
from onyx.connectors.models import ImageSection
3741
from onyx.connectors.models import SlimDocument
3842
from onyx.connectors.models import TextSection
@@ -68,9 +72,16 @@
6872
ONE_HOUR = 3600
6973

7074

75+
def _should_propagate_error(e: Exception) -> bool:
76+
return "field 'updated' is invalid" in str(e)
77+
78+
79+
class ConfluenceCheckpoint(ConnectorCheckpoint):
80+
last_updated: SecondsSinceUnixEpoch
81+
82+
7183
class ConfluenceConnector(
72-
LoadConnector,
73-
PollConnector,
84+
CheckpointedConnector[ConfluenceCheckpoint],
7485
SlimConnector,
7586
CredentialsConnector,
7687
):
@@ -190,6 +201,8 @@ def _construct_page_query(
190201
"%Y-%m-%d %H:%M"
191202
)
192203
page_query += f" and lastmodified <= '{formatted_end_time}'"
204+
205+
page_query += " order by lastmodified asc"
193206
return page_query
194207

195208
def _construct_attachment_query(self, confluence_page_id: str) -> str:
@@ -215,11 +228,14 @@ def _get_comment_string_for_page_id(self, page_id: str) -> str:
215228
)
216229
return comment_string
217230

218-
def _convert_page_to_document(self, page: dict[str, Any]) -> Document | None:
231+
def _convert_page_to_document(
232+
self, page: dict[str, Any]
233+
) -> Document | ConnectorFailure:
219234
"""
220235
Converts a Confluence page to a Document object.
221236
Includes the page content, comments, and attachments.
222237
"""
238+
page_id = page_url = ""
223239
try:
224240
# Extract basic page information
225241
page_id = page["id"]
@@ -315,25 +331,103 @@ def _convert_page_to_document(self, page: dict[str, Any]) -> Document | None:
315331
)
316332
except Exception as e:
317333
logger.error(f"Error converting page {page.get('id', 'unknown')}: {e}")
318-
if not self.continue_on_failure:
334+
if _should_propagate_error(e):
319335
raise
320-
return None
336+
return ConnectorFailure(
337+
failed_document=DocumentFailure(
338+
document_id=page_id,
339+
document_link=page_url,
340+
),
341+
failure_message=f"Error converting page {page.get('id', 'unknown')}: {e}",
342+
exception=e,
343+
)
344+
345+
def _fetch_page_attachments(
346+
self, page: dict[str, Any], doc: Document
347+
) -> Document | ConnectorFailure:
348+
attachment_query = self._construct_attachment_query(page["id"])
349+
350+
for attachment in self.confluence_client.paginated_cql_retrieval(
351+
cql=attachment_query,
352+
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
353+
):
354+
attachment["metadata"].get("mediaType", "")
355+
if not validate_attachment_filetype(
356+
attachment,
357+
):
358+
logger.info(f"Skipping attachment: {attachment['title']}")
359+
continue
360+
361+
logger.info(f"Processing attachment: {attachment['title']}")
362+
363+
# Attempt to get textual content or image summarization:
364+
object_url = build_confluence_document_id(
365+
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
366+
)
367+
try:
368+
response = convert_attachment_to_content(
369+
confluence_client=self.confluence_client,
370+
attachment=attachment,
371+
page_id=page["id"],
372+
allow_images=self.allow_images,
373+
)
374+
if response is None:
375+
continue
376+
377+
content_text, file_storage_name = response
378+
379+
if content_text:
380+
doc.sections.append(
381+
TextSection(
382+
text=content_text,
383+
link=object_url,
384+
)
385+
)
386+
elif file_storage_name:
387+
doc.sections.append(
388+
ImageSection(
389+
link=object_url,
390+
image_file_name=file_storage_name,
391+
)
392+
)
393+
except Exception as e:
394+
logger.error(
395+
f"Failed to extract/summarize attachment {attachment['title']}",
396+
exc_info=e,
397+
)
398+
if not self.continue_on_failure:
399+
if _should_propagate_error(e):
400+
raise
401+
# TODO: should we remove continue_on_failure entirely now that we have checkpointing?
402+
return ConnectorFailure(
403+
failed_document=DocumentFailure(
404+
document_id=doc.id,
405+
document_link=object_url,
406+
),
407+
failure_message=f"Failed to extract/summarize attachment {attachment['title']} for doc {doc.id}",
408+
exception=e,
409+
)
410+
return doc
321411

322412
def _fetch_document_batches(
323413
self,
414+
checkpoint: ConfluenceCheckpoint,
324415
start: SecondsSinceUnixEpoch | None = None,
325416
end: SecondsSinceUnixEpoch | None = None,
326-
) -> GenerateDocumentsOutput:
417+
) -> CheckpointOutput[ConfluenceCheckpoint]:
327418
"""
328419
Yields batches of Documents. For each page:
329420
- Create a Document with 1 Section for the page text/comments
330421
- Then fetch attachments. For each attachment:
331422
- Attempt to convert it with convert_attachment_to_content(...)
332423
- If successful, create a new Section with the extracted text or summary.
333424
"""
334-
doc_batch: list[Document] = []
425+
doc_count = 0
335426

336-
page_query = self._construct_page_query(start, end)
427+
checkpoint = copy.deepcopy(checkpoint)
428+
429+
# use "start" when last_updated is 0
430+
page_query = self._construct_page_query(checkpoint.last_updated or start, end)
337431
logger.debug(f"page_query: {page_query}")
338432

339433
for page in self.confluence_client.paginated_cql_retrieval(
@@ -342,94 +436,57 @@ def _fetch_document_batches(
342436
limit=self.batch_size,
343437
):
344438
# Build doc from page
345-
doc = self._convert_page_to_document(page)
346-
if not doc:
439+
doc_or_failure = self._convert_page_to_document(page)
440+
441+
if isinstance(doc_or_failure, ConnectorFailure):
442+
yield doc_or_failure
347443
continue
348444

349445
# Now get attachments for that page:
350-
attachment_query = self._construct_attachment_query(page["id"])
351-
# We'll use the page's XML to provide context if we summarize an image
352-
page.get("body", {}).get("storage", {}).get("value", "")
446+
doc_or_failure = self._fetch_page_attachments(page, doc_or_failure)
353447

354-
for attachment in self.confluence_client.paginated_cql_retrieval(
355-
cql=attachment_query,
356-
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
357-
):
358-
attachment["metadata"].get("mediaType", "")
359-
if not validate_attachment_filetype(
360-
attachment,
361-
):
362-
logger.info(f"Skipping attachment: {attachment['title']}")
363-
continue
364-
365-
logger.info(f"Processing attachment: {attachment['title']}")
366-
367-
# Attempt to get textual content or image summarization:
368-
try:
369-
response = convert_attachment_to_content(
370-
confluence_client=self.confluence_client,
371-
attachment=attachment,
372-
page_id=page["id"],
373-
allow_images=self.allow_images,
374-
)
375-
if response is None:
376-
continue
377-
378-
content_text, file_storage_name = response
379-
object_url = build_confluence_document_id(
380-
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
381-
)
382-
if content_text:
383-
doc.sections.append(
384-
TextSection(
385-
text=content_text,
386-
link=object_url,
387-
)
388-
)
389-
elif file_storage_name:
390-
doc.sections.append(
391-
ImageSection(
392-
link=object_url,
393-
image_file_name=file_storage_name,
394-
)
395-
)
396-
except Exception as e:
397-
logger.error(
398-
f"Failed to extract/summarize attachment {attachment['title']}",
399-
exc_info=e,
400-
)
401-
if not self.continue_on_failure:
402-
raise
403-
404-
doc_batch.append(doc)
448+
if isinstance(doc_or_failure, ConnectorFailure):
449+
yield doc_or_failure
450+
continue
405451

406-
if len(doc_batch) >= self.batch_size:
407-
yield doc_batch
408-
doc_batch = []
452+
# yield completed document
453+
doc_count += 1
454+
yield doc_or_failure
409455

410-
if doc_batch:
411-
yield doc_batch
456+
# create checkpoint after enough documents have been processed
457+
if doc_count >= self.batch_size:
458+
return checkpoint
412459

413-
def load_from_state(self) -> GenerateDocumentsOutput:
414-
return self._fetch_document_batches()
460+
checkpoint.has_more = False
461+
return checkpoint
415462

416-
def poll_source(
463+
@override
464+
def load_from_checkpoint(
417465
self,
418-
start: SecondsSinceUnixEpoch | None = None,
419-
end: SecondsSinceUnixEpoch | None = None,
420-
) -> GenerateDocumentsOutput:
466+
start: SecondsSinceUnixEpoch,
467+
end: SecondsSinceUnixEpoch,
468+
checkpoint: ConfluenceCheckpoint,
469+
) -> CheckpointOutput[ConfluenceCheckpoint]:
421470
try:
422-
return self._fetch_document_batches(start, end)
471+
return self._fetch_document_batches(checkpoint, start, end)
423472
except Exception as e:
424-
if "field 'updated' is invalid" in str(e) and start is not None:
473+
if _should_propagate_error(e) and start is not None:
425474
logger.warning(
426475
"Confluence says we provided an invalid 'updated' field. This may indicate"
427476
"a real issue, but can also appear during edge cases like daylight"
428477
f"savings time changes. Retrying with a 1 hour offset. Error: {e}"
429478
)
430-
return self._fetch_document_batches(start - ONE_HOUR, end)
479+
return self._fetch_document_batches(checkpoint, start - ONE_HOUR, end)
431480
raise
432481

482+
@override
483+
def build_dummy_checkpoint(self) -> ConfluenceCheckpoint:
484+
return ConfluenceCheckpoint(last_updated=0, has_more=True)
485+
486+
@override
487+
def validate_checkpoint_json(self, checkpoint_json: str) -> ConfluenceCheckpoint:
488+
return ConfluenceCheckpoint.model_validate_json(checkpoint_json)
489+
433490
def retrieve_all_slim_documents(
434491
self,
435492
start: SecondsSinceUnixEpoch | None = None,

backend/onyx/connectors/connector_runner.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import TypeVar
77

88
from onyx.connectors.interfaces import BaseConnector
9-
from onyx.connectors.interfaces import CheckpointConnector
9+
from onyx.connectors.interfaces import CheckpointedConnector
1010
from onyx.connectors.interfaces import CheckpointOutput
1111
from onyx.connectors.interfaces import LoadConnector
1212
from onyx.connectors.interfaces import PollConnector
@@ -97,9 +97,9 @@ def run(
9797
]:
9898
"""Adds additional exception logging to the connector."""
9999
try:
100-
if isinstance(self.connector, CheckpointConnector):
100+
if isinstance(self.connector, CheckpointedConnector):
101101
if self.time_range is None:
102-
raise ValueError("time_range is required for CheckpointConnector")
102+
raise ValueError("time_range is required for CheckpointedConnector")
103103

104104
start = time.monotonic()
105105
checkpoint_connector_generator = self.connector.load_from_checkpoint(

backend/onyx/connectors/factory.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from onyx.connectors.highspot.connector import HighspotConnector
3535
from onyx.connectors.hubspot.connector import HubSpotConnector
3636
from onyx.connectors.interfaces import BaseConnector
37-
from onyx.connectors.interfaces import CheckpointConnector
37+
from onyx.connectors.interfaces import CheckpointedConnector
3838
from onyx.connectors.interfaces import CredentialsConnector
3939
from onyx.connectors.interfaces import EventConnector
4040
from onyx.connectors.interfaces import LoadConnector
@@ -148,7 +148,7 @@ def identify_connector_class(
148148
# all connectors should be checkpoint connectors
149149
and (
150150
not issubclass(connector, PollConnector)
151-
and not issubclass(connector, CheckpointConnector)
151+
and not issubclass(connector, CheckpointedConnector)
152152
)
153153
),
154154
(

backend/onyx/connectors/github/connector.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from onyx.connectors.exceptions import CredentialExpiredError
2626
from onyx.connectors.exceptions import InsufficientPermissionsError
2727
from onyx.connectors.exceptions import UnexpectedValidationError
28-
from onyx.connectors.interfaces import CheckpointConnector
28+
from onyx.connectors.interfaces import CheckpointedConnector
2929
from onyx.connectors.interfaces import CheckpointOutput
3030
from onyx.connectors.interfaces import ConnectorCheckpoint
3131
from onyx.connectors.interfaces import ConnectorFailure
@@ -141,7 +141,7 @@ class GithubConnectorCheckpoint(ConnectorCheckpoint):
141141
cached_repo: SerializedRepository | None = None
142142

143143

144-
class GithubConnector(CheckpointConnector[GithubConnectorCheckpoint]):
144+
class GithubConnector(CheckpointedConnector[GithubConnectorCheckpoint]):
145145
def __init__(
146146
self,
147147
repo_owner: str,

0 commit comments

Comments
 (0)