Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ scaler_worker_adapter_ecs = "scaler.entry_points.worker_adapter_ecs:main"
[tool.scikit-build]
cmake.source-dir = "."
wheel.packages = ["src/scaler"]
build.targets = ["py_ymq", "py_object_storage_server", "py_one_to_many_dict"]
build.targets = ["py_ymq", "py_object_storage_server", "py_one_to_many_dict", "py_stable_priority_queue"]

[tool.scikit-build.cmake.define]
CMAKE_C_COMPILER = { env = "CMAKE_C_COMPILER", default = "gcc" }
Expand Down
37 changes: 37 additions & 0 deletions src/cpp/scaler/utility/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ target_link_libraries(py_one_to_many_dict PRIVATE
Python3::Module
)

add_library(py_stable_priority_queue SHARED
pymod/stable_priority_queue.cpp
)


target_include_directories(py_stable_priority_queue PRIVATE
${PROJECT_SOURCE_DIR}/src/cpp
)

target_link_libraries(py_stable_priority_queue PRIVATE
Python3::Module
)

set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/src/scaler/utility)

if(LINUX OR APPLE)
Expand All @@ -20,6 +33,12 @@ if(LINUX OR APPLE)
OUTPUT_NAME "one_to_many_dict"
LINKER_LANGUAGE CXX
)

set_target_properties(py_stable_priority_queue PROPERTIES
PREFIX ""
OUTPUT_NAME "stable_priority_queue"
LINKER_LANGUAGE CXX
)
elseif(WIN32)
set_target_properties(py_one_to_many_dict PROPERTIES
PREFIX ""
Expand All @@ -33,6 +52,17 @@ elseif(WIN32)
RUNTIME_OUTPUT_DIRECTORY_MINSIZEREL ${CMAKE_BINARY_DIR}/src/scaler/utility
)

set_target_properties(py_stable_priority_queue PROPERTIES
PREFIX ""
OUTPUT_NAME "stable_priority_queue"
SUFFIX ".pyd"
LINKER_LANGUAGE CXX
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/src/scaler/utility/queues
RUNTIME_OUTPUT_DIRECTORY_RELEASE ${CMAKE_BINARY_DIR}/src/scaler/utility/queues
RUNTIME_OUTPUT_DIRECTORY_DEBUG ${CMAKE_BINARY_DIR}/src/scaler/utility/queues
RUNTIME_OUTPUT_DIRECTORY_RELWITHDEBINFO ${CMAKE_BINARY_DIR}/src/scaler/utility/queues
RUNTIME_OUTPUT_DIRECTORY_MINSIZEREL ${CMAKE_BINARY_DIR}/src/scaler/utility/queues
)
endif()

