Skip to content

03_real_time_progress_monitoring

Benedikt Kuehne edited this page Jan 7, 2026 · 1 revision

Chapter 3: Real-time Progress Monitoring

In Chapter 2: Firmware Analysis Management, we learned how to upload a firmware and kick off a complex analysis. But what happens after you hit the "Analyze" button? Firmware analysis can take a long time – sometimes hours, sometimes even days! You wouldn't want to just stare at a blank screen, wondering if anything is happening, right?

Imagine you've ordered a package online. You don't just get a message saying "Your package is on its way." Instead, you get real-time updates: "Package left warehouse," "Out for delivery," "Delivered." You can track its journey step-by-step.

This is exactly the problem "Real-time Progress Monitoring" solves for EMBArk. It's like a live tracking display for your firmware analysis. It constantly watches the ongoing analysis, extracts crucial updates (like the current module being scanned, the overall percentage complete, or the current phase of the analysis), and instantly pushes this information to your browser. This way, you can watch your analysis unfold live, without needing to constantly refresh the page. This transparency is vital for long-running operations, keeping you informed and confident that things are progressing.

Solving Our Use Case: Watching Your Firmware Analysis Unfold Live

Let's revisit our analyst from the previous chapter. They've just uploaded a firmware and started an analysis. Now, they want to see its progress.

1. Starting an Analysis (Recap from Chapter 2) You've just pressed "Analyze" in the Uploader section. As we saw, this creates a FirmwareAnalysis record in the database and kicks off the EMBA scan in the background. Immediately after, EMBArk redirects you to the dashboard.

2. Navigating to the Dashboard After clicking "Analyze", EMBArk automatically takes you to the service dashboard (e.g., embark-dashboard-service). This page is designed to show you all your active analyses.

3. Watching the Progress Live On the dashboard, you'll see a section for "Running Analysis". For each active analysis, EMBArk displays a progress bar and lists the current phase and module being executed.

Here’s a simplified look at how the dashboard updates live in your browser:

// Simplified snippet from embark/static/scripts/serviceDashboard.js

// This function updates the progress bar visually
function makeProgress(percent, cur_ID) {
    "use strict";
    var rounded = Math.round(percent);
    var id = "#pBar_" + cur_ID; // e.g., #pBar_YOUR_ANALYSIS_ID
    // Update the width and text of the progress bar element
    $(id).attr('aria-valuenow', rounded).css('width', rounded + '%').text(rounded + '%');
}

// This function updates the list of phases displayed
function livelog_phase(phase_list, cur_ID) {
    "use strict";
    var id = "#log_phase_" + cur_ID; // e.g., #log_phase_YOUR_ANALYSIS_ID
    var $List = $(id);
    $List.empty(); // Clear previous entries
    for (var i = 0; i < phase_list.length; i++){
        var $entry = $('<li>' + phase_list[i] + '</li>');
        $List.append($entry); // Add new phase entries
    }
}

// This function updates the list of modules displayed
function livelog_module(module_list, cur_ID) {
    "use strict";
    var id = "#log_module_" + cur_ID; // e.g., #log_module_YOUR_ANALYSIS_ID
    var $List = $(id);
    $List.empty(); // Clear previous entries
    for (var i = 0; i < module_list.length; i++){
        var $entry = $('<li>' + module_list[i] + '</li>');
        $List.append($entry); // Add new module entries
    }
}

As EMBA runs, these JavaScript functions on your dashboard will be continuously called with new data, making the progress bar fill up and the lists of phases and modules update in real-time. You'll see messages like "Pre-checking phase," "Testing phase," and specific module names (e.g., "F02_toolchain finished") appear as the analysis progresses.

Under the Hood: The Real-time News Ticker

Let's look at how EMBArk achieves this "live news ticker" experience. It involves a few specialized components working together:

The Real-time Update Flow: A Simple Sequence

When an EMBA analysis is running and generating logs, here's a simplified sequence of how EMBArk gets those updates to your browser:

sequenceDiagram
    participant EMBA Analyzer
    participant LogReader
    participant EMBArk Web Server
    participant Web Browser
    participant User

    EMBA Analyzer->>LogReader: Writes log updates to `emba.log`
    LogReader->>EMBArk Web Server: Extracts status and sends to WebSocket (via Channels)
    EMBArk Web Server->>Web Browser: Pushes status updates instantly (WebSocket)
    Web Browser->>User: Displays live progress on dashboard
Loading

Note over EMBArk Web Server: The WebSocket connection is established when the user loads the dashboard.

