diff --git a/xandikos/tests/test_auth.py b/xandikos/tests/test_auth.py new file mode 100644 index 00000000..a465275b --- /dev/null +++ b/xandikos/tests/test_auth.py @@ -0,0 +1,221 @@ +# Xandikos +# Copyright (C) 2016-2017 Jelmer Vernooij , et al. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; version 3 +# of the License or (at your option) any later version of +# the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +"""Tests for authentication handling in xandikos.""" + +import asyncio +import tempfile +import shutil +import unittest +from unittest.mock import MagicMock, AsyncMock +from wsgiref.util import setup_testing_defaults + +from xandikos.webdav import WebDAVApp +from xandikos.web import MultiUserXandikosBackend + + +class MockBackend: + """Mock backend for testing.""" + + def __init__(self): + self.set_principal_calls = [] + self.resources = {} + + def set_principal(self, user): + self.set_principal_calls.append(user) + + def get_resource(self, path): + return self.resources.get(path) + + +class AuthenticationTests(unittest.TestCase): + """Tests for authentication header handling.""" + + def setUp(self): + self.backend = MockBackend() + self.app = WebDAVApp(self.backend) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.close() + + def test_wsgi_x_remote_user_header(self): + """Test that HTTP_X_REMOTE_USER is handled in WSGI.""" + environ = { + "REQUEST_METHOD": "OPTIONS", + "PATH_INFO": "/", + "HTTP_X_REMOTE_USER": "testuser", + } + setup_testing_defaults(environ) + + # Mock the resource + mock_resource = MagicMock() + mock_resource.resource_types = [] + self.backend.resources["/"] = mock_resource + + # Mock start_response + responses = [] + + def start_response(status, headers): + responses.append((status, headers)) + return lambda x: None + + # Call the WSGI handler + list(self.app.handle_wsgi_request(environ, start_response)) + + # Check that we got a response + self.assertTrue(len(responses) > 0) + + # Check that set_principal was called with the user + self.assertEqual(["testuser"], self.backend.set_principal_calls) + + # Check that REMOTE_USER was set in environ for the request + # (The environ is recreated in handle_wsgi_request, so we can't check it directly) + + def test_wsgi_no_remote_user(self): + """Test WSGI without authentication header.""" + environ = { + "REQUEST_METHOD": "OPTIONS", + "PATH_INFO": "/", + } + setup_testing_defaults(environ) + + # Mock the resource + mock_resource = MagicMock() + mock_resource.resource_types = [] + self.backend.resources["/"] = mock_resource + + # Mock start_response + responses = [] + + def start_response(status, headers): + responses.append((status, headers)) + return lambda x: None + + # Call the WSGI handler + self.app.handle_wsgi_request(environ, start_response) + + # Check that set_principal was NOT called + self.assertEqual([], self.backend.set_principal_calls) + + def test_aiohttp_x_remote_user_header(self): + """Test that X-Remote-User header is handled in aiohttp.""" + # Create a mock aiohttp request + mock_request = AsyncMock() + mock_headers = MagicMock() + mock_headers.get.side_effect = ( + lambda k, d=None: "aiohttpuser" if k == "X-Remote-User" else d + ) + mock_headers.__getitem__.side_effect = ( + lambda k: "aiohttpuser" if k == "X-Remote-User" else None + ) + mock_request.headers = mock_headers + mock_request.method = "OPTIONS" + mock_request.path = "/" + mock_request.url = "http://example.com/" + mock_request.raw_path = "/" + mock_request.match_info = {"path_info": "/"} + mock_request.content_type = "text/plain" + mock_request.content_length = 0 + mock_request.can_read_body = False + + # Mock the resource + mock_resource = MagicMock() + mock_resource.resource_types = [] + self.backend.resources["/"] = mock_resource + + # Call the aiohttp handler + self.loop.run_until_complete(self.app.aiohttp_handler(mock_request, "/")) + + # Check that set_principal was called with the user + self.assertEqual(["aiohttpuser"], self.backend.set_principal_calls) + + def test_aiohttp_no_remote_user(self): + """Test aiohttp without authentication header.""" + # Create a mock aiohttp request + mock_request = AsyncMock() + mock_headers = MagicMock() + mock_headers.get.return_value = None + mock_request.headers = mock_headers + mock_request.method = "OPTIONS" + mock_request.path = "/" + mock_request.url = "http://example.com/" + mock_request.raw_path = "/" + mock_request.match_info = {"path_info": "/"} + mock_request.content_type = "text/plain" + mock_request.content_length = 0 + mock_request.can_read_body = False + + # Mock the resource + mock_resource = MagicMock() + mock_resource.resource_types = [] + self.backend.resources["/"] = mock_resource + + # Call the aiohttp handler + self.loop.run_until_complete(self.app.aiohttp_handler(mock_request, "/")) + + # Check that set_principal was NOT called + self.assertEqual([], self.backend.set_principal_calls) + + +class IntegrationTests(unittest.TestCase): + """Integration tests with real backends.""" + + def setUp(self): + self.d = tempfile.mkdtemp() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + shutil.rmtree(self.d) + self.loop.close() + + def test_multiuser_backend_with_aiohttp_auth(self): + """Test MultiUserXandikosBackend with aiohttp authentication.""" + backend = MultiUserXandikosBackend(self.d) + app = WebDAVApp(backend) + + # Create a mock aiohttp request with auth + mock_request = AsyncMock() + mock_headers = MagicMock() + mock_headers.get.side_effect = ( + lambda k, d=None: "alice" if k == "X-Remote-User" else d + ) + mock_headers.__getitem__.side_effect = ( + lambda k: "alice" if k == "X-Remote-User" else None + ) + mock_request.headers = mock_headers + mock_request.method = "PROPFIND" + mock_request.path = "/alice/" + mock_request.url = "http://example.com/alice/" + mock_request.raw_path = "/alice/" + mock_request.match_info = {"path_info": "/alice/"} + mock_request.content_type = "application/xml" + mock_request.content_length = 0 + mock_request.can_read_body = False + + # Call the aiohttp handler + self.loop.run_until_complete(app.aiohttp_handler(mock_request, "/")) + + # Check that the principal was created + resource = backend.get_resource("/alice/") + self.assertIsNotNone(resource) + # _mark_as_principal normalizes the path, removing trailing slashes + self.assertIn("/alice", backend._user_principals) diff --git a/xandikos/tests/test_web.py b/xandikos/tests/test_web.py index 3c13702e..0d4efa44 100644 --- a/xandikos/tests/test_web.py +++ b/xandikos/tests/test_web.py @@ -27,7 +27,7 @@ from .. import caldav from ..icalendar import ICalendarFile from ..store.git import TreeGitStore -from ..web import CalendarCollection, XandikosBackend +from ..web import CalendarCollection, XandikosBackend, MultiUserXandikosBackend EXAMPLE_VCALENDAR1 = b"""\ BEGIN:VCALENDAR @@ -175,3 +175,92 @@ def start_response(code, _headers): self.assertEqual(["200 OK"], codes) self.assertEqual(b"".join([commit_hash, b"\t", default_branch, b"\n"]), body) + + +class MultiUserXandikosBackendTests(unittest.TestCase): + """Tests for MultiUserXandikosBackend.""" + + def setUp(self): + self.d = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.d) + + def test_create_principal_default_paths(self): + """Test creating a principal with default path format.""" + backend = MultiUserXandikosBackend(self.d) + backend.set_principal("testuser") + + # Check that principal was created with default format + principal_path = "/testuser/" + resource = backend.get_resource(principal_path) + self.assertIsNotNone(resource) + # _mark_as_principal normalizes the path, removing trailing slashes + self.assertIn("/testuser", backend._user_principals) + + # Check autocreate is enabled + self.assertTrue(backend.autocreate) + + def test_create_principal_custom_paths(self): + """Test creating a principal with custom path format.""" + backend = MultiUserXandikosBackend( + self.d, principal_path_prefix="/users/", principal_path_suffix="/principal/" + ) + backend.set_principal("customuser") + + # Check that principal was created with custom format + principal_path = "/users/customuser/principal/" + resource = backend.get_resource(principal_path) + self.assertIsNotNone(resource) + # _mark_as_principal normalizes the path, removing trailing slashes + self.assertIn("/users/customuser/principal", backend._user_principals) + + def test_create_principal_override_paths(self): + """Test overriding path format at method level.""" + backend = MultiUserXandikosBackend(self.d) + backend.set_principal( + "override", + principal_path_prefix="/special/", + principal_path_suffix="/user/", + ) + + # Check that principal was created with overridden format + principal_path = "/special/override/user/" + resource = backend.get_resource(principal_path) + self.assertIsNotNone(resource) + # _mark_as_principal normalizes the path, removing trailing slashes + self.assertIn("/special/override/user", backend._user_principals) + + def test_multiple_users(self): + """Test creating multiple users.""" + backend = MultiUserXandikosBackend(self.d) + + # Create multiple users + users = ["alice", "bob", "charlie"] + for user in users: + backend.set_principal(user) + + # Verify all users were created + for user in users: + principal_path = f"/{user}/" + resource = backend.get_resource(principal_path) + self.assertIsNotNone(resource) + # _mark_as_principal normalizes the path, removing trailing slashes + self.assertIn(f"/{user}", backend._user_principals) + + def test_idempotent_principal_creation(self): + """Test that setting the same principal twice is idempotent.""" + backend = MultiUserXandikosBackend(self.d) + + # Create principal first time + backend.set_principal("testuser") + principal_path = "/testuser/" + resource1 = backend.get_resource(principal_path) + + # Create same principal again + backend.set_principal("testuser") + resource2 = backend.get_resource(principal_path) + + # Should be the same resource + self.assertIsNotNone(resource1) + self.assertIsNotNone(resource2) diff --git a/xandikos/web.py b/xandikos/web.py index a56101ac..1ae03e99 100644 --- a/xandikos/web.py +++ b/xandikos/web.py @@ -1059,12 +1059,18 @@ def open_store_from_path(path: str, **kwargs): class XandikosBackend(webdav.Backend): def __init__( - self, path, *, paranoid: bool = False, index_threshold: Optional[int] = None + self, + path, + *, + paranoid: bool = False, + index_threshold: Optional[int] = None, + autocreate: bool = False, ) -> None: self.path = path self._user_principals: set[str] = set() self.paranoid = paranoid self.index_threshold = index_threshold + self.autocreate = autocreate def _map_to_file_path(self, relpath): return os.path.join(self.path, relpath.lstrip("/")) @@ -1188,6 +1194,30 @@ async def move_collection( shutil.move(source_file_path, dest_file_path) +class MultiUserXandikosBackend(XandikosBackend): + """Backend that automatically creates principals for authenticated users.""" + + def __init__(self, path, principal_path_prefix="/", principal_path_suffix="/"): + super().__init__(path, autocreate=True) + self.principal_path_prefix = principal_path_prefix + self.principal_path_suffix = principal_path_suffix + + def set_principal( + self, user, principal_path_prefix=None, principal_path_suffix=None + ): + """Set the principal for a user, creating it if necessary.""" + if principal_path_prefix is None: + principal_path_prefix = self.principal_path_prefix + if principal_path_suffix is None: + principal_path_suffix = self.principal_path_suffix + + principal = principal_path_prefix + user + principal_path_suffix + + if not self.get_resource(principal): + self.create_principal(principal, create_defaults=True) + self._mark_as_principal(principal) + + class XandikosApp(webdav.WebDAVApp): """A wsgi App that provides a Xandikos web server.""" diff --git a/xandikos/webdav.py b/xandikos/webdav.py index cdc409a7..cb59c7af 100644 --- a/xandikos/webdav.py +++ b/xandikos/webdav.py @@ -2656,6 +2656,20 @@ def _get_allowed_methods(self, request): return ret async def _handle_request(self, request, environ, start_response=None): + # Handle remote user authentication + remote_user = None + if hasattr(request, "headers"): + # aiohttp request + remote_user = request.headers.get("X-Remote-User") + + if "ORIGINAL_ENVIRON" in environ: + # WSGI request + remote_user = environ["ORIGINAL_ENVIRON"].get("HTTP_X_REMOTE_USER") + + if remote_user and hasattr(self.backend, "set_principal"): + environ["REMOTE_USER"] = remote_user + self.backend.set_principal(remote_user) + try: do = self.methods[request.method] except KeyError: