Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
900aeb9
account, accountability, core
Mar 12, 2026
56c9a05
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Mar 16, 2026
11e8c78
generic import, cov
Mar 16, 2026
c44fad9
Merge branch '207255_Celery_tasks_refactor' of github.com:unicef/hope…
Mar 16, 2026
451ce51
geo
Mar 16, 2026
7a1d54d
grievance
Mar 16, 2026
a9bece9
cov
Mar 17, 2026
5671f78
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Mar 17, 2026
223d4fe
household
Mar 17, 2026
b45077e
fix group_key
Mar 17, 2026
c27c0e6
Merge branch '207255_Celery_tasks_refactor' of github.com:unicef/hope…
Mar 17, 2026
6944208
ut
Mar 17, 2026
1e58257
ut
Mar 17, 2026
9309cf9
ut
Mar 17, 2026
4e79125
cov
Mar 17, 2026
5f4cfaa
payment app, generic task jobs refactor
Mar 19, 2026
2474787
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Mar 19, 2026
14d575f
ut
Mar 19, 2026
7c72cd9
cov
Mar 19, 2026
aacf455
refactor
Mar 20, 2026
454e10a
Merge remote-tracking branch 'origin/develop' into 207255_Celery_task…
Mar 24, 2026
47d1b19
conflicts
Mar 24, 2026
4ae7c01
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Mar 24, 2026
42f6f09
207255_Celery_tasks_refactor_2
Mar 25, 2026
c8a6b6c
Merge branch '207255_Celery_tasks_refactor' into 207255_Celery_tasks_…
MarekBiczysko Mar 25, 2026
0387204
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Mar 25, 2026
df14bbb
ut
Mar 25, 2026
c6e0e47
Merge branch '207255_Celery_tasks_refactor' of github.com:unicef/hope…
Mar 25, 2026
8f3e8af
ut
Mar 25, 2026
21941b6
recover_missing_async_jobs_task
Mar 29, 2026
841e69c
DEFAULT_RECOVER_MISSING_ASYNC_JOBS_LIMIT
Mar 29, 2026
9846f2b
Merge remote-tracking branch 'origin/develop' into 207255_Celery_task…
Mar 30, 2026
7961c78
ut
Mar 30, 2026
a8aaa60
cov
Mar 30, 2026
a1fff80
Merge remote-tracking branch 'origin/develop' into 207255_Celery_task…
Mar 30, 2026
74f3d27
lint
Mar 30, 2026
d0b438d
Merge remote-tracking branch 'origin/develop' into 207255_Celery_task…
Mar 30, 2026
46194c9
remove registration_data revoke tasks, remove univeral update celey m…
Mar 30, 2026
b279496
fix universal update admin
Mar 30, 2026
a680852
fill async job program
Mar 30, 2026
b22044d
refactor pdu app
Mar 31, 2026
dd47c61
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Mar 31, 2026
f363e1f
remove pragma
Mar 31, 2026
baf0e22
Merge branch '207255_Celery_tasks_refactor' of github.com:unicef/hope…
Mar 31, 2026
9c4d77e
remove pragma
Mar 31, 2026
d688683
migration
Mar 31, 2026
f5a60ba
pdu fixes
Mar 31, 2026
182a29c
ut
Mar 31, 2026
0a210e8
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Apr 1, 2026
4a77d7f
fix timeout envs
Apr 2, 2026
5b6277a
remove outer layer celery tasks
Apr 3, 2026
2451f0b
ut fix
Apr 3, 2026
190b7fd
ut fix
Apr 7, 2026
7e1c9d5
review fixes
Apr 7, 2026
e1c2f2d
Merge branch '207255_Celery_tasks_refactor' of github.com:unicef/hope…
Apr 7, 2026
765b466
review fixes
Apr 7, 2026
3cdf63f
review fixes
Apr 7, 2026
5e3f9d0
review fixes
Apr 7, 2026
4ca8d62
review fixes
Apr 7, 2026
1ea4ebc
Merge remote-tracking branch 'origin/develop' into 207255_Celery_task…
Apr 7, 2026
ce9f054
ut
Apr 7, 2026
96a0cbc
ut
Apr 7, 2026
50e6cbb
ut
Apr 8, 2026
81843eb
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Apr 8, 2026
25457a0
ut
Apr 8, 2026
63e4965
Merge remote-tracking branch 'origin/develop' into 207255_Celery_task…
Apr 8, 2026
f01f4c7
ut
Apr 8, 2026
ac7816c
ut
Apr 8, 2026
e58004d
ut
Apr 8, 2026
9a77f46
ut
Apr 9, 2026
d3003a1
ut
Apr 9, 2026
22fb74a
ut
Apr 9, 2026
56af457
ut
Apr 9, 2026
c9e8fba
cov, e2e
Apr 9, 2026
da9f028
cov
Apr 9, 2026
cdc082d
cleanup
Apr 9, 2026
9f3cddd
cleanup
Apr 9, 2026
5678c3d
cleanup
Apr 9, 2026
8a0cffd
cleanup
Apr 9, 2026
5cc90ed
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Apr 9, 2026
e46a236
ut
Apr 9, 2026
67fe048
cov
Apr 9, 2026
c6f760d
cov
Apr 9, 2026
cb883f2
e2e
Apr 9, 2026
5b3877f
cov
Apr 10, 2026
1a1f2ba
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Apr 10, 2026
406edd3
ut, admin
Apr 10, 2026
8f59242
ut
Apr 10, 2026
baa51b5
cov
Apr 10, 2026
f188109
e2e
Apr 10, 2026
0e40658
Merge remote-tracking branch 'origin/develop' into 207255_Celery_task…
Apr 10, 2026
b0ebaad
merge fix
Apr 10, 2026
c6e03db
merge fix
Apr 10, 2026
c2b1453
mypy
Apr 10, 2026
22c3382
ut
Apr 10, 2026
ba2e28c
fix aurora schedule task names
Apr 13, 2026
57072e9
Merge branch 'develop' into 207255_Celery_tasks_refactor
MarekBiczysko Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/hope/admin/household.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,15 @@ def has_add_permission(self, request: HttpRequest, obj: Household | None = None)

