Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
983783c
Object Storage Server should handle SIGTERM
gxuu Nov 15, 2025
65d0d70
YMQ should not crash when exceeds retry
gxuu Nov 15, 2025
e7ecded
Add YMQAsyncBinder and YMQAsyncConnector
gxuu Nov 15, 2025
fdb81b8
Scheduler use YMQ when backend is ymq
gxuu Nov 15, 2025
6a1f06e
Reformat
gxuu Nov 15, 2025
cb6ca8e
Fix halting bug
gxuu Nov 16, 2025
a2cb1ea
Apply suggestion from @rafa-be
sharpener6 Nov 17, 2025
6620480
Reformat
gxuu Nov 17, 2025
7d33c0b
Update src/scaler/io/utility.py
gxuu Nov 18, 2025
43a0d1d
Update src/scaler/io/utility.py 2
gxuu Nov 18, 2025
4a8a2b0
Resolve comment
gxuu Nov 19, 2025
90896f7
Resolve comment: don't expose _ymq
gxuu Nov 19, 2025
927ac5b
Revert "Resolve comment"
gxuu Nov 19, 2025
1740c53
Remove try catch
gxuu Nov 19, 2025
cc110a1
Bump Version Number
gxuu Nov 20, 2025
60d18cc
Bump Version Number
gxuu Nov 24, 2025
92958b4
Resolve requirement from Sharpener6
gxuu Nov 24, 2025
37de534
Merge branch 'master' into nov-ymq-integrate
gxuu Nov 25, 2025
46ab630
Merge branch 'master' into nov-ymq-integrate
gxuu Dec 1, 2025
f893aa5
Merge branch 'main' into nov-ymq-integrate
gxuu Dec 1, 2025
43df020
Bump Version Number
gxuu Dec 1, 2025
d10a76c
Merge branch 'main' into nov-ymq-integrate
gxuu Dec 5, 2025
802cbe1
Merge branch 'master' into nov-ymq-integrate
gxuu Dec 9, 2025
ff311f3
s
gxuu Dec 9, 2025
0d5fe4f
Merge branch 'nov-ymq-integrate' of github.com:gxuu/scaler into nov-y…
gxuu Dec 9, 2025
a14c627
Bump Version Number
gxuu Dec 9, 2025
c526211
Merge branch 'main' into nov-ymq-integrate
gxuu Dec 9, 2025
723cecf
Merge branch 'main' into nov-ymq-integrate
gxuu Dec 15, 2025
9b43965
Merge branch 'main' into nov-ymq-integrate
gxuu Dec 19, 2025
446be2b
Merge branch 'main' into nov-ymq-integrate
gxuu Jan 5, 2026
ac8a931
tp_dealloc should happen later than Py_TYPE
gxuu Jan 5, 2026
67cdb72
Merge branch 'main' into nov-ymq-integrate
sharpener6 Jan 6, 2026
63c04a3
Reduce retry limit as 8 time is too much
gxuu Jan 6, 2026
1052de7
Destory method should not be async
gxuu Jan 6, 2026
670a27c
Merge branch 'nov-ymq-integrate' of github.com:gxuu/scaler into nov-y…
gxuu Jan 6, 2026
ae8d575
Clearup comment in code
gxuu Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/cpp/scaler/object_storage/object_storage_server.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "scaler/object_storage/object_storage_server.h"

