1- from urllib .parse import urlsplit
2-
1+ from celery import group
32from django .db import transaction
4- from django .dispatch import receiver
53from django .db .models .signals import post_save
6-
7- import xram_memory .artifact .tasks as background_tasks
4+ from django .dispatch import receiver
5+ from retrying import retry
6+ from urllib .parse import urlsplit
87from xram_memory .artifact .models import Document , News , Newspaper
8+ from xram_memory .utils import celery_is_avaliable
9+ import random
10+ import xram_memory .artifact .tasks as background_tasks
911
1012
1113@receiver (post_save )
@@ -31,76 +33,116 @@ def set_mimetype_filesize_for_documents(sender, **kwargs):
3133 del instance ._save_in_signal
3234
3335
34- @receiver (post_save )
35- def news_add_newspaper (sender , ** kwargs ):
36- instance = kwargs ['instance' ]
37- if hasattr (instance , '_save_in_signal_add_newspaper' ):
38- return
39- if isinstance (instance , (News )) and not instance .newspaper :
40- instance ._save_in_signal_add_newspaper = True
36+ # TODO: mover para o modelo da notícia
37+ def associate_newspaper (news_instance : News ):
38+ """
39+ Com base na URL da notícia, associa ela com um jornal existente ou cria este jornal e, por fim, faz a associação.
40+ """
41+ news_instance ._save_in_signal = True
42+ try :
43+ base_url = "{uri.scheme}://{uri.netloc}" .format (
44+ uri = urlsplit (news_instance .url ))
45+ news_instance .newspaper = Newspaper .objects .get (url = base_url )
46+ news_instance .save ()
47+ except Newspaper .DoesNotExist :
48+ # crie um jornal (newspaper ) básico agora
49+ newspaper = None
4150 try :
42- base_url = "{uri.scheme}://{uri.netloc}" .format (
43- uri = urlsplit (instance .url ))
44- instance .newspaper = Newspaper .objects .get (url = base_url )
45- instance .save ()
46- except Newspaper .DoesNotExist :
47- # crie um jornal (newspaper ) básico agora
48- newspaper = None
49- try :
50- newspaper = Newspaper .objects .create (
51- title = base_url ,
52- url = base_url ,
53- created_by = instance .created_by ,
54- modified_by = instance .modified_by
55- )
56- except :
57- pass
58- else :
59- instance .newspaper = newspaper
60- instance .save ()
61- finally :
62- del instance ._save_in_signal_add_newspaper
51+ newspaper = Newspaper .objects .create (
52+ title = base_url ,
53+ url = base_url ,
54+ created_by = news_instance .created_by ,
55+ modified_by = news_instance .modified_by
56+ )
57+ except :
58+ pass
59+ else :
60+ news_instance .newspaper = newspaper
61+ news_instance .save ()
62+ finally :
63+ del news_instance ._save_in_signal
64+
65+
66+ def try_task (task , args ):
67+ """
68+ Emula o comportamento de tentar novamente do celery para uma tarefas que será executada sincronicamente.
69+ """
70+ expect_to_throw = tuple (getattr (task , 'throws' , ()))
71+ autoretry_for = tuple (getattr (task , 'autoretry_for' , ()))
72+ stop_max_attempt_number = 3
73+ wait_exponential_multiplier = 1000
74+ wait_exponential_max = 30 * 1000
6375
76+ def need_to_retry_for (exception ):
77+ return isinstance (exception , autoretry_for )
6478
65- @receiver (post_save )
66- def newspaper_add_basic_info (sender , ** kwargs ):
67- instance = kwargs ['instance' ]
68- # Não entre em loop infinito
69- if hasattr (instance , '_save_in_signal_newspaper_add_basic_info' ):
70- return
71- if isinstance (instance , (Newspaper )) and not instance .has_basic_info :
72- transaction .on_commit (lambda instance = instance : background_tasks .newspaper_set_basic_info .delay (
73- instance .pk ))
79+ @retry (stop_max_attempt_number = stop_max_attempt_number ,
80+ retry_on_exception = need_to_retry_for , wait_exponential_multiplier = wait_exponential_multiplier ,
81+ wait_exponential_max = wait_exponential_max )
82+ def retry_task (the_task , arguments ):
83+ the_task (* arguments )
7484
85+ retry_task (task , args )
7586
76- @receiver (post_save )
77- def news_add_basic_info (sender , ** kwargs ):
78- instance = kwargs ['instance' ]
79- # Não agende a captura em pdf se o sinal foi enviado durante o cadastro de um jornal
80- if hasattr (instance , '_save_in_signal_add_newspaper' ):
81- return
82- if isinstance (instance , (News )) and getattr (instance , '_set_basic_info' , False ):
83- transaction .on_commit (lambda instance = instance :
84- background_tasks .news_set_basic_info .delay (instance .pk ))
8587
88+ def determine_additional_tasks_to_run (news_instance , execute_async = True ):
89+ """
90+ Com base nas opções definidas pelo usuário, determine quais tarefas de processamento adicional executar.
91+ """
92+ fields_and_task_info = {
93+ '_set_basic_info' : (background_tasks .news_set_basic_info , (news_instance .pk , not execute_async )),
94+ '_fetch_archived_url' : (background_tasks .news_add_archived_url , (news_instance .pk ,)),
95+ '_add_pdf_capture' : (background_tasks .news_add_pdf_capture , (news_instance .pk ,)),
96+ }
97+ tasks = []
98+
99+ for field , task_info in fields_and_task_info .items ():
100+ if getattr (news_instance , field , False ):
101+ tasks .append (task_info )
86102
103+ return tasks
104+
105+ # Sinais para o processamento de News
87106@receiver (post_save )
88- def news_add_archived_url (sender , ** kwargs ):
107+ def news_additional_processing (sender , ** kwargs ):
108+ """
109+ De acorodo com as opções selecionadas pelo usuário, executa ou agenda tarefas para obter informações adicionais
110+ sobre determinada Notícia.
111+ """
89112 instance = kwargs ['instance' ]
90- # Não agende a captura em pdf se o sinal foi enviado durante o cadastro de um jornal
91- if hasattr (instance , '_save_in_signal_add_newspaper' ):
113+ if hasattr (instance , '_save_in_signal' ):
92114 return
93- if isinstance (instance , (News )) and getattr (instance , '_fetch_archived_url' , False ):
94- transaction .on_commit (lambda instance = instance :
95- background_tasks .news_add_archived_url .delay (instance .pk ))
115+ if isinstance (instance , News ):
116+ # Se esta notícia não tem jornal, associe ela a um
117+ if not instance .newspaper :
118+ associate_newspaper (instance )
119+
120+ execute_async = celery_is_avaliable ()
121+ tasks = determine_additional_tasks_to_run (instance , execute_async )
122+ if len (tasks ):
123+ if execute_async :
124+ transaction .on_commit (lambda instance = instance , tasks = tasks : group (
125+ [task .s (* args ) for task , args in tasks ]).apply_async ()
126+ )
127+ else :
128+ for task , args in tasks :
129+ transaction .on_commit (
130+ lambda task = task , args = args : try_task (task , args ))
96131
97132
133+ # Sinais para o processamento de Newspaper
98134@receiver (post_save )
99- def news_add_pdf_capture (sender , ** kwargs ):
135+ def newspaper_additional_processing (sender , ** kwargs ):
136+ """
137+ Agenda ou executa tarefa para obter informações básicas sobre um Jornal.
138+ """
100139 instance = kwargs ['instance' ]
101- # Não agende a captura em pdf se o sinal foi enviado durante o cadastro de um jornal
102- if hasattr (instance , '_save_in_signal_add_newspaper' ):
140+ if hasattr (instance , '_save_in_signal' ):
103141 return
104- if isinstance (instance , (News )) and getattr (instance , '_add_pdf_capture' , False ):
105- transaction .on_commit (lambda instance = instance :
106- background_tasks .news_add_pdf_capture .delay (instance .pk ))
142+ if isinstance (instance , Newspaper ) and not instance .has_basic_info :
143+ if celery_is_avaliable ():
144+ transaction .on_commit (
145+ lambda instance = instance : background_tasks .newspaper_set_basic_info .delay (instance .pk ))
146+ else :
147+ transaction .on_commit (
148+ lambda instance = instance : try_task (background_tasks .newspaper_set_basic_info , (instance .pk ,)))
0 commit comments