class HouseholdWithdrawFromListMixin:
@staticmethod
def get_household_queryset_from_list(household_id_list: list, program: Program) -> QuerySet:
def get_household_queryset_from_list(household_id_list: list[str], program: Program) -> QuerySet:
return Household.objects.filter(
unicef_id__in=household_id_list,
withdrawn=False,
program=program,
)

@transaction.atomic
def mass_withdraw_households_from_list_bulk(self, household_id_list: list, tag: str, program: Program) -> None:
def mass_withdraw_households_from_list_bulk(self, household_id_list: list[str], tag: str, program: Program) -> None:
households = self.get_household_queryset_from_list(household_id_list, program)
individuals = Individual.objects.filter(household__in=households, withdrawn=False, duplicate=False)

Expand Down Expand Up @@ -631,7 +631,7 @@ def mass_enroll_to_another_program(self, request: HttpRequest, qs: QuerySet) ->
program_for_enroll = form.cleaned_data["program_for_enroll"]
households_ids = list(qs.distinct("unicef_id").values_list("id", flat=True))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we do no pass the unicef_id to the task? we could save a query and a cast iteration

enroll_households_to_program_task.delay(
households_ids=households_ids,
households_ids=[str(_id) for (_id) in households_ids],
Comment thread
johniak marked this conversation as resolved.
Outdated
program_for_enroll_id=str(program_for_enroll.id),
user_id=str(request.user.id),
)
Expand Down
5 changes: 2 additions & 3 deletions src/hope/admin/individual.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,15 @@ def sanity_check(self, request: HttpRequest, pk: UUID) -> TemplateResponse:
def revalidate_phone_number_sync(self, request: HttpRequest, queryset: QuerySet) -> None:
try:
ids = queryset.values_list("id", flat=True)
revalidate_phone_number_task(ids)
revalidate_phone_number_task([str(_id) for _id in ids])
self.message_user(request, f"Updated {len(ids)} records", messages.SUCCESS)
except Error as e:
self.message_user(request, str(e), messages.ERROR)

revalidate_phone_number_sync.short_description = "Re-validate phone number (sync)"

def revalidate_phone_number_async(self, request: HttpRequest, queryset: QuerySet) -> None:
ids = list(queryset.values_list("id", flat=True))
revalidate_phone_number_task.delay(ids)
revalidate_phone_number_task.delay([str(_id) for _id in queryset.values_list("id", flat=True)])
self.message_user(request, "Updating in progress", messages.SUCCESS)