#include <algorithm>
#include <csignal>
#include <cstdint>
#include <exception>
#include <future>
Expand All @@ -14,9 +15,32 @@
namespace scaler {
namespace object_storage {

// Global atomic flag to indicate termination request
static std::atomic<bool> sigRequestStop {false};

// Signal handler for SIGTERM
extern "C" void handleSigTerm(int signum)
{
sigRequestStop = true;
}

// Function to install the signal handler
void setupSignalHandling()
{
struct sigaction sa {};
sa.sa_handler = handleSigTerm;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;

if (sigaction(SIGTERM, &sa, nullptr) == -1) {
perror("sigaction");
}
}

ObjectStorageServer::ObjectStorageServer()
{
initServerReadyFds();
setupSignalHandling();
}

ObjectStorageServer::~ObjectStorageServer()
Expand Down Expand Up @@ -138,7 +162,7 @@ void ObjectStorageServer::processRequests(std::function<bool()> running)

auto maybeMessageFuture = ymq::futureRecvMessage(_ioSocket);
while (maybeMessageFuture.wait_for(100ms) == std::future_status::timeout) {
if (!running()) {
if (!running() || sigRequestStop) {
_logger.log(scaler::ymq::Logger::LoggingLevel::info, "ObjectStorageServer: stopped by user");
pendingRequests.clear();
return;
Expand Down
14 changes: 9 additions & 5 deletions src/cpp/scaler/object_storage/pymod_object_storage_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,23 @@ static PyObject* PyObjectStorageServerRun(PyObject* self, PyObject* args)
logging_paths.push_back(PyUnicode_AsUTF8(path_obj));
}

auto running = []() -> bool {
int res {};
auto running = [&] -> bool {
AcquireGIL gil;
(void)gil;
return PyErr_CheckSignals() == 0;
res = PyErr_CheckSignals();
return res == 0;
};

((PyObjectStorageServer*)self)
->server.run(
addr, std::to_string(port), identity, log_level, log_format, std::move(logging_paths), std::move(running));

// TODO: Ideally, run should return a bool and we return failure with nullptr.
return nullptr;
// Py_RETURN_NONE;
if (!res) {
Py_RETURN_NONE;
} else {
return nullptr;
}
}

static PyObject* PyObjectStorageServerWaitUntilReady(PyObject* self, [[maybe_unused]] PyObject* args)
Expand Down
15 changes: 14 additions & 1 deletion src/cpp/scaler/ymq/io_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,23 @@ void IOContext::removeIOSocket(std::shared_ptr<IOSocket>& socket) noexcept
std::promise<void> promise;
auto future = promise.get_future();

// TODO: This `count` and `maxCount` is needed as a safety net so that
// we don't wait forever on querying numOfConnections.
// If the remote end is using YMQ as internal communication tool, then
// we don't need this safety net. This is because YMQ closes a connection
// if the remote end shutdown write. This results to an event in local.
// If the remote end does not close connection upon local end shutdown
// write, the local end will never get any event for remote socket close,
// and therefore the connection will stay alive in the system.
// This needs to be revisited, we have opened an issue, the issue link is:
// https://github.com/finos/opengris-scaler/issues/445
int count = 0;
auto waitToRemoveIOSocket = [&](const auto& self) -> void {
constexpr static int maxCount = 8;
rawSocket->_eventLoopThread->_eventLoop.executeNow([&] {
rawSocket->_eventLoopThread->_eventLoop.executeLater([&] {
if (rawSocket->numOfConnections()) {
if (rawSocket->numOfConnections() && count < maxCount) {
++count;
self(self);
return;
}
Expand Down
86 changes: 52 additions & 34 deletions src/cpp/scaler/ymq/io_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,17 @@ void IOSocket::connectTo(SocketAddress addr, ConnectReturnCallback onConnectRetu
_tcpClient->onCreated();

} else if (addr.nativeHandleType() == SocketAddress::Type::IPC) {
if (_domainClient) {
if (_ipcClient) {
unrecoverableError({
Error::ErrorCode::MultipleConnectToNotSupported,
"Originated from",
"IOSocket::connectTo",
});
}

_domainClient.emplace(
_ipcClient.emplace(
_eventLoopThread.get(), this->identity(), std::move(addr), std::move(callback), maxRetryTimes);
_domainClient->onCreated();
_ipcClient->onCreated();

} else {
std::unreachable(); // current protocol supports only tcp and icp
Expand All @@ -171,35 +171,34 @@ void IOSocket::connectTo(

void IOSocket::bindTo(std::string netOrDomainAddr, BindReturnCallback onBindReturn) noexcept
{
_eventLoopThread->_eventLoop.executeNow(
[this, netOrDomainAddr = std::move(netOrDomainAddr), callback = std::move(onBindReturn)] mutable {
assert(netOrDomainAddr.size());
const auto socketAddress = stringToSocketAddress(netOrDomainAddr);

if (socketAddress.nativeHandleType() == SocketAddress::Type::TCP) {
if (_tcpServer) {
callback(std::unexpected {Error::ErrorCode::MultipleBindToNotSupported});
return;
}
_eventLoopThread->_eventLoop.executeNow([this,
netOrDomainAddr = std::move(netOrDomainAddr),
callback = std::move(onBindReturn)] mutable {
assert(netOrDomainAddr.size());
const auto socketAddress = stringToSocketAddress(netOrDomainAddr);

if (socketAddress.nativeHandleType() == SocketAddress::Type::TCP) {
if (_tcpServer) {
callback(std::unexpected {Error::ErrorCode::MultipleBindToNotSupported});
return;
}

_tcpServer.emplace(
_eventLoopThread.get(), this->identity(), std::move(socketAddress), std::move(callback));
_tcpServer->onCreated();
_tcpServer.emplace(_eventLoopThread.get(), this->identity(), std::move(socketAddress), std::move(callback));
_tcpServer->onCreated();

} else if (socketAddress.nativeHandleType() == SocketAddress::Type::IPC) {
if (_domainServer) {
callback(std::unexpected {Error::ErrorCode::MultipleBindToNotSupported});
return;
}
} else if (socketAddress.nativeHandleType() == SocketAddress::Type::IPC) {
if (_ipcServer) {
callback(std::unexpected {Error::ErrorCode::MultipleBindToNotSupported});
return;
}

_domainServer.emplace(
_eventLoopThread.get(), this->identity(), std::move(socketAddress), std::move(callback));
_domainServer->onCreated();
_ipcServer.emplace(_eventLoopThread.get(), this->identity(), std::move(socketAddress), std::move(callback));
_ipcServer->onCreated();

} else {
std::unreachable(); // current protocol supports only tcp and icp
}
});
} else {
std::unreachable(); // current protocol supports only tcp and icp
}
});
}

void IOSocket::closeConnection(Identity remoteSocketIdentity) noexcept
Expand All @@ -213,11 +212,27 @@ void IOSocket::closeConnection(Identity remoteSocketIdentity) noexcept
});
}

void IOSocket::onConnectorMaxedOutRetry() noexcept
{
assert(_unestablishedConnection.size());
assert(IOSocketType::Connector == this->_socketType);
_connectorDisconnected = true;
fillPendingRecvMessagesWithErr(Error::ErrorCode::ConnectorSocketClosedByRemoteEnd);
auto& connPtr = _unestablishedConnection.back();
_eventLoopThread->_eventLoop.executeLater([conn = std::move(connPtr)]() {});
_unestablishedConnection.pop_back();
}

// TODO: The function should be separated into onConnectionAborted, onConnectionDisconnected,
// and probably onConnectionAbortedBeforeEstablished(?)
void IOSocket::onConnectionDisconnected(MessageConnection* conn, bool keepInBook) noexcept
{
if (!conn->_remoteIOSocketIdentity) {
// TODO: This should perhaps do retry?
if (IOSocketType::Connector == this->_socketType) {
_connectorDisconnected = true;
fillPendingRecvMessagesWithErr(Error::ErrorCode::ConnectorSocketClosedByRemoteEnd);
}
auto connIt = std::ranges::find_if(_unestablishedConnection, [&](const auto& x) { return x.get() == conn; });
assert(connIt != _unestablishedConnection.end());
_eventLoopThread->_eventLoop.executeLater([conn = std::move(*connIt)] {});
Expand Down Expand Up @@ -330,11 +345,14 @@ void IOSocket::onConnectionCreated(
_unestablishedConnection.back()->onCreated();
}

void IOSocket::removeConnectedStreamClient() noexcept
void IOSocket::removeConnectedStreamClient(const StreamClient* client) noexcept
{
if (this->_tcpClient && this->_tcpClient->_connected) {
if (this->_tcpClient && &(*this->_tcpClient) == client) {
this->_tcpClient.reset();
}
if (this->_ipcClient && &(*this->_ipcClient) == client) {
this->_ipcClient.reset();
}
}

void IOSocket::requestStop() noexcept
Expand All @@ -352,11 +370,11 @@ void IOSocket::requestStop() noexcept
_tcpClient->disconnect();
}

if (_domainClient) {
_domainClient->disconnect();
if (_ipcClient) {
_ipcClient->disconnect();
}
if (_domainServer) {
_domainServer->disconnect();
if (_ipcServer) {
_ipcServer->disconnect();
}
}

Expand Down
14 changes: 8 additions & 6 deletions src/cpp/scaler/ymq/io_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class IOSocket {
// NOTE: BELOW FOUR FUNCTIONS ARE USERSPACE API
void sendMessage(Message message, SendMessageCallback onMessageSent) noexcept;
void recvMessage(RecvMessageCallback onRecvMessage) noexcept;

void bindTo(std::string netOrDomainAddr, BindReturnCallback onBindReturn) noexcept;
void connectTo(
std::string netOrDomainAddr, ConnectReturnCallback onConnectReturn, size_t maxRetryTimes = 8) noexcept;
Expand All @@ -70,6 +69,9 @@ class IOSocket {
// From Connection Class only
void onConnectionIdentityReceived(MessageConnection* conn) noexcept;

// From CONNECTOR only
void onConnectorMaxedOutRetry() noexcept;

// NOTE: These two functions are called respectively by sendMessage and server/client.
// Notice that in the each case only the needed information are passed in; so it's less
// likely the user passed in combinations that does not make sense. These two calls are
Expand All @@ -78,8 +80,8 @@ class IOSocket {
void onConnectionCreated(
int fd, SocketAddress localAddr, SocketAddress remoteAddr, bool responsibleForRetry) noexcept;

// From TCPClient class only
void removeConnectedStreamClient() noexcept;
// From StreamClient class only
void removeConnectedStreamClient(const StreamClient* client) noexcept;

void requestStop() noexcept;

Expand All @@ -100,9 +102,9 @@ class IOSocket {
// NOTE: Owning one TCPServer means the user cannot bindTo multiple addresses.
std::optional<StreamServer> _tcpServer;

// NOTE: User may choose to bind to one IP address + one UDS address
std::optional<StreamServer> _domainServer;
std::optional<StreamClient> _domainClient;
// NOTE: User may choose to bind to one IPv4 address + one IPC address
std::optional<StreamServer> _ipcServer;
std::optional<StreamClient> _ipcClient;

// Remote identity to connection map
std::map<std::string, std::unique_ptr<MessageConnection>> _identityToConnection;
Expand Down
11 changes: 8 additions & 3 deletions src/cpp/scaler/ymq/stream_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ void StreamClient::onCreated()

_rawClient.zeroNativeHandle();
_connected = true;
_eventLoopThread->_eventLoop.executeLater([sock] { sock->removeConnectedStreamClient(); });
_eventLoopThread->_eventLoop.executeLater([sock, this] { sock->removeConnectedStreamClient(this); });

if (_retryTimes == 0) {
_onConnectReturn({});
Expand Down Expand Up @@ -105,14 +105,19 @@ void StreamClient::onWrite()
_rawClient.zeroNativeHandle();
_connected = true;

_eventLoopThread->_eventLoop.executeLater([sock] { sock->removeConnectedStreamClient(); });
_eventLoopThread->_eventLoop.executeLater([sock, this] { sock->removeConnectedStreamClient(this); });
}

void StreamClient::retry()
{
if (_retryTimes > _maxRetryTimes) {
_logger.log(Logger::LoggingLevel::error, "Retried times has reached maximum: ", _maxRetryTimes);
// exit(1);
disconnect();

const std::string id = this->_localIOSocketIdentity;
auto sock = this->_eventLoopThread->_identityToIOSocket.at(id);
sock->onConnectorMaxedOutRetry();
_eventLoopThread->_eventLoop.executeLater([sock, this] { sock->removeConnectedStreamClient(this); });
return;
}

Expand Down
17 changes: 12 additions & 5 deletions src/scaler/client/agent/client_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from scaler.config.types.zmq import ZMQConfig
from scaler.io.async_connector import ZMQAsyncConnector
from scaler.io.mixins import AsyncConnector
from scaler.io.utility import create_async_connector
from scaler.io.ymq.ymq import YMQException
from scaler.protocol.python.common import ObjectStorageAddress
from scaler.protocol.python.message import (
ClientDisconnect,
Expand Down Expand Up @@ -78,8 +80,9 @@ def __init__(
callback=self.__on_receive_from_client,
identity=None,
)
self._connector_external: AsyncConnector = ZMQAsyncConnector(
context=zmq.asyncio.Context.shadow(self._context),

self._connector_external: AsyncConnector = create_async_connector(
zmq.asyncio.Context.shadow(self._context),
name="client_agent_external",
socket_type=zmq.DEALER,
address=self._scheduler_address,
Expand Down Expand Up @@ -194,7 +197,11 @@ async def __get_loops(self):
finally:
self._stop_event.set() # always set the stop event before setting futures' exceptions

await self._object_manager.clear_all_objects(clear_serializer=True)
if not isinstance(exception, YMQException):
try:
await self._object_manager.clear_all_objects(clear_serializer=True)
except YMQException: # Above call triggers YMQ, which may raise
pass

self._connector_external.destroy()
self._connector_internal.destroy()
Expand All @@ -211,8 +218,8 @@ async def __get_loops(self):
elif isinstance(exception, (ClientQuitException, ClientShutdownException)):
logging.info("ClientAgent: client quitting")
self._future_manager.set_all_futures_with_exception(exception)
elif isinstance(exception, TimeoutError):
elif isinstance(exception, (TimeoutError, YMQException)):
logging.error(f"ClientAgent: client timeout when connecting to {self._scheduler_address.to_address()}")
self._future_manager.set_all_futures_with_exception(exception)
self._future_manager.set_all_futures_with_exception(TimeoutError())
else:
raise exception
Loading
Loading