Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6519b76
WIP
emyller May 5, 2025
2ce1196
WIP: New PoC for processing tasks from multiple databases
emyller May 8, 2025
32cba38
Consume tasks from multiple databases
emyller May 8, 2025
1f95cc0
Merge remote-tracking branch 'github/main' into feat/separate-task-pr…
emyller May 8, 2025
ec6c2e5
WIP: Fix managing tasks when in multi-database mode
emyller May 10, 2025
c6c4acf
Cover the database router with tests
emyller May 12, 2025
768dd77
Improve guardrail testing
emyller May 12, 2025
09ec777
Improve database configuration for testing
emyller May 12, 2025
0dbc71f
Make ALL tests contextualized with given database
emyller May 12, 2025
521c7c9
Improve typing
emyller May 13, 2025
ce81064
Make database use more explicit
emyller May 14, 2025
bda53cd
Improve router docstrings
emyller May 14, 2025
afecfcc
Remove redundancy
emyller May 15, 2025
2b32dd2
Revert "Remove redundancy"
emyller May 15, 2025
6bb822c
Merge remote-tracking branch 'github/main' into feat/separate-task-pr…
emyller May 15, 2025
3fef273
Move task processor database settings validation to the right place
emyller May 15, 2025
0131747
Improve database configuration in tests
emyller May 15, 2025
5386854
Merge remote-tracking branch 'github/main' into feat/separate-task-pr…
emyller May 19, 2025
3768746
Delete obsolete comment
emyller May 19, 2025
dc82f8e
Refactor validation
emyller May 19, 2025
0bb02eb
Remove unecessary documentation
emyller May 19, 2025
a6b54fb
Improve test coverage
emyller May 19, 2025
49da185
Try and improve fixture name
emyller May 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion docker/docker-compose.local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ name: flagsmith

volumes:
pg_data:
task_processor_pg_data:

services:
db:
image: postgres:15.5-alpine
pull_policy: always
restart: unless-stopped
volumes:
- pg_data:/var/lib/postgresql/data
Expand All @@ -17,3 +17,21 @@ services:
environment:
POSTGRES_DB: flagsmith
POSTGRES_PASSWORD: password
healthcheck:
test: pg_isready -Upostgres
interval: 1s
timeout: 30s

