Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions chaos/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,20 @@ async def spawn_worker() -> Process:
sent_tasks = await r.zcard("hello:sent")
received_tasks = await r.zcard("hello:received")

stream_length = await r.xlen(docket.stream_key)
pending = await r.xpending(
docket.stream_key, docket.worker_group_name
)

logger.info(
"sent: %d, received: %d, clients: %d",
"sent: %d, received: %d, stream: %d, pending: %d, clients: %d",
sent_tasks,
received_tasks,
stream_length,
pending["pending"],
connected_clients,
)
if sent_tasks >= tasks:
if sent_tasks >= tasks and received_tasks >= sent_tasks:
break
except redis.exceptions.ConnectionError as e:
logger.error(
Expand All @@ -177,23 +184,31 @@ async def spawn_worker() -> Process:

elif chaos_chance < 0.10:
worker_index = random.randrange(len(worker_processes))
worker_to_kill = worker_processes.pop(worker_index)
worker_to_kill = worker_processes[worker_index]

logger.warning("CHAOS: Killing worker %d...", worker_index)
try:
worker_to_kill.terminate()
worker_to_kill.kill()
except ProcessLookupError:
logger.warning(" What is dead may never die!")

logger.warning("CHAOS: Replacing worker %d...", worker_index)
worker_processes.append(await spawn_worker())
elif chaos_chance < 0.15:
logger.warning("CHAOS: Queuing a toxic task...")
try:
await docket.add(toxic)()
except redis.exceptions.ConnectionError:
pass

# Check if any worker processes have died and replace them
for i in range(len(worker_processes)):
process = worker_processes[i]
if process.returncode is not None:
logger.warning(
"Worker %d has died with code %d, replacing it...",
i,
process.returncode,
)
worker_processes[i] = await spawn_worker()

await asyncio.sleep(0.25)

async with docket.redis() as r:
Expand Down Expand Up @@ -225,5 +240,6 @@ async def spawn_worker() -> Process:

if __name__ == "__main__":
mode = sys.argv[1] if len(sys.argv) > 1 else "chaos"
tasks = int(sys.argv[2]) if len(sys.argv) > 2 else 20000
assert mode in ("performance", "chaos")
asyncio.run(main(mode=mode))
asyncio.run(main(mode=mode, tasks=tasks))
9 changes: 8 additions & 1 deletion chaos/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import logging
import random
import sys
import time

Expand Down Expand Up @@ -29,7 +31,12 @@ async def hello(


async def toxic():
sys.exit(42)
if random.random() < 0.25:
sys.exit(42)
elif random.random() < 0.5:
raise Exception("Boom")
else:
await asyncio.sleep(random.uniform(0.01, 0.05))


chaos_tasks = [hello, toxic]