1
+ from typing import Any
2
+
1
3
from onyx .db .models import UserRole
2
4
from tests .integration .common_utils .managers .api_key import APIKeyManager
3
5
from tests .integration .common_utils .managers .cc_pair import CCPairManager
11
13
from tests .integration .common_utils .test_models import DATestUser
12
14
13
15
14
- def test_multi_tenant_access_control (reset_multitenant : None ) -> None :
15
- # Creating an admin user (first user created is automatically an admin and also proviions the tenant
16
+ def setup_test_tenants (reset_multitenant : None ) -> dict [str , Any ]:
17
+ """Helper function to set up test tenants with documents and users."""
18
+ # Creating an admin user for Tenant 1
16
19
admin_user1 : DATestUser = UserManager .create (
17
20
18
21
)
19
-
20
22
assert UserManager .is_role (admin_user1 , UserRole .ADMIN )
21
23
22
24
# Create Tenant 2 and its Admin User
@@ -35,6 +37,16 @@ def test_multi_tenant_access_control(reset_multitenant: None) -> None:
35
37
api_key_1 .headers .update (admin_user1 .headers )
36
38
LLMProviderManager .create (user_performing_action = admin_user1 )
37
39
40
+ # Create connectors for Tenant 2
41
+ cc_pair_2 : DATestCCPair = CCPairManager .create_from_scratch (
42
+ user_performing_action = admin_user2 ,
43
+ )
44
+ api_key_2 : DATestAPIKey = APIKeyManager .create (
45
+ user_performing_action = admin_user2 ,
46
+ )
47
+ api_key_2 .headers .update (admin_user2 .headers )
48
+ LLMProviderManager .create (user_performing_action = admin_user2 )
49
+
38
50
# Seed documents for Tenant 1
39
51
cc_pair_1 .documents = []
40
52
doc1_tenant1 = DocumentManager .seed_doc_with_content (
@@ -49,16 +61,6 @@ def test_multi_tenant_access_control(reset_multitenant: None) -> None:
49
61
)
50
62
cc_pair_1 .documents .extend ([doc1_tenant1 , doc2_tenant1 ])
51
63
52
- # Create connectors for Tenant 2
53
- cc_pair_2 : DATestCCPair = CCPairManager .create_from_scratch (
54
- user_performing_action = admin_user2 ,
55
- )
56
- api_key_2 : DATestAPIKey = APIKeyManager .create (
57
- user_performing_action = admin_user2 ,
58
- )
59
- api_key_2 .headers .update (admin_user2 .headers )
60
- LLMProviderManager .create (user_performing_action = admin_user2 )
61
-
62
64
# Seed documents for Tenant 2
63
65
cc_pair_2 .documents = []
64
66
doc1_tenant2 = DocumentManager .seed_doc_with_content (
@@ -84,21 +86,36 @@ def test_multi_tenant_access_control(reset_multitenant: None) -> None:
84
86
user_performing_action = admin_user2
85
87
)
86
88
89
+ return {
90
+ "admin_user1" : admin_user1 ,
91
+ "admin_user2" : admin_user2 ,
92
+ "chat_session1" : chat_session1 ,
93
+ "chat_session2" : chat_session2 ,
94
+ "tenant1_doc_ids" : tenant1_doc_ids ,
95
+ "tenant2_doc_ids" : tenant2_doc_ids ,
96
+ }
97
+
98
+
99
+ def test_tenant1_can_access_own_documents (reset_multitenant : None ) -> None :
100
+ """Test that Tenant 1 can access its own documents but not Tenant 2's."""
101
+ test_data = setup_test_tenants (reset_multitenant )
102
+
87
103
# User 1 sends a message and gets a response
88
104
response1 = ChatSessionManager .send_message (
89
- chat_session_id = chat_session1 .id ,
105
+ chat_session_id = test_data [ " chat_session1" ] .id ,
90
106
message = "What is in Tenant 1's documents?" ,
91
- user_performing_action = admin_user1 ,
107
+ user_performing_action = test_data [ " admin_user1" ] ,
92
108
)
109
+
93
110
# Assert that the search tool was used
94
111
assert response1 .tool_name == "run_search"
95
112
96
113
response_doc_ids = {doc ["document_id" ] for doc in response1 .tool_result or []}
97
- assert tenant1_doc_ids .issubset (
114
+ assert test_data [ " tenant1_doc_ids" ] .issubset (
98
115
response_doc_ids
99
116
), "Not all Tenant 1 document IDs are in the response"
100
117
assert not response_doc_ids .intersection (
101
- tenant2_doc_ids
118
+ test_data [ " tenant2_doc_ids" ]
102
119
), "Tenant 2 document IDs should not be in the response"
103
120
104
121
# Assert that the contents are correct
@@ -107,21 +124,28 @@ def test_multi_tenant_access_control(reset_multitenant: None) -> None:
107
124
for doc in response1 .tool_result or []
108
125
), "Tenant 1 Document Content not found in any document"
109
126
127
+
128
+ def test_tenant2_can_access_own_documents (reset_multitenant : None ) -> None :
129
+ """Test that Tenant 2 can access its own documents but not Tenant 1's."""
130
+ test_data = setup_test_tenants (reset_multitenant )
131
+
110
132
# User 2 sends a message and gets a response
111
133
response2 = ChatSessionManager .send_message (
112
- chat_session_id = chat_session2 .id ,
134
+ chat_session_id = test_data [ " chat_session2" ] .id ,
113
135
message = "What is in Tenant 2's documents?" ,
114
- user_performing_action = admin_user2 ,
136
+ user_performing_action = test_data [ " admin_user2" ] ,
115
137
)
138
+
116
139
# Assert that the search tool was used
117
140
assert response2 .tool_name == "run_search"
141
+
118
142
# Assert that the tool_result contains Tenant 2's documents
119
143
response_doc_ids = {doc ["document_id" ] for doc in response2 .tool_result or []}
120
- assert tenant2_doc_ids .issubset (
144
+ assert test_data [ " tenant2_doc_ids" ] .issubset (
121
145
response_doc_ids
122
146
), "Not all Tenant 2 document IDs are in the response"
123
147
assert not response_doc_ids .intersection (
124
- tenant1_doc_ids
148
+ test_data [ " tenant1_doc_ids" ]
125
149
), "Tenant 1 document IDs should not be in the response"
126
150
127
151
# Assert that the contents are correct
@@ -130,28 +154,91 @@ def test_multi_tenant_access_control(reset_multitenant: None) -> None:
130
154
for doc in response2 .tool_result or []
131
155
), "Tenant 2 Document Content not found in any document"
132
156
157
+
158
+ def test_tenant1_cannot_access_tenant2_documents (reset_multitenant : None ) -> None :
159
+ """Test that Tenant 1 cannot access Tenant 2's documents."""
160
+ test_data = setup_test_tenants (reset_multitenant )
161
+
133
162
# User 1 tries to access Tenant 2's documents
134
163
response_cross = ChatSessionManager .send_message (
135
- chat_session_id = chat_session1 .id ,
164
+ chat_session_id = test_data [ " chat_session1" ] .id ,
136
165
message = "What is in Tenant 2's documents?" ,
137
- user_performing_action = admin_user1 ,
166
+ user_performing_action = test_data [ " admin_user1" ] ,
138
167
)
168
+
139
169
# Assert that the search tool was used
140
170
assert response_cross .tool_name == "run_search"
171
+
141
172
# Assert that the tool_result is empty or does not contain Tenant 2's documents
142
173
response_doc_ids = {doc ["document_id" ] for doc in response_cross .tool_result or []}
174
+
143
175
# Ensure none of Tenant 2's document IDs are in the response
144
- assert not response_doc_ids .intersection (tenant2_doc_ids )
176
+ assert not response_doc_ids .intersection (test_data ["tenant2_doc_ids" ])
177
+
178
+
179
+ def test_tenant2_cannot_access_tenant1_documents (reset_multitenant : None ) -> None :
180
+ """Test that Tenant 2 cannot access Tenant 1's documents."""
181
+ test_data = setup_test_tenants (reset_multitenant )
145
182
146
183
# User 2 tries to access Tenant 1's documents
147
184
response_cross2 = ChatSessionManager .send_message (
148
- chat_session_id = chat_session2 .id ,
185
+ chat_session_id = test_data [ " chat_session2" ] .id ,
149
186
message = "What is in Tenant 1's documents?" ,
150
- user_performing_action = admin_user2 ,
187
+ user_performing_action = test_data [ " admin_user2" ] ,
151
188
)
189
+
152
190
# Assert that the search tool was used
153
191
assert response_cross2 .tool_name == "run_search"
192
+
154
193
# Assert that the tool_result is empty or does not contain Tenant 1's documents
155
194
response_doc_ids = {doc ["document_id" ] for doc in response_cross2 .tool_result or []}
195
+
156
196
# Ensure none of Tenant 1's document IDs are in the response
157
- assert not response_doc_ids .intersection (tenant1_doc_ids )
197
+ assert not response_doc_ids .intersection (test_data ["tenant1_doc_ids" ])
198
+
199
+
200
+ def test_multi_tenant_access_control (reset_multitenant : None ) -> None :
201
+ """Legacy test for multi-tenant access control."""
202
+ test_data = setup_test_tenants (reset_multitenant )
203
+
204
+ # User 1 sends a message and gets a response with only Tenant 1's documents
205
+ response1 = ChatSessionManager .send_message (
206
+ chat_session_id = test_data ["chat_session1" ].id ,
207
+ message = "What is in Tenant 1's documents?" ,
208
+ user_performing_action = test_data ["admin_user1" ],
209
+ )
210
+ assert response1 .tool_name == "run_search"
211
+ response_doc_ids = {doc ["document_id" ] for doc in response1 .tool_result or []}
212
+ assert test_data ["tenant1_doc_ids" ].issubset (response_doc_ids )
213
+ assert not response_doc_ids .intersection (test_data ["tenant2_doc_ids" ])
214
+
215
+ # User 2 sends a message and gets a response with only Tenant 2's documents
216
+ response2 = ChatSessionManager .send_message (
217
+ chat_session_id = test_data ["chat_session2" ].id ,
218
+ message = "What is in Tenant 2's documents?" ,
219
+ user_performing_action = test_data ["admin_user2" ],
220
+ )
221
+ assert response2 .tool_name == "run_search"
222
+ response_doc_ids = {doc ["document_id" ] for doc in response2 .tool_result or []}
223
+ assert test_data ["tenant2_doc_ids" ].issubset (response_doc_ids )
224
+ assert not response_doc_ids .intersection (test_data ["tenant1_doc_ids" ])
225
+
226
+ # User 1 tries to access Tenant 2's documents and fails
227
+ response_cross = ChatSessionManager .send_message (
228
+ chat_session_id = test_data ["chat_session1" ].id ,
229
+ message = "What is in Tenant 2's documents?" ,
230
+ user_performing_action = test_data ["admin_user1" ],
231
+ )
232
+ assert response_cross .tool_name == "run_search"
233
+ response_doc_ids = {doc ["document_id" ] for doc in response_cross .tool_result or []}
234
+ assert not response_doc_ids .intersection (test_data ["tenant2_doc_ids" ])
235
+
236
+ # User 2 tries to access Tenant 1's documents and fails
237
+ response_cross2 = ChatSessionManager .send_message (
238
+ chat_session_id = test_data ["chat_session2" ].id ,
239
+ message = "What is in Tenant 1's documents?" ,
240
+ user_performing_action = test_data ["admin_user2" ],
241
+ )
242
+ assert response_cross2 .tool_name == "run_search"
243
+ response_doc_ids = {doc ["document_id" ] for doc in response_cross2 .tool_result or []}
244
+ assert not response_doc_ids .intersection (test_data ["tenant1_doc_ids" ])
0 commit comments