Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/tribler/core/database/restapi/database_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import binascii
import json
import typing
from binascii import unhexlify
Expand Down Expand Up @@ -92,6 +93,7 @@ def __init__(self, middlewares: tuple = (), client_max_size: int = MAX_REQUEST_S
web.delete("/torrents/{infohash}/tags", self.remove_tag),
web.patch("/torrents/{infohash}/tags", self.update_tags),
web.get("/torrents/popular", self.get_popular_torrents),
web.get("/torrents/health", self.get_torrent_health_history),
web.get("/search/local", self.local_search),
web.get("/search/completions", self.completions),
]
Expand Down Expand Up @@ -179,6 +181,38 @@ async def get_torrent_health(self, request: RequestType) -> RESTResponse:
_ = self.register_anonymous_task("health_check", asyncio.ensure_future(health_check_coro))
return RESTResponse({"checking": True})

@docs(
tags=["Metadata"],
summary="Fetch the health of recently checked swarms.",
responses={
200: {
"schema": schema(
HealthHistoryResponse={
"health_history": [schema(Health={
"infohash": String,
"seeders": Integer,
"leechers": Integer,
"last_check": Integer,
})]
}
)
}
},
)
async def get_torrent_health_history(self, request: RequestType) -> RESTResponse:
"""
Fetch the swarm health of a specific torrent.
"""
if self.torrent_checker is None:
return RESTResponse({"health_history": []})

return RESTResponse({"health_history": [{"infohash": binascii.hexlify(health.infohash).decode(),
"seeders": health.seeders,
"leechers": health.leechers,
"last_check": health.last_check,
"tracker": health.tracker}
for health in self.torrent_checker.torrents_checked.values()]})

def add_download_progress_to_metadata_list(self, contents_list: list[dict]) -> None:
"""
Retrieve the download status from libtorrent and attach it to the torrent descriptions in the content list.
Expand Down
13 changes: 10 additions & 3 deletions src/tribler/core/libtorrent/download_manager/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,20 +502,21 @@ def process_alert(self, alert: lt.alert, hops: int = 0) -> None: # noqa: C901,
bytearray(decoded[b"r"][b"BFsd"]),
bytearray(decoded[b"r"][b"BFpe"]))

async def get_metainfo(self, infohash: bytes, timeout: float = 7, hops: int | None = -1, # noqa: C901,PLR0912
url: str | None = None) -> MetainfoLookupResult | None:
async def get_metainfo(self, infohash: bytes, timeout: float = 7, hops: int | None = -1, # noqa: C901,PLR0912,PLR0915
health_check: bool = False, url: str | None = None) -> MetainfoLookupResult | None:
"""
Lookup metainfo for a given infohash. The mechanism works by joining the swarm for the infohash connecting
to a few peers, and downloading the metadata for the torrent.