install(
Expand All @@ -41,3 +71,10 @@ install(
LIBRARY DESTINATION scaler/utility
ARCHIVE DESTINATION scaler/utility
)

install(
TARGETS py_stable_priority_queue
RUNTIME DESTINATION scaler/utility/queues
LIBRARY DESTINATION scaler/utility/queues
ARCHIVE DESTINATION scaler/utility/queues
)
190 changes: 190 additions & 0 deletions src/cpp/scaler/utility/pymod/stable_priority_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#include "scaler/utility/stable_priority_queue.h"

#include "scaler/utility/pymod/compatibility.h"

namespace scaler {
namespace utility {
namespace stable_priority_queue {
namespace pymod {

using scaler::utility::pymod::OwnedPyObject;

extern "C" {
struct PyStablePriorityQueue {
PyObject_HEAD;
scaler::utility::StablePriorityQueue<OwnedPyObject<PyObject>> queue;
};

static PyObject* PyStablePriorityQueueNew(PyTypeObject* type, PyObject* args, PyObject* kwds)
{
PyStablePriorityQueue* self {};
self = (PyStablePriorityQueue*)type->tp_alloc(type, 0);
return (PyObject*)self;
}

static int PyStablePriorityQueueInit(PyStablePriorityQueue* self, PyObject* args, PyObject* kwds)
{
new (&((PyStablePriorityQueue*)self)->queue) scaler::utility::StablePriorityQueue<OwnedPyObject<PyObject>>();
return 0;
}

static void PyStablePriorityQueueDealloc(PyObject* self)
{
((PyStablePriorityQueue*)self)->queue.~StablePriorityQueue<OwnedPyObject<PyObject>>();
Py_TYPE(self)->tp_free((PyObject*)self);
}

static PyObject* PyStablePriorityQueuePut(PyStablePriorityQueue* self, PyObject* args)
{
int64_t priority {};
PyObject* data {};

if (!PyArg_ParseTuple(args, "LO", &priority, &data)) {
return nullptr;
}

self->queue.put({priority, OwnedPyObject<>::fromBorrowed(data)});
Py_RETURN_NONE;
}

static PyObject* PyStablePriorityQueueGet(PyStablePriorityQueue* self, PyObject* args)
{
(void)args;

auto [priorityAndData, exists] = self->queue.get();
if (!exists) {
PyErr_SetString(PyExc_ValueError, "cannot get from an empty queue");
return nullptr;
}

auto [priority, data] = std::move(priorityAndData);
OwnedPyObject<> res = PyTuple_Pack(2, PyLong_FromLongLong(priority), data.take());
if (!res) {
return nullptr;
}

return res.take();
}

static PyObject* PyStablePriorityQueueRemove(PyStablePriorityQueue* self, PyObject* args)
{
PyObject* data {};
if (!PyArg_ParseTuple(args, "O", &data)) {
return nullptr;
}

self->queue.remove(OwnedPyObject<>::fromBorrowed(data));
Py_RETURN_NONE;
}

static PyObject* PyStablePriorityQueueDecreasePriority(PyStablePriorityQueue* self, PyObject* args)
{
PyObject* data {};
if (!PyArg_ParseTuple(args, "O", &data)) {
return nullptr;
}

self->queue.decreasePriority(OwnedPyObject<>::fromBorrowed(data));

Py_RETURN_NONE;
}

static PyObject* PyStablePriorityQueueMaxPriorityItem(PyStablePriorityQueue* self, PyObject* args)
{
(void)args;

auto [priorityAndData, exists] = self->queue.maxPriorityItem();
if (!exists) {
PyErr_SetString(PyExc_ValueError, "cannot return max priority item from empty queue");
return nullptr;
}

auto [priority, data] = std::move(priorityAndData);
OwnedPyObject<> res = PyTuple_Pack(2, PyLong_FromLongLong(priority), data.take());
if (!res) {
return nullptr;
}

return res.take();
}

// Define the methods for the StablePriorityQueue Python class
static PyMethodDef PyStablePriorityQueueMethods[] = {
{"put", (PyCFunction)PyStablePriorityQueuePut, METH_VARARGS, "Put a priority-item list to the queue"},
{"get",
(PyCFunction)PyStablePriorityQueueGet,
METH_VARARGS,
"Pop and Return priority-item list with the highest priority in the queue"},
{"remove", (PyCFunction)PyStablePriorityQueueRemove, METH_VARARGS, "Remove an item from the queue"},
{"decrease_priority",
(PyCFunction)PyStablePriorityQueueDecreasePriority,
METH_VARARGS,
"Decrease priority of an item"},
{"max_priority_item",
(PyCFunction)PyStablePriorityQueueMaxPriorityItem,
METH_VARARGS,
"Return priority-item list with the highest priority in the queue"},
{nullptr},
};

static Py_ssize_t PyStablePriorityQueueSize(PyObject* self)
{
return ((PyStablePriorityQueue*)self)->queue.size();
}

static PyType_Slot PyStablePriorityQueueSlots[] = {
{Py_tp_new, (void*)PyStablePriorityQueueNew},
{Py_tp_init, (void*)PyStablePriorityQueueInit},
{Py_tp_dealloc, (void*)PyStablePriorityQueueDealloc},
{Py_tp_methods, (void*)PyStablePriorityQueueMethods},
{Py_sq_length, (void*)PyStablePriorityQueueSize},
{Py_tp_doc, (void*)"StablePriorityQueue"},
{0, nullptr},
};

static PyType_Spec PyStablePriorityQueueSpec = {
.name = "stable_priority_queue.StablePriorityQueue",
.basicsize = sizeof(PyStablePriorityQueue),
.itemsize = 0,
.flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
.slots = PyStablePriorityQueueSlots,
};

static PyModuleDef stable_priority_queue_module = {
.m_base = PyModuleDef_HEAD_INIT,
.m_name = "stable_priority_queue",
.m_doc = PyDoc_STR("A module that wraps a C++ StablePriorityQueue class"),
.m_size = 0,
.m_slots = nullptr,
.m_free = nullptr,
};
}
} // namespace pymod
} // namespace stable_priority_queue
} // namespace utility
} // namespace scaler

PyMODINIT_FUNC PyInit_stable_priority_queue(void)
{
using scaler::utility::stable_priority_queue::pymod::PyStablePriorityQueueSpec;
using scaler::utility::stable_priority_queue::pymod::stable_priority_queue_module;

PyObject* m = PyModule_Create(&stable_priority_queue_module);
if (!m) {
return nullptr;
}

PyObject* type = PyType_FromSpec(&PyStablePriorityQueueSpec);
if (!type) {
Py_DECREF(m);
return nullptr;
}

if (PyModule_AddObject(m, "StablePriorityQueue", type) < 0) {
Py_DECREF(type);
Py_DECREF(m);
return nullptr;
}

return m;
}
94 changes: 94 additions & 0 deletions src/cpp/scaler/utility/stable_priority_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@

#pragma once

#include <cstdint>
#include <map>
#include <unordered_map>

namespace scaler {
namespace utility {

template <typename T>
struct StablePriorityQueue {
using PriorityType = int64_t;
using CounterType = uint64_t;
using MapKeyType = std::pair<PriorityType, CounterType>;
using ItemType = std::pair<PriorityType, T>;

std::unordered_map<T, MapKeyType> _locator;
std::map<MapKeyType, T> _queue;
CounterType _itemCounter;

StablePriorityQueue(): _itemCounter {} {}

constexpr uint64_t size() const { return _queue.size(); }

void put(ItemType item)
{
const auto& [priority, data] = item;
MapKeyType mapKey = {priority, _itemCounter};
_locator[data] = mapKey;
_queue[mapKey] = std::move(data);
++_itemCounter;
}

std::pair<ItemType, bool> get()
{
ItemType res {};
if (_queue.empty()) {
return {res, false};
}

auto it = _queue.begin();
MapKeyType key = std::move(it->first);
T data = std::move(it->second);

_queue.erase(it);
_locator.erase(data);

res.first = key.first;
res.second = std::move(data);
return {res, true};
}

void remove(const T& data)
{
const auto it = _locator.find(data);
if (it == _locator.end()) {
return;
}

_queue.erase(it->second);
_locator.erase(data);
}

void decreasePriority(const T& data)
{
auto it = _locator.find(data);
if (it == _locator.end()) {
return;
}

auto& key = it->second;
auto oldData = std::move(_queue[key]);
_queue.erase(key);

--key.first;
key.second = _itemCounter;
_queue[key] = std::move(oldData);

++_itemCounter;
}

std::pair<ItemType, bool> maxPriorityItem() const
{
if (_queue.empty()) {
return {{}, false};
}
auto kvit = _queue.cbegin();
return {ItemType {kvit->first.first, kvit->second}, true};
}
};

} // namespace utility
} // namespace scaler
Loading
Loading