Skip to content

feat: add drainTimeout option to protect against slow/frozen clients#1060

Merged
robertsLando merged 11 commits into
mainfrom
feat/drain-timeout
Feb 9, 2026
Merged

feat: add drainTimeout option to protect against slow/frozen clients#1060
robertsLando merged 11 commits into
mainfrom
feat/drain-timeout

Conversation

@getlarge

@getlarge getlarge commented Dec 2, 2025

Copy link
Copy Markdown
Member

When a client's TCP buffer fills and it stops reading, the broker waits indefinitely for the 'drain' event, blocking message delivery to ALL subscribers.

This proof of concept implements a drainTimeout option (disabled by default for backwards compatibility) that sets a maximum time to wait for drain. After the timeout, the slow client is disconnected and delivery continues.

Changes:

  • Add drainTimeout option to Aedes constructor (default: 0 = disabled)
  • Implement drain timeout logic in lib/write.js
  • Add TypeScript type definition
  • Add comprehensive tests (unit + E2E with readStop() + ToxiProxy)
  • Update documentation with recommendations

The solution based on timeouts might not be ideal but brings a quick fix on the table for this issue and is good enough to start the discussion.

…POC)

When a client's TCP buffer fills and it stops reading, the broker waits indefinitely for the 'drain' event, blocking message delivery to ALL subscribers.

This proof of concept implements a drainTimeout option (disabled by default  for backwards compatibility) that sets a maximum time to wait for drain.
After the timeout, the slow client is disconnected and delivery continues.

   Changes:
   - Add drainTimeout option to Aedes constructor (default: 0 = disabled)
   - Implement drain timeout logic in lib/write.js
   - Add TypeScript type definition
   - Add comprehensive tests (unit + E2E with readStop() + ToxiProxy)
   - Update documentation with recommendations
@codecov

codecov Bot commented Dec 2, 2025

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 99.47%. Comparing base (2c6882e) to head (bb2e867).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1060      +/-   ##
==========================================
+ Coverage   99.44%   99.47%   +0.03%     
==========================================
  Files          15       15              
  Lines        1797     1901     +104     
==========================================
+ Hits         1787     1891     +104     
  Misses         10       10              
Flag Coverage Δ
unittests 99.47% <100.00%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@robertsLando robertsLando requested a review from mcollina December 2, 2025 12:52
@robertsLando

Copy link
Copy Markdown
Member

@mcollina would be glad to know your opinion on this. I faced an issue on production that seems related to this and speaking with @getlarge about it I think the reason is what he describes here.

An alternative is a stall in mqemitter and/or fastparallel for some other reasons. This happens only when there is an high frequency of messages incoming to the broker

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in drainTimeout safeguard to prevent a single slow/frozen TCP client from indefinitely blocking broker message delivery, by timing out stalled socket drains and disconnecting the offending client.

Changes:

  • Add drainTimeout option (default 0 = disabled) to broker defaults and TypeScript options.
  • Replace direct drain waiting in lib/write.js with a client-level waitForDrain() that supports timeouts and coalesces callbacks/timers.
  • Add new unit + E2E tests (including Docker-based ToxiProxy integration) and expand docs with guidance.

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
aedes.js Adds default drainTimeout: 0 to broker options.
lib/write.js Routes backpressure handling through client.waitForDrain().
lib/client.js Implements coalesced drain waiting + timeout-driven disconnect behavior.
types/instance.d.ts Exposes drainTimeout?: number in AedesOptions.
docs/Aedes.md Documents drainTimeout and its interaction with concurrency/backpressure.
test/helper.js Adds platform-skip helper used by new drain timeout tests.
test/drain-timeout.js Adds unit + readStop-based E2E coverage for drain timeout behavior.
test/drain-toxiproxy.js Adds Docker/ToxiProxy integration tests for slow/frozen client scenarios.
package.json Adds dev deps for Testcontainers + ToxiProxy client.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread test/helper.js Outdated
Comment thread lib/client.js
Comment thread test/drain-timeout.js Outdated
@robertsLando robertsLando changed the title fix: add drainTimeout option to protect against slow/frozen clients [WIP] feat: add drainTimeout option to protect against slow/frozen clients Feb 9, 2026
@robertsLando robertsLando requested a review from Copilot February 9, 2026 08:55
@robertsLando robertsLando changed the title feat: add drainTimeout option to protect against slow/frozen clients feat: add drainTimeout option to protect against slow/frozen clients Feb 9, 2026

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread aedes.js
Comment thread lib/client.js
Comment thread test/drain-toxiproxy.js Outdated
Comment thread test/drain-toxiproxy.js
@robertsLando robertsLando merged commit af1ae4b into main Feb 9, 2026
36 of 37 checks passed
@seriousme

