Skip to content

06_worker_node_orchestration

Benedikt Kuehne edited this page Jan 7, 2026 · 1 revision

Chapter 6: Worker Node Orchestration

In Chapter 5: EMBA Backend Integration, we learned how EMBArk effectively drives the EMBA analysis tool on a single machine. That's great for individual firmware analyses! But what if you have many firmwares to analyze, or if a single analysis is so large it takes too long on one machine? Waiting hours or even days for a single scan to complete can really slow things down.

Imagine EMBArk isn't just a single drone, but a drone fleet. Each drone can fly its own mission, but you need a central command center to manage them all. This command center needs to know which drones are busy, which are free, assign new missions, and make sure each drone has the right equipment.

This is the problem "Worker Node Orchestration" solves for EMBArk. It’s like the central project manager for distributed firmware analysis. When an analysis job is too big or there are many jobs, the Orchestrator steps in. It assigns analysis tasks to a team of specialized "workers" – which are actually other computers (worker nodes) set up to run EMBA. It keeps a close eye on which worker is free, which is busy, queues up new tasks, and even helps make sure workers have the right tools (dependencies) to do their job. This allows EMBArk to run many analyses at the same time across different machines, making the overall scanning process much, much faster!

Solving Our Use Case: Distributing Firmware Analyses for Speed

Let's say our analyst has five new firmware images they need to analyze as quickly as possible. Instead of running them one by one, they want EMBArk to automatically use any available worker nodes to speed up the process.

Here's how Worker Node Orchestration helps:

  1. Add Worker Nodes: The analyst first tells EMBArk about the other computers that can help with analysis (the "worker nodes").
  2. Submit Analyses: The analyst uploads all five firmwares and starts their analyses.
  3. Automatic Distribution: The Orchestrator automatically picks a free worker node for each analysis and sends the firmware and instructions to it. If all workers are busy, new analyses wait in a queue.
  4. Concurrent Processing: Multiple workers run EMBA analyses simultaneously, dramatically reducing the total time.

Understanding the Key Concepts

To manage this distributed team of analysis workers, EMBArk relies on a few core ideas:

1. The Project Manager: The Orchestrator

This is the brain of distributed processing. The Orchestrator is a special object that knows about all available workers and manages the list of tasks waiting to be done. It's constantly looking for free workers to assign new tasks to.

2. The Team Members: Worker Nodes

These are the actual external machines that run the EMBA analysis. In EMBArk, each worker node is represented by a Worker object in the database. They report their status (free, busy, reachable) back to the Orchestrator.

3. The Job List: Tasks & Queue

When you start a firmware analysis, it doesn't immediately run if no worker is free. Instead, it becomes an OrchestratorTask and is added to a waiting list (a queue). The Orchestrator picks tasks from this queue as workers become available.

# Simplified snippet from embark/workers/orchestrator.py
from dataclasses import dataclass
from uuid import UUID
from typing import Dict

@dataclass
class OrchestratorTask:
    firmware_analysis_id: UUID
    emba_cmd: str
    src_path: str
    target_path: str

    @classmethod
    def to_dict(cls, task):
        # Converts a task object into a dictionary for storage
        return {
            "firmware_analysis_id": str(task.firmware_analysis_id),
            "emba_cmd": task.emba_cmd,
            "src_path": task.src_path,
            "target_path": task.target_path
        }

The OrchestratorTask is a simple blueprint (dataclass) that holds all the information needed to run an analysis on a worker, like the unique ID of the analysis, the EMBA command, and where the firmware file is located.

4. The Toolbox: Dependencies & Configuration

For workers to do their job, they need EMBA itself and all its supporting tools (dependencies). EMBArk helps manage this. Configuration objects store SSH login details and IP ranges for discovering workers, while WorkerDependencyVersion tracks the versions of tools installed on each Worker. This ensures every worker has the correct "toolbox."

# Simplified snippet from embark/workers/models.py
from django.db import models
from users.models import User

class Configuration(models.Model):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    name = models.CharField(max_length=150)
    ssh_user = models.CharField(max_length=150)
    ssh_password = models.CharField(max_length=150)
    ip_range = models.CharField(max_length=20) # e.g., 192.168.1.0/24
    # ... other SSH key fields ...

