4
4
5
5
import asyncio
6
6
import re
7
- import time
8
7
import traceback
9
8
import urllib .parse
10
9
from datetime import datetime
11
10
from typing import TYPE_CHECKING , Optional , Tuple , List , Dict , Any , Union
12
11
from uuid import UUID , uuid4
13
12
13
+ from remotezip import RemoteIOError
14
+
14
15
from fastapi import Depends , HTTPException , Request , Response
15
16
import pymongo
16
17
@@ -81,50 +82,75 @@ async def set_ops(self, background_job_ops: BackgroundJobOps):
81
82
"""Set ops classes as needed"""
82
83
self .background_job_ops = background_job_ops
83
84
84
- async def add_crawl_pages_to_db_from_wacz (self , crawl_id : str , batch_size = 100 ):
85
+ async def add_crawl_pages_to_db_from_wacz (
86
+ self , crawl_id : str , batch_size = 100 , num_retries = 5
87
+ ):
85
88
"""Add pages to database from WACZ files"""
86
89
pages_buffer : List [Page ] = []
87
- try :
88
- crawl = await self .crawl_ops .get_crawl_out (crawl_id )
89
- stream = await self .storage_ops .sync_stream_wacz_pages (
90
- crawl .resources or []
91
- )
92
- new_uuid = crawl .type == "upload"
93
- seed_count = 0
94
- non_seed_count = 0
95
- for page_dict in stream :
96
- if not page_dict .get ("url" ):
97
- continue
90
+ retry = 0
91
+ while True :
92
+ try :
93
+ crawl = await self .crawl_ops .get_crawl_out (crawl_id )
94
+ stream = await self .storage_ops .sync_stream_wacz_pages (
95
+ crawl .resources or []
96
+ )
97
+ new_uuid = crawl .type == "upload"
98
+ seed_count = 0
99
+ non_seed_count = 0
100
+ for page_dict in stream :
101
+ if not page_dict .get ("url" ):
102
+ continue
103
+
104
+ page_dict ["isSeed" ] = page_dict .get ("isSeed" ) or page_dict .get (
105
+ "seed"
106
+ )
98
107
99
- page_dict ["isSeed" ] = page_dict .get ("isSeed" ) or page_dict .get ("seed" )
108
+ if page_dict .get ("isSeed" ):
109
+ seed_count += 1
110
+ else :
111
+ non_seed_count += 1
100
112
101
- if page_dict .get ("isSeed" ):
102
- seed_count += 1
103
- else :
104
- non_seed_count += 1
113
+ if len (pages_buffer ) > batch_size :
114
+ await self ._add_pages_to_db (crawl_id , pages_buffer )
115
+ pages_buffer = []
116
+
117
+ pages_buffer .append (
118
+ self ._get_page_from_dict (
119
+ page_dict , crawl_id , crawl .oid , new_uuid
120
+ )
121
+ )
105
122
106
- if len (pages_buffer ) > batch_size :
123
+ # Add any remaining pages in buffer to db
124
+ if pages_buffer :
107
125
await self ._add_pages_to_db (crawl_id , pages_buffer )
108
- pages_buffer = []
109
126
110
- pages_buffer .append (
111
- self ._get_page_from_dict (page_dict , crawl_id , crawl .oid , new_uuid )
127
+ await self .set_archived_item_page_counts (crawl_id )
128
+
129
+ print (
130
+ f"Added pages for crawl { crawl_id } : "
131
+ + f"{ seed_count } Seed, { non_seed_count } Non-Seed" ,
132
+ flush = True ,
112
133
)
113
134
114
- # Add any remaining pages in buffer to db
115
- if pages_buffer :
116
- await self ._add_pages_to_db (crawl_id , pages_buffer )
135
+ except RemoteIOError as rio :
136
+ msg = str (rio )
137
+ if msg .startswith ("503" ) or msg .startswith ("429" ):
138
+ if retry < num_retries :
139
+ retry += 1
140
+ print (f"Retrying, { retry } of { num_retries } , { msg } " )
141
+ await asyncio .sleep (5 )
142
+ continue
117
143
118
- await self . set_archived_item_page_counts ( crawl_id )
144
+ print ( f"No more retries, { msg } " )
119
145
120
- print (
121
- f"Added pages for crawl { crawl_id } : { seed_count } Seed, { non_seed_count } Non-Seed" ,
122
- flush = True ,
123
- )
124
- # pylint: disable=broad-exception-caught, raise-missing-from
125
- except Exception as err :
126
- traceback . print_exc ()
127
- print ( f"Error adding pages for crawl { crawl_id } to db: { err } " , flush = True )
146
+ # pylint: disable=broad-exception-caught, raise-missing-from
147
+ except Exception as err :
148
+ traceback . print_exc ()
149
+ print (
150
+ f"Error adding pages for crawl { crawl_id } to db: { err } " , flush = True
151
+ )
152
+
153
+ break
128
154
129
155
def _get_page_from_dict (
130
156
self , page_dict : Dict [str , Any ], crawl_id : str , oid : UUID , new_uuid : bool
@@ -982,7 +1008,7 @@ async def process_finished_crawls():
982
1008
break
983
1009
984
1010
print ("Running crawls remain, waiting for them to finish" )
985
- time .sleep (30 )
1011
+ await asyncio .sleep (30 )
986
1012
987
1013
await process_finished_crawls ()
988
1014
@@ -994,7 +1020,7 @@ async def process_finished_crawls():
994
1020
if in_progress is None :
995
1021
break
996
1022
print ("Unmigrated crawls remain, finishing job" )
997
- time .sleep (5 )
1023
+ await asyncio .sleep (5 )
998
1024
999
1025
1000
1026
# ============================================================================
0 commit comments