Skip to content

Commit 320f78e

Browse files
committed
fix(HttpWorker): 修复子工作线程进度计算和请求头处理逻辑
1 parent 563e3ae commit 320f78e

1 file changed

Lines changed: 25 additions & 16 deletions

File tree

features/http_pack/task.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,15 @@ def reassignSubworker(self):
127127
if self.stage.fileSize <= 0:
128128
return
129129

130-
slowestSubworker = max(self.subworkers, key=lambda subworker: subworker.end - subworker.progress)
131-
remainingBytes = slowestSubworker.end - slowestSubworker.progress
130+
slowestSubworker = max(self.subworkers, key=lambda subworker: subworker.end - subworker.progress + 1)
131+
remainingBytes = slowestSubworker.end - slowestSubworker.progress + 1
132132
if remainingBytes < cfg.maxReassignSize.value * 1048576:
133133
return
134134
base = remainingBytes // 2
135135
remainder = remainingBytes % 2
136-
slowestSubworker.end = slowestSubworker.progress + base + remainder
137-
newSubworker = HttpSubworker(slowestSubworker.end + 1, slowestSubworker.end + 1, slowestSubworker.end + base)
136+
oldEnd = slowestSubworker.end
137+
slowestSubworker.end = slowestSubworker.progress + base + remainder - 1
138+
newSubworker = HttpSubworker(slowestSubworker.end + 1, slowestSubworker.end + 1, oldEnd)
138139
self.subworkers.insert(self.subworkers.index(slowestSubworker) + 1, newSubworker)
139140
self.taskGroup.create_task(self.handleSubworker(newSubworker))
140141

@@ -168,8 +169,9 @@ async def handleSubworker(self, subworker: HttpSubworker):
168169

169170
await cfg.checkSpeedLimitation()
170171
pwrite(self.fileHandle, chunk, subworker.progress)
171-
subworker.progress += 65536
172-
cfg.globalSpeed += 65536
172+
chunkSize = len(chunk)
173+
subworker.progress += chunkSize
174+
cfg.globalSpeed += chunkSize
173175
finally:
174176
await res.close()
175177

@@ -186,10 +188,12 @@ async def handleSubworker(self, subworker: HttpSubworker):
186188
try:
187189
ftruncate(self.fileHandle, 0)
188190
subworker.progress = 0
191+
requestHeaders = self.requestHeaders.copy()
192+
requestHeaders.pop("range", None)
189193

190194
res = await self.client.get(
191195
self.stage.url,
192-
headers=self.requestHeaders,
196+
headers=requestHeaders,
193197
cookies=self.requestCookies,
194198
proxies=self.stage.proxies,
195199
verify=cfg.SSLVerify.value,
@@ -198,17 +202,18 @@ async def handleSubworker(self, subworker: HttpSubworker):
198202
)
199203
try:
200204
res.raise_for_status()
201-
if res.status_code not in {200, 206}:
205+
if res.status_code != 200:
202206
raise Exception(f"服务器返回了异常状态码:{res.status_code}")
203207

204-
async for chunk in await res.iter_raw(chunk_size=65536):
208+
async for chunk in await res.iter_content(chunk_size=65536):
205209
if not chunk:
206210
continue
207211

208212
await cfg.checkSpeedLimitation()
209213
pwrite(self.fileHandle, chunk, subworker.progress)
210-
subworker.progress += 65536
211-
cfg.globalSpeed += 65536
214+
chunkSize = len(chunk)
215+
subworker.progress += chunkSize
216+
cfg.globalSpeed += chunkSize
212217
finally:
213218
await res.close()
214219

@@ -221,7 +226,7 @@ async def handleSubworker(self, subworker: HttpSubworker):
221226
)
222227
await asyncio.sleep(5)
223228
else: # 正常下载
224-
while subworker.progress < subworker.end:
229+
while subworker.progress <= subworker.end:
225230
try:
226231
res = await self.client.get(
227232
self.stage.url,
@@ -240,20 +245,24 @@ async def handleSubworker(self, subworker: HttpSubworker):
240245
async for chunk in await res.iter_raw(chunk_size=65536):
241246
if not chunk:
242247
continue
248+
remainingBytes = subworker.end - subworker.progress + 1
249+
if len(chunk) > remainingBytes:
250+
chunk = chunk[:remainingBytes]
243251
await cfg.checkSpeedLimitation()
244252
offset = subworker.progress
245253
pwrite(self.fileHandle, chunk, offset)
246-
subworker.progress += 65536
247-
cfg.globalSpeed += 65536
248-
if subworker.progress >= subworker.end:
254+
chunkSize = len(chunk)
255+
subworker.progress += chunkSize
256+
cfg.globalSpeed += chunkSize
257+
if subworker.progress > subworker.end:
249258
break
250259
except Exception as e:
251260
raise e
252261
finally:
253262
await res.close()
254263

255264
if subworker.progress > subworker.end:
256-
subworker.progress = subworker.end
265+
subworker.progress = subworker.end + 1
257266

258267
except Exception as e:
259268
logger.opt(exception=e).error(

0 commit comments

Comments
 (0)