class Worker(models.Model):
    ip_address = models.GenericIPAddressField(unique=True)
    status = models.CharField(max_length=1) # e.g., 'U'nconfigured, 'C'onfigured, 'E'rror
    analysis_id = models.UUIDField(blank=True, null=True) # Current analysis ID
    configurations = models.ManyToManyField(Configuration) # Which configs apply to this worker
    # ... other system info and reachability fields ...

class WorkerDependencyVersion(models.Model):
    emba = models.CharField(max_length=100, null=True) # EMBA version
    emba_head = models.CharField(max_length=40, null=True) # Git commit hash
    # ... other dependency versions (NVD, EPSS, APT packages) ...

The Configuration model defines how EMBArk connects to and finds workers (SSH credentials, IP range). Worker represents an actual remote machine and stores its current status and the ID of the analysis it's running. WorkerDependencyVersion keeps track of the software installed on that worker, ensuring it has the correct version of EMBA and its tools.

How EMBArk Orchestrates a Distributed Analysis

Let's walk through how our analyst uses EMBArk to set up workers and distribute analyses.

1. Adding a Worker Configuration

First, the analyst adds a Configuration in the "Workers" section of EMBArk. This tells EMBArk how to find and connect to potential worker nodes.

# Simplified snippet from embark/workers/views.py

@permission_required("users.worker_permission", login_url='/')
def create_config(request):
    user = get_user(request)
    config_form = ConfigurationForm(request.POST)
    if config_form.is_valid():
        new_config = config_form.save(commit=False)
        new_config.user = user
        # Generate SSH keys for secure connection
        key = RSA.generate(settings.WORKER_SSH_KEY_SIZE)
        new_config.ssh_private_key = key.export_key().decode("utf-8")
        new_config.ssh_public_key = key.publickey().export_key().decode("utf-8")
        new_config.save()
        messages.success(request, 'Configuration created successfully.')
    # ... error handling ...
    return safe_redirect(request, '/worker/')

This function creates a new Configuration record in the database, including the username, password (for initial setup), and a range of IP addresses to scan for workers. It also generates unique SSH keys for more secure future connections.

2. Scanning for Workers

Once a configuration is added, the analyst can "Scan" the specified IP range. EMBArk will then try to connect to each IP address in the range using the provided SSH credentials. If successful, it creates a Worker entry for that machine.

# Simplified snippet from embark/workers/tasks.py

@shared_task
def config_worker_scan_task(configuration_id: int):
    config = Configuration.objects.get(id=configuration_id)
    ip_network = ipaddress.ip_network(config.ip_range, strict=False)
    ip_addresses = [str(ip) for ip in ip_network.hosts() if not is_ip_local_host(str(ip))]

    # Use a thread pool to scan IPs concurrently
    with ThreadPoolExecutor(max_workers=50) as executor:
        # _scan_for_worker attempts SSH connection and creates Worker objects
        executor.map(partial(_scan_for_worker, config, ssh_auth_check=True), ip_addresses)

    config.scan_status = Configuration.ScanStatus.FINISHED
    config.save()
    # ... error handling ...

The config_worker_scan_task is a background task that iterates through all IPs in the configured range. For each IP, _scan_for_worker tries to establish an SSH connection. If successful, it creates or updates a Worker database entry for that IP, linking it to the Configuration.

3. Submitting an Analysis (Distributed)

When the analyst now submits a firmware analysis through the web UI (as seen in Chapter 2: Firmware Analysis Management), the submit_firmware function checks if worker nodes are available.

# Simplified snippet from embark/uploader/executor.py

from workers.orchestrator import get_orchestrator, OrchestratorTask # New import
from uploader.models import FirmwareAnalysis, FirmwareFile

def submit_firmware(firmware_analysis: FirmwareAnalysis, firmware_file: FirmwareFile):
    orchestrator = get_orchestrator()
    if orchestrator.get_free_workers(): # Check if any worker is free
        # Build EMBA command and paths (similar to local execution)
        emba_cmd = firmware_analysis.construct_emba_command(...)
        src_path = firmware_file.file.path
        target_path = f"{settings.WORKER_FIRMWARE_DIR}/{os.path.basename(src_path)}"

        # Create an OrchestratorTask
        task = OrchestratorTask(firmware_analysis.id, emba_cmd, src_path, target_path)

        # Queue the task for the orchestrator to assign
        orchestrator.queue_task(task)
        return True
    else:
        # Fallback to local execution or mark as queued if no workers are available
        # ... or simply let it queue up and wait for a worker ...
        return False # Indicating it's queued or couldn't be submitted immediately

