Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions xandikos/tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
# Xandikos
# Copyright (C) 2016-2017 Jelmer Vernooij <[email protected]>, et al.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 3
# of the License or (at your option) any later version of
# the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.

"""Tests for authentication handling in xandikos."""

import asyncio
import tempfile
import shutil
import unittest
from unittest.mock import MagicMock, AsyncMock
from wsgiref.util import setup_testing_defaults

from xandikos.webdav import WebDAVApp
from xandikos.web import MultiUserXandikosBackend


class MockBackend:
"""Mock backend for testing."""

def __init__(self):
self.set_principal_calls = []
self.resources = {}

def set_principal(self, user):
self.set_principal_calls.append(user)

def get_resource(self, path):
return self.resources.get(path)


class AuthenticationTests(unittest.TestCase):
"""Tests for authentication header handling."""

def setUp(self):
self.backend = MockBackend()
self.app = WebDAVApp(self.backend)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def tearDown(self):
self.loop.close()

def test_wsgi_x_remote_user_header(self):
"""Test that HTTP_X_REMOTE_USER is handled in WSGI."""
environ = {
"REQUEST_METHOD": "OPTIONS",
"PATH_INFO": "/",
"HTTP_X_REMOTE_USER": "testuser",
}
setup_testing_defaults(environ)

# Mock the resource
mock_resource = MagicMock()
mock_resource.resource_types = []
self.backend.resources["/"] = mock_resource

# Mock start_response
responses = []

def start_response(status, headers):
responses.append((status, headers))
return lambda x: None

# Call the WSGI handler
list(self.app.handle_wsgi_request(environ, start_response))

# Check that we got a response
self.assertTrue(len(responses) > 0)

# Check that set_principal was called with the user
self.assertEqual(["testuser"], self.backend.set_principal_calls)

# Check that REMOTE_USER was set in environ for the request
# (The environ is recreated in handle_wsgi_request, so we can't check it directly)

def test_wsgi_no_remote_user(self):
"""Test WSGI without authentication header."""
environ = {
"REQUEST_METHOD": "OPTIONS",
"PATH_INFO": "/",
}
setup_testing_defaults(environ)

# Mock the resource
mock_resource = MagicMock()
mock_resource.resource_types = []
self.backend.resources["/"] = mock_resource

# Mock start_response
responses = []

def start_response(status, headers):
responses.append((status, headers))
return lambda x: None

# Call the WSGI handler
self.app.handle_wsgi_request(environ, start_response)

# Check that set_principal was NOT called
self.assertEqual([], self.backend.set_principal_calls)

def test_aiohttp_x_remote_user_header(self):
"""Test that X-Remote-User header is handled in aiohttp."""
# Create a mock aiohttp request
mock_request = AsyncMock()
mock_headers = MagicMock()
mock_headers.get.side_effect = (
lambda k, d=None: "aiohttpuser" if k == "X-Remote-User" else d
)
mock_headers.__getitem__.side_effect = (
lambda k: "aiohttpuser" if k == "X-Remote-User" else None
)
mock_request.headers = mock_headers
mock_request.method = "OPTIONS"
mock_request.path = "/"
mock_request.url = "http://example.com/"
mock_request.raw_path = "/"
mock_request.match_info = {"path_info": "/"}
mock_request.content_type = "text/plain"
mock_request.content_length = 0
mock_request.can_read_body = False

# Mock the resource
mock_resource = MagicMock()
mock_resource.resource_types = []
self.backend.resources["/"] = mock_resource

# Call the aiohttp handler
self.loop.run_until_complete(self.app.aiohttp_handler(mock_request, "/"))

# Check that set_principal was called with the user
self.assertEqual(["aiohttpuser"], self.backend.set_principal_calls)

def test_aiohttp_no_remote_user(self):
"""Test aiohttp without authentication header."""
# Create a mock aiohttp request
mock_request = AsyncMock()
mock_headers = MagicMock()
mock_headers.get.return_value = None
mock_request.headers = mock_headers
mock_request.method = "OPTIONS"
mock_request.path = "/"
mock_request.url = "http://example.com/"
mock_request.raw_path = "/"
mock_request.match_info = {"path_info": "/"}
mock_request.content_type = "text/plain"
mock_request.content_length = 0
mock_request.can_read_body = False

