Skip to content
Open
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v3
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ coverage.xml
# dotenv
.env

# WebDAV dead property storage
*.props

# virtualenv
venv/
ENV/
Expand Down
156 changes: 127 additions & 29 deletions pywebdav/lib/WebDAVServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import urllib.parse

from .propfind import PROPFIND
from .proppatch import PROPPATCH
from .report import REPORT
from .delete import DELETE
from .davcopy import COPY
Expand Down Expand Up @@ -313,9 +314,100 @@ def do_POST(self):

self.send_body(None, 405, 'Method Not Allowed', 'Method Not Allowed')

def _validate_lock_token(self, uri, ifheader):
"""
Validate If header against locks on a resource

Returns True if:
- Resource is not locked, OR
- Resource is locked AND If header contains a valid lock token

Returns False if:
- Resource is locked AND no If header provided, OR
- Resource is locked AND If header has invalid/missing token

Per RFC 4918 Section 10.4:
- Tagged lists (with resource) only apply if resource matches
- Untagged lists apply to the Request-URI
- For shared locks, ANY valid token from the set is sufficient
"""
if not self._l_isLocked(uri):
return True # Not locked - no validation needed

if not ifheader:
return False # Locked but no If header - FAIL

# Get all locks for this URI (supports shared locks)
uri_locks = self._l_getLocksForUri(uri)
taglist = IfParser(ifheader)

for tag in taglist:
# If tag has a resource, check if it matches the Request-URI
if tag.resource:
# Tagged list - only applies if resource matches
tag_uri = urllib.parse.unquote(tag.resource)
if tag_uri != uri:
continue # This tag doesn't apply to this resource

# Tag applies to this resource - check if any token is valid
for listitem in tag.list:
token = tokenFinder(listitem)
if token and self._l_hasLock(token):
# Check if this token is for one of the locks on this resource
for lock in uri_locks:
if lock.token == token:
return True # Valid token found

return False # No valid token found

def do_PROPPATCH(self):
# currently unsupported
return self.send_status(423)
""" Modify properties on a resource. """

dc = self.IFACE_CLASS

# Read the body containing the XML request
body = None
if 'Content-Length' in self.headers:
l = self.headers['Content-Length']
body = self.rfile.read(int(l))

if not body:
return self.send_status(400)

uri = urllib.parse.unquote(urllib.parse.urljoin(self.get_baseuri(dc), self.path))

# Validate lock token if resource is locked
if not self._validate_lock_token(uri, self.headers.get('If')):
return self.send_status(423)

# Parse and execute PROPPATCH
try:
pp = PROPPATCH(uri, dc, body)
except ExpatError:
# XML parse error
return self.send_status(400)
except DAV_Error as error:
(ec, dd) = error.args
return self.send_status(ec)

# Execute the property operations
try:
pp.validate_and_execute()
except DAV_NotFound:
return self.send_status(404)
except DAV_Error as error:
(ec, dd) = error.args
return self.send_status(ec)

# Generate Multi-Status response
try:
DATA = pp.create_response()
except DAV_Error as error:
(ec, dd) = error.args
return self.send_status(ec)

self.send_body_chunks_if_http11(DATA, 207, 'Multi-Status',
'Multiple responses')

def do_PROPFIND(self):
""" Retrieve properties on defined resource. """
Expand Down Expand Up @@ -420,9 +512,9 @@ def do_DELETE(self):
if uri.find('#') >= 0:
return self.send_status(404)

# locked resources are not allowed to delete
if self._l_isLocked(uri):
return self.send_body(None, 423, 'Locked', 'Locked')
# Validate lock token if resource is locked
if not self._validate_lock_token(uri, self.headers.get('If')):
return self.send_status(423)

# Handle If-Match
if 'If-Match' in self.headers:
Expand Down Expand Up @@ -542,34 +634,38 @@ def do_PUT(self):
self.log_request(412)
return

# locked resources are not allowed to be overwritten
# Validate lock token if resource is locked
ifheader = self.headers.get('If')
if (
(self._l_isLocked(uri)) and
(not ifheader)
):
return self.send_body(None, 423, 'Locked', 'Locked')