Crucially, submit_firmware now interacts with the Orchestrator. Instead of directly running EMBA locally, it creates an OrchestratorTask and tells the orchestrator to queue_task for processing.

4. Orchestrator Assigns and Worker Executes

The Orchestrator continuously checks for new tasks in its queue and free workers. When both are available, it assigns a task.

# Simplified snippet from embark/workers/orchestrator.py

from collections import deque
from typing import Dict, List
from workers.models import Worker, OrchestratorState

class Orchestrator:
    # ... (initialization of free_workers, busy_workers, tasks queue) ...

    def _assign_tasks(self):
        # This function is called periodically and after new tasks/workers are added
        if self.tasks and self.free_workers:
            next_task = self.tasks.popleft() # Get the oldest task
            free_worker = next(iter(self.free_workers.values())) # Get an arbitrary free worker
            self._assign_worker(free_worker, next_task) # Assign it!
            self._assign_tasks() # Recursively try to assign more tasks

    def _assign_worker(self, worker: Worker, task: OrchestratorTask):
        # pylint: disable=import-outside-toplevel
        from workers.tasks import start_analysis

        if worker.ip_address in self.free_workers:
            worker.analysis_id = task.firmware_analysis_id # Mark worker as busy with this analysis
            worker.save()

            # Trigger the remote analysis on the worker using a Celery task
            start_analysis.delay(worker.id, task.emba_cmd, task.src_path, task.target_path)

            self.busy_workers[worker.ip_address] = worker # Move worker to busy list
            del self.free_workers[worker.ip_address]
        # ... error handling ...

The _assign_tasks method is the core of the Orchestrator's job. It takes a task from the tasks queue and a worker from the free_workers pool. Then, _assign_worker updates the Worker's status, moves it to busy_workers, and crucially, calls start_analysis.delay(). This kicks off a background task that uses SSH to connect to the remote worker and begin the EMBA analysis.

5. Worker Performs Analysis & Reports Back

The start_analysis task runs on the main EMBArk server. It connects to the chosen worker, transfers the firmware, and launches EMBA. Another task, monitor_worker_and_fetch_logs, then continuously checks on the worker.

# Simplified snippet from embark/workers/tasks.py

import paramiko # For SSH connections
import shutil # For file operations

@shared_task
def start_analysis(worker_id, emba_cmd: str, src_path: str, target_path: str):
    worker = Worker.objects.get(id=worker_id)
    client = worker.ssh_connect() # Establish SSH connection

    # Create directory and copy firmware to worker via SFTP
    exec_blocking_ssh(client, f"sudo mkdir -p {settings.WORKER_FIRMWARE_DIR}")
    sftp_client = client.open_sftp()
    sftp_client.put(src_path, target_path) # Upload firmware
    sftp_client.close()

    # Launch EMBA on the remote worker
    client.exec_command(f"sudo sh -c '{emba_cmd}' >./emba_run.log 2>&1") # Starts EMBA in background
    logger.info("Firmware analysis has been started on the worker %s.", worker.name)

    # Start monitoring this worker
    monitor_worker_and_fetch_logs.delay(worker.id)
    client.close()

@shared_task
def monitor_worker_and_fetch_logs(worker_id) -> None:
    worker = Worker.objects.get(id=worker_id)
    orchestrator = get_orchestrator()
    while True: # Loop until analysis is done or worker unreachable
        _fetch_analysis_logs(worker) # Download new log parts
        is_running = _is_emba_running(worker) # Check if EMBA Docker is still active
        analysis = FirmwareAnalysis.objects.get(id=worker.analysis_id)
        if not is_running or analysis.finished or not orchestrator.is_busy(worker):
            break # Analysis finished!
        time.sleep(1) # Wait a bit before checking again

    # Finalize analysis, release worker, and re-add to orchestrator
    analysis.finished = True
    analysis.save()
    orchestrator.release_worker(worker)
    orchestrator.add_worker(worker) # Make worker available for new tasks
    orchestrator.assign_tasks() # Try to assign queued tasks
    # ... (error handling and worker soft-reset) ...