# Mock the resource
mock_resource = MagicMock()
mock_resource.resource_types = []
self.backend.resources["/"] = mock_resource

# Call the aiohttp handler
self.loop.run_until_complete(self.app.aiohttp_handler(mock_request, "/"))

# Check that set_principal was NOT called
self.assertEqual([], self.backend.set_principal_calls)


class IntegrationTests(unittest.TestCase):
"""Integration tests with real backends."""

def setUp(self):
self.d = tempfile.mkdtemp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def tearDown(self):
shutil.rmtree(self.d)
self.loop.close()

def test_multiuser_backend_with_aiohttp_auth(self):
"""Test MultiUserXandikosBackend with aiohttp authentication."""
backend = MultiUserXandikosBackend(self.d)
app = WebDAVApp(backend)

# Create a mock aiohttp request with auth
mock_request = AsyncMock()
mock_headers = MagicMock()
mock_headers.get.side_effect = (
lambda k, d=None: "alice" if k == "X-Remote-User" else d
)
mock_headers.__getitem__.side_effect = (
lambda k: "alice" if k == "X-Remote-User" else None
)
mock_request.headers = mock_headers
mock_request.method = "PROPFIND"
mock_request.path = "/alice/"
mock_request.url = "http://example.com/alice/"
mock_request.raw_path = "/alice/"
mock_request.match_info = {"path_info": "/alice/"}
mock_request.content_type = "application/xml"
mock_request.content_length = 0
mock_request.can_read_body = False

# Call the aiohttp handler
self.loop.run_until_complete(app.aiohttp_handler(mock_request, "/"))

# Check that the principal was created
resource = backend.get_resource("/alice/")
self.assertIsNotNone(resource)
# _mark_as_principal normalizes the path, removing trailing slashes
self.assertIn("/alice", backend._user_principals)
91 changes: 90 additions & 1 deletion xandikos/tests/test_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .. import caldav
from ..icalendar import ICalendarFile
from ..store.git import TreeGitStore
from ..web import CalendarCollection, XandikosBackend
from ..web import CalendarCollection, XandikosBackend, MultiUserXandikosBackend

