-
Notifications
You must be signed in to change notification settings - Fork 432
Expand file tree
/
Copy pathtorrent.py
More file actions
146 lines (120 loc) · 5.12 KB
/
torrent.py
File metadata and controls
146 lines (120 loc) · 5.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import logging
from sqlalchemy import delete, func
from sqlmodel import Session, select
from module.models import Torrent
logger = logging.getLogger(__name__)
class TorrentDatabase:
def __init__(self, session: Session):
self.session = session
def add(self, data: Torrent):
self.session.add(data)
self.session.commit()
logger.debug("Insert %s in database.", data.name)
def add_all(self, datas: list[Torrent]):
self.session.add_all(datas)
self.session.commit()
logger.debug("Insert %s torrents in database.", len(datas))
def update(self, data: Torrent):
self.session.add(data)
self.session.commit()
logger.debug("Update %s in database.", data.name)
def update_all(self, datas: list[Torrent]):
self.session.add_all(datas)
self.session.commit()
def update_one_user(self, data: Torrent):
self.session.add(data)
self.session.commit()
logger.debug("Update %s in database.", data.name)
def search(self, _id: int) -> Torrent | None:
result = self.session.execute(select(Torrent).where(Torrent.id == _id))
return result.scalar_one_or_none()
def search_all(self) -> list[Torrent]:
result = self.session.execute(select(Torrent))
return list(result.scalars().all())
def search_rss(self, rss_id: int) -> list[Torrent]:
result = self.session.execute(select(Torrent).where(Torrent.rss_id == rss_id))
return list(result.scalars().all())
def check_new(self, torrents_list: list[Torrent]) -> list[Torrent]:
if not torrents_list:
return []
urls = [t.url for t in torrents_list]
statement = select(Torrent.url).where(Torrent.url.in_(urls))
result = self.session.execute(statement)
existing_urls = set(result.scalars().all())
return [t for t in torrents_list if t.url not in existing_urls]
def search_by_qb_hash(self, qb_hash: str) -> Torrent | None:
"""Find torrent by qBittorrent hash."""
result = self.session.execute(select(Torrent).where(Torrent.qb_hash == qb_hash))
return result.scalar_one_or_none()
def search_by_qb_hashes(self, qb_hashes: list[str]) -> list[Torrent]:
"""Find torrents by multiple qBittorrent hashes (batch query)."""
if not qb_hashes:
return []
result = self.session.execute(
select(Torrent).where(Torrent.qb_hash.in_(qb_hashes))
)
return list(result.scalars().all())
def delete_by_bangumi_id(self, bangumi_id: int) -> int:
"""Delete all torrent records associated with a bangumi.
Returns the number of deleted records.
"""
statement = select(Torrent).where(Torrent.bangumi_id == bangumi_id)
result = self.session.execute(statement)
torrents = list(result.scalars().all())
count = len(torrents)
for t in torrents:
self.session.delete(t)
if count > 0:
self.session.commit()
logger.debug("Deleted %s torrent records for bangumi_id %s.", count, bangumi_id)
return count
def search_by_url(self, url: str) -> Torrent | None:
"""Find torrent by URL."""
result = self.session.execute(select(Torrent).where(Torrent.url == url))
return result.scalar_one_or_none()
def update_qb_hash(self, torrent_id: int, qb_hash: str) -> bool:
"""Update the qb_hash for a torrent."""
torrent = self.search(torrent_id)
if torrent:
torrent.qb_hash = qb_hash
self.session.add(torrent)
self.session.commit()
logger.debug("Updated qb_hash for torrent %s: %s", torrent_id, qb_hash)
return True
return False
def search_by_bangumi_id(self, bangumi_id: int) -> list[Torrent]:
result = self.session.execute(
select(Torrent).where(Torrent.bangumi_id == bangumi_id)
)
return list(result.scalars().all())
def search_orphans(self) -> list[Torrent]:
result = self.session.execute(
select(Torrent).where(Torrent.bangumi_id.is_(None))
)
return list(result.scalars().all())
def count_orphans(self) -> int:
result = self.session.execute(
select(func.count()).select_from(Torrent).where(Torrent.bangumi_id.is_(None))
)
return result.scalar_one()
def delete_one(self, torrent_id: int) -> bool:
torrent = self.search(torrent_id)
if torrent is None:
return False
self.session.delete(torrent)
self.session.commit()
logger.debug("Deleted torrent %s.", torrent_id)
return True
def delete_obj(self, torrent: Torrent) -> None:
self.session.delete(torrent)
self.session.commit()
logger.debug("Deleted torrent %s.", torrent.id)
def delete_orphans(self) -> int:
result = self.session.execute(
delete(Torrent).where(Torrent.bangumi_id.is_(None))
)
self.session.commit()
count = result.rowcount
if count > 0:
logger.debug("Deleted %s orphan torrents.", count)
return count