The start_analysis task uses paramiko (an SSH library) to connect to the worker, copy the firmware file (via SFTP), and execute the EMBA command. monitor_worker_and_fetch_logs then enters a loop, periodically checking if EMBA is still running (_is_emba_running) and downloading log updates (_fetch_analysis_logs). Once EMBA finishes, the worker is release_worker'd, marked as free, and put back into the free_workers pool by the Orchestrator, ready for a new task!

Under the Hood: The Distributed Project Management Team

Let's look at the internal components and how they interact to make distributed analysis happen.

The Orchestration Flow: A Simple Sequence

When an analyst triggers a firmware analysis, and worker nodes are available, here's a simplified sequence of events:

sequenceDiagram
    participant Analyst
    participant EMBArk Web App
    participant Orchestrator (Server)
    participant Worker Node
    participant EMBA Process

    Analyst->>EMBArk Web App: "Analyze Firmware" (multiple requests)
    EMBArk Web App->>Orchestrator (Server): Queues new analysis tasks
    Orchestrator (Server)->>Orchestrator (Server): Continuously checks tasks & free workers
    Orchestrator (Server)->>Worker Node: Assigns task (via Celery `start_analysis` task)
    Note over Worker Node: SSH connection established
    Worker Node->>Worker Node: Copies firmware via SFTP
    Worker Node->>EMBA Process: Spawns new EMBA Docker container
    Note over Worker Node: `monitor_worker_and_fetch_logs` watches EMBA process & fetches logs
    EMBA Process-->>Worker Node: EMBA completes, generates logs
    Worker Node->>Orchestrator (Server): Informs of completion (releases worker)
    Orchestrator (Server)->>Orchestrator (Server): Worker becomes free, assigns next queued task
    EMBArk Web App-->>Analyst: Shows analysis results (Chapter 4)
Loading

