Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/hope/admin/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def bulk_upload_individuals_photos(self, request: HttpRequest, pk: int) -> Templ
owner=request.user,
action="hope.admin.program.bulk_upload_individuals_photos_action",
config={"file_id": str(file_temp.pk)},
group_key=f"bulk_upload_individuals_photos:{program.pk}:{file_temp.pk}",
group_key="program",
description=f"Bulk upload individuals photos for program {program.pk}",
)
self.message_user(
Expand Down
14 changes: 7 additions & 7 deletions src/hope/admin/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from hope.admin.utils import HOPEModelAdminBase
from hope.apps.utils.security import is_root
from hope.contrib.aurora.celery_tasks import fresh_extract_records_task
from hope.contrib.aurora.celery_tasks import fresh_extract_records_async_task
from hope.contrib.aurora.models import Record, Registration
from hope.contrib.aurora.services.extract_record import extract
from hope.contrib.aurora.services.flex_registration_service import (
Expand Down Expand Up @@ -203,8 +203,8 @@ def get_queryset(self, request: HttpRequest) -> QuerySet:
@admin.action(description="Async extract")
def async_extract(self, request: HttpRequest, queryset: QuerySet) -> None:
try:
records_ids = queryset.values_list("id", flat=True)
fresh_extract_records_task.delay(list(records_ids))
records_ids = list(queryset.values_list("id", flat=True))
fresh_extract_records_async_task(records_ids)
self.message_user(
request,
f"Extracting data for {len(records_ids)} records",
Expand Down Expand Up @@ -249,7 +249,7 @@ def create_new_rdi(self, request: HttpRequest) -> HttpResponse:
rdi_name=f"{organization.slug} rdi {rdi_name}",
is_open=is_open,
)
create_task_for_processing_records(service, registration.pk, rdi.pk, list(records_ids))
create_task_for_processing_records(service, registration, rdi, list(records_ids))
url = reverse(
"admin:registration_data_registrationdataimport_change",
args=[rdi.pk],
Expand Down Expand Up @@ -287,8 +287,8 @@ def add_to_existing_rdi(self, request: HttpRequest) -> HttpResponse:
if request.method == "POST":
form = AmendRDIForm(request.POST, request=request)
if form.is_valid():
registration = form.cleaned_data["registration"]
rdi = form.cleaned_data.get("rdi")
registration: Registration = form.cleaned_data["registration"]
rdi: RegistrationDataImport | None = form.cleaned_data.get("rdi")
filters, exclude = form.cleaned_data["filters"]
ctx["filters"] = filters
ctx["exclude"] = exclude
Expand All @@ -300,7 +300,7 @@ def add_to_existing_rdi(self, request: HttpRequest) -> HttpResponse:
)
if records_ids := qs.values_list("id", flat=True):
try:
create_task_for_processing_records(service, registration.pk, rdi.pk, list(records_ids))
create_task_for_processing_records(service, registration, rdi, list(records_ids))
url = reverse(
"admin:registration_data_registrationdataimport_change",
args=[rdi.pk],
Comment thread
MarekBiczysko marked this conversation as resolved.
Expand Down
2 changes: 1 addition & 1 deletion src/hope/apps/account/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def invalidate_permissions_cache_for_user_if_expired_role_async_task() -> bool:
job_name=invalidate_permissions_cache_for_user_if_expired_role_async_task.__name__,
action="hope.apps.account.celery_tasks.invalidate_permissions_cache_for_user_if_expired_role_async_task_action",
config={},
group_key="invalidate_permissions_cache_for_user_if_expired_role_async_task",
group_key="account",
description="Invalidate permissions cache for users with expired roles",
)
return True
4 changes: 2 additions & 2 deletions src/hope/apps/accountability/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def export_survey_sample_async_task(survey: Survey, user: User) -> None:
program=survey.program,
action="hope.apps.accountability.celery_tasks.export_survey_sample_async_task_action",
config={"survey_id": survey_id, "user_id": user_id},
group_key=f"export_survey_sample_async_task:{survey_id}",
group_key="accountability",
description=f"Export survey sample for survey {survey_id}",
)

Expand All @@ -88,6 +88,6 @@ def send_survey_to_users_async_task(survey: Survey) -> None:
program=survey.program,
action="hope.apps.accountability.celery_tasks.send_survey_to_users_async_task_action",
config={"survey_id": survey_id},
group_key=f"send_survey_to_users_async_task:{survey_id}",
group_key="accountability",
description=f"Send survey to users for survey {survey_id}",
)
16 changes: 14 additions & 2 deletions src/hope/apps/core/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
DEFAULT_RECOVER_MISSING_ASYNC_JOBS_MAX_AGE_SECONDS = 12 * 60 * 60


