-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathtest_search_permissions.py
244 lines (203 loc) · 9.86 KB
/
test_search_permissions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
from typing import Any
from onyx.db.models import UserRole
from tests.integration.common_utils.managers.api_key import APIKeyManager
from tests.integration.common_utils.managers.cc_pair import CCPairManager
from tests.integration.common_utils.managers.chat import ChatSessionManager
from tests.integration.common_utils.managers.document import DocumentManager
from tests.integration.common_utils.managers.llm_provider import LLMProviderManager
from tests.integration.common_utils.managers.user import UserManager
from tests.integration.common_utils.test_models import DATestAPIKey
from tests.integration.common_utils.test_models import DATestCCPair
from tests.integration.common_utils.test_models import DATestChatSession
from tests.integration.common_utils.test_models import DATestUser
def setup_test_tenants(reset_multitenant: None) -> dict[str, Any]:
"""Helper function to set up test tenants with documents and users."""
# Creating an admin user for Tenant 1
admin_user1: DATestUser = UserManager.create(
)
assert UserManager.is_role(admin_user1, UserRole.ADMIN)
# Create Tenant 2 and its Admin User
admin_user2: DATestUser = UserManager.create(
)
assert UserManager.is_role(admin_user2, UserRole.ADMIN)
# Create connectors for Tenant 1
cc_pair_1: DATestCCPair = CCPairManager.create_from_scratch(
user_performing_action=admin_user1,
)
api_key_1: DATestAPIKey = APIKeyManager.create(
user_performing_action=admin_user1,
)
api_key_1.headers.update(admin_user1.headers)
LLMProviderManager.create(user_performing_action=admin_user1)
# Create connectors for Tenant 2
cc_pair_2: DATestCCPair = CCPairManager.create_from_scratch(
user_performing_action=admin_user2,
)
api_key_2: DATestAPIKey = APIKeyManager.create(
user_performing_action=admin_user2,
)
api_key_2.headers.update(admin_user2.headers)
LLMProviderManager.create(user_performing_action=admin_user2)
# Seed documents for Tenant 1
cc_pair_1.documents = []
doc1_tenant1 = DocumentManager.seed_doc_with_content(
cc_pair=cc_pair_1,
content="Tenant 1 Document Content",
api_key=api_key_1,
)
doc2_tenant1 = DocumentManager.seed_doc_with_content(
cc_pair=cc_pair_1,
content="Tenant 1 Document Content",
api_key=api_key_1,
)
cc_pair_1.documents.extend([doc1_tenant1, doc2_tenant1])
# Seed documents for Tenant 2
cc_pair_2.documents = []
doc1_tenant2 = DocumentManager.seed_doc_with_content(
cc_pair=cc_pair_2,
content="Tenant 2 Document Content",
api_key=api_key_2,
)
doc2_tenant2 = DocumentManager.seed_doc_with_content(
cc_pair=cc_pair_2,
content="Tenant 2 Document Content",
api_key=api_key_2,
)
cc_pair_2.documents.extend([doc1_tenant2, doc2_tenant2])
tenant1_doc_ids = {doc1_tenant1.id, doc2_tenant1.id}
tenant2_doc_ids = {doc1_tenant2.id, doc2_tenant2.id}
# Create chat sessions for each user
chat_session1: DATestChatSession = ChatSessionManager.create(
user_performing_action=admin_user1
)
chat_session2: DATestChatSession = ChatSessionManager.create(
user_performing_action=admin_user2
)
return {
"admin_user1": admin_user1,
"admin_user2": admin_user2,
"chat_session1": chat_session1,
"chat_session2": chat_session2,
"tenant1_doc_ids": tenant1_doc_ids,
"tenant2_doc_ids": tenant2_doc_ids,
}
def test_tenant1_can_access_own_documents(reset_multitenant: None) -> None:
"""Test that Tenant 1 can access its own documents but not Tenant 2's."""
test_data = setup_test_tenants(reset_multitenant)
# User 1 sends a message and gets a response
response1 = ChatSessionManager.send_message(
chat_session_id=test_data["chat_session1"].id,
message="What is in Tenant 1's documents?",
user_performing_action=test_data["admin_user1"],
)
# Assert that the search tool was used
assert response1.tool_name == "run_search"
response_doc_ids = {doc["document_id"] for doc in response1.tool_result or []}
assert test_data["tenant1_doc_ids"].issubset(
response_doc_ids
), "Not all Tenant 1 document IDs are in the response"
assert not response_doc_ids.intersection(
test_data["tenant2_doc_ids"]
), "Tenant 2 document IDs should not be in the response"
# Assert that the contents are correct
assert any(
doc["content"] == "Tenant 1 Document Content"
for doc in response1.tool_result or []
), "Tenant 1 Document Content not found in any document"
def test_tenant2_can_access_own_documents(reset_multitenant: None) -> None:
"""Test that Tenant 2 can access its own documents but not Tenant 1's."""
test_data = setup_test_tenants(reset_multitenant)
# User 2 sends a message and gets a response
response2 = ChatSessionManager.send_message(
chat_session_id=test_data["chat_session2"].id,
message="What is in Tenant 2's documents?",
user_performing_action=test_data["admin_user2"],
)
# Assert that the search tool was used
assert response2.tool_name == "run_search"
# Assert that the tool_result contains Tenant 2's documents
response_doc_ids = {doc["document_id"] for doc in response2.tool_result or []}
assert test_data["tenant2_doc_ids"].issubset(
response_doc_ids
), "Not all Tenant 2 document IDs are in the response"
assert not response_doc_ids.intersection(
test_data["tenant1_doc_ids"]
), "Tenant 1 document IDs should not be in the response"
# Assert that the contents are correct
assert any(
doc["content"] == "Tenant 2 Document Content"
for doc in response2.tool_result or []
), "Tenant 2 Document Content not found in any document"
def test_tenant1_cannot_access_tenant2_documents(reset_multitenant: None) -> None:
"""Test that Tenant 1 cannot access Tenant 2's documents."""
test_data = setup_test_tenants(reset_multitenant)
# User 1 tries to access Tenant 2's documents
response_cross = ChatSessionManager.send_message(
chat_session_id=test_data["chat_session1"].id,
message="What is in Tenant 2's documents?",
user_performing_action=test_data["admin_user1"],
)
# Assert that the search tool was used
assert response_cross.tool_name == "run_search"
# Assert that the tool_result is empty or does not contain Tenant 2's documents
response_doc_ids = {doc["document_id"] for doc in response_cross.tool_result or []}
# Ensure none of Tenant 2's document IDs are in the response
assert not response_doc_ids.intersection(test_data["tenant2_doc_ids"])
def test_tenant2_cannot_access_tenant1_documents(reset_multitenant: None) -> None:
"""Test that Tenant 2 cannot access Tenant 1's documents."""
test_data = setup_test_tenants(reset_multitenant)
# User 2 tries to access Tenant 1's documents
response_cross2 = ChatSessionManager.send_message(
chat_session_id=test_data["chat_session2"].id,
message="What is in Tenant 1's documents?",
user_performing_action=test_data["admin_user2"],
)
# Assert that the search tool was used
assert response_cross2.tool_name == "run_search"
# Assert that the tool_result is empty or does not contain Tenant 1's documents
response_doc_ids = {doc["document_id"] for doc in response_cross2.tool_result or []}
# Ensure none of Tenant 1's document IDs are in the response
assert not response_doc_ids.intersection(test_data["tenant1_doc_ids"])
def test_multi_tenant_access_control(reset_multitenant: None) -> None:
"""Legacy test for multi-tenant access control."""
test_data = setup_test_tenants(reset_multitenant)
# User 1 sends a message and gets a response with only Tenant 1's documents
response1 = ChatSessionManager.send_message(
chat_session_id=test_data["chat_session1"].id,
message="What is in Tenant 1's documents?",
user_performing_action=test_data["admin_user1"],
)
assert response1.tool_name == "run_search"
response_doc_ids = {doc["document_id"] for doc in response1.tool_result or []}
assert test_data["tenant1_doc_ids"].issubset(response_doc_ids)
assert not response_doc_ids.intersection(test_data["tenant2_doc_ids"])
# User 2 sends a message and gets a response with only Tenant 2's documents
response2 = ChatSessionManager.send_message(
chat_session_id=test_data["chat_session2"].id,
message="What is in Tenant 2's documents?",
user_performing_action=test_data["admin_user2"],
)
assert response2.tool_name == "run_search"
response_doc_ids = {doc["document_id"] for doc in response2.tool_result or []}
assert test_data["tenant2_doc_ids"].issubset(response_doc_ids)
assert not response_doc_ids.intersection(test_data["tenant1_doc_ids"])
# User 1 tries to access Tenant 2's documents and fails
response_cross = ChatSessionManager.send_message(
chat_session_id=test_data["chat_session1"].id,
message="What is in Tenant 2's documents?",
user_performing_action=test_data["admin_user1"],
)
assert response_cross.tool_name == "run_search"
response_doc_ids = {doc["document_id"] for doc in response_cross.tool_result or []}
assert not response_doc_ids.intersection(test_data["tenant2_doc_ids"])
# User 2 tries to access Tenant 1's documents and fails
response_cross2 = ChatSessionManager.send_message(
chat_session_id=test_data["chat_session2"].id,
message="What is in Tenant 1's documents?",
user_performing_action=test_data["admin_user2"],
)
assert response_cross2.tool_name == "run_search"
response_doc_ids = {doc["document_id"] for doc in response_cross2.tool_result or []}
assert not response_doc_ids.intersection(test_data["tenant1_doc_ids"])