Key Components and Code Elements

  1. embark/workers/models.py - Defining Workers, Configurations, and State: This file defines the blueprints for all worker-related data in the database.

    # Simplified snippet from embark/workers/models.py
    
    class OrchestratorState(models.Model):
        # Holds the dynamic state of the orchestrator
        free_workers = models.ManyToManyField(Worker, related_name='free_workers')
        busy_workers = models.ManyToManyField(Worker, related_name='busy_workers')
        tasks = models.JSONField(default=list, null=True) # The queue of tasks

    The OrchestratorState model is critical for persistence. It stores which workers are currently free or busy, and the tasks that are waiting in the queue. This means if EMBArk restarts, the Orchestrator can reload its state and continue managing analyses.

  2. embark/workers/orchestrator.py - The Orchestrator Logic: This is where the core "project manager" logic resides.

    # Simplified snippet from embark/workers/orchestrator.py
    from redis import Redis # For locking across processes
    from collections import deque # For the task queue
    
    class Orchestrator:
        free_workers: Dict[str, Worker] = {}
        busy_workers: Dict[str, Worker] = {}
        tasks: deque = deque() # A double-ended queue for tasks
    
        def _sync_orchestrator_state(self):
            # Loads the latest state from the database into memory
            state = OrchestratorState.objects.first()
            self.free_workers = {w.ip_address: w for w in state.free_workers.all()}
            self.busy_workers = {w.ip_address: w for w in state.busy_workers.all()}
            self.tasks = deque([OrchestratorTask.from_dict(t) for t in state.tasks])
    
        def _update_orchestrator_state(self):
            # Saves the current in-memory state back to the database
            state = OrchestratorState.objects.first()
            state.free_workers.set(list(self.free_workers.values()))
            state.busy_workers.set(list(self.busy_workers.values()))
            state.tasks = [OrchestratorTask.to_dict(t) for t in self.tasks]
            state.save()
    
        def queue_task(self, task: OrchestratorTask):
            with REDIS_CLIENT.lock(LOCK_KEY, LOCK_TIMEOUT): # Ensures only one process modifies state at a time
                self._sync_orchestrator_state()
                self.tasks.append(task)
                self._assign_tasks() # Try to assign immediately
                self._update_orchestrator_state()
    
        # ... (other methods like _assign_tasks, _assign_worker, _release_worker as described above) ...

    The Orchestrator class maintains lists of free_workers and busy_workers and a tasks queue. The _sync_orchestrator_state and _update_orchestrator_state methods handle loading and saving this information to the OrchestratorState model in the database, ensuring persistence. Redis locks are used to prevent multiple processes from trying to update the Orchestrator's state at the exact same time, which could lead to errors.

  3. embark/workers/tasks.py - The Remote Action Executors: This file contains all the Celery tasks that perform actions on remote worker nodes.

    # Simplified snippet from embark/workers/tasks.py
    from celery import shared_task
    from workers.orchestrator import get_orchestrator
    import paramiko # For SSH connection
    
    @shared_task
    def update_worker_info():
        # Periodically checks on all workers
        workers = Worker.objects.all()
        for worker in workers:
            try:
                update_system_info(worker) # Try to connect via SSH
                worker.reachable = True
            except paramiko.SSHException:
                _handle_unreachable_worker(worker) # Mark as unreachable
            worker.save()
    
    @shared_task
    def _handle_unreachable_worker(worker: Worker, force: bool = False):
        # If a worker is unreachable for too long, remove it from orchestrator
        # and reassign any running analysis to a new worker.
        orchestrator = get_orchestrator()
        orchestrator.remove_worker(worker, check=False)
        if worker.analysis_id:
            old_analysis = FirmwareAnalysis.objects.get(id=worker.analysis_id)
            new_analysis = _new_analysis_from(old_analysis) # Create new analysis task
            submit_firmware(new_analysis, old_analysis.firmware) # Resubmit
        worker.reachable = False
        worker.save()
    
    # ... (start_analysis and monitor_worker_and_fetch_logs as described above) ...

    These shared_tasks (background jobs handled by Celery) are crucial. update_worker_info periodically pings all workers to check if they are reachable. If a worker becomes unreachable, _handle_unreachable_worker removes it from the Orchestrator and automatically reschedules any analysis that was running on it (by creating a new FirmwareAnalysis and submitting it again). start_analysis and monitor_worker_and_fetch_logs handle the actual remote execution and monitoring, using paramiko for SSH communication.

  4. embark/workers/views.py - The User Interface for Workers: This file handles the web pages where users can manage worker configurations and view worker statuses.

    # Simplified snippet from embark/workers/views.py
    from django.shortcuts import render
    from workers.models import Configuration, Worker, DependencyVersion
    
    @permission_required("users.worker_permission", login_url='/')
    def worker_main(request):
        # Displays the main worker overview page
        user = get_user(request)
        configs = Configuration.objects.filter(user=user)
        workers = Worker.objects.filter(configurations__in=configs)
        # Add computed properties for display
        for config in configs:
            config.total_workers = config.workers.count()
            config.reachable_workers = config.workers.filter(reachable=True).count()
        # ... fetch available dependency versions ...
        return render(request, 'workers/index.html', {
            'configs': configs,
            'workers': workers,
            'availableVersion': DependencyVersion.objects.first()
        })
    
    # ... (other views for creating configs, triggering scans, updating dependencies, resetting workers) ...

    The worker_main view is responsible for fetching all Configuration and Worker data relevant to the logged-in user and presenting it on the overview.html template. This is where the analyst can see their configurations, the status of each worker (reachable, busy, configured), and trigger actions like scanning for new workers or updating dependencies.

Conclusion

Worker Node Orchestration is how EMBArk achieves scalability and efficiency in firmware analysis. By introducing the Orchestrator as a central manager, Worker nodes as distributed analysis powerhouses, and Configurations to manage access and tool versions, EMBArk can automatically distribute and execute many complex analyses concurrently. This dramatically speeds up the overall scanning process, transforming EMBArk into a powerful, distributed analysis platform.

Now that we understand how EMBArk distributes work across multiple machines, in the next chapter, we'll dive into how EMBArk handles tasks that run behind the scenes, independently of the web interface. Dive into Chapter 7: Background Task Execution to learn about the unsung heroes that keep EMBArk running smoothly.


Generated by AI Codebase Knowledge Builder. References: [1], [2], [3], [4], [5], [6], [7]

Clone this wiki locally