if self._l_isLocked(uri) and ifheader:
uri_token = self._l_getLockForUri(uri)
is_locked = self._l_isLocked(uri)

if is_locked:
# Resource is locked - must have valid lock token
if not self._validate_lock_token(uri, ifheader):
return self.send_status(423)
elif ifheader:
# Resource not locked but If header provided - validate it anyway
# Per RFC 4918, If header creates precondition that must be satisfied
# If it contains lock tokens that don't exist, fail with 412
taglist = IfParser(ifheader)
found = False
found_valid = False
for tag in taglist:
# Check if tag applies to this resource
if tag.resource:
tag_uri = urllib.parse.unquote(tag.resource)
if tag_uri != uri:
continue
# Check if any token in the tag is valid
for listitem in tag.list:
token = tokenFinder(listitem)
if (
token and
(self._l_hasLock(token)) and
(self._l_getLock(token) == uri_token)
):
found = True
if token and self._l_hasLock(token):
found_valid = True
break
if found:
if found_valid:
break
if not found:
res = self.send_body(None, 423, 'Locked', 'Locked')
self.log_request(423)
return res

# If If header specified but no valid tokens, fail with 412
if not found_valid:
return self.send_status(412)

# Handle expect
expect = self.headers.get('Expect', '')
Expand Down Expand Up @@ -676,9 +772,11 @@ def copymove(self, CLASS):
dest_uri = self.headers['Destination']
dest_uri = urllib.parse.unquote(dest_uri)

# check locks on source and dest
if self._l_isLocked(source_uri) or self._l_isLocked(dest_uri):
return self.send_body(None, 423, 'Locked', 'Locked')
# Per RFC 4918:
# - Source can be locked (locks don't transfer on COPY/MOVE)
# - Destination must not be locked OR must have valid token in If header
if not self._validate_lock_token(dest_uri, self.headers.get('If')):
return self.send_status(423)

# Overwrite?
overwrite = 1
Expand Down
10 changes: 9 additions & 1 deletion pywebdav/lib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

DAV_VERSION_2 = {
'version' : '1,2',
'options' :
'options' :
DAV_VERSION_1['options'] + ', LOCK, UNLOCK'
}

# WebDAV namespace
DAV_NAMESPACE = "DAV:"

# Property storage limits
MAX_PROPERTY_COUNT = 1000 # Maximum properties per resource
MAX_PROPERTY_VALUE_SIZE = 1024 * 1024 # 1MB per property value
MAX_PROPERTY_TOTAL_SIZE = 10 * 1024 * 1024 # 10MB total per resource
44 changes: 44 additions & 0 deletions pywebdav/lib/iface.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,50 @@ def get_prop(self,uri,ns,propname):
except AttributeError:
raise DAV_NotFound

###
### PROPERTY MANAGEMENT (for PROPPATCH)
###

def get_dead_props(self, uri):
""" return all dead properties for a resource

Dead properties are custom properties set by clients via PROPPATCH.
Returns a dict: {namespace: {propname: value, ...}, ...}

Base implementation has no storage, returns empty dict.
Override this in subclasses that support property storage.
"""
return {}

def set_prop(self, uri, ns, propname, value):
""" set a property value (dead property)

uri -- uri of the resource
ns -- namespace of the property
propname -- name of the property
value -- value to set (string)

Returns True on success, raises DAV_Error on failure.
Protected properties (DAV: namespace) should raise DAV_Forbidden.

Base implementation doesn't support property storage.
"""
raise DAV_Forbidden

def del_prop(self, uri, ns, propname):
""" delete a property (dead property)

uri -- uri of the resource
ns -- namespace of the property
propname -- name of the property

Returns True on success, raises DAV_Error on failure.
Should succeed even if property doesn't exist (idempotent).

Base implementation doesn't support property storage.
"""
raise DAV_Forbidden

###
### DATA methods (for GET and PUT)
###
Expand Down
Loading