Copy link
Copy Markdown
Contributor

@robertsLando : do we really need toxi-proxy or can we use a nodejs native small proxy instead, reducing the number of dependancies?

@robertsLando

Copy link
Copy Markdown
Member

@seriousme those are dev deps, btw I'm open to drop them in case you want to do a refactor of tests 🙏🏼

@getlarge

Copy link
Copy Markdown
Member Author

@seriousme toxi-proxy is just the just the client for the actual proxy running in Docker (if I remember correctly, via testcontainers).
So you could replace toxi-proxy lib by a plain fetch.

@seriousme

Copy link
Copy Markdown
Contributor

@getlarge @robertsLando
I'm no guru on toxi-proxi but is seems like we can create a slow slow socket by creating a TCP proxy in pure Node.js.
And we might even call socket.pause() directly in the test itself. The plan is to try remove the need for both toxi-proxy and testcontainers . Once I finished the prod dependencies PR I will try to have a look again at the dev deps ;-)

@getlarge

Copy link
Copy Markdown
Member Author

@getlarge @robertsLando I'm no guru on toxi-proxi but is seems like we can create a slow slow socket by creating a TCP proxy in pure Node.js. And we might even call socket.pause() directly in the test itself. The plan is to try remove the need for both toxi-proxy and testcontainers . Once I finished the prod dependencies PR I will try to have a look again at the dev deps ;-)

Yes socket.pause is the trick, but the goal of toxi-proxy is to replicate more closely the network conditions. In reality, the socket will probably not abruptly pause.

Our first iteration (I believe this was with Claude, months ago) looked like this:

/**
 * Demo 2b: Socket Drain Deadlock - Many Clients Edition
 *
 * Same problem as 02-socket-drain-deadlock.ts, but with:
 * - Smaller messages (4KB - realistic MQTT/WebSocket payload)
 * - Many more clients (20+)
 * - Rapid-fire broadcasts to fill the buffer faster
 *
 * This version emphasizes the "1 out of N" problem:
 * - 19 healthy clients, ready to receive
 * - 1 slow client (stuck on 3G, or malicious)
 * - The slow client freezes ALL 19 healthy clients
 *
 * Run:
 *   npx tsx 02b-socket-drain-many-clients.ts           # Both modes
 *   npx tsx 02b-socket-drain-many-clients.ts patient   # Deadlock
 *   npx tsx 02b-socket-drain-many-clients.ts impatient # Recovery
 */

import { createServer, createConnection, Socket, Server } from "node:net";

// Configuration
const PORT = 4001;
const MESSAGE_SIZE = 16 * 1024; // 16KB - still realistic, but fills buffer faster
const DRAIN_TIMEOUT_MS = 2000;
const NUM_CLIENTS = 20;
const SLOW_CLIENT_ID = 7; // Client #7 is the bad apple
const NUM_BROADCASTS = 100; // Many broadcasts to fill buffer
const BROADCAST_DELAY_MS = 0; // No delay - rapid fire

/**
 * Sequential emitter - same as before.
 */
class SequentialEmitter {
  private handlers: Array<{
    id: number;
    fn: (msg: Buffer, done: () => void) => void;
  }> = [];

