Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/hope/admin/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def bulk_upload_individuals_photos(self, request: HttpRequest, pk: int) -> Templ
owner=request.user,
action="hope.admin.program.bulk_upload_individuals_photos_action",
config={"file_id": str(file_temp.pk)},
group_key=f"bulk_upload_individuals_photos:{program.pk}:{file_temp.pk}",
group_key="program",
description=f"Bulk upload individuals photos for program {program.pk}",
)
self.message_user(
Expand Down
14 changes: 7 additions & 7 deletions src/hope/admin/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from hope.admin.utils import HOPEModelAdminBase
from hope.apps.utils.security import is_root
from hope.contrib.aurora.celery_tasks import fresh_extract_records_task
from hope.contrib.aurora.celery_tasks import fresh_extract_records_async_task
from hope.contrib.aurora.models import Record, Registration
from hope.contrib.aurora.services.extract_record import extract
from hope.contrib.aurora.services.flex_registration_service import (
Expand Down Expand Up @@ -203,8 +203,8 @@ def get_queryset(self, request: HttpRequest) -> QuerySet:
@admin.action(description="Async extract")
def async_extract(self, request: HttpRequest, queryset: QuerySet) -> None:
try:
records_ids = queryset.values_list("id", flat=True)
fresh_extract_records_task.delay(list(records_ids))
records_ids = list(queryset.values_list("id", flat=True))
fresh_extract_records_async_task(records_ids)
self.message_user(
request,
f"Extracting data for {len(records_ids)} records",
Expand Down Expand Up @@ -249,7 +249,7 @@ def create_new_rdi(self, request: HttpRequest) -> HttpResponse:
rdi_name=f"{organization.slug} rdi {rdi_name}",
is_open=is_open,
)
create_task_for_processing_records(service, registration.pk, rdi.pk, list(records_ids))
create_task_for_processing_records(service, registration, rdi, list(records_ids))
url = reverse(
"admin:registration_data_registrationdataimport_change",
args=[rdi.pk],
Expand Down Expand Up @@ -287,8 +287,8 @@ def add_to_existing_rdi(self, request: HttpRequest) -> HttpResponse:
if request.method == "POST":
form = AmendRDIForm(request.POST, request=request)
if form.is_valid():
registration = form.cleaned_data["registration"]
rdi = form.cleaned_data.get("rdi")
registration: Registration = form.cleaned_data["registration"]
rdi: RegistrationDataImport | None = form.cleaned_data.get("rdi")
filters, exclude = form.cleaned_data["filters"]
ctx["filters"] = filters
ctx["exclude"] = exclude
Expand All @@ -300,7 +300,7 @@ def add_to_existing_rdi(self, request: HttpRequest) -> HttpResponse:
)
if records_ids := qs.values_list("id", flat=True):
try:
create_task_for_processing_records(service, registration.pk, rdi.pk, list(records_ids))
create_task_for_processing_records(service, registration, rdi, list(records_ids))
url = reverse(
"admin:registration_data_registrationdataimport_change",
args=[rdi.pk],
Expand Down
2 changes: 1 addition & 1 deletion src/hope/apps/account/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def invalidate_permissions_cache_for_user_if_expired_role_async_task() -> bool:
job_name=invalidate_permissions_cache_for_user_if_expired_role_async_task.__name__,
action="hope.apps.account.celery_tasks.invalidate_permissions_cache_for_user_if_expired_role_async_task_action",
config={},
group_key="invalidate_permissions_cache_for_user_if_expired_role_async_task",
group_key="account",
description="Invalidate permissions cache for users with expired roles",
)
return True
4 changes: 2 additions & 2 deletions src/hope/apps/accountability/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def export_survey_sample_async_task(survey: Survey, user: User) -> None:
program=survey.program,
action="hope.apps.accountability.celery_tasks.export_survey_sample_async_task_action",
config={"survey_id": survey_id, "user_id": user_id},
group_key=f"export_survey_sample_async_task:{survey_id}",
group_key="accountability",
description=f"Export survey sample for survey {survey_id}",
)

Expand All @@ -88,6 +88,6 @@ def send_survey_to_users_async_task(survey: Survey) -> None:
program=survey.program,
action="hope.apps.accountability.celery_tasks.send_survey_to_users_async_task_action",
config={"survey_id": survey_id},
group_key=f"send_survey_to_users_async_task:{survey_id}",
group_key="accountability",
description=f"Send survey to users for survey {survey_id}",
)
16 changes: 14 additions & 2 deletions src/hope/apps/core/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
DEFAULT_RECOVER_MISSING_ASYNC_JOBS_MAX_AGE_SECONDS = 12 * 60 * 60