:param infohash: The (binary) infohash to lookup metainfo for.
:param timeout: A timeout in seconds.
:param hops: the number of tunnel hops to use for this lookup. If None, use config default.
:param health_check: Whether we're doing a health check.
:param url: Optional URL. Can contain trackers info, etc.
:return: The metainfo
"""
infohash_hex = hexlify(infohash)
if infohash in self.metainfo_cache:
if infohash in self.metainfo_cache and not health_check:
logger.info("Returning metainfo from cache for %s", infohash_hex)
return self.metainfo_cache[infohash]

Expand Down Expand Up @@ -549,6 +550,7 @@ async def get_metainfo(self, infohash: bytes, timeout: float = 7, hops: int | No
return None
self.metainfo_requests[infohash] = MetainfoLookup(download, 1)

t_start = time.time()
try:
await wait_for(shield(download.future_metainfo), timeout)
except (CancelledError, TimeoutError) as e:
Expand All @@ -557,6 +559,11 @@ async def get_metainfo(self, infohash: bytes, timeout: float = 7, hops: int | No
return None
else:
logger.info("Successfully retrieved metainfo for %s", infohash_hex)
# Spend the remaining time we have left determining the swarm size.
if health_check:
download.set_max_download_rate(10)
download.set_max_upload_rate(10)
await asyncio.sleep(timeout - (time.time() - t_start))
seeders, leechers = download.get_state().get_num_seeds_peers()
self.metainfo_cache[infohash] = MetainfoLookupResult(time=time.time(),
tdef=tdef,
Expand Down
44 changes: 16 additions & 28 deletions src/tribler/core/torrent_checker/torrent_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from tribler.core.notifier import Notification, Notifier
from tribler.core.torrent_checker.healthdataclasses import HEALTH_FRESHNESS_SECONDS, HealthInfo, TrackerResponse
from tribler.core.torrent_checker.torrentchecker_session import (
FakeDHTSession,
TrackerSession,
UdpSocketManager,
create_tracker_session,
Expand All @@ -34,18 +33,18 @@
TORRENT_SELECTION_INTERVAL = 10 # The interval for checking the health of a random torrent
MIN_TORRENT_CHECK_INTERVAL = 900 # How much time we should wait before checking a torrent again
TORRENT_CHECK_RETRY_INTERVAL = 30 # Interval when the torrent was successfully checked for the last time
MAX_TORRENTS_CHECKED_PER_SESSION = 5 # (5 random + 5 per tracker = 10 torrents) per 10 seconds
MAX_TORRENTS_CHECKED_PER_SESSION = 5 # Maximum simultaneous health checks.
SWARM_HEALTH_CHECK_TIMEOUT = 240 # Number of seconds to spend in a swarm when doing a health check.

TORRENT_SELECTION_POOL_SIZE = 2 # How many torrents to check (popular or random) during periodic check
USER_CHANNEL_TORRENT_SELECTION_POOL_SIZE = 5 # How many torrents to check from user's channel during periodic check
TORRENT_SELECTION_POOL_SIZE = 5 # How many torrents to check (popular or random) during periodic check
TORRENTS_CHECKED_RETURN_SIZE = 240 # Estimated torrents checked on default 4 hours idle run


def aggregate_responses_for_infohash(infohash: bytes, responses: list[TrackerResponse]) -> HealthInfo:
"""
Finds the "best" health info (with the max number of seeders) for a specified infohash.
"""
result = HealthInfo(infohash, last_check=0)
result = HealthInfo(infohash, last_check=int(time.time()), self_checked=True)
for response in responses:
for health in response.torrent_health_list:
if health.infohash == infohash and health > result:
Expand Down Expand Up @@ -91,7 +90,6 @@ async def initialize(self) -> None:
"""
Start all the looping tasks for the checker and creata socket.
"""
self.register_task("check random tracker", self.check_random_tracker, interval=TRACKER_SELECTION_INTERVAL)
self.register_task("check local torrents", self.check_local_torrents, interval=TORRENT_SELECTION_INTERVAL)
await self.create_socket_or_schedule()

Expand Down Expand Up @@ -207,10 +205,6 @@ async def get_tracker_response(self, session: TrackerSession) -> TrackerResponse
self._logger.info("Got response from %s in %f seconds: %s", session.__class__.__name__, round(t2 - t1, 3),
str(result))

with db_session:
for health in result.torrent_health_list:
self.update_torrent_health(health)

return result

@property
Expand Down Expand Up @@ -278,7 +272,7 @@ async def check_local_torrents(self) -> tuple[list, list]:
"""
selected_torrents = self.torrents_to_check()
self._logger.info("Check %d local torrents", len(selected_torrents))
results = [await self.check_torrent_health(t.infohash) for t in selected_torrents]
results = await asyncio.gather(*[self.check_torrent_health(t.infohash) for t in selected_torrents])
self._logger.info("Results for local torrents check: %s", str(results))
return selected_torrents, results