task-processor-db:
image: postgres:15.5-alpine
restart: unless-stopped
volumes:
- task_processor_pg_data:/var/lib/postgresql/data
ports:
- 5433:5432
environment:
POSTGRES_HOST_AUTH_METHOD: trust
healthcheck:
test: pg_isready -Upostgres
interval: 1s
timeout: 30s
3 changes: 2 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions settings/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,17 @@
env(
"DATABASE_URL",
default="postgresql://postgres:password@localhost:5432/flagsmith",
)
)
),
),
"task_processor": dj_database_url.parse(
env(
"TASK_PROCESSOR_DATABASE_URL",
default="postgresql://postgres@localhost:5433/postgres",
),
),
}
DATABASE_ROUTERS = ["task_processor.routers.TaskProcessorRouter"]
TASK_PROCESSOR_DATABASES = ["default"]
INSTALLED_APPS = [
"django.contrib.auth",
"django.contrib.contenttypes",
Expand Down
17 changes: 11 additions & 6 deletions src/task_processor/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
from django.db.models import Manager

if typing.TYPE_CHECKING:
from django.db.models.query import RawQuerySet

from task_processor.models import RecurringTask, Task


class TaskManager(Manager["Task"]):
def get_tasks_to_process(self, num_tasks: int) -> "RawQuerySet[Task]":
return self.raw("SELECT * FROM get_tasks_to_process(%s)", [num_tasks])
def get_tasks_to_process(self, num_tasks: int) -> typing.List["Task"]:
return list(
self.raw(
"SELECT * FROM get_tasks_to_process(%s)",
[num_tasks],
),
)


class RecurringTaskManager(Manager["RecurringTask"]):
def get_tasks_to_process(self) -> "RawQuerySet[RecurringTask]":
return self.raw("SELECT * FROM get_recurringtasks_to_process()")
def get_tasks_to_process(self) -> typing.List["RecurringTask"]:
return list(
self.raw("SELECT * FROM get_recurringtasks_to_process()"),
)
21 changes: 12 additions & 9 deletions src/task_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.utils import timezone

from task_processor import metrics
from task_processor.managers import TaskManager
from task_processor.models import (
AbstractBaseTask,
RecurringTask,
Expand All @@ -27,14 +28,14 @@
UNREGISTERED_RECURRING_TASK_GRACE_PERIOD = timedelta(minutes=30)


def run_tasks(num_tasks: int = 1) -> list[TaskRun]:
def run_tasks(database: str, num_tasks: int = 1) -> list[TaskRun]:
if num_tasks < 1:
raise ValueError("Number of tasks to process must be at least one")

tasks = Task.objects.get_tasks_to_process(num_tasks)

task_manager: TaskManager = Task.objects.db_manager(database)
tasks = task_manager.get_tasks_to_process(num_tasks)
if tasks:
logger.debug(f"Running {len(tasks)} task(s)")
logger.debug(f"Running {len(tasks)} task(s) from database '{database}'")

executed_tasks = []
task_runs = []
Expand All @@ -47,25 +48,27 @@ def run_tasks(num_tasks: int = 1) -> list[TaskRun]:
task_runs.append(task_run)

if executed_tasks:
Task.objects.bulk_update(
Task.objects.using(database).bulk_update(
executed_tasks,
fields=["completed", "num_failures", "is_locked"],
)

if task_runs:
TaskRun.objects.bulk_create(task_runs)
logger.debug(f"Finished running {len(task_runs)} task(s)")
TaskRun.objects.using(database).bulk_create(task_runs)
logger.debug(
f"Finished running {len(task_runs)} task(s) from database '{database}'"
)

return task_runs

return []


def run_recurring_tasks() -> list[RecurringTaskRun]:
def run_recurring_tasks(database: str) -> list[RecurringTaskRun]:
# NOTE: We will probably see a lot of delay in the execution of recurring tasks
# if the tasks take longer then `run_every` to execute. This is not
# a problem for now, but we should be mindful of this limitation
tasks = RecurringTask.objects.get_tasks_to_process()
tasks = RecurringTask.objects.db_manager(database).get_tasks_to_process()
if tasks:
logger.debug(f"Running {len(tasks)} recurring task(s)")

Expand Down
77 changes: 77 additions & 0 deletions src/task_processor/routers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from django.conf import settings
from django.db.models import Model


class TaskProcessorRouter:
"""
Routing of database operations for task processor models
"""

route_app_labels = ["task_processor"]

@property
def is_enabled(self) -> bool:
return "task_processor" in settings.TASK_PROCESSOR_DATABASES

def db_for_read(self, model: type[Model], **hints: None) -> str | None:
"""
If enabled, route "task_processor" models to the a se database
"""
if not self.is_enabled:
return None

if model._meta.app_label in self.route_app_labels:
return "task_processor"

return None

def db_for_write(self, model: type[Model], **hints: None) -> str | None:
"""
Attempts to write task processor models go to 'task_processor' database.
"""
if not self.is_enabled:
return None

if model._meta.app_label in self.route_app_labels:
return "task_processor"

return None

def allow_relation(self, obj1: Model, obj2: Model, **hints: None) -> bool | None:
"""
Relations between objects are allowed if both objects are
in the task processor database.
"""
if not self.is_enabled:
return None

both_objects_from_task_processor = (
obj1._meta.app_label in self.route_app_labels
and obj2._meta.app_label in self.route_app_labels
)

if both_objects_from_task_processor:
return True

return None

def allow_migrate(
self,
db: str,
app_label: str,
**hints: None,
) -> bool | None:
"""
Allow migrations for task processor models to run in both databases
NOTE: Even if, from a fresh install, the task processor tables are not
required in both databases, this is required to allow for easier
transition between a single database and a multi-database setup.
"""
if not self.is_enabled:
return None

if app_label in self.route_app_labels:
return db in ["default", "task_processor"]

return None
53 changes: 41 additions & 12 deletions src/task_processor/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timedelta
from threading import Thread

from django.conf import settings
from django.db import close_old_connections
from django.utils import timezone

Expand Down Expand Up @@ -95,18 +96,46 @@ def run(self) -> None:
time.sleep(self.sleep_interval_millis / 1000)

def run_iteration(self) -> None:
try:
run_tasks(self.queue_pop_size)
run_recurring_tasks()
except Exception as e:
# To prevent task threads from dying if they get an error retrieving the tasks from the
# database this will allow the thread to continue trying to retrieve tasks if it can
# successfully re-establish a connection to the database.
# TODO: is this also what is causing tasks to get stuck as locked? Can we unlock
# tasks here?

logger.error("Received error retrieving tasks: %s.", e, exc_info=e)
close_old_connections()
"""
Consume and execute tasks from the queue, and run recurring tasks

This method tries to consume tasks from multiple databases as to ensure
that any remaining tasks are processed after opting in or out of a
separate database setup.
"""
database_is_separate = "task_processor" in settings.TASK_PROCESSOR_DATABASES
if database_is_separate:
assert (
"task_processor.routers.TaskProcessorRouter"
in settings.DATABASE_ROUTERS
), (
"DATABASE_ROUTERS must include 'task_processor.routers.TaskProcessorRouter' "
"when using a separate task processor database."
) # This is for our own sanity
assert "task_processor" in settings.DATABASES, (
"DATABASES must include 'task_processor' when using a separate task processor database."
) # ¯\_(ツ)_/¯ One has to read the documentation and fix it: https://docs.flagsmith.com/deployment/configuration/task-processor

for database in settings.TASK_PROCESSOR_DATABASES:
try:
run_tasks(database, self.queue_pop_size)

# Recurring tasks are only run on one database
if (database == "default") ^ database_is_separate:
run_recurring_tasks(database)
except Exception as exception:
# To prevent task threads from dying if they get an error retrieving the tasks from the
# database this will allow the thread to continue trying to retrieve tasks if it can
# successfully re-establish a connection to the database.
# TODO: is this also what is causing tasks to get stuck as locked? Can we unlock
# tasks here?
exception_repr = f"{exception.__class__.__module__}.{repr(exception)}"
logger.error(
f"Error handling tasks from database '{database}': {exception_repr}",
exc_info=exception,
)

close_old_connections()

def stop(self) -> None:
self._stopped = True
Loading
Loading