class NonRetriableTaskError(Exception):
pass


def upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task_action(job: AsyncRetryJob) -> None:
from hope.apps.core.tasks.upload_new_template_and_update_flex_fields import ( # pragma: no cover
KoboRetriableError,
Expand Down Expand Up @@ -53,7 +57,7 @@ def upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task(x
job_name=upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task.__name__,
action="hope.apps.core.celery_tasks.upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task_action",
config={"xlsx_kobo_template_id": xlsx_kobo_template_id},
group_key=f"upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task:{xlsx_kobo_template_id}",
group_key="core",
description=f"Retry upload Kobo template {xlsx_kobo_template_id} and update flex fields",
)

Expand All @@ -77,7 +81,7 @@ def upload_new_kobo_template_and_update_flex_fields_async_task(xlsx_kobo_templat
job_name=upload_new_kobo_template_and_update_flex_fields_async_task.__name__,
action="hope.apps.core.celery_tasks.upload_new_kobo_template_and_update_flex_fields_async_task_action",
config={"xlsx_kobo_template_id": xlsx_kobo_template_id},
group_key=f"upload_new_kobo_template_and_update_flex_fields_async_task:{xlsx_kobo_template_id}",
group_key="core",
description=f"Upload Kobo template {xlsx_kobo_template_id} and update flex fields",
)

Expand Down Expand Up @@ -118,6 +122,14 @@ def async_retry_job_task(self: Any, pk: int, version: int | None = None, *args:
job.errors.pop(ASYNC_EXCEPTION_KEY, None)
job.save(update_fields=["errors"])
return result
except NonRetriableTaskError as exc:
job.errors = {
**job.errors,
ASYNC_EXCEPTION_KEY: str(exc),
}
job.save(update_fields=["errors"])
logger.warning(f"Async retry job action failed without retry for job {job.pk} ({job.action})")
raise
except Exception as exc:
job.errors = {
**job.errors,
Expand Down
8 changes: 4 additions & 4 deletions src/hope/apps/core/tasks_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
"task": "hope.apps.grievance.celery_tasks.periodic_grievances_notifications_async_task",
"schedule": crontab(minute="*/20"),
},
"extract_records_task": {
"task": "hope.contrib.aurora.celery_tasks.extract_records_task",
"extract_records_async_task": {
"task": "hope.contrib.aurora.celery_tasks.extract_records_async_task",
"schedule": crontab(minute=0, hour=0),
},
"remove_old_cash_plan_payment_verification_xlsx_async_task": {
"task": "hope.apps.payment.celery_tasks.remove_old_cash_plan_payment_verification_xlsx_async_task",
"schedule": crontab(minute=0, hour=0),
},
"clean_old_record_files_task": {
"task": "hope.contrib.aurora.celery_tasks.clean_old_record_files_task",
"clean_old_record_files_async_task": {
"task": "hope.contrib.aurora.celery_tasks.clean_old_record_files_async_task",
"schedule": crontab(minute=0, hour=0, day_of_month=1, month_of_year="2-12/2"),
},
"periodic_sync_payment_gateway_fsp_async_task": {
Expand Down
2 changes: 1 addition & 1 deletion src/hope/apps/generic_import/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,6 @@ def process_generic_import_async_task(
"registration_data_import_id": registration_data_import_id,
"import_data_id": import_data_id,
},
group_key=f"process_generic_import_async_task:{registration_data_import_id},{import_data_id}",
group_key="generic_import",
description=f"Process generic import for registration data import {registration_data_import_id}",
)
2 changes: 1 addition & 1 deletion src/hope/apps/geo/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def import_areas_from_csv_async_task(csv_data: str, delay_mptt_updates: bool = F
job_name=import_areas_from_csv_async_task.__name__,
action="hope.apps.geo.celery_tasks.import_areas_from_csv_async_task_action",
config={"csv_data": csv_data, "delay_mptt_updates": delay_mptt_updates},
group_key="import_areas_from_csv_async_task",
group_key="geo",
description="Import areas from CSV",
)

Expand Down
4 changes: 2 additions & 2 deletions src/hope/apps/grievance/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def deduplicate_and_check_against_sanctions_list_task_single_individual_async_ta
"should_populate_index": should_populate_index,
"individual_id": individual_id,
},
group_key=f"grievance_single_individual_deduplication:{individual_id}",
group_key="grievance",
description=f"Deduplicate and sanctions-check grievance individual {individual_id}",
)

Expand Down Expand Up @@ -112,6 +112,6 @@ def periodic_grievances_notifications_async_task() -> None:
job_name=periodic_grievances_notifications_async_task.__name__,
action="hope.apps.grievance.celery_tasks.periodic_grievances_notifications_async_task_action",
config={},
group_key="periodic_grievances_notifications_async_task",
group_key="grievance",
description="Send periodic grievance notifications",
)
21 changes: 8 additions & 13 deletions src/hope/apps/household/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from django.utils import timezone

from hope.apps.core.celery import app
from hope.apps.core.utils import stable_ids_hash
from hope.apps.household.documents import (
get_household_doc,
get_individual_doc,
Expand Down Expand Up @@ -71,7 +70,7 @@ def recalculate_population_fields_chunk_async_task(households_ids: list[str], pr
program_id=program_id,
action="hope.apps.household.celery_tasks.recalculate_population_fields_chunk_async_task_action",
config={"households_ids": households_ids, "program_id": program_id},
group_key=f"recalculate_population_fields_chunk_async_task:{program_id}:{stable_ids_hash(households_ids)}",
group_key="household",
description="Recalculate population fields chunk",
)

Expand Down Expand Up @@ -114,9 +113,7 @@ def recalculate_population_fields_async_task(household_ids: list[str], program_i
"household_ids": serialized_household_ids,
"program_id": program_id,
},
group_key=(
f"recalculate_population_fields_async_task:{program_id}:{stable_ids_hash(serialized_household_ids)}"
),
group_key="household",
description="Schedule population fields recalculation",
)

Expand All @@ -142,7 +139,7 @@ def interval_recalculate_population_fields_async_task() -> None:
AsyncJob.queue_task(
job_name=interval_recalculate_population_fields_async_task.__name__,
action="hope.apps.household.celery_tasks.interval_recalculate_population_fields_async_task_action",
group_key="interval_recalculate_population_fields_async_task",
group_key="household",
description="Run interval population fields recalculation",
)

Expand Down Expand Up @@ -186,7 +183,7 @@ def calculate_children_fields_for_not_collected_individual_data_async_task() ->
job_name=calculate_children_fields_for_not_collected_individual_data_async_task.__name__,
action="hope.apps.household.celery_tasks.calculate_children_fields_for_not_collected_individual_data_async_task_action",
config={},
group_key="calculate_children_fields_for_not_collected_individual_data_async_task",
group_key="household",
description="Calculate children fields for households",
)