Expand Down Expand Up @@ -346,21 +340,19 @@ async def check_torrent_health(self, infohash: bytes, timeout: float = 20, scrap
if session := self.create_session_for_request(tracker_url, timeout=timeout):
session.add_infohash(infohash)
coroutines.append(self.get_tracker_response(session))

session = FakeDHTSession(self.download_manager, timeout)
session.add_infohash(infohash)
self._logger.info("DHT session has been created for %s: %s", infohash_hex, str(session))
self.sessions["DHT"].append(session)
coroutines.append(self.get_tracker_response(session))

responses = await asyncio.gather(*coroutines, return_exceptions=True)
self._logger.info("%d responses for %s have been received: %s", len(responses), infohash_hex, str(responses))
self._logger.info("Received %s tracker responses for %s: %s", len(responses), infohash_hex, str(responses))
successful_responses = [response for response in responses if not isinstance(response, Exception)]
health = aggregate_responses_for_infohash(infohash, cast("list[TrackerResponse]", successful_responses))
if health.last_check == 0:
self.notify(health) # We don't need to store this in the db, but we still need to notify the GUI
else:
self.update_torrent_health(health)

if health.seeders == 0 and health.leechers == 0:
self._logger.info("Contacting trackers yielded no results, joining swarm %s", infohash_hex)
if metainfo := await self.download_manager.get_metainfo(infohash,timeout=SWARM_HEALTH_CHECK_TIMEOUT,
health_check=True):
health = HealthInfo(infohash, seeders=metainfo.get("seeders", 0), leechers=metainfo.get("leechers", 0),
last_check=int(time.time()), self_checked=True)

self.update_torrent_health(health)
return health

def create_session_for_request(self, tracker_url: str, timeout: float = 20) -> TrackerSession | None:
Expand Down Expand Up @@ -427,11 +419,7 @@ def update_torrent_health(self, health: HealthInfo) -> bool:
torrent_state.set(seeders=health.seeders, leechers=health.leechers, last_check=health.last_check,
self_checked=True)

if health.seeders > 0 or health.leechers > 0:
self.torrents_checked[health.infohash] = health
else:
self.torrents_checked.pop(health.infohash, None)

self.torrents_checked[health.infohash] = health
self.notify(health)
return True

Expand Down
53 changes: 4 additions & 49 deletions src/tribler/core/torrent_checker/torrentchecker_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
if TYPE_CHECKING:
from ipv8.messaging.interfaces.udp.endpoint import DomainAddress

from tribler.core.libtorrent.download_manager.download_manager import DownloadManager

# Although these are the actions for UDP trackers, they can still be used as
# identifiers.
Expand Down Expand Up @@ -188,14 +187,15 @@ def process_scrape_response(self, body: bytes | None) -> TrackerResponse:
leechers = file_info.get(b"incomplete", 0)

unprocessed_infohashes.discard(infohash)
health_list.append(HealthInfo(infohash, seeders, leechers, last_check=now, self_checked=True))
health_list.append(HealthInfo(infohash, seeders, leechers, last_check=now,
self_checked=True, tracker=self.tracker_url))

elif b"failure reason" in response_dict:
self._logger.info("%s Failure as reported by tracker [%s]", self, repr(response_dict[b"failure reason"]))
self.failed(msg=repr(response_dict[b"failure reason"]))

# handle the infohashes with no result (seeders/leechers = 0/0)
health_list.extend(HealthInfo(infohash=infohash, last_check=now, self_checked=True)
health_list.extend(HealthInfo(infohash=infohash, last_check=now, self_checked=True, tracker=self.tracker_url)
for infohash in unprocessed_infohashes)

self.is_finished = True
Expand Down Expand Up @@ -453,7 +453,7 @@ async def scrape(self) -> TrackerResponse:
# Sow complete as seeders. "complete: number of peers with the entire file, i.e. seeders (integer)"
# - https://wiki.theory.org/BitTorrentSpecification#Tracker_.27scrape.27_Convention
response_list.append(HealthInfo(infohash, seeders=complete, leechers=incomplete,
last_check=now, self_checked=True))
last_check=now, self_checked=True, tracker=self.tracker_url))

