From f6035f5304b718c6b0df6a629a8021d1fe0327ea Mon Sep 17 00:00:00 2001 From: Sourav <70096119+ydavs@users.noreply.github.com> Date: Tue, 23 Aug 2022 06:29:52 +0000 Subject: [PATCH 1/3] Fixed Issue. Issue : Header length not consumed for KeepAlive messages which eventually leads to an infinite loop. --- pieces/protocol.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pieces/protocol.py b/pieces/protocol.py index 51833e5..3f55a66 100644 --- a/pieces/protocol.py +++ b/pieces/protocol.py @@ -316,6 +316,9 @@ def parse(self): message_length = struct.unpack('>I', self.buffer[0:4])[0] if message_length == 0: + logging.debug('Got a KeepAlive message') + # Call consume + self.buffer = self.buffer[header_length + message_length:] return KeepAlive() if len(self.buffer) >= message_length: From cd05c8c2cbcb79be0e8bae1d726295188f8611c2 Mon Sep 17 00:00:00 2001 From: Sourav <70096119+ydavs@users.noreply.github.com> Date: Tue, 23 Aug 2022 09:08:18 +0000 Subject: [PATCH 2/3] Fixed case for peers that don't send BitField --- pieces/client.py | 17 ++++++++++++++++- pieces/protocol.py | 16 ++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/pieces/client.py b/pieces/client.py index baa388e..1663dbb 100644 --- a/pieces/client.py +++ b/pieces/client.py @@ -23,6 +23,7 @@ from asyncio import Queue from collections import namedtuple, defaultdict from hashlib import sha1 +import bitstring from pieces.protocol import PeerConnection, REQUEST_SIZE from pieces.tracker import Tracker @@ -322,6 +323,13 @@ def bytes_uploaded(self) -> int: # TODO Add support for sending data return 0 + def add_peer_without_bitfield(self, peer_id): + """ + Adds a peer without requiring a bitfield(Note sending a bitfield is optional.). + """ + bitfield = bitstring.BitArray(int=0, length=self.total_pieces) + self.peers[peer_id] = bitfield + def add_peer(self, peer_id, bitfield): """ Adds a peer and the bitfield representing the pieces the peer has. @@ -369,7 +377,11 @@ def next_request(self, peer_id) -> Block: if not block: block = self._next_ongoing(peer_id) if not block: - block = self._get_rarest_piece(peer_id).next_request() + piece = self._get_rarest_piece(peer_id) + if piece == None: + return None + else: + block = piece.next_request() return block def block_received(self, peer_id, piece_index, block_offset, data): @@ -468,6 +480,8 @@ def _get_rarest_piece(self, peer_id): if self.peers[p][piece.index]: piece_count[piece] += 1 + if len(piece_count) == 0: + return None rarest_piece = min(piece_count, key=lambda p: piece_count[p]) self.missing_pieces.remove(rarest_piece) self.ongoing_pieces.append(rarest_piece) @@ -499,3 +513,4 @@ def _write(self, piece): pos = piece.index * self.torrent.piece_length os.lseek(self.fd, pos, os.SEEK_SET) os.write(self.fd, piece.data) + \ No newline at end of file diff --git a/pieces/protocol.py b/pieces/protocol.py index 3f55a66..f8f0460 100644 --- a/pieces/protocol.py +++ b/pieces/protocol.py @@ -105,6 +105,9 @@ async def _start(self): # Sending BitField is optional and not needed when client does # not have any pieces. Thus we do not send any bitfield message + # Some peers don't sent any BitField and rely on Have messages. + # Thus we add a peer wihout any BitField. + self.piece_manager.add_peer_without_bitfield(self.remote_id) # The default state for a connection is that peer is not # interested and we are choked self.my_state.append('choked') @@ -116,6 +119,7 @@ async def _start(self): # Start reading responses as a stream of messages for as # long as the connection is open and data is transmitted async for message in PeerStreamIterator(self.reader, buffer): + logging.debug('Message received is of type {}'.format(type(message))) if 'stopped' in self.my_state: break if type(message) is BitField: @@ -154,9 +158,10 @@ async def _start(self): if 'choked' not in self.my_state: if 'interested' in self.my_state: if 'pending_request' not in self.my_state: - self.my_state.append('pending_request') - await self._request_piece() - + chk = await self._request_piece() + if chk: + self.my_state.append('pending_request') + except ProtocolError as e: logging.exception('Protocol error') except (ConnectionRefusedError, TimeoutError): @@ -207,6 +212,9 @@ async def _request_piece(self): self.writer.write(message) await self.writer.drain() + return True + else: + return False async def _handshake(self): """ @@ -321,7 +329,7 @@ def parse(self): self.buffer = self.buffer[header_length + message_length:] return KeepAlive() - if len(self.buffer) >= message_length: + if len(self.buffer) >= message_length+header_length: message_id = struct.unpack('>b', self.buffer[4:5])[0] def _consume(): From 937c2c9184c225cb1e07d7a4c4c7121de08dad98 Mon Sep 17 00:00:00 2001 From: Sourav <70096119+ydavs@users.noreply.github.com> Date: Thu, 25 Aug 2022 11:44:38 +0000 Subject: [PATCH 3/3] Added support for multi-file torrents. Fixed Issue. Re-requesting failed because PendingRequest implemented as namedTuple(immutable), changed it to dict. --- pieces/cli.py | 2 +- pieces/client.py | 58 ++++++++++++++++++++++++++++++++--------------- pieces/torrent.py | 43 +++++++++++++++++++++++++++-------- pieces/tracker.py | 19 +++------------- 4 files changed, 77 insertions(+), 45 deletions(-) diff --git a/pieces/cli.py b/pieces/cli.py index 0175ed1..192159f 100644 --- a/pieces/cli.py +++ b/pieces/cli.py @@ -35,7 +35,7 @@ def main(): args = parser.parse_args() if args.verbose: - logging.basicConfig(level=logging.INFO) + logging.basicConfig(filename='logfile.log', level=logging.INFO) loop = asyncio.get_event_loop() client = TorrentClient(Torrent(args.torrent)) diff --git a/pieces/client.py b/pieces/client.py index 1663dbb..1d7f5a9 100644 --- a/pieces/client.py +++ b/pieces/client.py @@ -70,11 +70,11 @@ async def start(self): if the download is aborted this method will complete. """ self.peers = [PeerConnection(self.available_peers, - self.tracker.torrent.info_hash, - self.tracker.peer_id, - self.piece_manager, - self._on_block_retrieved) - for _ in range(MAX_PEER_CONNECTIONS)] + self.tracker.torrent.info_hash, + self.tracker.peer_id, + self.piece_manager, + self._on_block_retrieved) + for _ in range(MAX_PEER_CONNECTIONS)] # The time we last made an announce call (timestamp) previous = None @@ -83,6 +83,7 @@ async def start(self): while True: if self.piece_manager.complete: + self.piece_manager._organize_files() logging.info('Torrent fully downloaded!') break if self.abort: @@ -96,6 +97,7 @@ async def start(self): uploaded=self.piece_manager.bytes_uploaded, downloaded=self.piece_manager.bytes_downloaded) + logging.debug(response) if response: previous = current interval = response.interval @@ -234,9 +236,6 @@ def data(self): blocks_data = [b.data for b in retrieved] return b''.join(blocks_data) -# The type used for keeping track of pending request that can be re-issued -PendingRequest = namedtuple('PendingRequest', ['block', 'added']) - class PieceManager: """ @@ -401,8 +400,8 @@ def block_received(self, peer_id, piece_index, block_offset, data): # Remove from pending requests for index, request in enumerate(self.pending_blocks): - if request.block.piece == piece_index and \ - request.block.offset == block_offset: + if request['block'].piece == piece_index and \ + request['block'].offset == block_offset: del self.pending_blocks[index] break @@ -440,15 +439,15 @@ def _expired_requests(self, peer_id) -> Block: """ current = int(round(time.time() * 1000)) for request in self.pending_blocks: - if self.peers[peer_id][request.block.piece]: - if request.added + self.max_pending_time < current: + if self.peers[peer_id][request['block'].piece]: + if request['added'] + self.max_pending_time < current: logging.info('Re-requesting block {block} for ' 'piece {piece}'.format( - block=request.block.offset, - piece=request.block.piece)) + block=request['block'].offset, + piece=request['block'].piece)) # Reset expiration timer - request.added = current - return request.block + request['added'] = current + return request['block'] return None def _next_ongoing(self, peer_id) -> Block: @@ -462,7 +461,7 @@ def _next_ongoing(self, peer_id) -> Block: block = piece.next_request() if block: self.pending_blocks.append( - PendingRequest(block, int(round(time.time() * 1000)))) + {'block':block, 'added':int(round(time.time()*1000))}) return block return None @@ -513,4 +512,27 @@ def _write(self, piece): pos = piece.index * self.torrent.piece_length os.lseek(self.fd, pos, os.SEEK_SET) os.write(self.fd, piece.data) - \ No newline at end of file + + def _organize_files(self): + """ + Organize the file strucuture from a single file to as provided in meta_info. + """ + if not self.torrent.multi_file: + return + #if not self.complete: + # raise RuntimeError('organize_files called before completing download!') + pos = 0 + for file in self.torrent.files: + os.lseek(self.fd, pos, os.SEEK_SET) + buffer = os.read(self.fd, file.length) + pos += file.length + + os.makedirs(os.path.dirname(file.path), exist_ok=True) + tmp_fd = os.open(file.path, os.O_RDWR | os.O_CREAT) + os.write(tmp_fd, buffer) + os.close(tmp_fd) + + # Remove the redundant tmp files + os.remove(self.torrent.output_file) + + return \ No newline at end of file diff --git a/pieces/torrent.py b/pieces/torrent.py index 6945e9f..27b5354 100644 --- a/pieces/torrent.py +++ b/pieces/torrent.py @@ -17,11 +17,12 @@ from hashlib import sha1 from collections import namedtuple +import os from . import bencoding # Represents the files within the torrent (i.e. the files to write to disk) -TorrentFile = namedtuple('TorrentFile', ['name', 'length']) +TorrentFile = namedtuple('TorrentFile', ['path', 'length']) class Torrent: @@ -47,12 +48,18 @@ def _identify_files(self): Identifies the files included in this torrent """ if self.multi_file: - # TODO Add support for multi-file torrents - raise RuntimeError('Multi-file torrents is not supported!') - self.files.append( - TorrentFile( - self.meta_info[b'info'][b'name'].decode('utf-8'), - self.meta_info[b'info'][b'length'])) + name = self.meta_info[b'info'][b'name'].decode('utf-8') + if not os.path.exists(name): + os.mkdir(name); + for file in self.meta_info[b'info'][b'files']: + curr_path = (b'/'.join(file[b'path'])).decode('utf-8') + curr_path = os.path.join(self.meta_info[b'info'][b'name'].decode('utf-8'), curr_path) + self.files.append(TorrentFile(curr_path, file[b'length'])) + else: + self.files.append( + TorrentFile( + self.meta_info[b'info'][b'name'].decode('utf-8'), + self.meta_info[b'info'][b'length'])) @property def announce(self) -> str: @@ -60,6 +67,16 @@ def announce(self) -> str: The announce URL to the tracker. """ return self.meta_info[b'announce'].decode('utf-8') + + @property + def announce_list(self) -> list: + """ + Returns announce list of trackers. + """ + if b'announce-list' in self.meta_info.keys(): + return [[a.decode('utf-8') for a in x] for x in self.meta_info[b'announce-list']] + else: + return None @property def multi_file(self) -> bool: @@ -86,8 +103,9 @@ def total_size(self) -> int: :return: The total size (in bytes) for this torrent's data. """ if self.multi_file: - raise RuntimeError('Multi-file torrents is not supported!') - return self.files[0].length + return sum(file.length for file in self.files) + else: + return self.files[0].length @property def pieces(self): @@ -106,7 +124,12 @@ def pieces(self): @property def output_file(self): - return self.meta_info[b'info'][b'name'].decode('utf-8') + if self.multi_file: + if not os.path.exists('.pieces_tmp'): + os.mkdir('.pieces_tmp') + return os.path.join('.pieces_tmp', 'tmp') + else: + return self.meta_info[b'info'][b'name'].decode('utf-8') def __str__(self): return 'Filename: {0}\n' \ diff --git a/pieces/tracker.py b/pieces/tracker.py index da81efd..b0ab218 100644 --- a/pieces/tracker.py +++ b/pieces/tracker.py @@ -80,6 +80,7 @@ def peers(self): # where the peers field is a list of dictionaries and one where all # the peers are encoded in a single string peers = self.response[b'peers'] + logging.debug('Type of peers is {0}, and len is {1}'.format(type(peers), len(peers))) if type(peers) == list: # TODO Implement support for dictionary peer list logging.debug('Dictionary model peers are returned by tracker') @@ -144,6 +145,8 @@ async def connect(self, params['event'] = 'started' url = self.torrent.announce + '?' + urlencode(params) + logging.debug(self.torrent.announce_list) + logging.debug(self.torrent.announce) logging.info('Connecting to tracker at: ' + url) async with self.http_client.get(url) as response: @@ -171,22 +174,6 @@ def raise_for_error(self, tracker_response): except UnicodeDecodeError: pass - def _construct_tracker_parameters(self): - """ - Constructs the URL parameters used when issuing the announce call - to the tracker. - """ - return { - 'info_hash': self.torrent.info_hash, - 'peer_id': self.peer_id, - 'port': 6889, - # TODO Update stats when communicating with tracker - 'uploaded': 0, - 'downloaded': 0, - 'left': 0, - 'compact': 1} - - def _calculate_peer_id(): """ Calculate and return a unique Peer ID.