Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/default/cuckoo.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ upload_max_size = 100000000
# Prevent upload of files that passes upload_max_size?
do_upload_max_size = no

# Enable multi-worker mode: one ResultServer process per VM
multiworker = no
# Base port for auto-assigning per-VM ports (multiworker mode)
# VM 1 gets base_port, VM 2 gets base_port+1, etc.
base_port = 2042

[processing]
# Set the maximum size of analyses generated files to process. This is used
# to avoid the processing of big files which may take a lot of processing
Expand Down
19 changes: 13 additions & 6 deletions lib/cuckoo/common/abstracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,11 @@ def initialize(self) -> None:

def _initialize(self) -> None:
"""Read configuration."""
multiworker_enabled = cfg.resultserver.get("multiworker", False)
base_port = cfg.resultserver.get("base_port", 2042)

mmanager_opts = self.options.get(self.module_name)
for machine_id in mmanager_opts["machines"]:
for machine_index, machine_id in enumerate(mmanager_opts["machines"]):
try:
machine_opts = self.options.get(machine_id.strip())
machine = Dictionary()
Expand All @@ -176,12 +179,16 @@ def _initialize(self) -> None:
machine.resultserver_ip = machine_opts.get("resultserver_ip", cfg.resultserver.ip)
machine.resultserver_port = machine_opts.get("resultserver_port")
if machine.resultserver_port is None:
# The ResultServer port might have been dynamically changed,
# get it from the ResultServer singleton. Also avoid import
# recursion issues by importing ResultServer here.
from lib.cuckoo.core.resultserver import ResultServer
if multiworker_enabled:
# In multiworker mode, auto-assign sequential ports
machine.resultserver_port = base_port + machine_index
else:
# The ResultServer port might have been dynamically changed,
# get it from the ResultServer singleton. Also avoid import
# recursion issues by importing ResultServer here.
from lib.cuckoo.core.resultserver import ResultServer

machine.resultserver_port = ResultServer().port
machine.resultserver_port = ResultServer().port

# Strip parameters.
for key, value in machine.items():
Expand Down
20 changes: 10 additions & 10 deletions lib/cuckoo/core/analysis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,15 +583,15 @@ def route_network(self):
self.machine.ip,
str(routing.inetsim.server),
str(routing.inetsim.dnsport),
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(routing.inetsim.ports),
)

elif self.route == "tor":
self.rooter_response = rooter(
"socks5_enable",
self.machine.ip,
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(routing.tor.dnsport),
str(routing.tor.proxyport),
)
Expand All @@ -600,14 +600,14 @@ def route_network(self):
self.rooter_response = rooter(
"socks5_enable",
self.machine.ip,
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(self.socks5s[self.route]["dnsport"]),
str(self.socks5s[self.route]["port"]),
)
self.rooter_response = rooter("libvirt_fwo_enable", self.machine.interface, self.machine.ip)

elif self.route in ("none", "None", "drop"):
self.rooter_response = rooter("drop_enable", self.machine.ip, str(self.cfg.resultserver.port))
self.rooter_response = rooter("drop_enable", self.machine.ip, str(self.machine.resultserver_port))
elif self.route[:3] == "tun" and is_network_interface(self.route):
self.log.info("Network interface %s is tunnel", self.interface)
self.rooter_response = rooter("interface_route_tun_enable", self.machine.ip, self.route, str(self.task.id))
Expand Down Expand Up @@ -648,7 +648,7 @@ def route_network(self):
self.machine.ip,
self.cfg.resultserver.ip,
"tcp",
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
)
self.rooter_response = rooter(
"forward_enable", input_interface, self.machine.interface, self.cfg.resultserver.ip, self.machine.ip
Expand Down Expand Up @@ -696,7 +696,7 @@ def unroute_network(self):
self.machine.ip,
self.cfg.resultserver.ip,
"tcp",
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
)
self.rooter_response = rooter(
"forward_disable", input_interface, self.machine.interface, self.cfg.resultserver.ip, self.machine.ip
Expand All @@ -720,15 +720,15 @@ def unroute_network(self):
self.machine.ip,
routing.inetsim.server,
str(routing.inetsim.dnsport),
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(routing.inetsim.ports),
)