  subscribe(
    id: number,
    fn: (msg: Buffer, done: () => void) => void
  ): () => void {
    this.handlers.push({ id, fn });
    return () => {
      this.handlers = this.handlers.filter((h) => h.id !== id);
    };
  }

  async emit(msg: Buffer): Promise<number> {
    let delivered = 0;
    for (const handler of this.handlers) {
      await new Promise<void>((resolve) => handler.fn(msg, resolve));
      delivered++;
    }
    return delivered;
  }

  get size() {
    return this.handlers.length;
  }
}

interface ClientState {
  id: number;
  socket: Socket;
  isSlow: boolean;
  unsubscribe: () => void;
}

const emitter = new SequentialEmitter();
const clients = new Map<number, ClientState>();

function patientWrite(client: ClientState, msg: Buffer, done: () => void): void {
  const ok = client.socket.write(msg);
  if (!ok) {
    // Only log for slow client to reduce noise
    if (client.isSlow) {
      console.log(`    [!] Client ${client.id} (SLOW) buffer full, waiting...`);
    }
    client.socket.once("drain", done);
  } else {
    done();
  }
}

function impatientWrite(client: ClientState, msg: Buffer, done: () => void): void {
  const ok = client.socket.write(msg);
  if (!ok) {
    if (client.isSlow) {
      console.log(
        `    [!] Client ${client.id} (SLOW) buffer full, waiting (max ${DRAIN_TIMEOUT_MS}ms)...`
      );
    }

    const timeout = setTimeout(() => {
      client.socket.removeListener("drain", onDrain);
      console.log(`    [!] Client ${client.id} DRAIN TIMEOUT - disconnecting`);
      client.socket.destroy(new Error("drain timeout"));
      done();
    }, DRAIN_TIMEOUT_MS);

    function onDrain() {
      clearTimeout(timeout);
      done();
    }
    client.socket.once("drain", onDrain);
  } else {
    done();
  }
}

async function broadcast(num: number): Promise<boolean> {
  const msg = Buffer.alloc(MESSAGE_SIZE, "M");

  // Log every 10th broadcast to reduce noise
  if (num % 10 === 1 || num === NUM_BROADCASTS) {
    console.log(
      `[Broadcast ${num}/${NUM_BROADCASTS}] Sending ${MESSAGE_SIZE / 1024}KB to ${emitter.size} clients...`
    );
  }

  const BROADCAST_TIMEOUT = 10000;

  try {
    await Promise.race([
      emitter.emit(msg),
      new Promise<never>((_, reject) =>
        setTimeout(() => reject(new Error("DEADLOCK")), BROADCAST_TIMEOUT)
      ),
    ]);
    return true;
  } catch {
    return false;
  }
}

function createClient(id: number, isSlow: boolean): Promise<Socket> {
  return new Promise((resolve, reject) => {
    const socket = createConnection({ port: PORT }, () => {
      if (isSlow) {
        console.log(`[Client ${id}] Connected (SLOW - the bad apple)`);
        socket.pause();
      }
      // Don't log healthy clients - too noisy with 20
      else if (id === 1 || id === NUM_CLIENTS) {
        console.log(`[Client ${id}] Connected`);
      }

      if (!isSlow) {
        socket.on("data", () => {}); // Consume data
      }

      resolve(socket);
    });
    socket.on("error", reject);
  });
}