Key Components and Code Elements

  1. embark/uploader/models.py - Storing the Status: The FirmwareAnalysis model is where the current progress status for each analysis is stored. This is crucial because it allows the LogReader to update the database, and the WebSocket consumer to retrieve the latest status.

    # Simplified snippet from embark/uploader/models.py
    from django.db import models
    import uuid
    # ... other imports ...
    
    def jsonfield_default_value():
        return {
            "percentage": 0,
            'analysis': "",
            'firmware_name': "",
            'last_update': "",
            'last_module': "",
            'module_list': [], # List of modules that have finished
            'last_phase': "",
            'phase_list': [],  # List of phases that have been entered
            'finished': False,
            'work': False
        }
    
    class FirmwareAnalysis(models.Model):
        # ... other fields ...
        id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True)
        firmware_name = models.CharField(editable=True, default="File unknown", max_length=127)
        # This JSONField holds the real-time status updates
        status = models.JSONField(null=False, default=jsonfield_default_value)
        finished = models.BooleanField(default=False, blank=False)
        failed = models.BooleanField(default=False, blank=False)
        # ... other fields ...
    • status = models.JSONField(...): This field acts as a dynamic dictionary in the database. It stores the percentage complete, the last_module processed, a module_list (history of completed modules), last_phase (current phase), and phase_list (history of entered phases). This allows the LogReader to update these values and the ProgressConsumer to retrieve them efficiently.
  2. embark/embark/logreader.py - The Log File Watcher: This component is a background process dedicated to watching the emba.log file generated by each running EMBA analysis. It's like a dedicated security camera constantly monitoring EMBA's activity.

    # Simplified snippet from embark/embark/logreader.py
    import logging
    import re
    import time
    from pathlib import Path
    
    from inotify_simple import INotify, flags # For watching file system events
    from asgiref.sync import async_to_sync
    from channels.layers import get_channel_layer # For sending messages to WebSockets
    from uploader.models import FirmwareAnalysis
    
    logger = logging.getLogger(__name__)
    
    class LogReader:
        def __init__(self, firmware_id):
            self.firmware_id = firmware_id
            self.analysis = FirmwareAnalysis.objects.get(id=self.firmware_id)
            self.user = self.analysis.user
            self.room_group_name = f"services_{self.user}"
            self.channel_layer = get_channel_layer()
            self.status_msg = { # Local copy to build up the status
                "percentage": 0, "module": "", "phase": ""
            }
            # ... initialization ...
            if self.analysis:
                self.read_loop()
    
        def save_status(self):
            # Update the analysis object in the database
            self.analysis.status["percentage"] = self.status_msg["percentage"]
            self.analysis.status["last_update"] = str(timezone.now())
            if self.status_msg["module"] != self.analysis.status["last_module"]:
                self.analysis.status["last_module"] = self.status_msg["module"]
                self.analysis.status["module_list"].append(self.status_msg["module"])
            if self.status_msg["phase"] != self.analysis.status["last_phase"]:
                self.analysis.status["last_phase"] = self.status_msg["phase"]
                self.analysis.status["phase_list"].append(self.status_msg["phase"])
            self.analysis.save(update_fields=["status"])
    
            # Send the updated status to the WebSocket group for this user
            async_to_sync(self.channel_layer.group_send)(
                self.room_group_name, {
                    "type": 'send.message',
                    "message": {str(self.analysis.id): self.analysis.status}
                }
            )
    
        # Identifies EMBA phase (e.g., Pre-checking, Testing) from log messages
        @staticmethod
        def phase_identify(status_message):
            # ... regex patterns to match phases ...
            # ... logic to calculate max_module and phase_nmbr based on matched pattern ...
            return max_module, phase_nmbr
    
        # Updates current module status and calculates percentage
        def update_status(self, stream_item_list):
            percentage = 0
            max_module, phase_nmbr = self.phase_identify(self.status_msg)
            # ... logic to calculate percentage based on modules and phases ...
            self.status_msg["module"] = stream_item_list[0]
            self.status_msg["percentage"] = percentage
            self.save_status()
    
        # Updates current phase
        def update_phase(self, stream_item_list):
            self.status_msg["phase"] = stream_item_list[1]
            self.save_status()
    
        def read_loop(self):
            # Continuously watches for changes in the emba.log file
            # If a change is detected, it calls get_diff() to find new lines
            # and input_processing() to parse them.
            emba_log_path = f"{self.analysis.path_to_logs}/emba.log"
            while not self.finish:
                if Path(emba_log_path).exists():
                    events = self.inotify_events(emba_log_path)
                    for event in events:
                        # Simplified: If log file is modified
                        if flags.MODIFY in flags.from_mask(event.mask):
                            diff = self.get_diff(emba_log_path) # Get new lines
                            self.input_processing(diff)        # Process and send updates
                time.sleep(5) # Check every 5 seconds if no inotify event
    
        def get_diff(self, log_file):
            # Reads the difference between the current log file and its last known state
            # ... uses difflib to find new lines in emba.log ...
            return new_lines_from_log
    
        def input_processing(self, tmp_inp):
            # Uses regular expressions (regex) to extract status and phase messages
            # from the log file content and calls update_status/update_phase.
            status_pattern = r"\[\*\]*" # Matches module finished messages
            phase_pattern = r"\[\!\]*"  # Matches phase change messages
            cur_ar = tmp_inp.splitlines()
    
            # Observer for status messages (e.g., "F02_toolchain finished")
            # Filters lines matching the status_pattern and extracts info
            # Calls self.update_status(extracted_info)
            # ... RxPy stream processing for status ...
    
            # Observer for phase messages (e.g., "Testing phase")
            # Filters lines matching the phase_pattern and extracts info
            # Calls self.update_phase(extracted_info)
            # ... RxPy stream processing for phase ...
    
        @classmethod
        def inotify_events(cls, path):
            # Uses Linux's inotify to get real-time notifications when a file changes.
            inotify = INotify()
            watch_flags = flags.MODIFY | flags.CLOSE_WRITE
            try:
                inotify.add_watch(path, watch_flags)
                return inotify.read()
            except builtins.Exception:
                logger.error("inotify_event error for path: %s", path)
                return []
    • LogReader: This class is instantiated for each active firmware analysis. Its main job is to monitor the emba.log file generated by EMBA.
    • inotify_events: This function uses a Linux feature called inotify to efficiently watch the log file for any changes. Instead of constantly rereading the whole file, inotify alerts LogReader only when the file is modified.
    • get_diff: When a change is detected, this function finds just the new lines that have been added to the emba.log file.
    • input_processing: This is where the magic of extraction happens. It uses regex (patterns) to identify specific messages in the log lines:
      • [*] patterns indicate a module has finished (e.g., [*] F02_toolchain finished).
      • [!] patterns indicate a phase change (e.g., [!] Testing phase).
    • update_status / update_phase: Once a module or phase message is identified, these functions update the status_msg dictionary, calculate the overall percentage complete, and then call save_status.
    • save_status: This function does two critical things:
      1. It updates the status JSON field in the FirmwareAnalysis database record for that specific analysis.
      2. It uses Django Channels (async_to_sync(self.channel_layer.group_send)) to push this updated status message to any connected WebSockets.
  3. embark/embark/consumers.py - The WebSocket Gateway: This file defines the ProgressConsumer, which is the server-side part that handles the WebSocket connections from your web browser.

    # Simplified snippet from embark/embark/consumers.py
    import json
    import logging
    from channels.db import database_sync_to_async
    from channels.generic.websocket import AsyncWebsocketConsumer
    from uploader.models import FirmwareAnalysis
    
    logger = logging.getLogger(__name__)
    
    class ProgressConsumer(AsyncWebsocketConsumer):
        # Called when a new WebSocket connection is established
        async def connect(self):
            # Create a unique "room group" for each user's progress updates
            self.room_group_name = f"services_{self.scope['user']}"
            await self.channel_layer.group_add(
                self.room_group_name,
                self.channel_name
            )
            await self.accept() # Accept the WebSocket connection
    
        # Called when the LogReader sends a message to this user's group
        async def send_message(self, event):
            message = event['message']
            # Send the received message as JSON to the connected browser
            await self.send(json.dumps(message, sort_keys=False))
    
        # Disconnect and remove from group
        async def disconnect(self, code):
            await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
    
        # Called when receiving data from the frontend (e.g., "Reload")
        async def receive(self, text_data=None, bytes_data=None):
            if text_data == "Reload":
                # On initial load or explicit request, get all current analysis statuses
                # from the DB and send them to the browser.
                analysis_list = await database_sync_to_async(FirmwareAnalysis.objects.filter)(user=self.scope['user']).exclude(failed=True)
                message = {str(analysis_.id): analysis_.status for analysis_ in analysis_list}
                await self.channel_layer.group_send(self.room_group_name, {"type": 'send.message', "message": message})
    • ProgressConsumer: This class acts as the bridge between the backend LogReader and your browser.
    • connect: When you open the dashboard page, your browser establishes a WebSocket connection to this consumer. The consumer then "subscribes" you to a unique room_group_name (e.g., services_yourusername). This ensures you only receive updates for your own analyses.
    • send_message: This is the crucial part. When the LogReader (from logreader.py) has a new status update, it sends it to this room_group. The send_message method then takes that update and instantly pushes it as a JSON message through the WebSocket directly to your browser.
    • receive: Allows the frontend to send messages (like "Reload") to get an initial dump of all current statuses.
  4. embark/embark/routing.py - WebSocket URL Configuration: Just like regular web pages have URLs (e.g., /dashboard), WebSockets also need URLs. This file maps the WebSocket endpoint to our ProgressConsumer.

    # Simplified snippet from embark/embark/routing.py
    from django.urls import path
    from channels.routing import URLRouter
    from embark.consumers import ProgressConsumer
    
    # url patterns for websocket communication
    ws_urlpatterns = URLRouter([
        path('ws/progress', ProgressConsumer.as_asgi(), name="websocket-progress"),
        # ... other WebSocket paths ...
    ])
    • path('ws/progress', ProgressConsumer.as_asgi(), ...): This line tells EMBArk that any WebSocket connection to the /ws/progress URL should be handled by our ProgressConsumer.
  5. embark/static/scripts/serviceDashboard.js - The Browser-Side Magic: This JavaScript file is loaded by your web browser when you visit the dashboard. It initiates the WebSocket connection and handles incoming real-time updates.

    // Simplified snippet from embark/static/scripts/serviceDashboard.js
    
    // Determine WebSocket URL based on current page protocol
    var loc = window.location;
    var wsStart = 'ws://';
    var wsPort = ':8001'; // Default for HTTP
    if (loc.protocol == 'https:') {
          wsStart = 'wss://';
          wsPort = ':8000'; // Default for HTTPS
    }
    // Establish the WebSocket connection
    var socket = new WebSocket(
            wsStart + location.hostname + wsPort + '/ws/progress'
    );
    
    // This method is called when the WebSocket connection is established
    socket.onopen = function () {
        console.log("[open] Connection established");
        socket.send("Reload"); // Request initial status updates
    };
    
    // This method is called whenever a message from the backend arrives
    socket.onmessage = function (event) {
        console.log("Received a update");
        var data = JSON.parse(event.data); // Parse the JSON data from the server
    
        try {
            for (const analysis_id in data){
                var status_dict = data[analysis_id]; // Get status for a specific analysis
                var container_id = "Container_" + status_dict.analysis;
    
                // Check if a container for this analysis already exists in the UI
                var existingContainer = document.getElementById(container_id);
    
                if (existingContainer == null) {
                    // If new analysis, add a new container to the dashboard
                    if (status_dict.finished == true){
                        add_container_to_finished(status_dict); // Add to finished section
                    } else {
                        add_container_to_running(status_dict); // Add to running section
                        // Update progress bar and log lists for the new container
                        livelog_module(status_dict.module_list, status_dict.analysis);
                        livelog_phase(status_dict.phase_list, status_dict.analysis);
                        makeProgress(status_dict.percentage, status_dict.analysis);
                    }
                } else if (status_dict.finished == true ){
                    // If analysis is now finished, move its container to the "Finished" section
                    existingContainer.remove();
                    add_container_to_finished(status_dict);
                } else {
                    // If container exists and analysis is still running, just update its content
                    livelog_module(status_dict.module_list, status_dict.analysis);
                    livelog_phase(status_dict.phase_list, status_dict.analysis);
                    makeProgress(status_dict.percentage, status_dict.analysis);
                }
            }
        } catch(error){
            console.error("Error processing WebSocket message:", error);
        }
    };
    
    // ... other socket event handlers (onclose, onerror) ...
    
    // Functions to add new analysis containers to the HTML (see previous "Solving Our Use Case" section)
    function add_container_to_running(status_dict) { /* ... implementation ... */ }
    function add_container_to_finished(status_dict) { /* ... implementation ... */ }
    function set_container_to_work(status_dict) { /* ... implementation ... */ }
    • var socket = new WebSocket(...): This line establishes the actual WebSocket connection from your browser to the EMBArk server's ProgressConsumer.
    • socket.onopen: When the connection is successfully made, it sends a "Reload" message to the server to get the current status of all your running analyses.
    • socket.onmessage: This is the heartbeat of the real-time monitoring. Every time the EMBArk server pushes a new status update through the WebSocket, this function is triggered. It parses the JSON data, identifies which analysis the update is for, and then calls functions like makeProgress, livelog_module, and livelog_phase to dynamically update the progress bar and log lists on your dashboard.
    • add_container_to_running, add_container_to_finished: These functions are responsible for creating the HTML "boxes" for each analysis on your dashboard. If an analysis finishes, the onmessage handler will remove it from the "Running" section and add it to the "Finished" section.

Conclusion

Real-time Progress Monitoring transforms EMBArk from a static analysis platform into a dynamic, transparent, and engaging tool. By leveraging inotify to efficiently watch log files, processing these logs with regex, storing status in the FirmwareAnalysis model, and using WebSockets with Django Channels to push updates to the browser, EMBArk keeps you informed every step of the way. You can always see exactly what your analyses are doing, without delay or constant page refreshes.

Now that you can watch your analyses unfold live, the next natural step is to understand the results. In the next chapter, we'll dive into Reporting & Visualization, where you'll learn how EMBArk presents the complex findings of its analyses in clear, actionable reports.


Generated by AI Codebase Knowledge Builder. References: [1], [2], [3], [4], [5], [6]

Clone this wiki locally