Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pieces/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def main():

args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.INFO)
logging.basicConfig(filename='logfile.log', level=logging.INFO)

loop = asyncio.get_event_loop()
client = TorrentClient(Torrent(args.torrent))
Expand Down
73 changes: 55 additions & 18 deletions pieces/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from asyncio import Queue
from collections import namedtuple, defaultdict
from hashlib import sha1
import bitstring

from pieces.protocol import PeerConnection, REQUEST_SIZE
from pieces.tracker import Tracker
Expand Down Expand Up @@ -69,11 +70,11 @@ async def start(self):
if the download is aborted this method will complete.
"""
self.peers = [PeerConnection(self.available_peers,
self.tracker.torrent.info_hash,
self.tracker.peer_id,
self.piece_manager,
self._on_block_retrieved)
for _ in range(MAX_PEER_CONNECTIONS)]
self.tracker.torrent.info_hash,
self.tracker.peer_id,
self.piece_manager,
self._on_block_retrieved)
for _ in range(MAX_PEER_CONNECTIONS)]

# The time we last made an announce call (timestamp)
previous = None
Expand All @@ -82,6 +83,7 @@ async def start(self):

while True:
if self.piece_manager.complete:
self.piece_manager._organize_files()
logging.info('Torrent fully downloaded!')
break
if self.abort:
Expand All @@ -95,6 +97,7 @@ async def start(self):
uploaded=self.piece_manager.bytes_uploaded,
downloaded=self.piece_manager.bytes_downloaded)

logging.debug(response)
if response:
previous = current
interval = response.interval
Expand Down Expand Up @@ -233,9 +236,6 @@ def data(self):
blocks_data = [b.data for b in retrieved]
return b''.join(blocks_data)

# The type used for keeping track of pending request that can be re-issued
PendingRequest = namedtuple('PendingRequest', ['block', 'added'])


class PieceManager:
"""
Expand Down Expand Up @@ -322,6 +322,13 @@ def bytes_uploaded(self) -> int:
# TODO Add support for sending data
return 0

def add_peer_without_bitfield(self, peer_id):
"""
Adds a peer without requiring a bitfield(Note sending a bitfield is optional.).
"""
bitfield = bitstring.BitArray(int=0, length=self.total_pieces)
self.peers[peer_id] = bitfield

def add_peer(self, peer_id, bitfield):
"""
Adds a peer and the bitfield representing the pieces the peer has.
Expand Down Expand Up @@ -369,7 +376,11 @@ def next_request(self, peer_id) -> Block:
if not block:
block = self._next_ongoing(peer_id)
if not block:
block = self._get_rarest_piece(peer_id).next_request()
piece = self._get_rarest_piece(peer_id)
if piece == None:
return None
else:
block = piece.next_request()
return block

def block_received(self, peer_id, piece_index, block_offset, data):
Expand All @@ -389,8 +400,8 @@ def block_received(self, peer_id, piece_index, block_offset, data):

# Remove from pending requests
for index, request in enumerate(self.pending_blocks):
if request.block.piece == piece_index and \
request.block.offset == block_offset:
if request['block'].piece == piece_index and \
request['block'].offset == block_offset:
del self.pending_blocks[index]
break

Expand Down Expand Up @@ -428,15 +439,15 @@ def _expired_requests(self, peer_id) -> Block:
"""
current = int(round(time.time() * 1000))
for request in self.pending_blocks:
if self.peers[peer_id][request.block.piece]:
if request.added + self.max_pending_time < current:
if self.peers[peer_id][request['block'].piece]:
if request['added'] + self.max_pending_time < current:
logging.info('Re-requesting block {block} for '
'piece {piece}'.format(
block=request.block.offset,
piece=request.block.piece))
block=request['block'].offset,
piece=request['block'].piece))
# Reset expiration timer
request.added = current
return request.block
request['added'] = current
return request['block']
return None

def _next_ongoing(self, peer_id) -> Block:
Expand All @@ -450,7 +461,7 @@ def _next_ongoing(self, peer_id) -> Block:
block = piece.next_request()
if block:
self.pending_blocks.append(
PendingRequest(block, int(round(time.time() * 1000))))
{'block':block, 'added':int(round(time.time()*1000))})
return block
return None

Expand All @@ -468,6 +479,8 @@ def _get_rarest_piece(self, peer_id):
if self.peers[p][piece.index]:
piece_count[piece] += 1

if len(piece_count) == 0:
return None
rarest_piece = min(piece_count, key=lambda p: piece_count[p])
self.missing_pieces.remove(rarest_piece)
self.ongoing_pieces.append(rarest_piece)
Expand Down Expand Up @@ -499,3 +512,27 @@ def _write(self, piece):
pos = piece.index * self.torrent.piece_length
os.lseek(self.fd, pos, os.SEEK_SET)
os.write(self.fd, piece.data)

def _organize_files(self):
"""
Organize the file strucuture from a single file to as provided in meta_info.
"""
if not self.torrent.multi_file:
return
#if not self.complete:
# raise RuntimeError('organize_files called before completing download!')
pos = 0
for file in self.torrent.files:
os.lseek(self.fd, pos, os.SEEK_SET)
buffer = os.read(self.fd, file.length)
pos += file.length

os.makedirs(os.path.dirname(file.path), exist_ok=True)
tmp_fd = os.open(file.path, os.O_RDWR | os.O_CREAT)
os.write(tmp_fd, buffer)
os.close(tmp_fd)

# Remove the redundant tmp files
os.remove(self.torrent.output_file)

return
19 changes: 15 additions & 4 deletions pieces/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ async def _start(self):
# Sending BitField is optional and not needed when client does
# not have any pieces. Thus we do not send any bitfield message