async function runScenario(
  name: string,
  useTimeout: boolean
): Promise<{ broadcasts: number; deadlocked: boolean }> {
  console.log(`\n${"=".repeat(70)}`);
  console.log(`SCENARIO: ${name}`);
  console.log(
    `Clients: ${NUM_CLIENTS} | Slow client: #${SLOW_CLIENT_ID} | Broadcasts: ${NUM_BROADCASTS} | Message: ${MESSAGE_SIZE / 1024}KB`
  );
  console.log(`${"=".repeat(70)}`);

  const writeFn = useTimeout ? impatientWrite : patientWrite;

  const server: Server = createServer((socket) => {
    const id = clients.size + 1;
    const isSlow = id === SLOW_CLIENT_ID;

    const client: ClientState = {
      id,
      socket,
      isSlow,
      unsubscribe: () => {},
    };

    clients.set(id, client);

    client.unsubscribe = emitter.subscribe(id, (msg, done) => {
      writeFn(client, msg, done);
    });

    socket.on("close", () => {
      if (isSlow) console.log(`[Server] Client ${id} (SLOW) disconnected`);
      clients.delete(id);
      client.unsubscribe();
    });

    socket.on("error", () => {});
  });

  await new Promise<void>((resolve) => server.listen(PORT, resolve));
  console.log(`[Server] Listening on port ${PORT}`);

  // Connect all clients
  console.log(`[Setup] Connecting ${NUM_CLIENTS} clients...`);
  const clientSockets: Socket[] = [];
  for (let i = 1; i <= NUM_CLIENTS; i++) {
    const isSlow = i === SLOW_CLIENT_ID;
    const socket = await createClient(i, isSlow);
    clientSockets.push(socket);
  }
  console.log(`[Setup] All ${NUM_CLIENTS} clients connected\n`);

  await new Promise((r) => setTimeout(r, 100));

  // Run broadcasts
  let successfulBroadcasts = 0;
  let deadlocked = false;
  const start = Date.now();

  for (let i = 1; i <= NUM_BROADCASTS; i++) {
    const success = await broadcast(i);
    if (success) {
      successfulBroadcasts++;
    } else {
      deadlocked = true;
      console.log(`\n${"!".repeat(70)}`);
      console.log(
        `DEADLOCK at broadcast ${i}! Client #${SLOW_CLIENT_ID} froze ${NUM_CLIENTS - 1} healthy clients.`
      );
      console.log(`${"!".repeat(70)}`);
      break;
    }
    await new Promise((r) => setTimeout(r, BROADCAST_DELAY_MS));
  }

  const elapsed = Date.now() - start;

  // Cleanup
  for (const socket of clientSockets) {
    socket.destroy();
  }
  server.close();
  clients.clear();

  console.log(`\n${"-".repeat(70)}`);
  console.log(
    `Results: ${successfulBroadcasts}/${NUM_BROADCASTS} broadcasts in ${elapsed}ms`
  );
  console.log(`Deadlocked: ${deadlocked ? "YES" : "No"}`);

  return { broadcasts: successfulBroadcasts, deadlocked };
}

async function main() {
  const mode = process.argv[2] || "both";

  console.log(`
╔══════════════════════════════════════════════════════════════════════╗
║  Socket Drain Deadlock - Many Clients Edition                        ║
╠══════════════════════════════════════════════════════════════════════╣
${NUM_CLIENTS} clients connected. ${NUM_CLIENTS - 1} are healthy and responsive.              ║
║  1 client (#${SLOW_CLIENT_ID}) is slow/stuck (stopped reading).                     ║
║                                                                      ║
║  The server delivers messages SEQUENTIALLY to each client.           ║
║  When it reaches client #${SLOW_CLIENT_ID}, the buffer fills and never drains.     ║
║  Result: ALL ${NUM_CLIENTS - 1} healthy clients are blocked by 1 bad client.          ║
╚══════════════════════════════════════════════════════════════════════╝
`);

  if (mode === "patient" || mode === "both") {
    const result = await runScenario("PATIENT (no drain timeout)", false);

    if (result.deadlocked) {
      console.log(`
┌───────────────────────────────────────────────────────────────────────┐
│ ONE BAD CLIENT FROZE ${(NUM_CLIENTS - 1).toString().padEnd(2)} HEALTHY CLIENTS                            │
├───────────────────────────────────────────────────────────────────────┤
│ This is the Aedes MQTT broker bug:                                    │
│ - 1 mobile client on flaky 3G connection                              │
│ - Entire broker freezes                                               │
│ - Thousands of healthy clients get disconnect errors                  │
│                                                                       │
│ Without a drain timeout, patience becomes a vulnerability.            │
└───────────────────────────────────────────────────────────────────────┘
`);
    }
  }

  await new Promise((r) => setTimeout(r, 500));

  if (mode === "impatient" || mode === "both") {
    const result = await runScenario("IMPATIENT (2s drain timeout)", true);

    console.log(`
┌───────────────────────────────────────────────────────────────────────┐
│ SLOW CLIENT KICKED, SYSTEM RECOVERED                                  │
├───────────────────────────────────────────────────────────────────────┤
│ When client #${SLOW_CLIENT_ID}'s buffer filled, server waited max ${DRAIN_TIMEOUT_MS}ms.             │
│ Timeout fired → slow client disconnected → broadcasts continued.     │
│                                                                       │
│ Result: ${result.broadcasts}/${NUM_BROADCASTS} broadcasts succeeded.                              │
${NUM_CLIENTS - 1} healthy clients were protected from 1 bad client.                 │
│                                                                       │
│ The "impatient" timeout is a SECURITY BOUNDARY:                       │
│ - One bad actor can't DoS the entire system                           │
│ - Healthy clients stay connected                                      │
│ - Slow client can reconnect when conditions improve                   │
└───────────────────────────────────────────────────────────────────────┘
`);
  }

  console.log("Done.");
}

