1
+ import copy
1
2
from datetime import datetime
2
3
from datetime import timedelta
3
4
from datetime import timezone
4
5
from typing import Any
5
6
from urllib .parse import quote
6
7
7
8
from requests .exceptions import HTTPError
9
+ from typing_extensions import override
8
10
9
11
from onyx .configs .app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP
10
12
from onyx .configs .app_configs import CONFLUENCE_TIMEZONE_OFFSET
22
24
from onyx .connectors .exceptions import CredentialExpiredError
23
25
from onyx .connectors .exceptions import InsufficientPermissionsError
24
26
from onyx .connectors .exceptions import UnexpectedValidationError
27
+ from onyx .connectors .interfaces import CheckpointedConnector
28
+ from onyx .connectors .interfaces import CheckpointOutput
29
+ from onyx .connectors .interfaces import ConnectorCheckpoint
30
+ from onyx .connectors .interfaces import ConnectorFailure
25
31
from onyx .connectors .interfaces import CredentialsConnector
26
32
from onyx .connectors .interfaces import CredentialsProviderInterface
27
- from onyx .connectors .interfaces import GenerateDocumentsOutput
28
33
from onyx .connectors .interfaces import GenerateSlimDocumentOutput
29
- from onyx .connectors .interfaces import LoadConnector
30
- from onyx .connectors .interfaces import PollConnector
31
34
from onyx .connectors .interfaces import SecondsSinceUnixEpoch
32
35
from onyx .connectors .interfaces import SlimConnector
33
36
from onyx .connectors .models import BasicExpertInfo
34
37
from onyx .connectors .models import ConnectorMissingCredentialError
35
38
from onyx .connectors .models import Document
39
+ from onyx .connectors .models import DocumentFailure
36
40
from onyx .connectors .models import ImageSection
37
41
from onyx .connectors .models import SlimDocument
38
42
from onyx .connectors .models import TextSection
68
72
ONE_HOUR = 3600
69
73
70
74
75
+ def _should_propagate_error (e : Exception ) -> bool :
76
+ return "field 'updated' is invalid" in str (e )
77
+
78
+
79
+ class ConfluenceCheckpoint (ConnectorCheckpoint ):
80
+ last_updated : SecondsSinceUnixEpoch
81
+
82
+
71
83
class ConfluenceConnector (
72
- LoadConnector ,
73
- PollConnector ,
84
+ CheckpointedConnector [ConfluenceCheckpoint ],
74
85
SlimConnector ,
75
86
CredentialsConnector ,
76
87
):
@@ -190,6 +201,8 @@ def _construct_page_query(
190
201
"%Y-%m-%d %H:%M"
191
202
)
192
203
page_query += f" and lastmodified <= '{ formatted_end_time } '"
204
+
205
+ page_query += " order by lastmodified asc"
193
206
return page_query
194
207
195
208
def _construct_attachment_query (self , confluence_page_id : str ) -> str :
@@ -215,11 +228,14 @@ def _get_comment_string_for_page_id(self, page_id: str) -> str:
215
228
)
216
229
return comment_string
217
230
218
- def _convert_page_to_document (self , page : dict [str , Any ]) -> Document | None :
231
+ def _convert_page_to_document (
232
+ self , page : dict [str , Any ]
233
+ ) -> Document | ConnectorFailure :
219
234
"""
220
235
Converts a Confluence page to a Document object.
221
236
Includes the page content, comments, and attachments.
222
237
"""
238
+ page_id = page_url = ""
223
239
try :
224
240
# Extract basic page information
225
241
page_id = page ["id" ]
@@ -315,25 +331,103 @@ def _convert_page_to_document(self, page: dict[str, Any]) -> Document | None:
315
331
)
316
332
except Exception as e :
317
333
logger .error (f"Error converting page { page .get ('id' , 'unknown' )} : { e } " )
318
- if not self . continue_on_failure :
334
+ if _should_propagate_error ( e ) :
319
335
raise
320
- return None
336
+ return ConnectorFailure (
337
+ failed_document = DocumentFailure (
338
+ document_id = page_id ,
339
+ document_link = page_url ,
340
+ ),
341
+ failure_message = f"Error converting page { page .get ('id' , 'unknown' )} : { e } " ,
342
+ exception = e ,
343
+ )
344
+
345
+ def _fetch_page_attachments (
346
+ self , page : dict [str , Any ], doc : Document
347
+ ) -> Document | ConnectorFailure :
348
+ attachment_query = self ._construct_attachment_query (page ["id" ])
349
+
350
+ for attachment in self .confluence_client .paginated_cql_retrieval (
351
+ cql = attachment_query ,
352
+ expand = "," .join (_ATTACHMENT_EXPANSION_FIELDS ),
353
+ ):
354
+ attachment ["metadata" ].get ("mediaType" , "" )
355
+ if not validate_attachment_filetype (
356
+ attachment ,
357
+ ):
358
+ logger .info (f"Skipping attachment: { attachment ['title' ]} " )
359
+ continue
360
+
361
+ logger .info (f"Processing attachment: { attachment ['title' ]} " )
362
+
363
+ # Attempt to get textual content or image summarization:
364
+ object_url = build_confluence_document_id (
365
+ self .wiki_base , attachment ["_links" ]["webui" ], self .is_cloud
366
+ )
367
+ try :
368
+ response = convert_attachment_to_content (
369
+ confluence_client = self .confluence_client ,
370
+ attachment = attachment ,
371
+ page_id = page ["id" ],
372
+ allow_images = self .allow_images ,
373
+ )
374
+ if response is None :
375
+ continue
376
+
377
+ content_text , file_storage_name = response
378
+
379
+ if content_text :
380
+ doc .sections .append (
381
+ TextSection (
382
+ text = content_text ,
383
+ link = object_url ,
384
+ )
385
+ )
386
+ elif file_storage_name :
387
+ doc .sections .append (
388
+ ImageSection (
389
+ link = object_url ,
390
+ image_file_name = file_storage_name ,
391
+ )
392
+ )
393
+ except Exception as e :
394
+ logger .error (
395
+ f"Failed to extract/summarize attachment { attachment ['title' ]} " ,
396
+ exc_info = e ,
397
+ )
398
+ if not self .continue_on_failure :
399
+ if _should_propagate_error (e ):
400
+ raise
401
+ # TODO: should we remove continue_on_failure entirely now that we have checkpointing?
402
+ return ConnectorFailure (
403
+ failed_document = DocumentFailure (
404
+ document_id = doc .id ,
405
+ document_link = object_url ,
406
+ ),
407
+ failure_message = f"Failed to extract/summarize attachment { attachment ['title' ]} for doc { doc .id } " ,
408
+ exception = e ,
409
+ )
410
+ return doc
321
411
322
412
def _fetch_document_batches (
323
413
self ,
414
+ checkpoint : ConfluenceCheckpoint ,
324
415
start : SecondsSinceUnixEpoch | None = None ,
325
416
end : SecondsSinceUnixEpoch | None = None ,
326
- ) -> GenerateDocumentsOutput :
417
+ ) -> CheckpointOutput [ ConfluenceCheckpoint ] :
327
418
"""
328
419
Yields batches of Documents. For each page:
329
420
- Create a Document with 1 Section for the page text/comments
330
421
- Then fetch attachments. For each attachment:
331
422
- Attempt to convert it with convert_attachment_to_content(...)
332
423
- If successful, create a new Section with the extracted text or summary.
333
424
"""
334
- doc_batch : list [ Document ] = []
425
+ doc_count = 0
335
426
336
- page_query = self ._construct_page_query (start , end )
427
+ checkpoint = copy .deepcopy (checkpoint )
428
+
429
+ # use "start" when last_updated is 0
430
+ page_query = self ._construct_page_query (checkpoint .last_updated or start , end )
337
431
logger .debug (f"page_query: { page_query } " )
338
432
339
433
for page in self .confluence_client .paginated_cql_retrieval (
@@ -342,94 +436,57 @@ def _fetch_document_batches(
342
436
limit = self .batch_size ,
343
437
):
344
438
# Build doc from page
345
- doc = self ._convert_page_to_document (page )
346
- if not doc :
439
+ doc_or_failure = self ._convert_page_to_document (page )
440
+
441
+ if isinstance (doc_or_failure , ConnectorFailure ):
442
+ yield doc_or_failure
347
443
continue
348
444
349
445
# Now get attachments for that page:
350
- attachment_query = self ._construct_attachment_query (page ["id" ])
351
- # We'll use the page's XML to provide context if we summarize an image
352
- page .get ("body" , {}).get ("storage" , {}).get ("value" , "" )
446
+ doc_or_failure = self ._fetch_page_attachments (page , doc_or_failure )
353
447
354
- for attachment in self .confluence_client .paginated_cql_retrieval (
355
- cql = attachment_query ,
356
- expand = "," .join (_ATTACHMENT_EXPANSION_FIELDS ),
357
- ):
358
- attachment ["metadata" ].get ("mediaType" , "" )
359
- if not validate_attachment_filetype (
360
- attachment ,
361
- ):
362
- logger .info (f"Skipping attachment: { attachment ['title' ]} " )
363
- continue
364
-
365
- logger .info (f"Processing attachment: { attachment ['title' ]} " )
366
-
367
- # Attempt to get textual content or image summarization:
368
- try :
369
- response = convert_attachment_to_content (
370
- confluence_client = self .confluence_client ,
371
- attachment = attachment ,
372
- page_id = page ["id" ],
373
- allow_images = self .allow_images ,
374
- )
375
- if response is None :
376
- continue
377
-
378
- content_text , file_storage_name = response
379
- object_url = build_confluence_document_id (
380
- self .wiki_base , attachment ["_links" ]["webui" ], self .is_cloud
381
- )
382
- if content_text :
383
- doc .sections .append (
384
- TextSection (
385
- text = content_text ,
386
- link = object_url ,
387
- )
388
- )
389
- elif file_storage_name :
390
- doc .sections .append (
391
- ImageSection (
392
- link = object_url ,
393
- image_file_name = file_storage_name ,
394
- )
395
- )
396
- except Exception as e :
397
- logger .error (
398
- f"Failed to extract/summarize attachment { attachment ['title' ]} " ,
399
- exc_info = e ,
400
- )
401
- if not self .continue_on_failure :
402
- raise
403
-
404
- doc_batch .append (doc )
448
+ if isinstance (doc_or_failure , ConnectorFailure ):
449
+ yield doc_or_failure
450
+ continue
405
451
406
- if len ( doc_batch ) >= self . batch_size :
407
- yield doc_batch
408
- doc_batch = []
452
+ # yield completed document
453
+ doc_count += 1
454
+ yield doc_or_failure
409
455
410
- if doc_batch :
411
- yield doc_batch
456
+ # create checkpoint after enough documents have been processed
457
+ if doc_count >= self .batch_size :
458
+ return checkpoint
412
459
413
- def load_from_state ( self ) -> GenerateDocumentsOutput :
414
- return self . _fetch_document_batches ()
460
+ checkpoint . has_more = False
461
+ return checkpoint
415
462
416
- def poll_source (
463
+ @override
464
+ def load_from_checkpoint (
417
465
self ,
418
- start : SecondsSinceUnixEpoch | None = None ,
419
- end : SecondsSinceUnixEpoch | None = None ,
420
- ) -> GenerateDocumentsOutput :
466
+ start : SecondsSinceUnixEpoch ,
467
+ end : SecondsSinceUnixEpoch ,
468
+ checkpoint : ConfluenceCheckpoint ,
469
+ ) -> CheckpointOutput [ConfluenceCheckpoint ]:
421
470
try :
422
- return self ._fetch_document_batches (start , end )
471
+ return self ._fetch_document_batches (checkpoint , start , end )
423
472
except Exception as e :
424
- if "field 'updated' is invalid" in str (e ) and start is not None :
473
+ if _should_propagate_error (e ) and start is not None :
425
474
logger .warning (
426
475
"Confluence says we provided an invalid 'updated' field. This may indicate"
427
476
"a real issue, but can also appear during edge cases like daylight"
428
477
f"savings time changes. Retrying with a 1 hour offset. Error: { e } "
429
478
)
430
- return self ._fetch_document_batches (start - ONE_HOUR , end )
479
+ return self ._fetch_document_batches (checkpoint , start - ONE_HOUR , end )
431
480
raise
432
481
482
+ @override
483
+ def build_dummy_checkpoint (self ) -> ConfluenceCheckpoint :
484
+ return ConfluenceCheckpoint (last_updated = 0 , has_more = True )
485
+
486
+ @override
487
+ def validate_checkpoint_json (self , checkpoint_json : str ) -> ConfluenceCheckpoint :
488
+ return ConfluenceCheckpoint .model_validate_json (checkpoint_json )
489
+
433
490
def retrieve_all_slim_documents (
434
491
self ,
435
492
start : SecondsSinceUnixEpoch | None = None ,
0 commit comments