-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathocs.py
More file actions
158 lines (131 loc) · 6.49 KB
/
ocs.py
File metadata and controls
158 lines (131 loc) · 6.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""OCS (Open Collaboration Services) client for NextCloud API.
Reference: https://docs.nextcloud.com/server/latest/developer_manual/client_apis/index.html
"""
import logging
import defusedxml.ElementTree as ET
from app.clients.base import BaseAPIClient
from app.models.activity import Activity, FileActivity, FileActivityResponse, FileInfo
from app.models.search import FileSearchResult
logger = logging.getLogger(__name__)
class OCSClient(BaseAPIClient):
"""Client for NextCloud OCS API.
Handles business logic for activities, file search, calendar search, and task search.
"""
service_name = "NextCloud OCS"
def _auth_headers(self) -> dict[str, str]:
headers = super()._auth_headers()
headers["OCS-APIRequest"] = "true"
headers["Accept"] = "application/json"
return headers
async def get_file_activities(
self,
limit: int = 50,
since: int = 0,
is_favorite: bool = False,
) -> FileActivityResponse:
"""Get file activities with cursor-based pagination.
When is_favorite=True, fetches favorite files via WebDAV REPORT instead of the activity feed.
Otherwise fetches activities, filters to file-related activities (including sharing),
and returns with all files per activity preserved.
"""
if is_favorite:
return await self._get_favorite_files()
url_string = "ocs/v2.php/apps/activity/api/v2/activity/files"
params: dict[str, str] = {"format": "json"}
if since:
params["since"] = str(since)
if limit:
params["limit"] = str(limit)
activities, headers = await self._get_resource_with_headers(
path=url_string,
model_type=list[Activity],
params=params,
response_parser=lambda data: data.get("ocs", {}).get("data", []),
)
# Filter by object_type == "files" (includes files + files_sharing apps)
file_activities: list[FileActivity] = []
for activity in activities:
if activity.object_type == "files":
file_activities.append(
FileActivity(
activity_id=activity.activity_id,
datetime=activity.datetime,
action=activity.type,
files=activity.extract_files(),
)
)
# Get last_given from header for cursor-based pagination
# Note: httpx returns headers in lowercase
last_given_str = headers.get("x-activity-last-given")
last_given = int(last_given_str) if last_given_str else None
return FileActivityResponse(results=file_activities, last_given=last_given)
async def _get_favorite_files(self) -> FileActivityResponse:
"""Fetch favorite files using Nextcloud WebDAV REPORT.
Reference: https://docs.nextcloud.com/server/latest/developer_manual/client_apis/webdav/index.html
"""
# Resolve current user ID
url = self._build_url("ocs/v2.php/cloud/user")
user_response = await self.client.get(url, params={"format": "json"}, headers=self._auth_headers())
if user_response.status_code != 200:
logger.warning(
"Failed to resolve current user for favorites (status %s), returning empty results",
user_response.status_code,
)
return FileActivityResponse(results=[], last_given=None)
user_id = user_response.json().get("ocs", {}).get("data", {}).get("id", "")
# WebDAV REPORT to filter favorite files
xml_body = (
'<?xml version="1.0"?>'
'<oc:filter-files xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">'
"<d:prop>"
"<d:getlastmodified/><d:getcontenttype/><d:displayname/><oc:fileid/><oc:favorite/>"
"</d:prop>"
"<oc:filter-rules><oc:favorite>1</oc:favorite></oc:filter-rules>"
"</oc:filter-files>"
)
report_url = self._build_url(f"remote.php/dav/files/{user_id}/")
headers = self._auth_headers()
headers["Content-Type"] = "application/xml"
headers["Depth"] = "infinity"
response = await self.client.request("REPORT", report_url, content=xml_body.encode(), headers=headers)
if response.status_code not in (200, 207):
logger.warning(
"Failed to fetch favorites via WebDAV REPORT (status %s), returning empty results",
response.status_code,
)
return FileActivityResponse(results=[], last_given=None)
# Parse WebDAV multistatus XML response
DAV = "DAV:"
OC = "http://owncloud.org/ns"
root = ET.fromstring(response.text)
base_path = f"/remote.php/dav/files/{user_id}/"
file_activities: list[FileActivity] = []
for resp in root.findall(f"{{{DAV}}}response"):
href = resp.findtext(f"{{{DAV}}}href") or ""
propstat = resp.find(f"{{{DAV}}}propstat")
if propstat is None:
continue
if "200" not in (propstat.findtext(f"{{{DAV}}}status") or ""):
continue
prop = propstat.find(f"{{{DAV}}}prop")
if prop is None:
continue
display_name = prop.findtext(f"{{{DAV}}}displayname") or href.rstrip("/").split("/")[-1]
file_id_str = prop.findtext(f"{{{OC}}}fileid")
file_id = int(file_id_str) if file_id_str else None
path = href[len(base_path) :] if href.startswith(base_path) else href
link = f"{self.base_url}/f/{file_id}" if file_id else None
file_activities.append(FileActivity(files=[FileInfo(id=file_id, name=display_name, path=path, link=link)]))
return FileActivityResponse(results=file_activities, last_given=None)
async def search_files(
self, term: str, path: str = "ocs/v2.php/search/providers/files/search"
) -> FileActivityResponse:
validated = await self._get_resource(
path=path,
model_type=list[FileSearchResult],
params={"format": "json", "term": term},
response_parser=lambda data: data.get("ocs", {}).get("data", {}).get("entries", []),
)
search_results: list[FileSearchResult] = validated
file_activities = [FileActivity(files=[FileInfo(name=entry.name, link=entry.url)]) for entry in search_results]
return FileActivityResponse(results=file_activities, last_given=None)