revalidate_phone_number_async.short_description = "Re-validate phone number (async)"
Expand Down
2 changes: 1 addition & 1 deletion src/hope/admin/registration_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def enroll_to_program(self, request: HttpRequest, pk: UUID) -> HttpResponse | No
program_for_enroll = form.cleaned_data["program_for_enroll"]
households_ids = list(qs.distinct("unicef_id").values_list("id", flat=True))
enroll_households_to_program_task.delay(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as before

households_ids=households_ids,
households_ids=[str(_id) for (_id) in households_ids],
program_for_enroll_id=str(program_for_enroll.id),
user_id=str(request.user.id),
)
Expand Down
35 changes: 27 additions & 8 deletions src/hope/apps/account/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,44 @@

from django.db.models import Q
from django.utils import timezone
from django_celery_boost.models import AsyncJobModel

from hope.apps.account.signals import _invalidate_user_permissions_cache
from hope.apps.core.celery import app
from hope.apps.utils.logs import log_start_and_end
from hope.apps.utils.sentry import sentry_tags
from hope.models import AsyncRetryJob

logger = logging.getLogger(__name__)


def invalidate_permissions_cache_for_user_if_expired_role_action(job: AsyncRetryJob) -> bool:
# Invalidate permissions cache for users with roles that expired a day before
from hope.models import User

try:
day_ago = timezone.now() - datetime.timedelta(days=1)
users = User.objects.filter(
Q(role_assignments__expiry_date=day_ago.date()) | Q(partner__role_assignments__expiry_date=day_ago.date())
).distinct()
_invalidate_user_permissions_cache(users)
return True
except Exception:
logger.exception("Failed to invalidate permissions cache for users with expired roles")
raise


@app.task(bind=True, default_retry_delay=60, max_retries=3)
@log_start_and_end
@sentry_tags
def invalidate_permissions_cache_for_user_if_expired_role(self: Any) -> bool:
# Invalidate permissions cache for users with roles that expired a day before
from hope.models import User

day_ago = timezone.now() - datetime.timedelta(days=1)
users = User.objects.filter(
Q(role_assignments__expiry_date=day_ago.date()) | Q(partner__role_assignments__expiry_date=day_ago.date())
).distinct()
_invalidate_user_permissions_cache(users)
job = AsyncRetryJob.objects.create(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what paradigm this code is implementing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Celery Beat cannot schedule a plain Python function, so the periodic entrypoint still has to be a real Celery task. That Celery task does not do the work itself and only creates an
AsyncRetryJob, which is then stored in the database and executed by the shared async job runner.

owner=None,
type=AsyncJobModel.JobType.JOB_TASK,
action="hope.apps.account.celery_tasks.invalidate_permissions_cache_for_user_if_expired_role_action",
config={},
group_key="invalidate_permissions_cache_for_user_if_expired_role",
description="Invalidate permissions cache for users with expired roles",
)
job.queue()
return True
123 changes: 87 additions & 36 deletions src/hope/apps/accountability/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

from django_celery_boost.models import AsyncJobModel

from hope.apps.accountability.services.export_survey_sample_service import (
ExportSurveySampleService,
)
Expand All @@ -8,17 +10,17 @@
from hope.apps.core.utils import send_email_notification
from hope.apps.utils.logs import log_start_and_end
from hope.apps.utils.sentry import sentry_tags, set_sentry_business_area_tag
from hope.models import BusinessArea, Survey
from hope.models import AsyncJob, BusinessArea, Survey

logger = logging.getLogger(__name__)


@app.task
@log_start_and_end
@sentry_tags
def export_survey_sample_task(survey_id: str, user_id: str) -> None:
def export_survey_sample_task_action(job: AsyncJob) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this code under transaction management?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to put it into transaction, this code generates and stores sample file and notifies user

from hope.models import User

survey_id = job.config["survey_id"]
user_id = job.config["user_id"]

try:
survey = Survey.objects.get(id=survey_id)
user = User.objects.get(pk=user_id)
Expand All @@ -30,43 +32,92 @@ def export_survey_sample_task(survey_id: str, user_id: str) -> None:
if survey.business_area.enable_email_notification:
send_email_notification(service, user)