# close this socket and remove its transaction ID from the list
self.remove_transaction_id()
Expand All @@ -463,51 +463,6 @@ async def scrape(self) -> TrackerResponse:
return TrackerResponse(url=self.tracker_url, torrent_health_list=response_list)


class FakeDHTSession(TrackerSession):
"""
Fake TrackerSession that manages DHT requests.
"""

def __init__(self, download_manager: DownloadManager, timeout: float) -> None:
"""
Create a new fake DHT tracker session.
"""
super().__init__("DHT", "DHT", ("DHT", 0), "DHT", timeout)

self.download_manager = download_manager

async def connect_to_tracker(self) -> TrackerResponse:
"""
Query the bittorrent DHT.
"""
health_list = []
now = int(time.time())
for infohash in self.infohash_list:
lookup = await self.download_manager.get_metainfo(infohash, timeout=self.timeout)
if lookup is None:
continue
health = HealthInfo(infohash, seeders=lookup["seeders"], leechers=lookup["leechers"],
last_check=now, self_checked=True)
health_list.append(health)

return TrackerResponse(url="DHT", torrent_health_list=health_list)


class FakeBep33DHTSession(FakeDHTSession):
"""
Fake session for a BEP33 lookup.
"""

async def connect_to_tracker(self) -> TrackerResponse:
"""
Query the bittorrent DHT using BEP33 to avoid joining the swarm.
"""
dht_health_manager = self.download_manager.dht_health_manager
health_infos = [await dht_health_manager.get_health(infohash, timeout=self.timeout)
for infohash in self.infohash_list] if dht_health_manager is not None else []
return TrackerResponse(url="DHT", torrent_health_list=health_infos)


def create_tracker_session(tracker_url: str, timeout: float, proxy: tuple | None,
socket_manager: UdpSocketManager) -> TrackerSession:
"""
Expand Down
15 changes: 7 additions & 8 deletions src/tribler/core/tunnel/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import hashlib
import time
from binascii import hexlify, unhexlify
from collections import Counter
from typing import TYPE_CHECKING, cast

from ipv8.messaging.anonymization.hidden_services import HiddenTunnelCommunity, HiddenTunnelSettings
Expand Down Expand Up @@ -330,7 +329,7 @@ def monitor_hidden_swarms(self, new_states: dict[bytes, DownloadStatus], hops: d
if intro_points_todo <= 0:
return

ip_counter = Counter([c.info_hash for c in intro_points])
ip_hashes = {c.info_hash for c in intro_points}
for info_hash in set(list(new_states) + list(self.download_states)):
new_state = new_states.get(info_hash)
old_state = self.download_states.get(info_hash, None)
Expand All @@ -348,12 +347,12 @@ def on_join_swarm(addr: Address, ih: bytes = info_hash) -> None:
self.leave_swarm(info_hash)

# Ensure we have enough introduction points for this infohash. Currently, we only create 1.
if new_state == DownloadStatus.SEEDING:
for _ in range(1 - ip_counter.get(info_hash, 0)):
if intro_points_todo <= 0:
return
self.logger.info("Create introducing circuit for %s", hexlify(info_hash))
self.create_introduction_point(info_hash)
if new_state == DownloadStatus.SEEDING and info_hash not in ip_hashes:
self.logger.info("Create introducing circuit for %s", hexlify(info_hash))
self.create_introduction_point(info_hash)
intro_points_todo -= 1
if intro_points_todo <= 0:
return

def on_e2e_finished(self, address: Address, info_hash: bytes) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def setUp(self) -> None:

self.tracker_manager = TrackerManager(state_dir=Path("."), metadata_store=self.metadata_store)
self.torrent_checker = TorrentChecker(config=TriblerConfigManager(), tracker_manager=self.tracker_manager,
download_manager=MagicMock(get_metainfo=AsyncMock()),
download_manager=MagicMock(get_metainfo=AsyncMock(return_value={})),
notifier=MagicMock(), metadata_store=self.metadata_store)

async def tearDown(self) -> None:
Expand Down
Loading