EXAMPLE_VCALENDAR1 = b"""\
BEGIN:VCALENDAR
Expand Down Expand Up @@ -175,3 +175,92 @@ def start_response(code, _headers):

self.assertEqual(["200 OK"], codes)
self.assertEqual(b"".join([commit_hash, b"\t", default_branch, b"\n"]), body)


class MultiUserXandikosBackendTests(unittest.TestCase):
"""Tests for MultiUserXandikosBackend."""

def setUp(self):
self.d = tempfile.mkdtemp()

def tearDown(self):
shutil.rmtree(self.d)

def test_create_principal_default_paths(self):
"""Test creating a principal with default path format."""
backend = MultiUserXandikosBackend(self.d)
backend.set_principal("testuser")

# Check that principal was created with default format
principal_path = "/testuser/"
resource = backend.get_resource(principal_path)
self.assertIsNotNone(resource)
# _mark_as_principal normalizes the path, removing trailing slashes
self.assertIn("/testuser", backend._user_principals)

# Check autocreate is enabled
self.assertTrue(backend.autocreate)

def test_create_principal_custom_paths(self):
"""Test creating a principal with custom path format."""
backend = MultiUserXandikosBackend(
self.d, principal_path_prefix="/users/", principal_path_suffix="/principal/"
)
backend.set_principal("customuser")

# Check that principal was created with custom format
principal_path = "/users/customuser/principal/"
resource = backend.get_resource(principal_path)
self.assertIsNotNone(resource)
# _mark_as_principal normalizes the path, removing trailing slashes
self.assertIn("/users/customuser/principal", backend._user_principals)

def test_create_principal_override_paths(self):
"""Test overriding path format at method level."""
backend = MultiUserXandikosBackend(self.d)
backend.set_principal(
"override",
principal_path_prefix="/special/",
principal_path_suffix="/user/",
)

# Check that principal was created with overridden format
principal_path = "/special/override/user/"
resource = backend.get_resource(principal_path)
self.assertIsNotNone(resource)
# _mark_as_principal normalizes the path, removing trailing slashes
self.assertIn("/special/override/user", backend._user_principals)

def test_multiple_users(self):
"""Test creating multiple users."""
backend = MultiUserXandikosBackend(self.d)

# Create multiple users
users = ["alice", "bob", "charlie"]
for user in users:
backend.set_principal(user)

# Verify all users were created
for user in users:
principal_path = f"/{user}/"
resource = backend.get_resource(principal_path)
self.assertIsNotNone(resource)
# _mark_as_principal normalizes the path, removing trailing slashes
self.assertIn(f"/{user}", backend._user_principals)

def test_idempotent_principal_creation(self):
"""Test that setting the same principal twice is idempotent."""
backend = MultiUserXandikosBackend(self.d)

# Create principal first time
backend.set_principal("testuser")
principal_path = "/testuser/"
resource1 = backend.get_resource(principal_path)

# Create same principal again
backend.set_principal("testuser")
resource2 = backend.get_resource(principal_path)

# Should be the same resource
self.assertIsNotNone(resource1)
self.assertIsNotNone(resource2)
32 changes: 31 additions & 1 deletion xandikos/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,12 +1059,18 @@ def open_store_from_path(path: str, **kwargs):

class XandikosBackend(webdav.Backend):
def __init__(
self, path, *, paranoid: bool = False, index_threshold: Optional[int] = None
self,
path,
*,
paranoid: bool = False,
index_threshold: Optional[int] = None,
autocreate: bool = False,
) -> None:
self.path = path
self._user_principals: set[str] = set()
self.paranoid = paranoid
self.index_threshold = index_threshold
self.autocreate = autocreate

def _map_to_file_path(self, relpath):
return os.path.join(self.path, relpath.lstrip("/"))
Expand Down Expand Up @@ -1188,6 +1194,30 @@ async def move_collection(
shutil.move(source_file_path, dest_file_path)


class MultiUserXandikosBackend(XandikosBackend):
"""Backend that automatically creates principals for authenticated users."""

def __init__(self, path, principal_path_prefix="/", principal_path_suffix="/"):
super().__init__(path, autocreate=True)
self.principal_path_prefix = principal_path_prefix
self.principal_path_suffix = principal_path_suffix

def set_principal(
self, user, principal_path_prefix=None, principal_path_suffix=None
):
"""Set the principal for a user, creating it if necessary."""
if principal_path_prefix is None:
principal_path_prefix = self.principal_path_prefix
if principal_path_suffix is None:
principal_path_suffix = self.principal_path_suffix

principal = principal_path_prefix + user + principal_path_suffix

if not self.get_resource(principal):
self.create_principal(principal, create_defaults=True)
self._mark_as_principal(principal)


class XandikosApp(webdav.WebDAVApp):
"""A wsgi App that provides a Xandikos web server."""

Expand Down
14 changes: 14 additions & 0 deletions xandikos/webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -2656,6 +2656,20 @@ def _get_allowed_methods(self, request):
return ret

async def _handle_request(self, request, environ, start_response=None):
# Handle remote user authentication
remote_user = None
if hasattr(request, "headers"):
# aiohttp request
remote_user = request.headers.get("X-Remote-User")

if "ORIGINAL_ENVIRON" in environ:
# WSGI request
remote_user = environ["ORIGINAL_ENVIRON"].get("HTTP_X_REMOTE_USER")

if remote_user and hasattr(self.backend, "set_principal"):
environ["REMOTE_USER"] = remote_user
self.backend.set_principal(remote_user)

try:
do = self.methods[request.method]
except KeyError:
Expand Down
Loading