except Exception as e:
logger.warning(e)
except Exception as exc:
job.errors = {
"error": str(exc),
}
job.save(update_fields=["errors"])
logger.exception("Failed to export survey sample")
raise


def send_survey_to_users_action(job: AsyncJob) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, is this code executed inside a trasanction?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A transaction is not needed here because the task’s critical work is an external RapidPro call, which cannot be rolled back by the database, and the only local write is a small follow-up metadata update.

survey_id = job.config["survey_id"]
try:
survey = Survey.objects.get(id=survey_id)
set_sentry_business_area_tag(survey.business_area.name)
if survey.category == Survey.CATEGORY_MANUAL:
return
phone_numbers = survey.recipients.filter(head_of_household__phone_no_valid=True).values_list(
"head_of_household__phone_no", flat=True
)
if survey.category == Survey.CATEGORY_SMS:
api = RapidProAPI(survey.business_area.slug, RapidProAPI.MODE_MESSAGE)
api.broadcast_message(phone_numbers, survey.body)
return
business_area = BusinessArea.objects.get(id=survey.business_area_id)
api = RapidProAPI(business_area.slug, RapidProAPI.MODE_VERIFICATION)

already_received = [
phone_number
for successful_call in survey.successful_rapid_pro_calls
for phone_number in successful_call["urns"]
]
phone_numbers = [phone_number for phone_number in phone_numbers if phone_number not in already_received]

successful_flows, error = api.start_flow(survey.flow_id, phone_numbers)
if error:
job.errors = {
"start_flow_error": str(error),
}
job.save(update_fields=["errors"])

for successful_flow in successful_flows:
survey.successful_rapid_pro_calls.append(
{
"flow_uuid": successful_flow.response["uuid"],
"urns": list(map(str, successful_flow.urns)),
}
)
survey.save()
except Exception as exc:
job.errors = {
"error": str(exc),
}
job.save(update_fields=["errors"])
logger.exception("Failed to send survey to users")
raise


@app.task
@log_start_and_end
@sentry_tags
def export_survey_sample_task(survey_id: str, user_id: str) -> None:
Comment thread
MarekBiczysko marked this conversation as resolved.
Outdated
survey = Survey.objects.get(id=survey_id)
job = AsyncJob.objects.create(
owner_id=user_id,
program=survey.program,
type=AsyncJobModel.JobType.JOB_TASK,
action="hope.apps.accountability.celery_tasks.export_survey_sample_task_action",
config={"survey_id": str(survey_id), "user_id": str(user_id)},
group_key=f"export_survey_sample_task:{survey_id}",
description=f"Export survey sample for survey {survey_id}",
)
job.queue()


@app.task
@log_start_and_end
@sentry_tags
def send_survey_to_users(survey_id: str) -> None:
survey = Survey.objects.get(id=survey_id)
set_sentry_business_area_tag(survey.business_area.name)
if survey.category == Survey.CATEGORY_MANUAL:
return
phone_numbers = survey.recipients.filter(head_of_household__phone_no_valid=True).values_list(
"head_of_household__phone_no", flat=True
job = AsyncJob.objects.create(
owner=survey.created_by,
program=survey.program,
type=AsyncJobModel.JobType.JOB_TASK,
action="hope.apps.accountability.celery_tasks.send_survey_to_users_action",
config={"survey_id": str(survey_id)},
group_key=f"send_survey_to_users:{survey_id}",
description=f"Send survey to users for survey {survey_id}",
)
if survey.category == Survey.CATEGORY_SMS:
api = RapidProAPI(survey.business_area.slug, RapidProAPI.MODE_MESSAGE)
api.broadcast_message(phone_numbers, survey.body)
return
business_area = BusinessArea.objects.get(id=survey.business_area_id)
api = RapidProAPI(business_area.slug, RapidProAPI.MODE_VERIFICATION)

already_received = [
phone_number
for successful_call in survey.successful_rapid_pro_calls
for phone_number in successful_call["urns"]
]
phone_numbers = [phone_number for phone_number in phone_numbers if phone_number not in already_received]

successful_flows, error = api.start_flow(survey.flow_id, phone_numbers)

for successful_flow in successful_flows:
survey.successful_rapid_pro_calls.append(
{
"flow_uuid": successful_flow.response["uuid"],
"urns": list(map(str, successful_flow.urns)),
}
)
survey.save()
job.queue()
Loading
Loading