# Some peers don't sent any BitField and rely on Have messages.
# Thus we add a peer wihout any BitField.
self.piece_manager.add_peer_without_bitfield(self.remote_id)
# The default state for a connection is that peer is not
# interested and we are choked
self.my_state.append('choked')
Expand All @@ -116,6 +119,7 @@ async def _start(self):
# Start reading responses as a stream of messages for as
# long as the connection is open and data is transmitted
async for message in PeerStreamIterator(self.reader, buffer):
logging.debug('Message received is of type {}'.format(type(message)))
if 'stopped' in self.my_state:
break
if type(message) is BitField:
Expand Down Expand Up @@ -154,9 +158,10 @@ async def _start(self):
if 'choked' not in self.my_state:
if 'interested' in self.my_state:
if 'pending_request' not in self.my_state:
self.my_state.append('pending_request')
await self._request_piece()

chk = await self._request_piece()
if chk:
self.my_state.append('pending_request')

except ProtocolError as e:
logging.exception('Protocol error')
except (ConnectionRefusedError, TimeoutError):
Expand Down Expand Up @@ -207,6 +212,9 @@ async def _request_piece(self):

self.writer.write(message)
await self.writer.drain()
return True
else:
return False

async def _handshake(self):
"""
Expand Down Expand Up @@ -316,9 +324,12 @@ def parse(self):
message_length = struct.unpack('>I', self.buffer[0:4])[0]

if message_length == 0:
logging.debug('Got a KeepAlive message')
# Call consume
self.buffer = self.buffer[header_length + message_length:]
return KeepAlive()

if len(self.buffer) >= message_length:
if len(self.buffer) >= message_length+header_length:
message_id = struct.unpack('>b', self.buffer[4:5])[0]

def _consume():
Expand Down
43 changes: 33 additions & 10 deletions pieces/torrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

from hashlib import sha1
from collections import namedtuple
import os

from . import bencoding

# Represents the files within the torrent (i.e. the files to write to disk)
TorrentFile = namedtuple('TorrentFile', ['name', 'length'])
TorrentFile = namedtuple('TorrentFile', ['path', 'length'])


class Torrent:
Expand All @@ -47,19 +48,35 @@ def _identify_files(self):
Identifies the files included in this torrent
"""
if self.multi_file:
# TODO Add support for multi-file torrents
raise RuntimeError('Multi-file torrents is not supported!')
self.files.append(
TorrentFile(
self.meta_info[b'info'][b'name'].decode('utf-8'),
self.meta_info[b'info'][b'length']))
name = self.meta_info[b'info'][b'name'].decode('utf-8')
if not os.path.exists(name):
os.mkdir(name);
for file in self.meta_info[b'info'][b'files']:
curr_path = (b'/'.join(file[b'path'])).decode('utf-8')
curr_path = os.path.join(self.meta_info[b'info'][b'name'].decode('utf-8'), curr_path)
self.files.append(TorrentFile(curr_path, file[b'length']))
else:
self.files.append(
TorrentFile(
self.meta_info[b'info'][b'name'].decode('utf-8'),
self.meta_info[b'info'][b'length']))

@property
def announce(self) -> str:
"""
The announce URL to the tracker.
"""
return self.meta_info[b'announce'].decode('utf-8')

@property
def announce_list(self) -> list:
"""
Returns announce list of trackers.
"""
if b'announce-list' in self.meta_info.keys():
return [[a.decode('utf-8') for a in x] for x in self.meta_info[b'announce-list']]
else:
return None

@property
def multi_file(self) -> bool:
Expand All @@ -86,8 +103,9 @@ def total_size(self) -> int:
:return: The total size (in bytes) for this torrent's data.
"""
if self.multi_file:
raise RuntimeError('Multi-file torrents is not supported!')
return self.files[0].length
return sum(file.length for file in self.files)
else:
return self.files[0].length

@property
def pieces(self):
Expand All @@ -106,7 +124,12 @@ def pieces(self):

@property
def output_file(self):
return self.meta_info[b'info'][b'name'].decode('utf-8')
if self.multi_file:
if not os.path.exists('.pieces_tmp'):
os.mkdir('.pieces_tmp')
return os.path.join('.pieces_tmp', 'tmp')
else:
return self.meta_info[b'info'][b'name'].decode('utf-8')

def __str__(self):
return 'Filename: {0}\n' \
Expand Down
19 changes: 3 additions & 16 deletions pieces/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def peers(self):
# where the peers field is a list of dictionaries and one where all
# the peers are encoded in a single string
peers = self.response[b'peers']
logging.debug('Type of peers is {0}, and len is {1}'.format(type(peers), len(peers)))
if type(peers) == list:
# TODO Implement support for dictionary peer list
logging.debug('Dictionary model peers are returned by tracker')
Expand Down Expand Up @@ -144,6 +145,8 @@ async def connect(self,
params['event'] = 'started'

url = self.torrent.announce + '?' + urlencode(params)
logging.debug(self.torrent.announce_list)
logging.debug(self.torrent.announce)
logging.info('Connecting to tracker at: ' + url)

async with self.http_client.get(url) as response:
Expand Down Expand Up @@ -171,22 +174,6 @@ def raise_for_error(self, tracker_response):
except UnicodeDecodeError:
pass

def _construct_tracker_parameters(self):
"""
Constructs the URL parameters used when issuing the announce call
to the tracker.
"""
return {
'info_hash': self.torrent.info_hash,
'peer_id': self.peer_id,
'port': 6889,
# TODO Update stats when communicating with tracker
'uploaded': 0,
'downloaded': 0,
'left': 0,
'compact': 1}


def _calculate_peer_id():
"""
Calculate and return a unique Peer ID.
Expand Down