-
-
Notifications
You must be signed in to change notification settings - Fork 62
07_background_task_execution
In Chapter 6: Worker Node Orchestration, we learned how EMBArk smartly distributes demanding firmware analyses across multiple worker machines to speed things up. But what about all the other operations that take time, whether it's an EMBA scan, generating a big report, or the system performing routine maintenance? You wouldn't want the entire EMBArk web interface to freeze every time a heavy operation starts, right?
Imagine you're at a busy restaurant. You place your order with a waiter. You expect the waiter to immediately take your order for a drink, and then continue serving other customers, even while the chef in the kitchen is busy preparing your meal. You don't want the waiter to stand there silently, waiting for your food to be cooked before they can do anything else!
This is exactly the problem "Background Task Execution" solves for EMBArk. It's like having a dedicated operations team working diligently behind the scenes. For any task that might take a while (like running a firmware scan, creating a massive report, or updating dependencies), EMBArk hands it off to this background team. This means the main web server (your waiter) remains free to respond to your clicks and requests, keeping the user interface smooth and responsive, while the heavy lifting happens elsewhere. It uses clever queuing systems to manage and execute these tasks efficiently, ensuring a seamless experience.
Let's say an analyst initiates a new EMBA scan, which could run for hours. At the same time, the EMBArk system needs to check for dependency updates or clean up old logs. How does EMBArk ensure all these operations happen without slowing down the web interface?
Here's how Background Task Execution helps:
- Submit a Long-Running Task: When the analyst clicks "Analyze," the web server immediately hands off the EMBA scan to the background system.
- Continue Browsing: The analyst can immediately navigate to other parts of EMBArk (e.g., check previous reports) while the scan runs.
- Scheduled Maintenance: At specific times, internal system tasks (like cleanup or updates) are automatically triggered and executed in the background without user intervention.
- No Freezing: The main web interface never freezes, always ready for user interaction.
To handle tasks efficiently in the background, EMBArk uses a few powerful concepts:
The core idea is to prevent long-running operations from tying up the main web server. When a request comes in that requires a lot of processing, the web server quickly acknowledges it and then passes the actual work to a specialized background system. This frees up the web server to handle new user requests instantly.
This is a waiting list for all the background tasks. When a task is offloaded, it's added to this queue. Dedicated worker processes (separate from the web server) constantly check this queue, pick up the next available task, and execute it. This ensures that tasks are processed in an organized manner, and the system can handle bursts of activity.
EMBArk uses two main types of "work crews" for background tasks:
-
BoundedExecutor(Local Crew): This is a local, thread-based system for tasks that run on the same machine as the EMBArk web server. It manages a limited number of tasks that can run concurrently, preventing the local machine from getting overloaded. -
Celery(Distributed Crew): For more complex or scalable needs, especially when using Chapter 6: Worker Node Orchestration to distribute EMBA scans, EMBArk uses Celery. Celery is a powerful, distributed task queue system. It allows tasks to be sent to and executed by dedicated worker processes that can be on the same machine or on entirely different machines.
Some tasks don't need to be triggered by a user click; they need to run automatically at specific times or intervals (e.g., daily cleanup, hourly resource tracking). APScheduler is EMBArk's built-in planner that handles these scheduled tasks, ensuring they run exactly when they're supposed to, without manual intervention.
Let's see how EMBArk handles different types of background tasks.
Many long-running operations, like initiating an EMBA scan or zipping large log files, are submitted to the BoundedExecutor for local execution or to Celery if worker nodes are involved.
When a function needs to run in the background, it's "submitted" to the executor:
# Simplified snippet from embark/uploader/boundedexecutor.py
from concurrent.futures import ThreadPoolExecutor
from threading import BoundedSemaphore
# Maximum concurrent running tasks for the local executor
MAX_WORKERS = 4
MAX_QUEUE = MAX_WORKERS
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
semaphore = BoundedSemaphore(MAX_QUEUE)
class BoundedExecutor:
# ... other methods ...
@classmethod
def submit(cls, function_cmd, *args, **kwargs):
"""
Submits a function to the executor's threadpool for background execution.
"""
# Acquire a semaphore to limit the number of concurrent tasks
queue_not_full = semaphore.acquire(blocking=False)
if not queue_not_full:
print("Executor queue full, task cannot be submitted.")
return None
try:
# Submit the function to the thread pool
future = executor.submit(function_cmd, *args, **kwargs)
except Exception as exce:
print(f"Executor task could not be submitted: {exce}")
semaphore.release() # Release semaphore on error
raise exce
# Ensure semaphore is released when the task finishes
future.add_done_callback(lambda x: semaphore.release())
return future
# Example of submitting an EMBA command to run in the background
# (From uploader/executor.py, simplified)
# emba_cmd = "emba -f firmware.bin -l /logs"
# BoundedExecutor.submit(BoundedExecutor.run_emba_cmd, emba_cmd, analysis_id)This submit method takes a function (function_cmd) and its arguments, and adds it to an internal queue managed by ThreadPoolExecutor. The BoundedSemaphore ensures that only MAX_WORKERS tasks run at the same time, preventing the system from being overwhelmed. The run_emba_cmd (from Chapter 5: EMBA Backend Integration) is a common example of a function submitted this way.
For tasks that need to run on a schedule, EMBArk uses APScheduler. You define a Python function for the task and then tell APScheduler when to run it.
# Simplified snippet from embark/uploader/management/commands/runapscheduler.py
import logging
import psutil
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import timezone
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
logger = logging.getLogger("web")
def resource_tracker():
"""
This job tracks the current CPU and memory usage and stores it in the database.
"""
cpu_percentage = psutil.cpu_percent()
memory_percentage = psutil.virtual_memory().percent
print(f"Tracking resources: CPU={cpu_percentage}%, Memory={memory_percentage}%")
# ... code to save to database (ResourceTimestamp model) ...
def delete_old_job_executions(max_age=1_209_600): # Defaults to 14 days
"""
This job deletes old APScheduler job execution entries and timestamps from the database.
"""
print(f"Deleting old job executions and timestamps older than {max_age} seconds...")
DjangoJobExecution.objects.delete_old_job_executions(max_age)
# ... code to delete old ResourceTimestamp entries ...
class Command(BaseCommand):
help = "Runs resource tracker as well as cleanup process."
def handle(self, *args, **options):
scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), "default")
# Schedule resource_tracker to run every hour
scheduler.add_job(
resource_tracker,
trigger=CronTrigger(minute="00"),
id="resource_tracker",
replace_existing=True,
)
# Schedule delete_old_job_executions to run every day at midnight
scheduler.add_job(
delete_old_job_executions,
trigger=CronTrigger(hour="00", minute="00"),
id="delete_old_job_executions",
replace_existing=True,
args=(1_209_600,) # Pass max_age argument
)
print("Starting scheduler...")
scheduler.start()This runapscheduler.py script defines two tasks: resource_tracker (to monitor system usage) and delete_old_job_executions (for cleanup). The Command class then uses BlockingScheduler and CronTrigger to tell APScheduler when to run these functions: resource_tracker runs every hour (minute="00"), and delete_old_job_executions runs daily at midnight (hour="00", minute="00"). These tasks run in a dedicated background process, separate from the web server.
For tasks that might be distributed across multiple machines (especially relevant for Chapter 6: Worker Node Orchestration), EMBArk uses Celery. You mark a function as a @shared_task and then call it using .delay().
# Simplified snippet from embark/workers/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task
def update_worker_info():
"""
This task periodically updates system information for all worker nodes.
"""
print("Updating worker information...")
# ... code to connect to workers and update their status ...
logger.info("Worker info update completed.")
@shared_task
def config_worker_scan_task(configuration_id: int):
"""
This task scans the IP range of a configuration to find and set up worker nodes.
"""
print(f"Scanning for workers using configuration ID {configuration_id}...")
# ... code to scan IPs and create Worker objects ...
logger.info("Worker scan task finished.")
# How these tasks are typically called (not in workers/tasks.py, but from other views/logic):
# from workers.tasks import update_worker_info, config_worker_scan_task
#
# # To trigger an update immediately in the background:
# update_worker_info.delay()
#
# # To trigger a worker scan for a specific configuration:
# config_worker_scan_task.delay(123) # assuming 123 is a config IDThe @shared_task decorator tells Celery that these functions can be executed by Celery workers. When you call update_worker_info.delay(), Celery puts this task into a message queue. A separate Celery worker process (which might be running on a different machine) then picks up this task and executes it. This is how operations like checking worker node status (Chapter 6: Worker Node Orchestration) are handled in a scalable way.
Let's look at how these different background execution systems work together.
When a task is initiated by the user or by a schedule, here's a simplified sequence of how EMBArk ensures it runs in the background:
sequenceDiagram
participant User/Scheduler
participant EMBArk Web Server
participant Task Queue
participant Background Worker
User/Scheduler->>EMBArk Web Server: Triggers a task (e.g., EMBA scan, cleanup)
EMBArk Web Server->>Task Queue: Submits the task
EMBArk Web Server-->>User/Scheduler: Responds instantly (UI remains responsive)
Note over Task Queue: Task waits if workers are busy
Background Worker->>Task Queue: Picks up next task
Background Worker->>Background Worker: Executes the task (e.g., runs EMBA, deletes logs)
Background Worker-->>Task Queue: Marks task as complete
Note over Task Queue: Task queue can be local (BoundedExecutor) or distributed (Celery).
-
embark/embark/celery_dtq.py- The Celery Application: This file is the entry point for configuring Celery in EMBArk. It tells Celery how to find tasks and which settings to use.# Simplified snippet from embark/embark/celery_dtq.py import os from celery import Celery # Set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'embark.settings.deploy') app = Celery('embark') # Load task configurations from Django settings, using a 'CELERY' namespace. app.config_from_object('django.conf:settings', namespace='CELERY') # Automatically discover tasks in all installed Django apps. app.autodiscover_tasks()
This code sets up the Celery
appand tells it to load configuration fromsettings.deploy.pyand to look for@shared_taskfunctions within all theINSTALLED_APPS. -
embark/embark/settings/deploy.py- Celery & Redis Configuration: This file defines the connection details for Redis (the message broker) and other Celery settings.# Simplified snippet from embark/embark/settings/deploy.py # ... other settings ... REDIS_HOST = os.environ.get('REDIS_HOST', '127.0.0.1') REDIS_PORT = int(os.environ.get('REDIS_PORT', 6379)) # Celery task queue settings CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/0" CELERY_RESULT_BACKEND = f"redis://{REDIS_HOST}:{REDIS_PORT}/0" CELERY_TASK_TRACK_STARTED = True
CELERY_BROKER_URLtells Celery where to send and receive task messages (Redis in this case).CELERY_RESULT_BACKENDspecifies where task results are stored. -
embark/uploader/boundedexecutor.py- The Local Task Manager: As seen earlier, this file contains theBoundedExecutorclass, which uses Python'sconcurrent.futures.ThreadPoolExecutorto manage a fixed number of local background threads.# Simplified snippet from embark/uploader/boundedexecutor.py # ... imports for ThreadPoolExecutor, BoundedSemaphore ... # MAX_WORKERS controls how many tasks can run in parallel on the local machine MAX_WORKERS = 4 # MAX_QUEUE defines how many tasks can wait in the queue before new submissions are rejected MAX_QUEUE = MAX_WORKERS executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) semaphore = BoundedSemaphore(MAX_QUEUE) class BoundedExecutor: # ... submit, run_emba_cmd methods as described previously ... pass
This sets up a local pool of threads. When
BoundedExecutor.submit()is called, it tries to get a slot. If a slot is free, the task starts immediately in a new thread. If all slots are busy, theBoundedSemaphoreprevents more tasks from starting until one finishes, effectively creating a simple queue. -
embark/uploader/management/commands/runapscheduler.py- The Scheduler Runner: This script is executed as a dedicated process to run all the scheduled tasks.# Simplified snippet from embark/uploader/management/commands/runapscheduler.py # ... imports for BlockingScheduler, CronTrigger, etc. ... class Command(BaseCommand): # ... handle method which adds jobs to the scheduler ... pass
When
python3 ./manage.py runapscheduleris executed, it starts aBlockingSchedulerthat continuously checks its configured jobs and triggers them at their scheduled times. This scheduler itself runs as a background process, ensuring it doesn't block the main web server. -
dev-tools/debug-server-start.sh&run-server.sh- Launching Background Processes: These startup scripts (for development and deployment, respectively) are responsible for launching the Celery worker and the APScheduler process.# Simplified snippet from dev-tools/debug-server-start.sh # ... database and environment setup ... # Start runapscheduler python3 ./manage.py runapscheduler | tee -a ../logs/scheduler.log & # Start celery worker (with beat for periodic tasks) celery -A embark worker --beat --scheduler django -l INFO --logfile=../logs/celery.log & CELERY_PID=$! # Store PID to kill later
The
&at the end of each command tells the shell to run these commands in the background as separate processes. This is how Celery and APScheduler are launched independently of the web server, allowing them to perform their background duties.celery --beat --scheduler djangospecifically tells Celery to also act as a scheduler for tasks defined indjango_celery_beat(another way to schedule tasks, integrated with Celery).
Background Task Execution is essential for a smooth and efficient EMBArk experience. By intelligently offloading demanding operations to BoundedExecutor for local control, Celery for scalable distributed processing, and APScheduler for automated scheduling, EMBArk ensures that its web interface remains responsive at all times. This "operations team" diligently handles everything from complex firmware analyses to routine system maintenance, all without interrupting your workflow.
Now that we understand how tasks are executed and managed in the background, the next crucial step is to understand how all this data (users, firmware, analysis results, workers) is actually stored and retrieved. Dive into Chapter 8: Data Models (Persistence Layer) to learn about the blueprints that define EMBArk's data.
Generated by AI Codebase Knowledge Builder. References: [1], [2], [3], [4], [5], [6], [7]
EMBArk - firmware security scanning at its best
Sponsor EMBA and EMBArk:
The EMBA environment is free and open source!
We put a lot of time and energy into these tools and related research to make this happen. It's now possible for you to contribute as a sponsor!
If you like EMBArk you have the chance to support future development by becoming a Sponsor
Thank You ❤️ Get a Sponsor
EMBArk - firmware security scanning at its best
- Home
- Feature overview
- Installation
- Usage
-
EMBArk-book
- Overview of embark
- Chapter 1: User Authentication & Authorization
- Chapter 2: Firmware Analysis Management
- Chapter 3: Real-time Progress Monitoring
- Chapter 4: Reporting & Visualization
- Chapter 5: EMBA Backend Integration
- Chapter 6: Worker Node Orchestration
- Chapter 7: Background Task Execution
- Chapter 8: Data Models (Persistence Layer)
- Chapter 9: Deployment & Environment Setup
- Development
- FAQ
- Sponsoring EMBArk
- AMOS project archive
- EMBA firmware scanning backend