You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Migrates Countly drill_events* collections from MongoDB into a single ClickHouse table. Supports multi-pod horizontal scaling, range-parallel processing, async writes, pause/resume, crash recovery, backpressure monitoring, and a real-time dashboard.
Quick Start
cp .env.example .env # edit with your connection details
docker compose up --build
curl http://localhost:8080/healthz
Copy .env.example and adjust. All values below show defaults where applicable.
Service
Variable
Default
Description
SERVICE_NAME
(required)
Service identifier
SERVICE_PORT
8080
HTTP server port
SERVICE_HOST
0.0.0.0
HTTP server bind address
GRACEFUL_SHUTDOWN_TIMEOUT_MS
60000
Max wait for graceful shutdown (ms)
RERUN_MODE
resume
resume, new-run, or clone-run
LOG_LEVEL
info
fatal, error, warn, info, debug, trace
MongoDB Source
Variable
Default
Description
MONGO_URI
(required)
MongoDB connection string
MONGO_DB
countly_drill
Source database
MONGO_COUNTLY_DB
countly
Countly database (for hash resolution)
MONGO_COLLECTION_PREFIX
drill_events
Prefix to discover collections
MONGO_READ_PREFERENCE
primary
Read preference
MONGO_READ_CONCERN
majority
Read concern level
MONGO_RETRY_READS
true
Enable retry reads
MONGO_APP_NAME
(optional)
Connection app name
MONGO_BATCH_ROWS_TARGET
10000
Target docs per ClickHouse write batch
MONGO_PAGE_SIZE
10000
MongoDB page size per cursor read
MONGO_CURSOR_BATCH_SIZE
10000
MongoDB cursor batch size
MONGO_MAX_TIME_MS
600000
Cursor timeout (ms)
Range-Parallel Processing
Variable
Default
Description
RANGE_PARALLEL_THRESHOLD
500000
Estimated doc count to trigger range splitting
RANGE_COUNT
100
Number of time-ranges to split a collection into
RANGE_LEASE_TTL_SEC
300
Range lease TTL for dead-pod reclaim (s)
Transform
Variable
Default
Description
TRANSFORM_VERSION
v1
Data transform version tag
ClickHouse Target
Variable
Default
Description
CLICKHOUSE_URL
(required)
ClickHouse HTTP endpoint
CLICKHOUSE_DB
countly_drill
Target database
CLICKHOUSE_TABLE
drill_events
Target table
CLICKHOUSE_USERNAME
default
Username
CLICKHOUSE_PASSWORD
(empty)
Password
CLICKHOUSE_QUERY_TIMEOUT_MS
120000
Query timeout (ms)
CLICKHOUSE_MAX_RETRIES
8
Max insert retry attempts
CLICKHOUSE_RETRY_BASE_DELAY_MS
1000
Backoff base delay (ms)
CLICKHOUSE_RETRY_MAX_DELAY_MS
30000
Backoff max delay (ms)
CLICKHOUSE_USE_DEDUP_TOKEN
true
Enable insert dedup tokens
Backpressure
Variable
Default
Description
BACKPRESSURE_ENABLED
true
Enable ClickHouse parts monitoring
BACKPRESSURE_PARTS_TO_THROW_INSERT
300
Parts threshold to pause inserts
BACKPRESSURE_MAX_PARTS_IN_TOTAL
500
Max total parts allowed
BACKPRESSURE_PARTITION_PCT_HIGH
0.70
Partition high watermark
BACKPRESSURE_PARTITION_PCT_LOW
0.55
Partition low watermark
BACKPRESSURE_TOTAL_PCT_HIGH
0.70
Total high watermark
BACKPRESSURE_TOTAL_PCT_LOW
0.55
Total low watermark
BACKPRESSURE_POLL_INTERVAL_MS
5000
Pressure polling interval (ms)
BACKPRESSURE_MAX_PAUSE_EPISODE_MS
180000
Max pause before force resume (ms)
State
Variable
Default
Description
MANIFEST_DB
countly_drill
MongoDB database for run manifests
REDIS_URL
(required)
Redis connection URL
REDIS_KEY_PREFIX
mig
Redis key namespace
TIMELINE_SNAPSHOT_INTERVAL
10
Timeline snapshot every N batches
Memory / GC
Variable
Default
Description
GC_ENABLED
true
Enable manual GC
GC_RSS_SOFT_LIMIT_MB
3072
RSS soft limit to trigger GC
GC_RSS_HARD_LIMIT_MB
6144
RSS hard limit warning
GC_HEAP_USED_RATIO
0.70
Heap usage ratio trigger
GC_EVERY_N_BATCHES
50
Run GC every N batches
Worker / Multi-Pod
Variable
Default
Description
POD_ID
hostname()
Unique pod identifier
MULTI_POD_ENABLED
true
Enable distributed locking
LOCK_TTL_SECONDS
300
Collection lock TTL — crash safety net (s)
LOCK_RENEW_MS
60000
Lock renewal interval (ms)
PROGRESS_UPDATE_MS
5000
Progress reporting interval to Redis (ms)
POD_HEARTBEAT_MS
30000
Pod heartbeat interval (ms)
POD_DEAD_AFTER_SEC
180
Pod considered dead after this silence (s)
Async Write
Variable
Default
Description
ASYNC_WRITE_FLUSH_INTERVAL_MS
5000
Flush queued batch records to MongoDB every N ms
ASYNC_WRITE_FLUSH_BATCH_SIZE
10
Flush after N batch records queued
Multi-Collection Migration
The service automatically discovers all MongoDB collections matching MONGO_COLLECTION_PREFIX* (e.g. drill_events, drill_events5a2b3c4d...).
Collections are processed in priority order (base collection first, then alphabetical)
Each collection gets its own run ID and isolated Redis key prefix
Already-completed collections are skipped on restart
Missing {cd: 1, _id: 1} compound index is created automatically in background
If a collection fails, the service logs the error and continues to the next
Multi-Pod Mode
Multiple pods can process collections in parallel using Redis-based distributed locking. Each pod autonomously discovers collections, acquires locks, and processes work.
How It Works
Pod heartbeat: Each pod writes a TTL-based heartbeat key (mig:pod:{podId}) to Redis every 30s
Collection locking: Before processing a collection, a pod acquires a lock via atomic Lua script. If the lock is held by another live pod, it moves to the next collection
Dead pod detection: If a pod's heartbeat expires (180s default), other pods can steal its locks
Global progress: Each pod reports per-collection progress to Redis. The /stats endpoint and dashboard merge data from all pods
Range-Parallel Processing
Collections exceeding RANGE_PARALLEL_THRESHOLD (default 500K docs) are split into RANGE_COUNT equal time-ranges based on [min(cd), max(cd)]. Multiple pods can process different ranges of the same collection concurrently.
Range initialization: First pod to reach a large collection divides the time span into N ranges via Redis SETNX
Atomic claiming: Pods claim ranges via a Redis Lua script that atomically marks a pending range as processing
Exclusive boundaries: Ranges use [start, end) (exclusive upper bound) to prevent duplicate processing. The final range uses [start, max]
Stale reclaim: If a pod dies mid-range, other pods reclaim its ranges after the lease TTL expires
Batch sequence isolation: Each range gets 10,000 batch sequence slots to prevent collisions
Full dashboard JSON: progress, throughput, integrity, cluster, live batches
GET
/viz
Real-time HTML dashboard
Control (Single Pod)
Method
Path
Description
POST
/control/pause
Pause after current batch
POST
/control/resume
Resume from pause
POST
/control/stop-after-batch
Stop cleanly after current batch
POST
/control/gc
Trigger manual GC (body: {"mode":"now"})
POST
/control/reindex/:collection
Trigger index creation for a collection
POST
/control/retry-collection/:collection
Re-queue a failed/skipped collection
POST
/control/retry-batch/:runId/:batchSeq
Retry a specific failed/skipped batch
POST
/control/retry-skipped-batches/:runId
Retry all skipped_empty batches in a run
Control (Multi-Pod)
Method
Path
Description
POST
/control/global/pause
Pause all pods
POST
/control/global/resume
Resume all pods
POST
/control/global/stop
Stop all pods after current batch
GET
/control/locks
List all active collection locks
POST
/control/locks/release/:collection
Force-release a lock (admin)
GET
/control/pods
List all pods with status and locks held
POST
/control/pods/remove/:podId
Remove dead pod and release its locks
POST
/control/drain
Graceful drain for K8s scale-down
Runs
Method
Path
Description
GET
/runs
List all runs (?status=, ?limit=, ?offset=)
GET
/runs/current
Current active run with coverage
GET
/runs/current/batches
Batches for current run (?status=, ?limit=)
GET
/runs/:id
Run details by ID
GET
/runs/:id/batches
Batch list (?status=, ?limit=)
GET
/runs/:id/failures
Failed batches, retry errors, digest mismatches
GET
/runs/:id/timeline
Performance snapshots over time
GET
/runs/:id/coverage
Document range coverage and completion %
DELETE
/runs/:id/cache
Clean up Redis cache for a run
Operations
Pause and Resume
# Single pod
curl -X POST http://localhost:8080/control/pause
curl -X POST http://localhost:8080/control/resume
# All pods (multi-pod mode)
curl -X POST http://localhost:8080/control/global/pause
curl -X POST http://localhost:8080/control/global/resume
Graceful Stop
curl -X POST http://localhost:8080/control/stop-after-batch
Or send SIGTERM / SIGINT. The service will:
Finish the current batch
Flush async write queue to MongoDB
Release all collection locks
Close all connections
Exit cleanly
Lock Management
# List all active locks
curl http://localhost:8080/control/locks
# Force-release a stuck lock
curl -X POST http://localhost:8080/control/locks/release/drill_events5a2b...
# List pods and their status
curl http://localhost:8080/control/pods
# Remove a dead pod and release its locks
curl -X POST http://localhost:8080/control/pods/remove/pod-3
Retry Failed Work
# Re-queue a failed collection
curl -X POST http://localhost:8080/control/retry-collection/drill_events5a2b...
# Retry a specific failed batch
curl -X POST http://localhost:8080/control/retry-batch/{runId}/{batchSeq}
# Retry all skipped batches in a run
curl -X POST http://localhost:8080/control/retry-skipped-batches/{runId}
Crash Recovery
On restart after a crash, the service automatically:
Checks Redis for the last committed cursor (hot-path authority)
Falls back to MongoDB manifest if Redis is empty
Replays any inflight batches with digest verification
# JSON stats
curl -s http://localhost:8080/stats | jq '.summary'# Cluster overview
curl -s http://localhost:8080/stats | jq '.cluster.pods'# Or use the dashboard
open http://localhost:8080/viz
Architecture
MongoDB (drill_events*)
|
| cursor pagination on (cd, _id) compound index
v
+------------------------------------------------------+
| CollectionOrchestrator |
| |-- discovers collections, index-aware scheduling |
| |-- multi-pod: Redis lock per collection |
| | |
| +-- Small collections (< 500K docs): |
| | BatchRunner --> ClickHouse |
| | |
| +-- Large collections (>= 500K docs): |
| RangeCoordinator |
| |-- splits [min(cd), max(cd)] into N ranges |
| |-- atomic range claiming via Redis Lua script |
| +-- per-range BatchRunner --> ClickHouse |
+------------------------------------------------------+
| | |
| batch inserts | hot-path commit | async flush
| (dedup tokens) | |
v v v
ClickHouse Redis MongoDB manifest
(drill_events) (cursor, bitmap, (batch records,
live phases, run metadata,
range stats, audit trail)
pod heartbeats,
collection locks)
Redis (hot-path authority): Committed cursor, batch completion bitmap, live batch phases, range stats, pod heartbeats, collection locks, global commands
MongoDB manifest (async, authoritative for audit): Batch records flushed asynchronously every 5s or 10 batches. Run metadata, skip samples, error history
Async batch writer: Redis is the commit point for each batch. MongoDB writes are queued and bulk-flushed in the background. On graceful shutdown, the queue is drained
Backpressure: Monitors ClickHouse active parts count; pauses inserts when thresholds exceeded (hysteresis with high/low watermarks)
Dedup tokens: Each batch carries mig:{runId}:{batchSeq} for idempotent retries