elif self.route == "tor":
self.rooter_response = rooter(
"socks5_disable",
self.machine.ip,
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(routing.tor.dnsport),
str(routing.tor.proxyport),
)
Expand All @@ -737,14 +737,14 @@ def unroute_network(self):
self.rooter_response = rooter(
"socks5_disable",
self.machine.ip,
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(self.socks5s[self.route]["dnsport"]),
str(self.socks5s[self.route]["port"]),
)
self.rooter_response = rooter("libvirt_fwo_disable", self.machine.interface, self.machine.ip)

elif self.route in ("none", "None", "drop"):
self.rooter_response = rooter("drop_disable", self.machine.ip, str(self.cfg.resultserver.port))
self.rooter_response = rooter("drop_disable", self.machine.ip, str(self.machine.resultserver_port))
elif self.route[:3] == "tun":
self.log.info("Disable tunnel interface: %s", self.interface)
self.rooter_response = rooter("interface_route_tun_disable", self.machine.ip, self.route, str(self.task.id))
Expand Down
194 changes: 191 additions & 3 deletions lib/cuckoo/core/resultserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import errno
import json
import logging
import multiprocessing
import os
import signal
import socket
import struct
from contextlib import suppress
Expand Down Expand Up @@ -598,13 +600,150 @@ def negotiate_protocol(self, task_id, ctx):
return klass(task_id, ctx, version)


class SingleVMResultServerWorker(GeventResultServerWorker):
"""A GeventResultServerWorker variant for multiworker mode.

Instead of maintaining an IP-to-task mapping, this worker reads
the current task_id from a shared multiprocessing.Value, since
it only serves a single VM.
"""

def __init__(self, sock, shared_task_id, vm_ip, **kwargs):
super().__init__(sock, **kwargs)
self._shared_task_id = shared_task_id
self._vm_ip = vm_ip

def handle(self, sock, addr):
"""Override handle to use shared task_id instead of IP lookup."""
task_id = self._shared_task_id.value
if not task_id:
log.warning("Worker for %s has no active task, rejecting connection from %s", self._vm_ip, addr[0])
return

self.task_id = task_id
self.storagepath = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id))
self.create_folders()

ctx = HandlerContext(task_id, self.storagepath, sock)
task_log_start(task_id)
try:
try:
protocol = self.negotiate_protocol(task_id, ctx)
except EOFError:
return

if not protocol:
return
Comment on lines +644 to +645
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential race condition here. The task could be cancelled and a new one started for this VM while negotiate_protocol() is blocking on I/O. The original GeventResultServerWorker handles this by re-checking the task ID after negotiation. This check is missing here and should be added to prevent results from being associated with the wrong task. This could lead to incorrect analysis results.

            if self._shared_task_id.value != task_id:
                log.warning(
                    "Task #%d for VM %s was cancelled during negotiation, new task is %s",
                    task_id, self._vm_ip, self._shared_task_id.value or "none"
                )
                return

            if not protocol:
                return


# Register context for cancellation
with self.task_mgmt_lock:
s = self.handlers.setdefault(task_id, set())
s.add(ctx)

try:
with protocol:
protocol.handle()
except CuckooOperationalError as e:
log.exception(e)
finally:
with self.task_mgmt_lock:
s.discard(ctx)
ctx.cancel()
if ctx.buf:
log.warning("Task #%s with protocol %s has unprocessed data", task_id, protocol)
finally:
task_log_stop(task_id)

def cancel_task(self, task_id):
"""Cancel all handlers for a task."""
with self.task_mgmt_lock:
ctxs = self.handlers.pop(task_id, set())
for ctx in ctxs:
ctx.cancel()
task_log_stop_force(task_id)


class ResultServerWorkerProcess(multiprocessing.Process):
"""Dedicated ResultServer process for a single VM.

Each worker runs its own gevent event loop and StreamServer,
listening on a unique port assigned to one VM. This bypasses
the GIL limitation of the single-process model by giving each
VM its own Python process for handling result uploads.
"""

def __init__(self, ip, port, listen_ip):
super().__init__(daemon=True, name=f"ResultServerWorker-{ip}:{port}")
self.ip = ip
self.port = port
self.listen_ip = listen_ip
self._task_id = multiprocessing.Value("i", 0)
self._ready = multiprocessing.Event()

