Skip to content

Commit 11854b1

Browse files
committed
confluence checkpointing tested
1 parent 16b8f11 commit 11854b1

File tree

2 files changed

+387
-0
lines changed

2 files changed

+387
-0
lines changed

backend/onyx/connectors/confluence/connector.py

+4
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,10 @@ def _fetch_document_batches(
442442
yield doc_or_failure
443443
continue
444444

445+
checkpoint.last_updated = datetime_from_string(
446+
page["version"]["when"]
447+
).timestamp()
448+
445449
# Now get attachments for that page:
446450
doc_or_failure = self._fetch_page_attachments(page, doc_or_failure)
447451

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
import time
2+
from collections.abc import Callable
3+
from collections.abc import Generator
4+
from datetime import datetime
5+
from datetime import timezone
6+
from typing import Any
7+
from typing import cast
8+
from unittest.mock import MagicMock
9+
from unittest.mock import patch
10+
11+
import pytest
12+
from requests.exceptions import HTTPError
13+
14+
from onyx.configs.constants import DocumentSource
15+
from onyx.connectors.confluence.connector import ConfluenceCheckpoint
16+
from onyx.connectors.confluence.connector import ConfluenceConnector
17+
from onyx.connectors.confluence.onyx_confluence import OnyxConfluence
18+
from onyx.connectors.exceptions import CredentialExpiredError
19+
from onyx.connectors.exceptions import InsufficientPermissionsError
20+
from onyx.connectors.exceptions import UnexpectedValidationError
21+
from onyx.connectors.models import ConnectorFailure
22+
from onyx.connectors.models import Document
23+
from onyx.connectors.models import DocumentFailure
24+
from onyx.connectors.models import SlimDocument
25+
from tests.unit.onyx.connectors.utils import load_everything_from_checkpoint_connector
26+
27+
PAGE_SIZE = 2
28+
29+
30+
@pytest.fixture
31+
def confluence_base_url() -> str:
32+
return "https://example.atlassian.net/wiki"
33+
34+
35+
@pytest.fixture
36+
def space_key() -> str:
37+
return "TEST"
38+
39+
40+
@pytest.fixture
41+
def mock_confluence_client() -> MagicMock:
42+
"""Create a mock Confluence client with proper typing"""
43+
mock = MagicMock(spec=OnyxConfluence)
44+
# Initialize with empty results for common methods
45+
mock.paginated_cql_retrieval.return_value = []
46+
mock.get_all_spaces = MagicMock()
47+
mock.get_all_spaces.return_value = {"results": []}
48+
return mock
49+
50+
51+
@pytest.fixture
52+
def confluence_connector(
53+
confluence_base_url: str, space_key: str, mock_confluence_client: MagicMock
54+
) -> Generator[ConfluenceConnector, None, None]:
55+
"""Create a Confluence connector with a mock client"""
56+
connector = ConfluenceConnector(
57+
wiki_base=confluence_base_url,
58+
space=space_key,
59+
is_cloud=True,
60+
labels_to_skip=["secret", "sensitive"],
61+
timezone_offset=0.0,
62+
batch_size=2,
63+
)
64+
# Initialize the client directly
65+
connector._confluence_client = mock_confluence_client
66+
with patch("onyx.connectors.confluence.connector._SLIM_DOC_BATCH_SIZE", 2):
67+
yield connector
68+
69+
70+
@pytest.fixture
71+
def create_mock_page() -> Callable[..., dict[str, Any]]:
72+
def _create_mock_page(
73+
id: str = "123",
74+
title: str = "Test Page",
75+
updated: str = "2023-01-01T12:00:00.000+0000",
76+
content: str = "Test Content",
77+
labels: list[str] | None = None,
78+
) -> dict[str, Any]:
79+
"""Helper to create a mock Confluence page object"""
80+
return {
81+
"id": id,
82+
"title": title,
83+
"version": {"when": updated},
84+
"body": {"storage": {"value": content}},
85+
"metadata": {
86+
"labels": {"results": [{"name": label} for label in (labels or [])]}
87+
},
88+
"space": {"key": "TEST"},
89+
"_links": {"webui": f"/spaces/TEST/pages/{id}"},
90+
}
91+
92+
return _create_mock_page
93+
94+
95+
def test_get_cql_query_with_space(confluence_connector: ConfluenceConnector) -> None:
96+
"""Test CQL query generation with space specified"""
97+
start = datetime(2023, 1, 1, tzinfo=timezone.utc).timestamp()
98+
end = datetime(2023, 1, 2, tzinfo=timezone.utc).timestamp()
99+
100+
query = confluence_connector._construct_page_query(start, end)
101+
102+
# Check that the space part and time part are both in the query
103+
assert f"space='{confluence_connector.space}'" in query
104+
assert "lastmodified >= '2023-01-01 00:00'" in query
105+
assert "lastmodified <= '2023-01-02 00:00'" in query
106+
assert " and " in query.lower()
107+
108+
109+
def test_get_cql_query_without_space(confluence_base_url: str) -> None:
110+
"""Test CQL query generation without space specified"""
111+
# Create connector without space key
112+
connector = ConfluenceConnector(wiki_base=confluence_base_url, is_cloud=True)
113+
114+
start = datetime(2023, 1, 1, tzinfo=connector.timezone).timestamp()
115+
end = datetime(2023, 1, 2, tzinfo=connector.timezone).timestamp()
116+
117+
query = connector._construct_page_query(start, end)
118+
119+
# Check that only time part is in the query
120+
assert "space=" not in query
121+
assert "lastmodified >= '2023-01-01 00:00'" in query
122+
assert "lastmodified <= '2023-01-02 00:00'" in query
123+
124+
125+
def test_load_from_checkpoint_happy_path(
126+
confluence_connector: ConfluenceConnector,
127+
create_mock_page: Callable[..., dict[str, Any]],
128+
) -> None:
129+
"""Test loading from checkpoint - happy path"""
130+
# Set up mocked pages
131+
first_updated = datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)
132+
last_updated = datetime(2023, 1, 3, 12, 0, tzinfo=timezone.utc)
133+
mock_page1 = create_mock_page(
134+
id="1", title="Page 1", updated=first_updated.isoformat()
135+
)
136+
mock_page2 = create_mock_page(
137+
id="2", title="Page 2", updated=first_updated.isoformat()
138+
)
139+
mock_page3 = create_mock_page(
140+
id="3", title="Page 3", updated=last_updated.isoformat()
141+
)
142+
143+
# Mock paginated_cql_retrieval to return our mock pages
144+
confluence_client = confluence_connector._confluence_client
145+
assert confluence_client is not None, "bad test setup"
146+
paginated_cql_mock = cast(MagicMock, confluence_client.paginated_cql_retrieval)
147+
paginated_cql_mock.side_effect = [
148+
[mock_page1, mock_page2],
149+
[], # comments
150+
[], # attachments
151+
[], # comments
152+
[], # attachments
153+
[mock_page3],
154+
[], # comments
155+
[], # attachments
156+
]
157+
158+
# Call load_from_checkpoint
159+
end_time = time.time()
160+
outputs = load_everything_from_checkpoint_connector(
161+
confluence_connector, 0, end_time
162+
)
163+
164+
# Check that the documents were returned
165+
assert len(outputs) == 2
166+
167+
checkpoint_output1 = outputs[0]
168+
assert len(checkpoint_output1.items) == 2
169+
document1 = checkpoint_output1.items[0]
170+
assert isinstance(document1, Document)
171+
assert document1.id == f"{confluence_connector.wiki_base}/spaces/TEST/pages/1"
172+
document2 = checkpoint_output1.items[1]
173+
assert isinstance(document2, Document)
174+
assert document2.id == f"{confluence_connector.wiki_base}/spaces/TEST/pages/2"
175+
assert checkpoint_output1.next_checkpoint == ConfluenceCheckpoint(
176+
last_updated=first_updated.timestamp(),
177+
has_more=True,
178+
)
179+
180+
checkpoint_output2 = outputs[1]
181+
assert len(checkpoint_output2.items) == 1
182+
document3 = checkpoint_output2.items[0]
183+
assert isinstance(document3, Document)
184+
assert document3.id == f"{confluence_connector.wiki_base}/spaces/TEST/pages/3"
185+
assert checkpoint_output2.next_checkpoint == ConfluenceCheckpoint(
186+
last_updated=last_updated.timestamp(),
187+
has_more=False,
188+
)
189+
190+
191+
def test_load_from_checkpoint_with_page_processing_error(
192+
confluence_connector: ConfluenceConnector,
193+
create_mock_page: Callable[..., dict[str, Any]],
194+
) -> None:
195+
"""Test loading from checkpoint with a mix of successful and failed page processing"""
196+
# Set up mocked pages
197+
mock_page1 = create_mock_page(id="1", title="Page 1")
198+
mock_page2 = create_mock_page(id="2", title="Page 2")
199+
200+
# Mock paginated_cql_retrieval to return our mock pages
201+
confluence_client = confluence_connector._confluence_client
202+
assert confluence_client is not None, "bad test setup"
203+
paginated_cql_mock = cast(MagicMock, confluence_client.paginated_cql_retrieval)
204+
paginated_cql_mock.return_value = [mock_page1, mock_page2]
205+
206+
# Mock _convert_page_to_document to fail for the second page
207+
def mock_convert_side_effect(page: dict[str, Any]) -> Document | ConnectorFailure:
208+
if page["id"] == "1":
209+
return Document(
210+
id=f"{confluence_connector.wiki_base}/spaces/TEST/pages/1",
211+
sections=[],
212+
source=DocumentSource.CONFLUENCE,
213+
semantic_identifier="Page 1",
214+
metadata={},
215+
)
216+
else:
217+
return ConnectorFailure(
218+
failed_document=DocumentFailure(
219+
document_id=page["id"],
220+
document_link=f"{confluence_connector.wiki_base}/spaces/TEST/pages/{page['id']}",
221+
),
222+
failure_message="Failed to process Confluence page",
223+
exception=Exception("Test error"),
224+
)
225+
226+
with patch(
227+
"onyx.connectors.confluence.connector.ConfluenceConnector._convert_page_to_document",
228+
side_effect=mock_convert_side_effect,
229+
):
230+
# Call load_from_checkpoint
231+
end_time = time.time()
232+
outputs = load_everything_from_checkpoint_connector(
233+
confluence_connector, 0, end_time
234+
)
235+
236+
assert len(outputs) == 1
237+
checkpoint_output = outputs[0]
238+
assert len(checkpoint_output.items) == 2
239+
240+
# First item should be successful
241+
assert isinstance(checkpoint_output.items[0], Document)
242+
assert (
243+
checkpoint_output.items[0].id
244+
== f"{confluence_connector.wiki_base}/spaces/TEST/pages/1"
245+
)
246+
247+
# Second item should be a failure
248+
assert isinstance(checkpoint_output.items[1], ConnectorFailure)
249+
assert (
250+
"Failed to process Confluence page"
251+
in checkpoint_output.items[1].failure_message
252+
)
253+
254+
255+
def test_retrieve_all_slim_documents(
256+
confluence_connector: ConfluenceConnector,
257+
create_mock_page: Callable[..., dict[str, Any]],
258+
) -> None:
259+
"""Test retrieving all slim documents"""
260+
# Set up mocked pages
261+
mock_page1 = create_mock_page(id="1")
262+
mock_page2 = create_mock_page(id="2")
263+
264+
# Mock paginated_cql_retrieval to return our mock pages
265+
confluence_client = confluence_connector._confluence_client
266+
assert confluence_client is not None, "bad test setup"
267+
268+
paginated_cql_mock = cast(MagicMock, confluence_client.cql_paginate_all_expansions)
269+
paginated_cql_mock.side_effect = [[mock_page1, mock_page2], [], []]
270+
271+
# Call retrieve_all_slim_documents
272+
batches = list(confluence_connector.retrieve_all_slim_documents(0, 100))
273+
assert paginated_cql_mock.call_count == 3
274+
275+
# Check that a batch with 2 documents was returned
276+
assert len(batches) == 1
277+
assert len(batches[0]) == 2
278+
assert isinstance(batches[0][0], SlimDocument)
279+
assert batches[0][0].id == f"{confluence_connector.wiki_base}/spaces/TEST/pages/1"
280+
assert batches[0][1].id == f"{confluence_connector.wiki_base}/spaces/TEST/pages/2"
281+
282+
283+
@pytest.mark.parametrize(
284+
"status_code,expected_exception,expected_message",
285+
[
286+
(
287+
401,
288+
CredentialExpiredError,
289+
"Invalid or expired Confluence credentials",
290+
),
291+
(
292+
403,
293+
InsufficientPermissionsError,
294+
"Insufficient permissions to access Confluence resources",
295+
),
296+
(404, UnexpectedValidationError, "Unexpected Confluence error"),
297+
],
298+
)
299+
def test_validate_connector_settings_errors(
300+
confluence_connector: ConfluenceConnector,
301+
status_code: int,
302+
expected_exception: type[Exception],
303+
expected_message: str,
304+
) -> None:
305+
"""Test validation with various error scenarios"""
306+
error = HTTPError(response=MagicMock(status_code=status_code))
307+
308+
confluence_client = confluence_connector._confluence_client
309+
assert confluence_client is not None, "bad test setup"
310+
get_all_spaces_mock = cast(MagicMock, confluence_client.get_all_spaces)
311+
get_all_spaces_mock.side_effect = error
312+
313+
with pytest.raises(expected_exception) as excinfo:
314+
confluence_connector.validate_connector_settings()
315+
assert expected_message in str(excinfo.value)
316+
317+
318+
def test_validate_connector_settings_success(
319+
confluence_connector: ConfluenceConnector,
320+
) -> None:
321+
"""Test successful validation"""
322+
confluence_client = confluence_connector._confluence_client
323+
assert confluence_client is not None, "bad test setup"
324+
get_all_spaces_mock = cast(MagicMock, confluence_client.get_all_spaces)
325+
get_all_spaces_mock.return_value = {"results": [{"key": "TEST"}]}
326+
327+
confluence_connector.validate_connector_settings()
328+
get_all_spaces_mock.assert_called_once()
329+
330+
331+
def test_checkpoint_progress(
332+
confluence_connector: ConfluenceConnector,
333+
create_mock_page: Callable[..., dict[str, Any]],
334+
) -> None:
335+
"""Test that the checkpoint's last_updated field is properly updated after processing pages"""
336+
# Set up mocked pages with different timestamps
337+
earlier_timestamp = datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)
338+
later_timestamp = datetime(2023, 1, 2, 12, 0, tzinfo=timezone.utc)
339+
mock_page1 = create_mock_page(
340+
id="1", title="Page 1", updated=earlier_timestamp.isoformat()
341+
)
342+
mock_page2 = create_mock_page(
343+
id="2", title="Page 2", updated=later_timestamp.isoformat()
344+
)
345+
346+
# Mock paginated_cql_retrieval to return our mock pages
347+
confluence_client = confluence_connector._confluence_client
348+
assert confluence_client is not None, "bad test setup"
349+
paginated_cql_mock = cast(MagicMock, confluence_client.paginated_cql_retrieval)
350+
paginated_cql_mock.side_effect = [
351+
[mock_page1, mock_page2], # Return both pages
352+
[], # No comments for page 1
353+
[], # No attachments for page 1
354+
[], # No comments for page 2
355+
[], # No attachments for page 2
356+
[], # No more pages
357+
]
358+
359+
# Call load_from_checkpoint
360+
end_time = datetime(2023, 1, 3, tzinfo=timezone.utc).timestamp()
361+
362+
outputs = load_everything_from_checkpoint_connector(
363+
confluence_connector, 0, end_time
364+
)
365+
366+
last_checkpoint = outputs[-1].next_checkpoint
367+
368+
assert last_checkpoint == ConfluenceCheckpoint(
369+
last_updated=later_timestamp.timestamp(),
370+
has_more=False,
371+
)
372+
# Convert the expected timestamp to epoch seconds
373+
expected_timestamp = datetime(2023, 1, 2, 12, 0, tzinfo=timezone.utc).timestamp()
374+
375+
# The checkpoint's last_updated should be set to the latest page's timestamp
376+
assert last_checkpoint.last_updated == expected_timestamp
377+
assert not last_checkpoint.has_more # No more pages to process
378+
379+
assert len(outputs) == 2
380+
# Verify we got both documents
381+
assert len(outputs[0].items) == 2
382+
assert isinstance(outputs[0].items[0], Document)
383+
assert isinstance(outputs[0].items[1], Document)

0 commit comments

Comments
 (0)