Skip to content

Commit acaade1

Browse files
committed
WIP
1 parent 676449b commit acaade1

1 file changed

Lines changed: 80 additions & 60 deletions

File tree

src/pclient/client.py

Lines changed: 80 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ def __init__(
128128
self._me = Path(__file__).resolve()
129129

130130
self._logger.debug(
131-
f"__init()__ values for {self._me!s} process {self._pid}:"
131+
f"[{self._pid}]"
132+
f" __init()__ values for {self._me!s}:"
132133
f" URL: {self._url!s}"
133134
f" output: {self._output!s}"
134135
f" working dir: {self._working_dir!s}"
@@ -162,13 +163,14 @@ async def _master_fetch(self) -> None:
162163
with tempfile.TemporaryDirectory(dir=work_dir) as wd:
163164
cwd = Path(wd)
164165
os.chdir(cwd)
166+
self._logger.debug(f"[{self._pid}] Working directory is now {wd}")
165167
if self._byte_range:
166168
raise ValueError(
167169
"_master_fetch can only be called from parent process"
168170
)
169171
# Truncate output file
170172
with output.open("wb") as f:
171-
pass
173+
f.truncate()
172174
self._lock.acquire()
173175
self._ordinal = 1 # Reset event counter
174176
self._timings = [] # and timings
@@ -178,8 +180,8 @@ async def _master_fetch(self) -> None:
178180
await self._get_filesize()
179181
if self._filesize is None:
180182
self._logger.warning(
181-
f"{url} does not support byte ranges; single-threaded"
182-
f" output in {output}"
183+
f"[{self._pid}] {url} does not support byte ranges;"
184+
f" single-threaded output in {output}"
183185
)
184186
else:
185187
await self._parallel_fetch()
@@ -238,7 +240,9 @@ async def _get_filesize(self) -> None:
238240
ev2 = f"Get range for {url}"
239241
self._start_stamp(ev2)
240242
headers = {"Range": "bytes=0-0"}
241-
self._logger.debug(f"About to fetch {url} for filesize")
243+
self._logger.debug(
244+
f"[{self._pid}] About to fetch {url} for filesize"
245+
)
242246
r = await c.get(url, headers=headers)
243247
self._end_stamp(ev2)
244248
r.raise_for_status()
@@ -266,28 +270,29 @@ async def _get_filesize(self) -> None:
266270
self._end_stamp(event)
267271
if self._filesize and self._filesize != filesize:
268272
self._logger.warning(
269-
f"Stored filesize {self._filesize} != {filesize}"
273+
f"[{self._pid}] Stored filesize {self._filesize}"
274+
f" != {filesize}"
270275
)
271276
self._filesize = filesize
277+
self._logger.debug(f"[{self._pid}] Stored filesize {filesize}")
272278

273279
async def _chunk_range(self, inp: _ByteRange) -> None:
274280
event = f"Chunking range {inp}"
275281
self._start_stamp(event)
276-
size = inp.last - inp.first
277-
offset = 0
282+
offset = inp.first
278283
chunks: collections.deque[_ByteRange] = collections.deque()
279-
while offset <= size:
280-
if offset + self._chunk_size < size:
284+
while offset <= inp.last:
285+
if offset + self._chunk_size < inp.last:
281286
end = offset + self._chunk_size - 1
282287
else:
283-
end = size
288+
end = inp.last
284289
chunk = _ByteRange(first=offset, last=end)
285290
offset = offset + self._chunk_size
286291
chunks.append(chunk)
287292
self._lock.acquire()
288293
self._chunks = chunks
289294
self._lock.release()
290-
self._logger.debug(f"Chunks: {self._chunks}")
295+
self._logger.debug(f"[{self._pid}] Chunks: {self._chunks}")
291296
self._end_stamp(event)
292297

293298
async def _write_file(self, output: Path, r: httpx.Response) -> None:
@@ -301,19 +306,20 @@ async def _write_file(self, output: Path, r: httpx.Response) -> None:
301306
self._end_stamp(event)
302307
return
303308
with output.open("wb") as f:
304-
written = 0
305309
async for data in r.aiter_bytes():
306-
while written < len(data):
307-
written += f.write(data[written:])
310+
self._logger.debug(
311+
f"[{self._pid}] writing {len(data)} bytes to {output!s}"
312+
)
313+
f.write(data)
308314
self._end_stamp(event)
309315

310316
def _start_stamp(self, event: str) -> None:
311-
self._logger.debug(f"Start {event}")
317+
self._logger.debug(f"[{self._pid}] Start {event}")
312318
self._stamp(event=event, start=True)
313319

314320
def _end_stamp(self, event: str) -> None:
315321
self._stamp(event=event, start=False)
316-
self._logger.debug(f"Stop {event}")
322+
self._logger.debug(f"[{self._pid}] Stop {event}")
317323

318324
def _stamp(self, event: str, *, start: bool) -> None:
319325
now = datetime.datetime.now(tz=datetime.UTC)
@@ -373,13 +379,16 @@ def _add_durations(self) -> None:
373379
del e_d[ev_id]
374380
if e_d:
375381
self._logger.warning(
376-
f"Found start but no finish for {list(e_d.keys())}"
382+
f"[{self._pid}] Found start but no finish"
383+
f" for {list(e_d.keys())}"
377384
)
378385

379386
async def _divide_subprocs(self, byte_range: _ByteRange) -> None:
380-
size = byte_range.last - byte_range.first
387+
size = byte_range.last - byte_range.first + 1
381388
if self._max_procs < 2:
382-
self._logger.warning("Cannot subdivide process any further.")
389+
self._logger.warning(
390+
f"[{self._pid}] Cannot subdivide process any further."
391+
)
383392
return
384393
chunks_per_process = int(size / (self._max_procs * self._chunk_size))
385394
leftover_chunks = int(
@@ -432,7 +441,9 @@ async def _spawn_subprocs(
432441
if self._debug:
433442
args.append("--debug")
434443

435-
self._logger.debug(f"Spawn subprocess: {self._me!s} {args}")
444+
self._logger.debug(
445+
f"[{self._pid}] Spawn subprocess: {self._me!s} {args}"
446+
)
436447
proc = await asyncio.subprocess.create_subprocess_exec(
437448
str(self._me), *args
438449
)
@@ -441,7 +452,8 @@ async def _spawn_subprocs(
441452
await proc.wait()
442453
if proc.returncode != 0:
443454
self._logger.error(
444-
f"Process {proc} exited with rc {proc.returncode}"
455+
f"[{self._pid}] Process {proc} exited with"
456+
f" rc {proc.returncode}"
445457
)
446458

447459
async def _fetch_singleproc_parts(self, byte_range: _ByteRange) -> None:
@@ -457,48 +469,63 @@ async def _fetch_singleproc_parts(self, byte_range: _ByteRange) -> None:
457469
self._start_stamp(event)
458470
await self._chunk_range(byte_range)
459471
while True:
460-
self._logger.debug("Acquiring lock to get chunk")
472+
self._logger.debug(f"[{self._pid}] Acquiring lock to get chunk")
461473
self._lock.acquire()
462474
if len(self._chunks) < 1 and len(tasks) < 1:
463475
# We have handled each chunk and all our tasks are done.
464-
self._logger.debug("Chunks and tasks finished; releasing lock")
476+
self._logger.debug(
477+
f"[{self._pid}] Chunks and tasks finished; releasing lock"
478+
)
465479
self._lock.release()
466480
break
467481
tl = len(tasks)
468-
self._logger.debug(f"{tl} task[s] exist[s]")
482+
self._logger.debug(f"[{self._pid}] {tl} task[s] exist[s]")
469483
while tl < self._max_threads:
470484
if len(self._chunks) > 0:
471485
chunk = self._chunks.popleft()
472-
self._logger.debug(f"Got chunk {chunk} to work on")
486+
self._logger.debug(
487+
f"[{self._pid}] Got chunk {chunk} to work on"
488+
)
473489
tasks.add(
474490
asyncio.create_task(
475491
self._fetch_range(chunk, base_file)
476492
)
477493
)
478-
self._logger.debug(f"Added task for {chunk}, {base_file}")
494+
self._logger.debug(
495+
f"[{self._pid}] Added task for {chunk}, {base_file}"
496+
)
479497
tl += 1
480498
else:
481499
# If we run out of chunks, we can't refill the thread
482500
# count.
483-
self._logger.debug("Chunks exhausted")
501+
self._logger.debug(f"[{self._pid}] Chunks exhausted")
484502
break
485503
# Check each task for completion
486504
remove = set()
487505
for task in tasks:
488-
self._logger.debug(f"Checking task {task} for completion")
506+
self._logger.debug(
507+
f"[{self._pid}] Checking task {task} for completion"
508+
)
489509
if task.done():
490-
self._logger.debug(f"Task {task} complete")
510+
self._logger.debug(f"[{self._pid}] Task {task} complete")
491511
remove.add(task) # Don't alter the set while iterating it.
492512
for task in remove:
493-
self._logger.debug(f"Removing completed task {task}")
513+
self._logger.debug(
514+
f"[{self._pid}] Removing completed task {task}"
515+
)
494516
tasks.remove(task)
495-
self._logger.debug(f"{len(tasks)} remain[s]; releasing lock.")
517+
self._logger.debug(
518+
f"[{self._pid}] {len(tasks)} task[s] remain[s];"
519+
" releasing lock."
520+
)
496521
self._lock.release()
497522
if len(tasks) == self._max_threads:
498-
self._logger.debug("All task slots are full; waiting.")
523+
self._logger.debug(
524+
f"[{self._pid}] All task slots are full; waiting."
525+
)
499526
# Pause a bit.
500527
await asyncio.sleep(0.1)
501-
self._logger.debug("Left chunk task loop.")
528+
self._logger.debug(f"[{self._pid}] Left chunk task loop.")
502529
self._end_stamp(event)
503530
if self._report:
504531
await self._generate_report()
@@ -517,6 +544,8 @@ async def _fetch_range(
517544
directory where we want to write this file.
518545
"""
519546
url = self._url
547+
if self._filesize is None:
548+
self._logger.debug(f"[{self._pid}] filesize is unknown; using 1e9")
520549
filesize = self._filesize or 1e9
521550
digits = len(str(filesize))
522551
n_format = f"{{:0{digits}d}}"
@@ -543,6 +572,7 @@ async def _reassemble_file_parts(
543572
event = f"Reassemble {base_name}"
544573
self._start_stamp(event)
545574
parts = Path().glob(f"{base_name}.*.part")
575+
self._logger.debug(f"[{self._pid}] file parts: {parts}")
546576
async with asyncio.TaskGroup() as tg:
547577
tasks: set[asyncio.Task] = set()
548578
for partfile in parts:
@@ -561,43 +591,32 @@ async def _write_file_chunk_to_target(self, inp: Path, outp: Path) -> None:
561591
if len(parts) < 3 or parts[-1] != "part":
562592
raise ValueError(f"{fname} doesn't look like a part file")
563593
first, last = map(int, parts[-2].split("-"))
564-
size = last - first
565-
read_size = size + 1
594+
size = last - first + 1
566595
if size > self._chunk_size:
567-
self._logger.warning(f"Chunk size {size} > {self._chunk_size}")
568-
read_size = self._chunk_size
569-
self._copy_file_part(inp, outp, read_size, offset=first)
596+
self._logger.warning(
597+
f"[{self._pid}] Chunk size {size} > {self._chunk_size}"
598+
)
599+
size = self._chunk_size
600+
self._copy_file_part(inp, outp, size, offset=first)
570601
self._end_stamp(event)
571602

572603
def _copy_file_part(
573604
self, inp: Path, outp: Path, read_size: int, offset: int
574605
) -> None:
575606
"""Copy a part file into the target file at a specific offset."""
576607
# Old-school! Feels like I'm doing file I/O in C!
577-
event = f"copy file part at {offset}; {read_size} bytes"
608+
event = (f"copy file part {inp!s} ({read_size} bytes) to"
609+
f" {outp!s} at {offset})")
578610
self._start_stamp(event)
579611
with outp.open("ab") as outp_f:
580612
outp_f.seek(offset)
581-
with inp.open("rb") as inp_f:
582-
while True:
583-
data = inp_f.read(read_size)
584-
self._logger.debug(f"Read {len(data)} bytes from {inp!s}")
585-
if not data:
586-
self._logger.debug("Out of data; leaving")
587-
break
588-
while data:
589-
written = outp_f.write(data)
590-
self._logger.debug(
591-
f"Wrote {written} bytes to {outp!s} at {offset}"
592-
)
593-
if written < len(data):
594-
self._logger.warning(
595-
f"Short write to {outp!s}:"
596-
f" {written}/{len(data)}"
597-
)
598-
data = data[written:]
599-
else:
600-
break
613+
data = inp.read_bytes()
614+
self._logger.debug(
615+
f"[{self._pid}] Read {len(data)} bytes from {inp!s}"
616+
)
617+
bw = outp.write_bytes(data)
618+
if bw < len(data):
619+
raise ValueError(f"Short write: {bw}/{len(data)} to {outp!s}")
601620
self._end_stamp(event)
602621

603622

@@ -700,6 +719,7 @@ def main() -> None:
700719
report=args.report,
701720
debug=args.debug,
702721
byte_range=args.byte_range,
722+
filesize=args.filesize,
703723
)
704724

705725

0 commit comments

Comments
 (0)