Expand All @@ -208,7 +205,7 @@ def revalidate_phone_number_async_task(individual_ids: list[UUID]) -> None:
job_name=revalidate_phone_number_async_task.__name__,
action="hope.apps.household.celery_tasks.revalidate_phone_number_async_task_action",
config={"individual_ids": serialized_individual_ids},
group_key=f"revalidate_phone_number_async_task:{stable_ids_hash(serialized_individual_ids)}",
group_key="household",
description="Revalidate phone numbers for individuals",
)

Expand Down Expand Up @@ -263,7 +260,7 @@ def enroll_households_to_program_async_task(
"program_for_enroll_id": program_for_enroll_id,
"user_id": user_id,
},
group_key=f"enroll_households_to_program_async_task:{program_for_enroll_id}:{stable_ids_hash(households_ids)}",
group_key="household",
description=f"Enroll households to program {program_for_enroll_id}",
)

Expand All @@ -289,9 +286,7 @@ def mass_withdraw_households_from_list_async_task(
program_id=serialized_program_id,
action="hope.apps.household.celery_tasks.mass_withdraw_households_from_list_async_task_action",
config={"household_id_list": household_id_list, "tag": tag, "program_id": serialized_program_id},
group_key=(
f"mass_withdraw_households_from_list_async_task:{serialized_program_id}:{tag}:{stable_ids_hash(household_id_list)}"
),
group_key="household",
description=f"Mass withdraw households from list for program {serialized_program_id}",
)

Expand All @@ -314,6 +309,6 @@ def cleanup_indexes_in_inactive_programs_async_task() -> None:
job_name=cleanup_indexes_in_inactive_programs_async_task.__name__,
action="hope.apps.household.celery_tasks.cleanup_indexes_in_inactive_programs_async_task_action",
config={},
group_key="cleanup_indexes_in_inactive_programs_async_task",
group_key="household",
description="Cleanup indexes in inactive programs",
)
Loading
Loading