class NonRetriableTaskError(Exception):
pass


def upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task_action(job: AsyncRetryJob) -> None:
from hope.apps.core.tasks.upload_new_template_and_update_flex_fields import ( # pragma: no cover
KoboRetriableError,
Expand Down Expand Up @@ -53,7 +57,7 @@ def upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task(x
job_name=upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task.__name__,
action="hope.apps.core.celery_tasks.upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task_action",
config={"xlsx_kobo_template_id": xlsx_kobo_template_id},
group_key=f"upload_new_kobo_template_and_update_flex_fields_task_with_retry_async_task:{xlsx_kobo_template_id}",
group_key="core",
description=f"Retry upload Kobo template {xlsx_kobo_template_id} and update flex fields",
)

Expand All @@ -77,7 +81,7 @@ def upload_new_kobo_template_and_update_flex_fields_async_task(xlsx_kobo_templat
job_name=upload_new_kobo_template_and_update_flex_fields_async_task.__name__,
action="hope.apps.core.celery_tasks.upload_new_kobo_template_and_update_flex_fields_async_task_action",
config={"xlsx_kobo_template_id": xlsx_kobo_template_id},
group_key=f"upload_new_kobo_template_and_update_flex_fields_async_task:{xlsx_kobo_template_id}",
group_key="core",
description=f"Upload Kobo template {xlsx_kobo_template_id} and update flex fields",
)

Expand Down Expand Up @@ -118,6 +122,14 @@ def async_retry_job_task(self: Any, pk: int, version: int | None = None, *args:
job.errors.pop(ASYNC_EXCEPTION_KEY, None)
job.save(update_fields=["errors"])
return result
except NonRetriableTaskError as exc:
job.errors = {
**job.errors,
ASYNC_EXCEPTION_KEY: str(exc),
}
job.save(update_fields=["errors"])
logger.warning(f"Async retry job action failed without retry for job {job.pk} ({job.action})")
raise
except Exception as exc:
job.errors = {
**job.errors,
Expand Down
8 changes: 4 additions & 4 deletions src/hope/apps/core/tasks_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
"task": "hope.apps.grievance.celery_tasks.periodic_grievances_notifications_async_task",
"schedule": crontab(minute="*/20"),
},
"extract_records_task": {
"task": "hope.contrib.aurora.celery_tasks.extract_records_task",
"extract_records_async_task": {
"task": "hope.contrib.aurora.celery_tasks.extract_records_async_task",
"schedule": crontab(minute=0, hour=0),
},
"remove_old_cash_plan_payment_verification_xlsx_async_task": {
"task": "hope.apps.payment.celery_tasks.remove_old_cash_plan_payment_verification_xlsx_async_task",
"schedule": crontab(minute=0, hour=0),
},
"clean_old_record_files_task": {
"task": "hope.contrib.aurora.celery_tasks.clean_old_record_files_task",
"clean_old_record_files_async_task": {
"task": "hope.contrib.aurora.celery_tasks.clean_old_record_files_async_task",
"schedule": crontab(minute=0, hour=0, day_of_month=1, month_of_year="2-12/2"),
},
"periodic_sync_payment_gateway_fsp_async_task": {
Expand Down
2 changes: 1 addition & 1 deletion src/hope/apps/generic_import/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,6 @@ def process_generic_import_async_task(
"registration_data_import_id": registration_data_import_id,
"import_data_id": import_data_id,
},
group_key=f"process_generic_import_async_task:{registration_data_import_id},{import_data_id}",
group_key="generic_import",
description=f"Process generic import for registration data import {registration_data_import_id}",
)
2 changes: 1 addition & 1 deletion src/hope/apps/geo/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def import_areas_from_csv_async_task(csv_data: str, delay_mptt_updates: bool = F
job_name=import_areas_from_csv_async_task.__name__,
action="hope.apps.geo.celery_tasks.import_areas_from_csv_async_task_action",
config={"csv_data": csv_data, "delay_mptt_updates": delay_mptt_updates},
group_key="import_areas_from_csv_async_task",
group_key="geo",
description="Import areas from CSV",
)

Expand Down
4 changes: 2 additions & 2 deletions src/hope/apps/grievance/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def deduplicate_and_check_against_sanctions_list_task_single_individual_async_ta
"should_populate_index": should_populate_index,
"individual_id": individual_id,
},
group_key=f"grievance_single_individual_deduplication:{individual_id}",
group_key="grievance",
description=f"Deduplicate and sanctions-check grievance individual {individual_id}",
)

Expand Down Expand Up @@ -112,6 +112,6 @@ def periodic_grievances_notifications_async_task() -> None:
job_name=periodic_grievances_notifications_async_task.__name__,
action="hope.apps.grievance.celery_tasks.periodic_grievances_notifications_async_task_action",
config={},
group_key="periodic_grievances_notifications_async_task",
group_key="grievance",
description="Send periodic grievance notifications",
)
21 changes: 8 additions & 13 deletions src/hope/apps/household/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from django.utils import timezone

from hope.apps.core.celery import app
from hope.apps.core.utils import stable_ids_hash
from hope.apps.household.documents import (
get_household_doc,
get_individual_doc,
Expand Down Expand Up @@ -71,7 +70,7 @@ def recalculate_population_fields_chunk_async_task(households_ids: list[str], pr
program_id=program_id,
action="hope.apps.household.celery_tasks.recalculate_population_fields_chunk_async_task_action",
config={"households_ids": households_ids, "program_id": program_id},
group_key=f"recalculate_population_fields_chunk_async_task:{program_id}:{stable_ids_hash(households_ids)}",
group_key="household",
description="Recalculate population fields chunk",
)

Expand Down Expand Up @@ -114,9 +113,7 @@ def recalculate_population_fields_async_task(household_ids: list[str], program_i
"household_ids": serialized_household_ids,
"program_id": program_id,
},
group_key=(
f"recalculate_population_fields_async_task:{program_id}:{stable_ids_hash(serialized_household_ids)}"
),
group_key="household",
description="Schedule population fields recalculation",
)

Expand All @@ -142,7 +139,7 @@ def interval_recalculate_population_fields_async_task() -> None:
AsyncJob.queue_task(
job_name=interval_recalculate_population_fields_async_task.__name__,
action="hope.apps.household.celery_tasks.interval_recalculate_population_fields_async_task_action",
group_key="interval_recalculate_population_fields_async_task",
group_key="household",
description="Run interval population fields recalculation",
)

Expand Down Expand Up @@ -186,7 +183,7 @@ def calculate_children_fields_for_not_collected_individual_data_async_task() ->
job_name=calculate_children_fields_for_not_collected_individual_data_async_task.__name__,
action="hope.apps.household.celery_tasks.calculate_children_fields_for_not_collected_individual_data_async_task_action",
config={},
group_key="calculate_children_fields_for_not_collected_individual_data_async_task",
group_key="household",
description="Calculate children fields for households",
)

