This repository was archived by the owner on Feb 13, 2020. It is now read-only.
This repository was archived by the owner on Feb 13, 2020. It is now read-only.
SQL-based Work Queue enhancements #393
Open
Description
@cyrusdaboo originally submitted this as ticket:839
txext.enterprise.queue implements a database-based distributed work queue. The basic behavior is this:
- Each host registers itself in an SQL NODE_INFO table, and listens on a port for AMP connections.
- An item of work is defined by a row in an SQL table, with multiple tables used for different work types.
- An item of work is enqueued using txn.enqueue() which includes an argument for a notBefore time. A work item will not execute until notBefore.
- The process that enqueues the work item sets up a reactor.callLater() to trigger at the notBefore time, with the callback executing the work item in that process. If the work fails, it is “orphaned” (see next item).
- Each host has a master process that regularly polls (default 1 minute interval) all work queue tables to see if there are any “orphaned” work items (ones which are past their notBefore time by some amount - current 10 minutes). Any items found in this sweep are dispatched to a local process based on the current work load of each process. If there is too much local load, it will poll the other hosts (using the AMP protocol) to find one willing to accept the work and hand it off to that one.
- Work items have a “group” class property that is used to create a lock when the work item is dequeued, preventing concurrent execution of work with the same id.
- Work items can coalesce by scanning their work table for other matching items and delete those during execution of the selected work item.
There are several issues with the current implementation that we need to address:
- Dequeue locking: when an item of work is dequeued (see txext.enterprise.queue.ultimatelyPerform) first its group id is locked using a NamedLock, then its record (row) is deleted from the work table. Both of those actions will block if some other process is already processing a work item in the same group, or if there is no group, if the work item itself is being processed. The problem with that is that it will block the “orphaned” work item polling loop - and it is possible for all hosts to end up blocked beyond one long-lived item at the “top” of a work table.
- There needs to be a way to enqueue a work item without scheduling it via callLater in the current process. For work items known to require significant work, it would be better to have them scheduled on a node/process with low load - there is no guarantee the process creating the work item will have low load at the time the callLater triggers.
- Item (2) also suggests we need some way to indicate priority and estimated load for different types of work item. e.g. we know push notification work is a light load and needs to occur close to real-time (within 3 secs of being scheduled). Whereas other work, scheduling, group cacher etc, involve heavier loads and typically execute a minute or hours after being enqueued.
- The default “orphan” notBefore limit seems high - waiting for 10 minutes for a high priority item is not good, particularly if we fix item (2) where we would be deliberately “orphaning” work.
- Scaleability - some types of work may involve process of very large data sets - e.g. scans over calendar homes, all attachments etc. Dumping 100K work items into the queue system is not ideal - particularly in the absence of any kind of prioritization mechanism. A better approach would be to create a limited set (pool) of work items that run in parallel and process the overall (large) work set. However, that needs to be done in a way to minimize lock contention (i.e., address item (1) in some fashion).