diff --git a/pyproject.toml b/pyproject.toml index 0b0b21f32..896a7917e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ scaler_worker_adapter_ecs = "scaler.entry_points.worker_adapter_ecs:main" [tool.scikit-build] cmake.source-dir = "." wheel.packages = ["src/scaler"] -build.targets = ["py_ymq", "py_object_storage_server", "py_one_to_many_dict"] +build.targets = ["py_ymq", "py_object_storage_server", "py_one_to_many_dict", "py_stable_priority_queue"] [tool.scikit-build.cmake.define] CMAKE_C_COMPILER = { env = "CMAKE_C_COMPILER", default = "gcc" } diff --git a/src/cpp/scaler/utility/CMakeLists.txt b/src/cpp/scaler/utility/CMakeLists.txt index 81c46d4e2..5e7f22061 100644 --- a/src/cpp/scaler/utility/CMakeLists.txt +++ b/src/cpp/scaler/utility/CMakeLists.txt @@ -12,6 +12,19 @@ target_link_libraries(py_one_to_many_dict PRIVATE Python3::Module ) +add_library(py_stable_priority_queue SHARED + pymod/stable_priority_queue.cpp +) + + +target_include_directories(py_stable_priority_queue PRIVATE + ${PROJECT_SOURCE_DIR}/src/cpp +) + +target_link_libraries(py_stable_priority_queue PRIVATE + Python3::Module +) + set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/src/scaler/utility) if(LINUX OR APPLE) @@ -20,6 +33,12 @@ if(LINUX OR APPLE) OUTPUT_NAME "one_to_many_dict" LINKER_LANGUAGE CXX ) + + set_target_properties(py_stable_priority_queue PROPERTIES + PREFIX "" + OUTPUT_NAME "stable_priority_queue" + LINKER_LANGUAGE CXX + ) elseif(WIN32) set_target_properties(py_one_to_many_dict PROPERTIES PREFIX "" @@ -33,6 +52,17 @@ elseif(WIN32) RUNTIME_OUTPUT_DIRECTORY_MINSIZEREL ${CMAKE_BINARY_DIR}/src/scaler/utility ) + set_target_properties(py_stable_priority_queue PROPERTIES + PREFIX "" + OUTPUT_NAME "stable_priority_queue" + SUFFIX ".pyd" + LINKER_LANGUAGE CXX + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/src/scaler/utility/queues + RUNTIME_OUTPUT_DIRECTORY_RELEASE ${CMAKE_BINARY_DIR}/src/scaler/utility/queues + RUNTIME_OUTPUT_DIRECTORY_DEBUG ${CMAKE_BINARY_DIR}/src/scaler/utility/queues + RUNTIME_OUTPUT_DIRECTORY_RELWITHDEBINFO ${CMAKE_BINARY_DIR}/src/scaler/utility/queues + RUNTIME_OUTPUT_DIRECTORY_MINSIZEREL ${CMAKE_BINARY_DIR}/src/scaler/utility/queues + ) endif() install( @@ -41,3 +71,10 @@ install( LIBRARY DESTINATION scaler/utility ARCHIVE DESTINATION scaler/utility ) + +install( + TARGETS py_stable_priority_queue + RUNTIME DESTINATION scaler/utility/queues + LIBRARY DESTINATION scaler/utility/queues + ARCHIVE DESTINATION scaler/utility/queues +) diff --git a/src/cpp/scaler/utility/pymod/stable_priority_queue.cpp b/src/cpp/scaler/utility/pymod/stable_priority_queue.cpp new file mode 100644 index 000000000..4e640c091 --- /dev/null +++ b/src/cpp/scaler/utility/pymod/stable_priority_queue.cpp @@ -0,0 +1,190 @@ +#include "scaler/utility/stable_priority_queue.h" + +#include "scaler/utility/pymod/compatibility.h" + +namespace scaler { +namespace utility { +namespace stable_priority_queue { +namespace pymod { + +using scaler::utility::pymod::OwnedPyObject; + +extern "C" { +struct PyStablePriorityQueue { + PyObject_HEAD; + scaler::utility::StablePriorityQueue> queue; +}; + +static PyObject* PyStablePriorityQueueNew(PyTypeObject* type, PyObject* args, PyObject* kwds) +{ + PyStablePriorityQueue* self {}; + self = (PyStablePriorityQueue*)type->tp_alloc(type, 0); + return (PyObject*)self; +} + +static int PyStablePriorityQueueInit(PyStablePriorityQueue* self, PyObject* args, PyObject* kwds) +{ + new (&((PyStablePriorityQueue*)self)->queue) scaler::utility::StablePriorityQueue>(); + return 0; +} + +static void PyStablePriorityQueueDealloc(PyObject* self) +{ + ((PyStablePriorityQueue*)self)->queue.~StablePriorityQueue>(); + Py_TYPE(self)->tp_free((PyObject*)self); +} + +static PyObject* PyStablePriorityQueuePut(PyStablePriorityQueue* self, PyObject* args) +{ + int64_t priority {}; + PyObject* data {}; + + if (!PyArg_ParseTuple(args, "LO", &priority, &data)) { + return nullptr; + } + + self->queue.put({priority, OwnedPyObject<>::fromBorrowed(data)}); + Py_RETURN_NONE; +} + +static PyObject* PyStablePriorityQueueGet(PyStablePriorityQueue* self, PyObject* args) +{ + (void)args; + + auto [priorityAndData, exists] = self->queue.get(); + if (!exists) { + PyErr_SetString(PyExc_ValueError, "cannot get from an empty queue"); + return nullptr; + } + + auto [priority, data] = std::move(priorityAndData); + OwnedPyObject<> res = PyTuple_Pack(2, PyLong_FromLongLong(priority), data.take()); + if (!res) { + return nullptr; + } + + return res.take(); +} + +static PyObject* PyStablePriorityQueueRemove(PyStablePriorityQueue* self, PyObject* args) +{ + PyObject* data {}; + if (!PyArg_ParseTuple(args, "O", &data)) { + return nullptr; + } + + self->queue.remove(OwnedPyObject<>::fromBorrowed(data)); + Py_RETURN_NONE; +} + +static PyObject* PyStablePriorityQueueDecreasePriority(PyStablePriorityQueue* self, PyObject* args) +{ + PyObject* data {}; + if (!PyArg_ParseTuple(args, "O", &data)) { + return nullptr; + } + + self->queue.decreasePriority(OwnedPyObject<>::fromBorrowed(data)); + + Py_RETURN_NONE; +} + +static PyObject* PyStablePriorityQueueMaxPriorityItem(PyStablePriorityQueue* self, PyObject* args) +{ + (void)args; + + auto [priorityAndData, exists] = self->queue.maxPriorityItem(); + if (!exists) { + PyErr_SetString(PyExc_ValueError, "cannot return max priority item from empty queue"); + return nullptr; + } + + auto [priority, data] = std::move(priorityAndData); + OwnedPyObject<> res = PyTuple_Pack(2, PyLong_FromLongLong(priority), data.take()); + if (!res) { + return nullptr; + } + + return res.take(); +} + +// Define the methods for the StablePriorityQueue Python class +static PyMethodDef PyStablePriorityQueueMethods[] = { + {"put", (PyCFunction)PyStablePriorityQueuePut, METH_VARARGS, "Put a priority-item list to the queue"}, + {"get", + (PyCFunction)PyStablePriorityQueueGet, + METH_VARARGS, + "Pop and Return priority-item list with the highest priority in the queue"}, + {"remove", (PyCFunction)PyStablePriorityQueueRemove, METH_VARARGS, "Remove an item from the queue"}, + {"decrease_priority", + (PyCFunction)PyStablePriorityQueueDecreasePriority, + METH_VARARGS, + "Decrease priority of an item"}, + {"max_priority_item", + (PyCFunction)PyStablePriorityQueueMaxPriorityItem, + METH_VARARGS, + "Return priority-item list with the highest priority in the queue"}, + {nullptr}, +}; + +static Py_ssize_t PyStablePriorityQueueSize(PyObject* self) +{ + return ((PyStablePriorityQueue*)self)->queue.size(); +} + +static PyType_Slot PyStablePriorityQueueSlots[] = { + {Py_tp_new, (void*)PyStablePriorityQueueNew}, + {Py_tp_init, (void*)PyStablePriorityQueueInit}, + {Py_tp_dealloc, (void*)PyStablePriorityQueueDealloc}, + {Py_tp_methods, (void*)PyStablePriorityQueueMethods}, + {Py_sq_length, (void*)PyStablePriorityQueueSize}, + {Py_tp_doc, (void*)"StablePriorityQueue"}, + {0, nullptr}, +}; + +static PyType_Spec PyStablePriorityQueueSpec = { + .name = "stable_priority_queue.StablePriorityQueue", + .basicsize = sizeof(PyStablePriorityQueue), + .itemsize = 0, + .flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, + .slots = PyStablePriorityQueueSlots, +}; + +static PyModuleDef stable_priority_queue_module = { + .m_base = PyModuleDef_HEAD_INIT, + .m_name = "stable_priority_queue", + .m_doc = PyDoc_STR("A module that wraps a C++ StablePriorityQueue class"), + .m_size = 0, + .m_slots = nullptr, + .m_free = nullptr, +}; +} +} // namespace pymod +} // namespace stable_priority_queue +} // namespace utility +} // namespace scaler + +PyMODINIT_FUNC PyInit_stable_priority_queue(void) +{ + using scaler::utility::stable_priority_queue::pymod::PyStablePriorityQueueSpec; + using scaler::utility::stable_priority_queue::pymod::stable_priority_queue_module; + + PyObject* m = PyModule_Create(&stable_priority_queue_module); + if (!m) { + return nullptr; + } + + PyObject* type = PyType_FromSpec(&PyStablePriorityQueueSpec); + if (!type) { + Py_DECREF(m); + return nullptr; + } + + if (PyModule_AddObject(m, "StablePriorityQueue", type) < 0) { + Py_DECREF(type); + Py_DECREF(m); + return nullptr; + } + + return m; +} diff --git a/src/cpp/scaler/utility/stable_priority_queue.h b/src/cpp/scaler/utility/stable_priority_queue.h new file mode 100644 index 000000000..1da0db6be --- /dev/null +++ b/src/cpp/scaler/utility/stable_priority_queue.h @@ -0,0 +1,94 @@ + +#pragma once + +#include +#include +#include + +namespace scaler { +namespace utility { + +template +struct StablePriorityQueue { + using PriorityType = int64_t; + using CounterType = uint64_t; + using MapKeyType = std::pair; + using ItemType = std::pair; + + std::unordered_map _locator; + std::map _queue; + CounterType _itemCounter; + + StablePriorityQueue(): _itemCounter {} {} + + constexpr uint64_t size() const { return _queue.size(); } + + void put(ItemType item) + { + const auto& [priority, data] = item; + MapKeyType mapKey = {priority, _itemCounter}; + _locator[data] = mapKey; + _queue[mapKey] = std::move(data); + ++_itemCounter; + } + + std::pair get() + { + ItemType res {}; + if (_queue.empty()) { + return {res, false}; + } + + auto it = _queue.begin(); + MapKeyType key = std::move(it->first); + T data = std::move(it->second); + + _queue.erase(it); + _locator.erase(data); + + res.first = key.first; + res.second = std::move(data); + return {res, true}; + } + + void remove(const T& data) + { + const auto it = _locator.find(data); + if (it == _locator.end()) { + return; + } + + _queue.erase(it->second); + _locator.erase(data); + } + + void decreasePriority(const T& data) + { + auto it = _locator.find(data); + if (it == _locator.end()) { + return; + } + + auto& key = it->second; + auto oldData = std::move(_queue[key]); + _queue.erase(key); + + --key.first; + key.second = _itemCounter; + _queue[key] = std::move(oldData); + + ++_itemCounter; + } + + std::pair maxPriorityItem() const + { + if (_queue.empty()) { + return {{}, false}; + } + auto kvit = _queue.cbegin(); + return {ItemType {kvit->first.first, kvit->second}, true}; + } +}; + +} // namespace utility +} // namespace scaler diff --git a/src/scaler/utility/queues/async_priority_queue.py b/src/scaler/utility/queues/async_priority_queue.py index f0d444989..ccaa33734 100644 --- a/src/scaler/utility/queues/async_priority_queue.py +++ b/src/scaler/utility/queues/async_priority_queue.py @@ -1,8 +1,7 @@ from asyncio import Queue -from dataclasses import dataclass -from typing import Any, Dict, Tuple, Union +from typing import Any, Tuple, Union -from sortedcontainers import SortedDict +from scaler.utility.queues.stable_priority_queue import StablePriorityQueue PriorityType = Union[int, Tuple["PriorityType", ...]] @@ -13,62 +12,31 @@ class AsyncPriorityQueue(Queue): Input entries are typically list of the form: [priority, data]. """ - @dataclass(frozen=True) - class MapKey: - priority: int - count: int - - def __lt__(self, other): - return (self.priority, self.count) < (other.priority, other.count) - - def __hash__(self): - return hash((self.priority, self.count)) - - @dataclass - class LocatorValue: - map_key: "AsyncPriorityQueue.MapKey" - data: bytes - def __len__(self): return len(self._queue) def _init(self, maxsize): - self._locator: Dict[bytes, AsyncPriorityQueue.LocatorValue] = {} - self._queue: Dict[AsyncPriorityQueue.MapKey, bytes] = SortedDict() - self._item_counter: int = 0 + self._queue = StablePriorityQueue() def _put(self, item): if not isinstance(item, list): item = list(item) priority, data = item - map_key = AsyncPriorityQueue.MapKey(priority=priority, count=self._item_counter) - self._locator[data] = AsyncPriorityQueue.LocatorValue(map_key=map_key, data=data) - self._queue[map_key] = data - self._item_counter += 1 + self._queue.put(priority, data) def _get(self): - map_key, data = self._queue.popitem(0) # type: ignore[call-arg] - self._locator.pop(data) - return map_key.priority, data + return self._queue.get() def remove(self, data): - loc_value = self._locator.pop(data) - self._queue.pop(loc_value.map_key) + self._queue.remove(data) def decrease_priority(self, data): # Decrease the priority *value* of an item in the queue, effectively move data closer to the front. # Notes: # - *priority* in the signature means the priority *value* of the item. # - Time complexity is O(log n) due to the underlying SortedDict structure. - - loc_value = self._locator[data] - map_key = AsyncPriorityQueue.MapKey(priority=loc_value.map_key.priority - 1, count=self._item_counter) - new_loc_value = AsyncPriorityQueue.LocatorValue(map_key=map_key, data=data) - self._locator[data] = new_loc_value - self._queue.pop(loc_value.map_key) - self._queue[map_key] = data - self._item_counter += 1 + self._queue.decrease_priority(data) def max_priority_item(self) -> Tuple[PriorityType, Any]: """Return the current item at the front of the queue without removing it from the queue. @@ -79,5 +47,4 @@ def max_priority_item(self) -> Tuple[PriorityType, Any]: - *priority* means the priority in the queue - Time complexity is O(1) as we are peeking in the head """ - loc_value = self._queue.peekitem(0) # type: ignore[attr-defined] - return (loc_value[0].priority, loc_value[1]) + return self._queue.max_priority_item() diff --git a/src/scaler/worker/agent/task_manager.py b/src/scaler/worker/agent/task_manager.py index a8f62890f..a4607bb5e 100644 --- a/src/scaler/worker/agent/task_manager.py +++ b/src/scaler/worker/agent/task_manager.py @@ -9,9 +9,6 @@ from scaler.utility.queues.async_priority_queue import AsyncPriorityQueue from scaler.worker.agent.mixins import ProcessorManager, TaskManager -_SUSPENDED_TASKS_PRIORITY = 1 -_QUEUED_TASKS_PRIORITY = 2 - class VanillaTaskManager(Looper, TaskManager): def __init__(self, task_timeout_seconds: int): @@ -132,10 +129,18 @@ def __enqueue_task(self, task: Task, is_suspended: bool): # Higher-priority tasks have a higher priority value. But as the queue is sorted by increasing order, we negate # the inserted value that it will be at the head of the queue. - if is_suspended: - queue_priority = (-task_priority, _SUSPENDED_TASKS_PRIORITY) - else: - queue_priority = (-task_priority, _QUEUED_TASKS_PRIORITY) + # The order of insertion is preserved and unique because f(x) = 10*x is a bijection. + # 1. Task(priority=0) [suspended] = -(0 * 10 + 1) = -1 + # 2. Task(priority=3) [suspended] = -(3 * 10 + 1) = -31 + # 3. Task(priority=3) = -(3 * 10 + 0) = -30 + # 4. Task(priority=0) = -(3 * 0 + 0) = -3 + # We want to execute the tasks in this order: 2-3-1-4. + # As you can see -31 < -30 < -3 < -1 and the task will be executed in order 2-3-1-4. + # `queue_priority` used to be a tuple of the form (-task_priority, !is_suspended). + # Because it is difficult to implement the semantics in C++ implementation, the two + # items are combined together, extending the last digit of task_priority to determine + # whether a task is suspended. + queue_priority = -(task_priority * 10 + int(is_suspended)) self._queued_task_ids.put_nowait((queue_priority, task.task_id)) self._queued_task_id_to_task[task.task_id] = task