Expand All @@ -208,7 +205,7 @@ def revalidate_phone_number_async_task(individual_ids: list[UUID]) -> None:
job_name=revalidate_phone_number_async_task.__name__,
action="hope.apps.household.celery_tasks.revalidate_phone_number_async_task_action",
config={"individual_ids": serialized_individual_ids},
group_key=f"revalidate_phone_number_async_task:{stable_ids_hash(serialized_individual_ids)}",
group_key="household",
description="Revalidate phone numbers for individuals",
)

Expand Down Expand Up @@ -263,7 +260,7 @@ def enroll_households_to_program_async_task(
"program_for_enroll_id": program_for_enroll_id,
"user_id": user_id,
},
group_key=f"enroll_households_to_program_async_task:{program_for_enroll_id}:{stable_ids_hash(households_ids)}",
group_key="household",
description=f"Enroll households to program {program_for_enroll_id}",
)

Expand All @@ -289,9 +286,7 @@ def mass_withdraw_households_from_list_async_task(
program_id=serialized_program_id,
action="hope.apps.household.celery_tasks.mass_withdraw_households_from_list_async_task_action",
config={"household_id_list": household_id_list, "tag": tag, "program_id": serialized_program_id},
group_key=(
f"mass_withdraw_households_from_list_async_task:{serialized_program_id}:{tag}:{stable_ids_hash(household_id_list)}"
),
group_key="household",
description=f"Mass withdraw households from list for program {serialized_program_id}",
)

Expand All @@ -314,6 +309,6 @@ def cleanup_indexes_in_inactive_programs_async_task() -> None:
job_name=cleanup_indexes_in_inactive_programs_async_task.__name__,
action="hope.apps.household.celery_tasks.cleanup_indexes_in_inactive_programs_async_task_action",
config={},
group_key="cleanup_indexes_in_inactive_programs_async_task",
group_key="household",
description="Cleanup indexes in inactive programs",
)
Loading
Loading