11import abc
22import math
3+ import time
34from datetime import datetime
45from multiprocessing import Pool
56from pathlib import Path
67from typing import Iterator , cast
78
9+ import elasticsearch
810import tqdm
911from elasticsearch import Elasticsearch
1012from elasticsearch .helpers import bulk , parallel_bulk
@@ -523,6 +525,16 @@ def run_items_import(
523525 index = generate_index_object (next_index , config )
524526 # create the index
525527 index .save (using = es_client )
528+ # it may take some time to create the index
529+ for i in range (60 ):
530+ try :
531+ index .refresh ()
532+ break
533+ except elasticsearch .NotFoundError :
534+ logger .info ("Index not ready, waiting 10 seconds" )
535+ time .sleep (10 )
536+ else :
537+ raise RuntimeError ("Index not ready after 600 seconds" )
526538 else :
527539 # use current index
528540 next_index = config .index .name
@@ -543,7 +555,15 @@ def run_items_import(
543555 # run in parallel
544556 num_errors = 0
545557 with Pool (num_processes ) as pool :
546- for i , success , errors in pool .starmap (import_parallel , args ):
558+ if num_processes > 1 :
559+ logger .info ("Running in parallel with %d processes" , num_processes )
560+ result_iter = iter (pool .starmap (import_parallel , args ))
561+ else :
562+ # run sequentially, it's easier to debug if we need it
563+ # we won't use the pool in this case
564+ logger .info ("Running in a single processes" )
565+ result_iter = iter (map (lambda a : import_parallel (* a ), args ))
566+ for i , success , errors in result_iter :
547567 # Note: we log here instead of in sub-process because
548568 # it's easier to avoid mixing logs, and it works better for pytest
549569 logger .info ("[%d] Indexed %d documents" , i , success )
0 commit comments