def run(self):
"""Entry point for the worker process. Sets up a standalone
gevent StreamServer and serves forever."""
signal.signal(signal.SIGINT, signal.SIG_IGN)

# Reinitialize gevent's event loop after fork to avoid
# inheriting the parent's loop state (watchers, callbacks, etc.)
import gevent
import gevent.socket

gevent.reinit()
gevent.get_hub()

sock = gevent.socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((self.listen_ip, self.port))
except OSError as e:
log.error("Worker for %s failed to bind on %s:%s: %s", self.ip, self.listen_ip, self.port, e)
return
sock.listen(32)

server = SingleVMResultServerWorker(sock, self._task_id, self.ip, spawn="default")
self._ready.set()
log.info("ResultServer worker for VM %s started on port %d (pid %d)", self.ip, self.port, os.getpid())
server.serve_forever()

def set_task(self, task_id):
"""Register a task with this worker (called from parent process)."""
self._task_id.value = task_id
self._ready.wait(timeout=10)

def clear_task(self, task_id):
"""Clear the current task (called from parent process)."""
self._task_id.value = 0

def get_task_id(self):
return self._task_id.value


class ResultServer(metaclass=Singleton):
"""Manager for the ResultServer worker and task state."""
"""Manager for the ResultServer worker and task state.

Supports two modes:
- Legacy (default): Single gevent StreamServer on one port for all VMs
- Multiworker: One dedicated process per VM, each on its own port
"""

def __init__(self):
if not categories_need_VM:
return

self.multiworker = cfg.resultserver.get("multiworker", False)
self.ip = cfg.resultserver.ip

if self.multiworker:
self.workers = {} # ip -> ResultServerWorkerProcess
self.port = cfg.resultserver.get("base_port", 2042)
log.info("ResultServer starting in multiworker mode (base_port=%d)", self.port)
else:
self._start_legacy_server()

def _start_legacy_server(self):
"""Start the traditional single-port ResultServer."""
ip = cfg.resultserver.ip
port = cfg.resultserver.port
pool_size = cfg.resultserver.pool_size
Expand Down Expand Up @@ -647,11 +786,60 @@ def __init__(self):

def add_task(self, task, machine):
"""Register a task/machine with the ResultServer."""
self.instance.add_task(task.id, machine.ip)
if self.multiworker:
self._add_task_multiworker(task, machine)
else:
self.instance.add_task(task.id, machine.ip)

def del_task(self, task, machine):
"""Delete running task and cancel existing handlers."""
self.instance.del_task(task.id, machine.ip)
if self.multiworker:
self._del_task_multiworker(task, machine)
else:
self.instance.del_task(task.id, machine.ip)

def _add_task_multiworker(self, task, machine):
"""Start or reuse a worker process for this VM and register the task."""
worker = self.workers.get(machine.ip)

# Spawn a new worker if none exists or the previous one died
if worker is None or not worker.is_alive():
if worker is not None:
log.warning("Worker for %s (pid %d) died, spawning replacement", machine.ip, worker.pid)
worker.close()
worker = ResultServerWorkerProcess(
ip=machine.ip,
port=int(machine.resultserver_port),
listen_ip=self.ip,
)
worker.start()
self.workers[machine.ip] = worker
log.debug("Task #%s: Started worker for %s on port %d", task.id, machine.ip, machine.resultserver_port)

worker.set_task(task.id)
log.debug("Task #%s: Registered with worker for %s", task.id, machine.ip)

def _del_task_multiworker(self, task, machine):
"""Clear task from worker. Worker stays alive for reuse."""
worker = self.workers.get(machine.ip)
if worker and worker.is_alive():
worker.clear_task(task.id)
log.debug("Task #%s: Cleared from worker for %s", task.id, machine.ip)
else:
log.warning("Task #%s: No alive worker found for %s", task.id, machine.ip)

def shutdown_workers(self):
"""Terminate all multiworker processes."""
if not getattr(self, "multiworker", False):
return
for ip, worker in self.workers.items():
if worker.is_alive():
log.info("Terminating ResultServer worker for %s (pid %d)", ip, worker.pid)
worker.terminate()
worker.join(timeout=5)
if worker.is_alive():
worker.kill()
self.workers.clear()

def create_server(self, sock, pool_size):
if pool_size:
Expand Down