Skip to content

Commit 03d535d

Browse files
authored
Merge branch 'lbryio:master' into send-max-pr1628
2 parents 5e87378 + 2735484 commit 03d535d

File tree

16 files changed

+487
-38
lines changed

16 files changed

+487
-38
lines changed

lbry/conf.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,10 @@ class Config(CLIConfig):
681681
('cdn.reflector.lbry.com', 5567)
682682
])
683683

684+
tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
685+
('tracker.lbry.com', 1337)
686+
])
687+
684688
lbryum_servers = Servers("SPV wallet servers", [
685689
('spv11.lbry.com', 50001),
686690
('spv12.lbry.com', 50001),

lbry/extras/daemon/components.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
from lbry.torrent.torrent_manager import TorrentManager
2828
from lbry.wallet import WalletManager
2929
from lbry.wallet.usage_payment import WalletServerPayer
30+
from lbry.torrent.tracker import TrackerClient
31+
3032
try:
3133
from lbry.torrent.session import TorrentSession
3234
except ImportError:
@@ -48,6 +50,7 @@
4850
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
4951
UPNP_COMPONENT = "upnp"
5052
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
53+
TRACKER_ANNOUNCER_COMPONENT = "tracker_announcer_component"
5154
LIBTORRENT_COMPONENT = "libtorrent_component"
5255

5356

@@ -708,3 +711,49 @@ async def start(self):
708711

709712
async def stop(self):
710713
self.exchange_rate_manager.stop()
714+
715+
716+
class TrackerAnnouncerComponent(Component):
717+
component_name = TRACKER_ANNOUNCER_COMPONENT
718+
depends_on = [FILE_MANAGER_COMPONENT]
719+
720+
def __init__(self, component_manager):
721+
super().__init__(component_manager)
722+
self.file_manager = None
723+
self.announce_task = None
724+
self.tracker_client: typing.Optional[TrackerClient] = None
725+
726+
@property
727+
def component(self):
728+
return self.tracker_client
729+
730+
@property
731+
def running(self):
732+
return self._running and self.announce_task and not self.announce_task.done()
733+
734+
async def announce_forever(self):
735+
while True:
736+
sleep_seconds = 60.0
737+
announce_sd_hashes = []
738+
for file in self.file_manager.get_filtered():
739+
if not file.downloader:
740+
continue
741+
announce_sd_hashes.append(bytes.fromhex(file.sd_hash))
742+
await self.tracker_client.announce_many(*announce_sd_hashes)
743+
await asyncio.sleep(sleep_seconds)
744+
745+
async def start(self):
746+
node = self.component_manager.get_component(DHT_COMPONENT) \
747+
if self.component_manager.has_component(DHT_COMPONENT) else None
748+
node_id = node.protocol.node_id if node else None
749+
self.tracker_client = TrackerClient(node_id, self.conf.tcp_port, lambda: self.conf.tracker_servers)
750+
await self.tracker_client.start()
751+
self.file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
752+
self.announce_task = asyncio.create_task(self.announce_forever())
753+
754+
async def stop(self):
755+
self.file_manager = None
756+
if self.announce_task and not self.announce_task.done():
757+
self.announce_task.cancel()
758+
self.announce_task = None
759+
self.tracker_client.stop()

lbry/extras/daemon/daemon.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
from lbry.extras import system_info
4545
from lbry.extras.daemon import analytics
4646
from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
47-
from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT, DISK_SPACE_COMPONENT
47+
from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT, DISK_SPACE_COMPONENT, TRACKER_ANNOUNCER_COMPONENT
4848
from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT
4949
from lbry.extras.daemon.componentmanager import RequiredCondition
5050
from lbry.extras.daemon.componentmanager import ComponentManager
@@ -4996,7 +4996,6 @@ async def jsonrpc_blob_delete(self, blob_hash):
49964996
DHT / Blob Exchange peer commands.
49974997
"""
49984998

4999-
@requires(DHT_COMPONENT)
50004999
async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
50015000
"""
50025001
Get peers for blob hash
@@ -5018,21 +5017,29 @@ async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
50185017
if not is_valid_blobhash(blob_hash):
50195018
# TODO: use error from lbry.error
50205019
raise Exception("invalid blob hash")
5021-
peers = []
50225020
peer_q = asyncio.Queue(loop=self.component_manager.loop)
5023-
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
5021+
if self.component_manager.has_component(TRACKER_ANNOUNCER_COMPONENT):
5022+
tracker = self.component_manager.get_component(TRACKER_ANNOUNCER_COMPONENT)
5023+
tracker_peers = await tracker.get_kademlia_peer_list(bytes.fromhex(blob_hash))
5024+
log.info("Found %d peers for %s from trackers.", len(tracker_peers), blob_hash[:8])
5025+
peer_q.put_nowait(tracker_peers)
5026+
elif not self.component_manager.has_component(DHT_COMPONENT):
5027+
raise Exception("Peer list needs, at least, either a DHT component or a Tracker component for discovery.")
5028+
peers = []
5029+
if self.component_manager.has_component(DHT_COMPONENT):
5030+
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
50245031
while not peer_q.empty():
50255032
peers.extend(peer_q.get_nowait())
5026-
results = [
5027-
{
5028-
"node_id": hexlify(peer.node_id).decode(),
5033+
results = {
5034+
(peer.address, peer.tcp_port): {
5035+
"node_id": hexlify(peer.node_id).decode() if peer.node_id else None,
50295036
"address": peer.address,
50305037
"udp_port": peer.udp_port,
50315038
"tcp_port": peer.tcp_port,
50325039
}
50335040
for peer in peers
5034-
]
5035-
return paginate_list(results, page, page_size)
5041+
}
5042+
return paginate_list(list(results.values()), page, page_size)
50365043

50375044
@requires(DATABASE_COMPONENT)
50385045
async def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None):

lbry/file/source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
4545
self.purchase_receipt = None
4646
self._added_on = added_on
4747
self.analytics_manager = analytics_manager
48+
self.downloader = None
4849

4950
self.saving = asyncio.Event(loop=self.loop)
5051
self.finished_writing = asyncio.Event(loop=self.loop)

lbry/stream/downloader.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from lbry.utils import lru_cache_concurrent
99
from lbry.stream.descriptor import StreamDescriptor
1010
from lbry.blob_exchange.downloader import BlobDownloader
11+
from lbry.torrent.tracker import enqueue_tracker_search
12+
1113
if typing.TYPE_CHECKING:
1214
from lbry.conf import Config
1315
from lbry.dht.node import Node
@@ -91,6 +93,7 @@ async def start(self, node: typing.Optional['Node'] = None, connection_id: int =
9193
self.accumulate_task.cancel()
9294
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
9395
await self.add_fixed_peers()
96+
enqueue_tracker_search(bytes.fromhex(self.sd_hash), self.peer_queue)
9497
# start searching for peers for the sd hash
9598
self.search_queue.put_nowait(self.sd_hash)
9699
log.info("searching for peers for stream %s", self.sd_hash)

lbry/stream/managed_stream.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616

1717
if typing.TYPE_CHECKING:
1818
from lbry.conf import Config
19-
from lbry.schema.claim import Claim
2019
from lbry.blob.blob_manager import BlobManager
2120
from lbry.blob.blob_info import BlobInfo
22-
from lbry.dht.node import Node
2321
from lbry.extras.daemon.analytics import AnalyticsManager
2422
from lbry.wallet.transaction import Transaction
2523

0 commit comments

Comments
 (0)