main().catch(console.error);

@seriousme

Copy link
Copy Markdown
Contributor

Well Gemini suggests a more compact and readable solution:

const net = require('net');

// Configuration
const BROKER_PORT = 1883;       // The actual Aedes broker port
const PROXY_PORT = 10000;       // The port the 'slow client' connects to
const CHUNK_SIZE = 10;          // Send only 10 bytes at a time
const DELAY_MS = 100;           // Wait 100ms between each chunk (simulates a slow connection)

const proxyServer = net.createServer((clientSocket) => {
  // 1. Establish a connection to the actual Aedes broker
  const brokerSocket = net.connect(BROKER_PORT, '127.0.0.1');

  // Traffic from Client -> Broker (usually fast, left unmodified)
  clientSocket.pipe(brokerSocket);

  // Traffic from Broker -> Client (this is what we will throttle/delay)
  brokerSocket.on('data', (data) => {
    // Pause the broker socket so the buffer doesn't overflow in Node.js memory
    brokerSocket.pause();

    let offset = 0;

    function sendChunk() {
      if (offset < data.length) {
        const chunk = data.subarray(offset, offset + CHUNK_SIZE);
        offset += CHUNK_SIZE;

        // Write a small chunk of data to the client
        clientSocket.write(chunk, () => {
          // Once the chunk is successfully written, wait DELAY_MS before sending the next one
          setTimeout(sendChunk, DELAY_MS);
        });
      } else {
        // All data from this 'data' emission has been processed, resume the broker socket
        brokerSocket.resume();
      }
    }

    sendChunk();
  });

  // Error handling and socket cleanup
  clientSocket.on('error', () => { brokerSocket.destroy(); });
  brokerSocket.on('error', () => { clientSocket.destroy(); });
  clientSocket.on('close', () => { brokerSocket.end(); });
  brokerSocket.on('close', () => { clientSocket.end(); });
});

proxyServer.listen(PROXY_PORT, () => {
  console.log(`Slow proxy listening on port ${PROXY_PORT} and forwarding to ${BROKER_PORT}`);
}); 

@seriousme

Copy link
Copy Markdown
Contributor

Btw the new tests succeed but also give out warnings like:

(node:42002) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 3 end listeners added to [Socket]. MaxListeners is 2. Use emitter.setMaxListeners() to increase limit
(node:42002) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 3 end listeners added to [Socket]. MaxListeners is 2. Use emitter.setMaxListeners() to increase limit

Since warnings do not fail tests the LLM this will go unnoticed unless you run the tests yourself, when you do run them yourself its a bit uncanny.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants