Conversation
|
Also can you improve the current workers queue too long error on chronos to include qlength |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
| # Worker tuning: fetch one task at a time for fair scheduling | ||
| worker_prefetch_multiplier=1, | ||
| # Task execution limits | ||
| task_soft_time_limit=300, |
There was a problem hiding this comment.
We weren't setting task_soft_time_limit and task_time_limit earlier so by default they were None - they concern the normal celery workers and the dispatch worker itself as we run it with these flags --soft-time-limit=0 --time-limit=0
HenryTraill
left a comment
There was a problem hiding this comment.
Lets breakdown the events array and add each event to the new queues to make the round robin equal
Codex - xHigh reasoning review on backwards compatibility:Backward Compatibility Sign-off (PR Summary)Approved for rollout with backward compatibility, with one operational requirement: keep dispatcher running while round-robin queues are draining. Why this is backward compatible
Mixed-version safety
Operational guardrails
Bottom lineNo payload-format backward-compatibility break was identified for in-flight webhook tasks. Primary risk is operational sequencing (flag/dispatcher/rollback order), not task schema incompatibility. |
|
@HenryTraill We need to make sure on Heroku that the dispatcher process never auto scales past one single instance else if we have multiple dispatchers there is risk of duplicate webhooks. |



Close: #119
Why we are doing this
Prevent noisy-neighbor delays by queueing work per branch and dispatching fairly across branches.
Design
dispatch_branch_task()receives webhook work and enqueues it into a Redis LIST for that branch.jobs:branch:{branch_id}(per-branch FIFO queue)jobs:branches:active(set of branches with pending jobs)jobs:dispatcher:cursor(last branch processed)task_send_webhooks, which performs the outbound HTTP calls and writes webhook logs.Mermaid Diagram
flowchart TD A["TC2 webhook payload"] --> B["dispatch_branch_task"] B --> C["Redis list: jobs:branch:BRANCH_ID"] B --> D["Redis set: jobs:branches:active"] E["Dispatcher worker loop (job_dispatcher_task)"] --> D E --> F["Read cursor: jobs:dispatcher:cursor"] E --> G{"Broker queue below threshold?"} G -- "No" --> H["Sleep cycle_delay"] H --> E G -- "Yes" --> I["dispatch_cycle (round robin)"] I --> C I --> J["task.apply_async"] J --> K["Celery broker queue"] K --> L["task_send_webhooks"] L --> M["Client webhook endpoints"] L --> N["WebhookLog rows"] I --> O["Ack queue head"] O --> P["Update cursor"] P --> EExpected Outcome