From a25ae51cf80ff50cd5674841a73a15734df4197c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 25 May 2023 08:11:21 +0200 Subject: [PATCH 01/20] GEDS: Send heartbeats and add api to enable server decomissioning. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 85 ++++++++++- src/libgeds/GEDS.h | 4 + src/libgeds/MetadataService.cpp | 51 +++++++ src/libgeds/MetadataService.h | 7 + src/libgeds/Server.cpp | 30 ++++ src/metadataservice/CMakeLists.txt | 8 + src/metadataservice/GRPCServer.cpp | 49 ++++++- src/metadataservice/MDSHttpServer.cpp | 67 +++++++++ src/metadataservice/MDSHttpServer.h | 37 +++++ src/metadataservice/MDSHttpSession.cpp | 147 +++++++++++++++++++ src/metadataservice/MDSHttpSession.h | 46 ++++++ src/metadataservice/NodeInformation.cpp | 101 +++++++++++++ src/metadataservice/NodeInformation.h | 69 +++++++++ src/metadataservice/Nodes.cpp | 187 ++++++++++++++++++++++++ src/metadataservice/Nodes.h | 46 ++++++ src/protos/geds.proto | 30 +++- src/utility/MDSKVSBucket.cpp | 10 ++ src/utility/MDSKVSBucket.h | 2 + 18 files changed, 972 insertions(+), 4 deletions(-) create mode 100644 src/metadataservice/MDSHttpServer.cpp create mode 100644 src/metadataservice/MDSHttpServer.h create mode 100644 src/metadataservice/MDSHttpSession.cpp create mode 100644 src/metadataservice/MDSHttpSession.h create mode 100644 src/metadataservice/NodeInformation.cpp create mode 100644 src/metadataservice/NodeInformation.h create mode 100644 src/metadataservice/Nodes.cpp create mode 100644 src/metadataservice/Nodes.h diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 36ffec6c..f119f9c8 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -159,6 +159,8 @@ absl::Status GEDS::start() { // Update state. _state = ServiceState::Running; + (void)_metadataService.configureNode(_hostname, geds::rpc::NodeState::Register); + startStorageMonitoringThread(); startPubSubStreamThread(); @@ -923,6 +925,77 @@ void GEDS::relocate(std::shared_ptr handle, bool force) { (void)handle->relocate(); } +absl::Status GEDS::downloadObject(const std::string &bucket, const std::string &key) { + auto oldFile = openAsFileHandle(bucket, key); + if (!oldFile.ok()) { + return oldFile.status(); + } + auto newFile = createAsFileHandle(bucket, key, true /* overwrite */); + if (!newFile.ok()) { + return newFile.status(); + } + return (*oldFile)->download(*newFile); +} + +absl::Status GEDS::downloadObjects(std::vector objects) { + struct PullHelper { + std::mutex mutex; + std::condition_variable cv; + size_t nTasks; + size_t nErrors; + auto lock() { return std::unique_lock(mutex); } + }; + auto h = std::make_shared(); + { + auto lock = h->lock(); + h->nTasks = objects.size(); + h->nErrors = 0; + } + + auto self = shared_from_this(); + size_t off = 3 * _config.io_thread_pool_size; + + for (size_t offset = 0; offset < objects.size(); offset += off) { + auto rbegin = offset; + auto rend = rbegin + off; + if (rend > objects.size()) { + rend = objects.size(); + } + for (auto i = rbegin; i < rend; i++) { + auto file = objects[i]; + boost::asio::post(_ioThreadPool, [self, &file, h]() { + bool error = false; + try { + auto status = self->downloadObject(file.bucket, file.key); + if (!status.ok()) { + LOG_ERROR("Unable to download ", file.bucket, "/", file.key); + error = true; + } + } catch (...) { + LOG_ERROR("Encountered an exception when downloading ", file.bucket, "/", file.key); + error = true; + } + { + auto lock = h->lock(); + h->nTasks -= 1; + if (error) { + h->nErrors += 1; + } + } + h->cv.notify_all(); + }); + } + } + auto relocateLock = h->lock(); + h->cv.wait(relocateLock, [h]() { return h->nTasks == 0; }); + LOG_INFO("Downloaded ", objects.size(), " objects, errors: ", h->nErrors); + if (h->nErrors) { + return absl::UnknownError("Some objects were not downloaded: Observed " + + std::to_string(h->nErrors) + " errors!"); + } + return absl::OkStatus(); +} + void GEDS::startStorageMonitoringThread() { _storageMonitoringThread = std::thread([&]() { auto statsLocalStorageUsed = geds::Statistics::createGauge("GEDS: Local Storage used"); @@ -970,8 +1043,16 @@ void GEDS::startStorageMonitoringThread() { *statsLocalMemoryFree = _memoryCounters.free; } - auto targetStorage = (size_t)(0.5 * (double)_config.available_local_storage); - if (storageUsed > targetStorage) { + { + // Send heartbeat. + auto status = _metadataService.heartBeat(_hostname, _storageCounters, _memoryCounters); + if (!status.ok()) { + LOG_ERROR("Unable to send heartbeat to metadata service: ", status.message()); + } + } + + auto targetStorage = (size_t)(0.7 * (double)_config.available_local_storage); + if (memoryUsed > targetStorage) { std::sort(std::begin(relocatable), std::end(relocatable), [](std::shared_ptr a, std::shared_ptr b) { if (a->openCount() == 0 && b->openCount() == 0) { diff --git a/src/libgeds/GEDS.h b/src/libgeds/GEDS.h index 33768196..e6cb9378 100644 --- a/src/libgeds/GEDS.h +++ b/src/libgeds/GEDS.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -327,6 +328,9 @@ class GEDS : public std::enable_shared_from_this, utility::RWConcurrentObj absl::Status subscribe(const geds::SubscriptionEvent &event); absl::Status unsubscribe(const geds::SubscriptionEvent &event); + + absl::Status downloadObject(const std::string &bucket, const std::string &key); + absl::Status downloadObjects(std::vector objects); }; #endif // GEDS_GEDS_H diff --git a/src/libgeds/MetadataService.cpp b/src/libgeds/MetadataService.cpp index 0d487f13..14fc2468 100644 --- a/src/libgeds/MetadataService.cpp +++ b/src/libgeds/MetadataService.cpp @@ -143,6 +143,57 @@ absl::StatusOr MetadataService::getConnectionInformation() { return response.remoteaddress(); } +absl::Status MetadataService::configureNode(const std::string &identifier, + geds::rpc::NodeState state) { + METADATASERVICE_CHECK_CONNECTED; + + geds::rpc::NodeStatus request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + auto node = request.mutable_node(); + node->set_identifier(identifier); + + request.set_state(state); + + auto status = _stub->ConfigureNode(&context, request, &response); + if (!status.ok()) { + return absl::UnavailableError("Unable to execute ConfigureNode: " + status.error_message()); + } + return convertStatus(response); +} + +absl::Status MetadataService::heartBeat(const std::string &identifier, + const StorageCounter &storage, + const StorageCounter &memory) { + METADATASERVICE_CHECK_CONNECTED; + + geds::rpc::HeartbeatMessage request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + auto node = request.mutable_node(); + node->set_identifier(identifier); + + { + auto lock = memory.getReadLock(); + request.set_memoryallocated(memory.allocated); + request.set_memoryused(memory.used); + } + + { + auto lock = storage.getReadLock(); + request.set_storageused(storage.used); + request.set_storageallocated(storage.allocated); + } + + auto status = _stub->Heartbeat(&context, request, &response); + if (!status.ok()) { + return absl::UnavailableError("Unable to send heart beat: " + status.error_message()); + } + return convertStatus(response); +} + absl::Status MetadataService::createBucket(const std::string_view &bucket) { METADATASERVICE_CHECK_CONNECTED; geds::rpc::Bucket request; diff --git a/src/libgeds/MetadataService.h b/src/libgeds/MetadataService.h index ac2fcf79..696c8846 100644 --- a/src/libgeds/MetadataService.h +++ b/src/libgeds/MetadataService.h @@ -22,7 +22,9 @@ #include "ObjectStoreConfig.h" #include "PubSub.h" +#include "StorageCounter.h" #include "geds.grpc.pb.h" +#include "geds.pb.h" namespace geds { @@ -45,6 +47,11 @@ class MetadataService { absl::Status disconnect(); + absl::Status configureNode(const std::string &identifier, geds::rpc::NodeState state); + + absl::Status heartBeat(const std::string &identifier, const StorageCounter &storage, + const StorageCounter &memory); + absl::StatusOr getConnectionInformation(); absl::Status registerObjectStoreConfig(const ObjectStoreConfig &mapping); diff --git a/src/libgeds/Server.cpp b/src/libgeds/Server.cpp index 663fd29b..a2bc1251 100644 --- a/src/libgeds/Server.cpp +++ b/src/libgeds/Server.cpp @@ -30,6 +30,7 @@ #include "Platform.h" #include "Ports.h" #include "Status.h" +#include "absl/status/status.h" #include "geds.grpc.pb.h" #include "geds.pb.h" @@ -63,6 +64,35 @@ class ServerImpl final : public geds::rpc::GEDSService::Service { return grpc::Status::OK; } + ::grpc::Status DownloadObjects(::grpc::ServerContext *context, + const ::geds::rpc::MultiObjectID *request, + ::geds::rpc::StatusResponse *response) override { + LOG_INFO(context->peer(), " has requested to pull ", request->objects().size(), " objects."); + + std::vector objects; + const auto &data = request->objects(); + objects.reserve(data.size()); + for (const auto &o : data) { + objects.emplace_back(geds::ObjectID(o.bucket(), o.key())); + } + auto status = _geds->downloadObjects(objects); + convertStatus(response, status); + return grpc::Status::OK; + }; + + ::grpc::Status DeleteObjectsLocally(::grpc::ServerContext *context, + const ::geds::rpc::MultiObjectID *request, + ::geds::rpc::StatusResponse *response) override { + LOG_INFO(context->peer(), " has requested to delete ", request->objects().size(), " objects."); + + (void)context; + (void)request; + (void)response; + auto status = absl::UnimplementedError("DeleteObjectsLocally: NYI"); + convertStatus(response, status); + return grpc::Status::OK; + } + public: ServerImpl(std::shared_ptr geds, Server &server) : _geds(geds), _server(server) {} diff --git a/src/metadataservice/CMakeLists.txt b/src/metadataservice/CMakeLists.txt index 07d70ae8..55b846d1 100644 --- a/src/metadataservice/CMakeLists.txt +++ b/src/metadataservice/CMakeLists.txt @@ -6,6 +6,14 @@ set(SOURCES GRPCServer.cpp GRPCServer.h + MDSHttpServer.cpp + MDSHttpServer.h + MDSHttpSession.cpp + MDSHttpSession.h + NodeInformation.cpp + NodeInformation.h + Nodes.cpp + Nodes.h ObjectStoreHandler.cpp ObjectStoreHandler.h S3Helper.cpp diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index fae039c5..90d6c154 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -20,6 +20,8 @@ #include "FormatISO8601.h" #include "Logging.h" +#include "MDSHttpServer.h" +#include "Nodes.h" #include "ObjectStoreConfig.h" #include "ObjectStoreHandler.h" #include "ParseGRPC.h" @@ -37,8 +39,16 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { std::shared_ptr _kvs; ObjectStoreHandler _objectStoreHandler; + Nodes _nodes; + geds::MDSHttpServer _httpServer; + public: - MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs) {} + MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs), _httpServer(4383, _nodes) { + auto status = _httpServer.start(); + if (!status.ok()) { + LOG_ERROR("Unable to start HTTP Server!"); + } + } geds::ObjectID convert(const ::geds::rpc::ObjectID *r) { return geds::ObjectID(r->bucket(), r->key()); @@ -53,6 +63,43 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { } protected: + grpc::Status ConfigureNode(::grpc::ServerContext *context, const ::geds::rpc::NodeStatus *request, + ::geds::rpc::StatusResponse *response) override { + LOG_ACCESS("ConfigureNode"); + const auto &identifier = request->node().identifier(); + const auto state = request->state(); + + absl::Status status; + if (state == geds::rpc::NodeState::Register) { + status = _nodes.registerNode(identifier); + } else if (state == geds::rpc::NodeState::Unregister) { + status = _nodes.unregisterNode(identifier); + } else { + LOG_ERROR("Invalid state ", state); + status = absl::InvalidArgumentError("Invalid state: " + std::to_string(state)); + } + + convertStatus(response, status); + return grpc::Status::OK; + } + + grpc::Status Heartbeat(::grpc::ServerContext *context, + const ::geds::rpc::HeartbeatMessage *request, + ::geds::rpc::StatusResponse *response) override { + LOG_ACCESS("Heartbeat: ", request->node().identifier()); + + const auto &identifier = request->node().identifier(); + NodeHeartBeat val; + val.memoryAllocated = request->memoryallocated(); + val.memoryUsed = request->memoryused(); + val.storageAllocated = request->storageallocated(); + val.storageUsed = request->storageused(); + + auto status = _nodes.heartbeat(identifier, std::move(val)); + convertStatus(response, status); + return grpc::Status::OK; + } + grpc::Status GetConnectionInformation(::grpc::ServerContext *context, const ::geds::rpc::EmptyParams * /* unused request */, ::geds::rpc::ConnectionInformation *response) override { diff --git a/src/metadataservice/MDSHttpServer.cpp b/src/metadataservice/MDSHttpServer.cpp new file mode 100644 index 00000000..80db7c5f --- /dev/null +++ b/src/metadataservice/MDSHttpServer.cpp @@ -0,0 +1,67 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "MDSHttpServer.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Logging.h" +#include "MDSHttpSession.h" +#include "Nodes.h" + +namespace geds { + +MDSHttpServer::MDSHttpServer(uint16_t port, Nodes &nodes) : _port(port), _nodes(nodes) {} + +absl::Status MDSHttpServer::start() { + if (_acceptor != nullptr) { + return absl::UnknownError("The server is already running!"); + } + try { + auto host = boost::asio::ip::make_address("0.0.0.0"); + _acceptor = std::unique_ptr( + new boost::asio::ip::tcp::acceptor(_ioContext, {host, _port})); + _thread = std::thread([&] { + accept(); + _ioContext.run(); + }); + } catch (boost::exception &e) { + // Workaround until GEDS properly supports multiple processes. + auto diag = boost::diagnostic_information(e, false); + return absl::InternalError("Unable to start webserver: " + diag); + } + return absl::OkStatus(); +} + +void MDSHttpServer::stop() { + _ioContext.stop(); + _acceptor->close(); + _thread.join(); +} + +void MDSHttpServer::accept() { + _acceptor->async_accept(boost::asio::make_strand(_ioContext), + [&](boost::beast::error_code ec, boost::asio::ip::tcp::socket socket) { + if (ec) { + LOG_ERROR("Unable to accept ", ec.message(), " ABORT."); + return; + } + std::make_shared(std::move(socket), _nodes)->start(); + accept(); + }); +} + +} // namespace geds diff --git a/src/metadataservice/MDSHttpServer.h b/src/metadataservice/MDSHttpServer.h new file mode 100644 index 00000000..983cfac1 --- /dev/null +++ b/src/metadataservice/MDSHttpServer.h @@ -0,0 +1,37 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include + +#include +#include + +#include "MDSHttpSession.h" + +class Nodes; + +namespace geds { + +class MDSHttpServer { + uint16_t _port; + + boost::asio::io_context _ioContext{1}; + std::unique_ptr _acceptor = nullptr; + std::thread _thread; + + Nodes &_nodes; + +public: + MDSHttpServer(uint16_t port, Nodes &nodes); + absl::Status start(); + void stop(); + +private: + void accept(); +}; + +} // namespace geds diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp new file mode 100644 index 00000000..e961841b --- /dev/null +++ b/src/metadataservice/MDSHttpSession.cpp @@ -0,0 +1,147 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "MDSHttpSession.h" + +#include + +#include "Logging.h" +#include "Nodes.h" +#include "Statistics.h" + +namespace geds { + +MDSHttpSession::MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes) + : _stream(std::move(socket)), _nodes(nodes) {} + +MDSHttpSession::~MDSHttpSession() { close(); } + +void MDSHttpSession::start() { + LOG_DEBUG("Start connection"); + auto self = shared_from_this(); + boost::asio::dispatch(_stream.get_executor(), + boost::beast::bind_front_handler(&MDSHttpSession::awaitRequest, self)); +} + +void MDSHttpSession::awaitRequest() { + auto self = shared_from_this(); + _request = {}; + _stream.expires_after(std::chrono::seconds(10)); + + boost::beast::http::async_read( + _stream, _buffer, _request, + [self](boost::beast::error_code ec, std::size_t /* bytes_transferred */) { + if (ec == boost::beast::http::error::end_of_stream) { + return; + } + if (ec) { + LOG_ERROR("Failed reading stream", ec.message()); + return; + } + self->handleRequest(); + }); +} + +void MDSHttpSession::prepareHtmlReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "text/html"); + _response.keep_alive(_request.keep_alive()); + + boost::beast::ostream(_response.body()) << "" + << "" + << "GEDS Service" + << "" + << "" + << "
"
+                                          << "GEDS Metadata Service\n"
+                                          << "=====================\n"
+                                          << "\n";
+
+  const auto &info = _nodes.information();
+  if (info.size()) {
+    boost::beast::ostream(_response.body()) << "Registered nodes:\n";
+    info.forall([&](const std::shared_ptr &node) {
+      const auto &[heartBeat, ts] = node->lastHeartBeat();
+      boost::beast::ostream(_response.body())
+          << " - " << node->identifier << " "                         //
+          << "Allocated: " << heartBeat.storageAllocated << " "       //
+          << "Used: " << heartBeat.storageUsed << " "                 //
+          << "Memory Allocated: " << heartBeat.memoryAllocated << " " //
+          << "Memory Used: " << heartBeat.memoryUsed << "\n";
+    });
+  }
+  boost::beast::ostream(_response.body()) << "
" + << "" << std::endl; + handleWrite(); +} + +void MDSHttpSession::handleRequest() { + if (_request.method() != boost::beast::http::verb::get) { + return prepareError(boost::beast::http::status::bad_request, "Invalid method."); + } + if (_request.target().empty() || _request.target()[0] != '/') { + return prepareError(boost::beast::http::status::bad_request, "Invalid path."); + } + + if (_request.target() == "/") { + return prepareHtmlReply(); + } + if (_request.target() == "/metrics") { + return prepareMetricsReply(); + } + + return prepareError(boost::beast::http::status::not_found, "Invalid path"); +} + +void MDSHttpSession::prepareMetricsReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "plain/text"); + _response.keep_alive(_request.keep_alive()); + + std::stringstream stream; + Statistics::get().prometheusMetrics(stream); + boost::beast::ostream(_response.body()) << stream.str(); + handleWrite(); +} + +void MDSHttpSession::prepareError(boost::beast::http::status status, std::string message) { + _response.result(status); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "text/html"); + _response.keep_alive(_request.keep_alive()); + boost::beast::ostream(_response.body()) << message; + + return handleWrite(); +} + +void MDSHttpSession::handleWrite() { + auto self = shared_from_this(); + _response.content_length(_response.body().size()); + + boost::beast::http::async_write( + _stream, _response, + [self](boost::beast::error_code ec, std::size_t /* unused bytesTransferred */) { + if (ec) { + LOG_ERROR("Error ", ec.message()); + return; + } + if (self->_request.keep_alive()) { + self->awaitRequest(); + } + self->_buffer.clear(); + }); +} + +void MDSHttpSession::close() { + LOG_DEBUG("Closing connection"); + + boost::beast::error_code ec; + _stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); + _stream.socket().close(); +} + +} // namespace geds diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h new file mode 100644 index 00000000..f9714a11 --- /dev/null +++ b/src/metadataservice/MDSHttpSession.h @@ -0,0 +1,46 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +class Nodes; + +namespace geds { + +class MDSHttpSession : public std::enable_shared_from_this { + boost::beast::tcp_stream _stream; + boost::beast::flat_buffer _buffer{4096}; + boost::beast::http::request _request; + boost::beast::http::response _response; + + Nodes &_nodes; + +public: + MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes); + ~MDSHttpSession(); + void start(); + + void awaitRequest(); + void handleRequest(); + void prepareHtmlReply(); + void prepareMetricsReply(); + void prepareError(boost::beast::http::status status, std::string message); + void handleWrite(); + + void close(); +}; +} // namespace geds diff --git a/src/metadataservice/NodeInformation.cpp b/src/metadataservice/NodeInformation.cpp new file mode 100644 index 00000000..ca82a228 --- /dev/null +++ b/src/metadataservice/NodeInformation.cpp @@ -0,0 +1,101 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "NodeInformation.h" + +#include "Logging.h" +#include "Status.h" + +NodeInformation::NodeInformation(std::string identifierArg) + : identifier(std::move(identifierArg)) {} + +absl::Status NodeInformation::connect() { + auto channelLock = std::lock_guard(_connectionMutex); + auto lock = getWriteLock(); + if (_channel.get() != nullptr) { + // Already connected. + return absl::OkStatus(); + } + try { + _channel = grpc::CreateChannel(identifier, grpc::InsecureChannelCredentials()); + auto success = + _channel->WaitForConnected(std::chrono::system_clock::now() + std::chrono::seconds(10)); + if (!success) { + // Destroy channel. + _channel = nullptr; + return absl::UnavailableError("Could not connect to " + identifier + "."); + } + _stub = geds::rpc::GEDSService::NewStub(_channel); + } catch (const std::exception &e) { + _channel = nullptr; + return absl::UnavailableError("Could not open channel with " + identifier + + ". Reason: " + e.what()); + } + return absl::OkStatus(); +} + +absl::Status NodeInformation::disconnect() { + auto channelLock = std::lock_guard(_connectionMutex); + auto lock = getWriteLock(); + if (_channel.get() == nullptr) { + // Already disconnected. + return absl::OkStatus(); + } + _stub.release(); + _channel = nullptr; + return absl::OkStatus(); +} + +void NodeInformation::setState(NodeState state) { + auto lock = getWriteLock(); + _state = state; +} +NodeState NodeInformation::state() { + auto lock = getReadLock(); + return _state; +} + +void NodeInformation::updateHeartBeat(const NodeHeartBeat &heartBeat) { + auto lock = getWriteLock(); + _heartBeat = heartBeat; + _lastCheckin = std::chrono::system_clock::now(); +} + +std::tuple> +NodeInformation::lastHeartBeat() { + auto lock = getReadLock(); + return std::make_tuple(_heartBeat, _lastCheckin); +} + +std::chrono::time_point NodeInformation::lastCheckin() { + auto lock = getReadLock(); + return _lastCheckin; +} + +absl::Status +NodeInformation::downloadObjects(const std::vector> &objects) { + if (!_connectionMutex.try_lock()) { + return absl::UnavailableError("Unable to pull objects: Lock is unavailable!"); + } + + geds::rpc::MultiObjectID request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + for (const auto &o : objects) { + auto obj = request.add_objects(); + obj->set_bucket(o->bucket); + obj->set_key(o->key); + } + + auto status = _stub->DownloadObjects(&context, request, &response); + if (!status.ok()) { + LOG_ERROR("Unable to execute object pull grpc call, status: ", status.error_code(), " ", + status.error_details()); + return absl::UnknownError("Unable to execute object pull!"); + } + + return convertStatus(response); +} diff --git a/src/metadataservice/NodeInformation.h b/src/metadataservice/NodeInformation.h new file mode 100644 index 00000000..040ce943 --- /dev/null +++ b/src/metadataservice/NodeInformation.h @@ -0,0 +1,69 @@ +/** + * Copyright 2023- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "RWConcurrentObjectAdaptor.h" +#include "geds.grpc.pb.h" + +enum class NodeState { Registered, Decomissioning, Unknown }; + +struct NodeHeartBeat { + size_t memoryAllocated{0}; + size_t memoryUsed{0}; + size_t storageAllocated{0}; + size_t storageUsed{0}; +}; + +struct RelocatableObject { + const std::string &bucket; + std::string key; + size_t size; +}; + +class NodeInformation : public utility::RWConcurrentObjectAdaptor { + std::mutex _connectionMutex; + std::shared_ptr _channel{nullptr}; + std::unique_ptr _stub{nullptr}; + + // Subject to state mutex + NodeState _state{NodeState::Unknown}; + NodeHeartBeat _heartBeat; + std::chrono::time_point _lastCheckin; + // End subject to state mutex +public: + NodeInformation(std::string identifier); + NodeInformation(NodeInformation &) = delete; + NodeInformation(NodeInformation &&) = delete; + NodeInformation &operator=(NodeInformation &) = delete; + NodeInformation &operator=(NodeInformation &&) = delete; + + absl::Status connect(); + absl::Status disconnect(); + + // Subject to state mutex + absl::Status queryHeartBeat(); + const std::string identifier; + + void setState(NodeState state); + NodeState state(); + + void updateHeartBeat(const NodeHeartBeat &heartBeat); + std::tuple> lastHeartBeat(); + std::chrono::time_point lastCheckin(); + // End subject to state mutex + + absl::Status downloadObjects(const std::vector> &objects); +}; diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp new file mode 100644 index 00000000..a0f774d1 --- /dev/null +++ b/src/metadataservice/Nodes.cpp @@ -0,0 +1,187 @@ +/** + * Copyright 2023- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "Nodes.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "Logging.h" +#include "geds.grpc.pb.h" + +absl::Status Nodes::registerNode(const std::string &identifier) { + auto val = std::make_shared(identifier); + auto existing = _nodes.insertOrExists(identifier, val); + if (existing.get() != val.get()) { + // auto diff = std::chrono::duration_cast(now - existing->lastCheckin); + if (existing->state() == NodeState::Decomissioning) { + // Allow reregistering decomissioned nodes. + auto connect = val->connect(); + if (!connect.ok()) { + LOG_ERROR("Unable to establish backchannel to " + identifier + + " unable to decomission node!"); + } + _nodes.insertOrReplace(identifier, val); + + return absl::OkStatus(); + } + auto message = "Node " + identifier + " was already registered!"; + LOG_ERROR(message); + return absl::AlreadyExistsError(message); + } + + auto connect = val->connect(); + if (!connect.ok()) { + LOG_ERROR("Unable to establish backchannel to " + identifier + " unable to decomission node!"); + } + return absl::OkStatus(); +} + +absl::Status Nodes::unregisterNode(const std::string &identifier) { + auto removed = _nodes.getAndRemove(identifier); + if (!removed.value()) { + auto message = "Unable to remove " + identifier + " not found!"; + LOG_ERROR(message); + return absl::NotFoundError(message); + } + return absl::OkStatus(); +} + +absl::Status Nodes::heartbeat(const std::string &identifier, const NodeHeartBeat &heartbeat) { + auto val = _nodes.get(identifier); + if (!val.value()) { + auto message = "Unable to process heartbeat " + identifier + " not found!"; + LOG_ERROR(message); + return absl::NotFoundError(message); + } + (*val)->updateHeartBeat(heartbeat); + return absl::OkStatus(); +} + +absl::Status Nodes::decomissionNodes(const std::vector &nodes, + std::shared_ptr kvs) { + if (!_isDecommissioning.try_lock()) { + return absl::UnavailableError("Already decomissioning."); + } + + for (const auto &node : nodes) { + auto existing = _nodes.get(node); + if (!existing.has_value()) { + LOG_ERROR("Unable to decomission: Node " + node + " since it does not exist!"); + continue; + } + (*existing)->setState(NodeState::Decomissioning); + } + + // Prefix nodes with geds:// + std::vector prefixedNodes(nodes.size()); + for (const auto &node : nodes) { + prefixedNodes.emplace_back("geds://" + node); + } + + // Find all buckets. + auto buckets = kvs->listBuckets(); + if (!buckets.ok()) { + LOG_ERROR("Unable to list buckets when decomissioning: ", buckets.status().message()); + _isDecommissioning.unlock(); + return buckets.status(); + } + + // Find all objects that need to be relocated. + std::vector> objects; + for (const auto &bucketName : *buckets) { + auto bucket = kvs->getBucket(bucketName); + if (!bucket.ok()) { + continue; + } + (*bucket)->forall([&objects, &prefixedNodes, &bucketName](const utility::Path &path, + const geds::ObjectInfo &obj) { + // Do not relocate cached blocks. + if (path.name.starts_with("_$cachedblock$/")) { + return; + } + for (const auto &n : prefixedNodes) { + if (obj.location.starts_with(n)) { + objects.emplace_back( + new RelocatableObject{.bucket = bucketName, .key = path.name, .size = obj.size}); + } + } + }); + } + + std::sort(objects.begin(), objects.end(), + [](const std::shared_ptr &a, + const std::shared_ptr &b) { return a->size > b->size; }); + + // List all available nodes. + struct NodeTargetInfo { + std::shared_ptr node; + std::vector> objects; + size_t available; + size_t target; + }; + std::vector> targetNodes; + _nodes.forall([&targetNodes](std::shared_ptr &info) { + if (info->state() == NodeState::Registered) { + const auto [hb, _] = info->lastHeartBeat(); + + size_t storageAvailable = + (hb.storageUsed > hb.storageAllocated) ? 0 : hb.storageAllocated - hb.storageUsed; + targetNodes.emplace_back(new NodeTargetInfo{ + .node = info, .objects = {}, .available = storageAvailable, .target = 0}); + } + }); + if (targetNodes.size() == 0) { + return absl::UnavailableError("No target nodes available!"); + } + + // Sort target nodes based on available size. + auto targetNodeCompare = [](const std::shared_ptr &a, + const std::shared_ptr &b) { + auto aAvail = (a->available > a->target) ? a->available - a->target : 0; + auto bAvail = (b->available > b->target) ? b->available - b->target : 0; + return aAvail > bAvail || a->available > b->available; + }; + std::sort(targetNodes.begin(), targetNodes.end(), targetNodeCompare); + + // Simple binpacking by filling up the empty nodes. + std::vector> nonRelocatable; + for (auto &obj : objects) { + bool foundTarget = false; + for (auto &target : targetNodes) { + if (target->target + obj->size < target->available) { + foundTarget = true; + target->objects.push_back(obj); + target->target += obj->size; + } + } + if (!foundTarget) { + LOG_ERROR("Unable to relocate ", obj->bucket, "/", obj->key, " (", obj->size, " bytes)"); + nonRelocatable.push_back(obj); + } + } + + std::vector threads; + threads.reserve(targetNodes.size()); + for (auto &target : targetNodes) { + threads.emplace_back([target] { + auto status = target->node->downloadObjects(target->objects); + if (!status.ok()) { + LOG_ERROR("Unable to relocate objects to ", target->node->identifier); + } + }); + } + for (auto &thread : threads) { + thread.join(); + } + + _isDecommissioning.unlock(); + return absl::OkStatus(); +} diff --git a/src/metadataservice/Nodes.h b/src/metadataservice/Nodes.h new file mode 100644 index 00000000..a16d913b --- /dev/null +++ b/src/metadataservice/Nodes.h @@ -0,0 +1,46 @@ +/** + * Copyright 2023- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ConcurrentMap.h" +#include "MDSKVS.h" +#include "NodeInformation.h" +#include "geds.grpc.pb.h" + +class Nodes { + mutable std::shared_mutex _mutex; + + utility::ConcurrentMap> _nodes; + + std::mutex _isDecommissioning; + +public: + Nodes() = default; + Nodes(Nodes &) = delete; + Nodes(Nodes &&) = delete; + Nodes &operator=(Nodes &) = delete; + Nodes &operator=(Nodes &&) = delete; + ~Nodes() = default; + + const utility::ConcurrentMap> &information() const { + return _nodes; + }; + + absl::Status registerNode(const std::string &identifier); + absl::Status unregisterNode(const std::string &identifier); + absl::Status heartbeat(const std::string &identifier, const NodeHeartBeat &heartbeat); + absl::Status decomissionNodes(const std::vector &nodes, std::shared_ptr kvs); +}; diff --git a/src/protos/geds.proto b/src/protos/geds.proto index c877b8a9..75e87f98 100644 --- a/src/protos/geds.proto +++ b/src/protos/geds.proto @@ -31,6 +31,7 @@ message ObjectID { string bucket = 1; string key = 2; } +message MultiObjectID { repeated ObjectID objects = 1; } message ObjectInfo { string location = 1; @@ -95,7 +96,30 @@ message SubscriptionStreamResponse { Object object = 2; } +enum NodeState { + Register = 0; + Unregister = 1; + PrepareDecomission = 2; +} + +message NodeIdentifier { string identifier = 1; } + +message NodeStatus { + NodeIdentifier node = 1; + NodeState state = 2; +} + +message HeartbeatMessage { + NodeIdentifier node = 1; + uint64 memoryAllocated = 2; + uint64 memoryUsed = 3; + uint64 storageAllocated = 4; + uint64 storageUsed = 5; +} + service MetadataService { + rpc ConfigureNode(NodeStatus) returns (StatusResponse); + rpc Heartbeat(HeartbeatMessage) returns (StatusResponse); rpc GetConnectionInformation(EmptyParams) returns (ConnectionInformation); rpc RegisterObjectStore(ObjectStoreConfig) returns (StatusResponse); rpc ListObjectStores(EmptyParams) returns (AvailableObjectStoreConfigs); @@ -128,4 +152,8 @@ message TransportEndpoint { message AvailTransportEndpoints { repeated TransportEndpoint endpoint = 1; } -service GEDSService { rpc GetAvailEndpoints(EmptyParams) returns (AvailTransportEndpoints); } +service GEDSService { + rpc GetAvailEndpoints(EmptyParams) returns (AvailTransportEndpoints); + rpc DownloadObjects(MultiObjectID) returns (StatusResponse); + rpc DeleteObjectsLocally(MultiObjectID) returns (StatusResponse); +} diff --git a/src/utility/MDSKVSBucket.cpp b/src/utility/MDSKVSBucket.cpp index ce87c234..4025b870 100644 --- a/src/utility/MDSKVSBucket.cpp +++ b/src/utility/MDSKVSBucket.cpp @@ -104,3 +104,13 @@ MDSKVSBucket::listObjects(const std::string &keyPrefix, char delimiter) { return std::make_pair(std::move(result), std::vector{commonPrefixes.begin(), commonPrefixes.end()}); } + +void MDSKVSBucket::forall( + std::function action) { + auto lock = getReadLock(); + for (const auto &v : _map) { + const auto &key = v.first; + const auto &value = v.second; + action(key, value->obj); + } +} diff --git a/src/utility/MDSKVSBucket.h b/src/utility/MDSKVSBucket.h index 4eae24a8..bcba00fe 100644 --- a/src/utility/MDSKVSBucket.h +++ b/src/utility/MDSKVSBucket.h @@ -52,4 +52,6 @@ class MDSKVSBucket : public utility::RWConcurrentObjectAdaptor { absl::StatusOr lookup(const std::string &key); absl::StatusOr, std::vector>> listObjects(const std::string &keyPrefix, char delimiter = 0); + + void forall(std::function action); }; From b528989dafacd422683de369f1de095005d25c6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 10 May 2023 11:50:02 +0200 Subject: [PATCH 02/20] Integrate UUID with HeartBeat and Expose api/nodes json endpoint. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 5 ++- src/libgeds/MetadataService.cpp | 14 +++---- src/libgeds/MetadataService.h | 5 ++- src/metadataservice/CMakeLists.txt | 2 + src/metadataservice/GRPCServer.cpp | 13 +++--- src/metadataservice/MDSHttpSession.cpp | 40 +++++++++++++----- src/metadataservice/MDSHttpSession.h | 1 + src/metadataservice/NodeInformation.cpp | 36 +++++++++++++---- src/metadataservice/NodeInformation.h | 22 +++++++--- src/metadataservice/Nodes.cpp | 54 +++++++++++++++++-------- src/metadataservice/Nodes.h | 15 +++++-- src/protos/geds.proto | 8 +++- 12 files changed, 154 insertions(+), 61 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index f119f9c8..65779f95 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -159,7 +159,8 @@ absl::Status GEDS::start() { // Update state. _state = ServiceState::Running; - (void)_metadataService.configureNode(_hostname, geds::rpc::NodeState::Register); + (void)_metadataService.configureNode(uuid, _hostname, _server.port(), + geds::rpc::NodeState::Register); startStorageMonitoringThread(); startPubSubStreamThread(); @@ -1045,7 +1046,7 @@ void GEDS::startStorageMonitoringThread() { { // Send heartbeat. - auto status = _metadataService.heartBeat(_hostname, _storageCounters, _memoryCounters); + auto status = _metadataService.heartBeat(uuid, _storageCounters, _memoryCounters); if (!status.ok()) { LOG_ERROR("Unable to send heartbeat to metadata service: ", status.message()); } diff --git a/src/libgeds/MetadataService.cpp b/src/libgeds/MetadataService.cpp index 14fc2468..e6420843 100644 --- a/src/libgeds/MetadataService.cpp +++ b/src/libgeds/MetadataService.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include "GEDS.h" #include "Logging.h" @@ -143,8 +142,8 @@ absl::StatusOr MetadataService::getConnectionInformation() { return response.remoteaddress(); } -absl::Status MetadataService::configureNode(const std::string &identifier, - geds::rpc::NodeState state) { +absl::Status MetadataService::configureNode(const std::string &uuid, const std::string &identifier, + uint16_t port, geds::rpc::NodeState state) { METADATASERVICE_CHECK_CONNECTED; geds::rpc::NodeStatus request; @@ -153,8 +152,10 @@ absl::Status MetadataService::configureNode(const std::string &identifier, auto node = request.mutable_node(); node->set_identifier(identifier); + node->set_port(port); request.set_state(state); + request.set_uuid(uuid); auto status = _stub->ConfigureNode(&context, request, &response); if (!status.ok()) { @@ -163,8 +164,7 @@ absl::Status MetadataService::configureNode(const std::string &identifier, return convertStatus(response); } -absl::Status MetadataService::heartBeat(const std::string &identifier, - const StorageCounter &storage, +absl::Status MetadataService::heartBeat(const std::string &uuid, const StorageCounter &storage, const StorageCounter &memory) { METADATASERVICE_CHECK_CONNECTED; @@ -172,9 +172,7 @@ absl::Status MetadataService::heartBeat(const std::string &identifier, geds::rpc::StatusResponse response; grpc::ClientContext context; - auto node = request.mutable_node(); - node->set_identifier(identifier); - + request.set_uuid(uuid); { auto lock = memory.getReadLock(); request.set_memoryallocated(memory.allocated); diff --git a/src/libgeds/MetadataService.h b/src/libgeds/MetadataService.h index 696c8846..3eccf0a8 100644 --- a/src/libgeds/MetadataService.h +++ b/src/libgeds/MetadataService.h @@ -47,9 +47,10 @@ class MetadataService { absl::Status disconnect(); - absl::Status configureNode(const std::string &identifier, geds::rpc::NodeState state); + absl::Status configureNode(const std::string &uuid, const std::string &identifier, uint16_t port, + geds::rpc::NodeState state); - absl::Status heartBeat(const std::string &identifier, const StorageCounter &storage, + absl::Status heartBeat(const std::string &uuid, const StorageCounter &storage, const StorageCounter &memory); absl::StatusOr getConnectionInformation(); diff --git a/src/metadataservice/CMakeLists.txt b/src/metadataservice/CMakeLists.txt index 55b846d1..3b67d783 100644 --- a/src/metadataservice/CMakeLists.txt +++ b/src/metadataservice/CMakeLists.txt @@ -23,7 +23,9 @@ set(SOURCES add_library(libmetadataservice STATIC ${SOURCES}) target_link_libraries(libmetadataservice PUBLIC + magic_enum::magic_enum ${GRPC_LIBRARIES} + ${Boost_LIBRARIES} geds_utility geds_proto geds_s3 diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index 90d6c154..6a232b15 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -5,6 +5,7 @@ #include "GRPCServer.h" +#include #include #include #include @@ -67,13 +68,15 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { ::geds::rpc::StatusResponse *response) override { LOG_ACCESS("ConfigureNode"); const auto &identifier = request->node().identifier(); + uint16_t port = request->node().port(); const auto state = request->state(); + const auto &uuid = request->uuid(); absl::Status status; if (state == geds::rpc::NodeState::Register) { - status = _nodes.registerNode(identifier); + status = _nodes.registerNode(uuid, identifier, port); } else if (state == geds::rpc::NodeState::Unregister) { - status = _nodes.unregisterNode(identifier); + status = _nodes.unregisterNode(uuid); } else { LOG_ERROR("Invalid state ", state); status = absl::InvalidArgumentError("Invalid state: " + std::to_string(state)); @@ -86,16 +89,16 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { grpc::Status Heartbeat(::grpc::ServerContext *context, const ::geds::rpc::HeartbeatMessage *request, ::geds::rpc::StatusResponse *response) override { - LOG_ACCESS("Heartbeat: ", request->node().identifier()); + LOG_ACCESS("Heartbeat: ", request->uuid()); - const auto &identifier = request->node().identifier(); + const auto &uuid = request->uuid(); NodeHeartBeat val; val.memoryAllocated = request->memoryallocated(); val.memoryUsed = request->memoryused(); val.storageAllocated = request->storageallocated(); val.storageUsed = request->storageused(); - auto status = _nodes.heartbeat(identifier, std::move(val)); + auto status = _nodes.heartbeat(uuid, std::move(val)); convertStatus(response, status); return grpc::Status::OK; } diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp index e961841b..cc483fd5 100644 --- a/src/metadataservice/MDSHttpSession.cpp +++ b/src/metadataservice/MDSHttpSession.cpp @@ -66,7 +66,8 @@ void MDSHttpSession::prepareHtmlReply() { info.forall([&](const std::shared_ptr &node) { const auto &[heartBeat, ts] = node->lastHeartBeat(); boost::beast::ostream(_response.body()) - << " - " << node->identifier << " " // + << " - " << node->uuid << ": " // + << node->host << ":" << node->port << " " // << "Allocated: " << heartBeat.storageAllocated << " " // << "Used: " << heartBeat.storageUsed << " " // << "Memory Allocated: " << heartBeat.memoryAllocated << " " // @@ -78,22 +79,41 @@ void MDSHttpSession::prepareHtmlReply() { handleWrite(); } +void MDSHttpSession::prepareApiNodesReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "text/csv"); + _response.keep_alive(_request.keep_alive()); + + auto data = boost::json::value_from(_nodes); + boost::beast::ostream(_response.body()) << boost::json::serialize(data); + + handleWrite(); +} + void MDSHttpSession::handleRequest() { - if (_request.method() != boost::beast::http::verb::get) { - return prepareError(boost::beast::http::status::bad_request, "Invalid method."); - } if (_request.target().empty() || _request.target()[0] != '/') { return prepareError(boost::beast::http::status::bad_request, "Invalid path."); } - if (_request.target() == "/") { - return prepareHtmlReply(); + if (_request.method() == boost::beast::http::verb::get) { + if (_request.target() == "/") { + return prepareHtmlReply(); + } + if (_request.target() == "/api/nodes") { + return prepareApiNodesReply(); + } + if (_request.target() == "/metrics") { + return prepareMetricsReply(); + } + return prepareError(boost::beast::http::status::not_found, "Invalid path"); } - if (_request.target() == "/metrics") { - return prepareMetricsReply(); + if (_request.method() == boost::beast::http::verb::post) { + if (_request.target() == "/api/decomission") { + } + return prepareError(boost::beast::http::status::not_found, "Invalid path"); } - - return prepareError(boost::beast::http::status::not_found, "Invalid path"); + return prepareError(boost::beast::http::status::bad_request, "Invalid method."); } void MDSHttpSession::prepareMetricsReply() { diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h index f9714a11..116f3e39 100644 --- a/src/metadataservice/MDSHttpSession.h +++ b/src/metadataservice/MDSHttpSession.h @@ -38,6 +38,7 @@ class MDSHttpSession : public std::enable_shared_from_this { void handleRequest(); void prepareHtmlReply(); void prepareMetricsReply(); + void prepareApiNodesReply(); void prepareError(boost::beast::http::status status, std::string message); void handleWrite(); diff --git a/src/metadataservice/NodeInformation.cpp b/src/metadataservice/NodeInformation.cpp index ca82a228..e7711d97 100644 --- a/src/metadataservice/NodeInformation.cpp +++ b/src/metadataservice/NodeInformation.cpp @@ -8,8 +8,8 @@ #include "Logging.h" #include "Status.h" -NodeInformation::NodeInformation(std::string identifierArg) - : identifier(std::move(identifierArg)) {} +NodeInformation::NodeInformation(std::string uuid, std::string hostArg, uint16_t port) + : uuid(std::move(uuid)), host(std::move(hostArg)), port(port) {} absl::Status NodeInformation::connect() { auto channelLock = std::lock_guard(_connectionMutex); @@ -18,20 +18,20 @@ absl::Status NodeInformation::connect() { // Already connected. return absl::OkStatus(); } + auto loc = host + ":" + std::to_string(port); try { - _channel = grpc::CreateChannel(identifier, grpc::InsecureChannelCredentials()); + _channel = grpc::CreateChannel(loc, grpc::InsecureChannelCredentials()); auto success = _channel->WaitForConnected(std::chrono::system_clock::now() + std::chrono::seconds(10)); if (!success) { // Destroy channel. _channel = nullptr; - return absl::UnavailableError("Could not connect to " + identifier + "."); + return absl::UnavailableError("Could not connect to " + loc + "."); } _stub = geds::rpc::GEDSService::NewStub(_channel); } catch (const std::exception &e) { _channel = nullptr; - return absl::UnavailableError("Could not open channel with " + identifier + - ". Reason: " + e.what()); + return absl::UnavailableError("Could not open channel with " + loc + ". Reason: " + e.what()); } return absl::OkStatus(); } @@ -52,7 +52,7 @@ void NodeInformation::setState(NodeState state) { auto lock = getWriteLock(); _state = state; } -NodeState NodeInformation::state() { +NodeState NodeInformation::state() const { auto lock = getReadLock(); return _state; } @@ -64,7 +64,7 @@ void NodeInformation::updateHeartBeat(const NodeHeartBeat &heartBeat) { } std::tuple> -NodeInformation::lastHeartBeat() { +NodeInformation::lastHeartBeat() const { auto lock = getReadLock(); return std::make_tuple(_heartBeat, _lastCheckin); } @@ -99,3 +99,23 @@ NodeInformation::downloadObjects(const std::vector const &n) { + const auto &[h, timeStamp] = n->lastHeartBeat(); + + auto storageFree = h.storageUsed > h.storageAllocated ? 0 : h.storageAllocated - h.storageUsed; + auto memoryFree = h.memoryUsed > h.memoryAllocated ? 0 : h.memoryAllocated - h.memoryUsed; + + jv = {{"uuid", n->uuid}, + {"host", n->host}, + {"port", n->port}, + {"storageAllocated", h.storageAllocated}, + {"storageUsed", h.storageUsed}, + {"storageFree", storageFree}, + {"memoryAllocated", h.memoryAllocated}, + {"memoryUsed", h.memoryUsed}, + {"memoryFree", memoryFree}, + {"lastCheckIn", toISO8601String(timeStamp)}, + {"state", std::string{magic_enum::enum_name(n->state())}}}; +} diff --git a/src/metadataservice/NodeInformation.h b/src/metadataservice/NodeInformation.h index 040ce943..73742a65 100644 --- a/src/metadataservice/NodeInformation.h +++ b/src/metadataservice/NodeInformation.h @@ -6,16 +6,21 @@ #pragma once #include +#include #include #include #include #include +#include #include +#include #include -#include +#include +#include "FormatISO8601.h" #include "RWConcurrentObjectAdaptor.h" +#include "boost/json/detail/value_from.hpp" #include "geds.grpc.pb.h" enum class NodeState { Registered, Decomissioning, Unknown }; @@ -44,7 +49,7 @@ class NodeInformation : public utility::RWConcurrentObjectAdaptor { std::chrono::time_point _lastCheckin; // End subject to state mutex public: - NodeInformation(std::string identifier); + NodeInformation(std::string uuid, std::string host, uint16_t port); NodeInformation(NodeInformation &) = delete; NodeInformation(NodeInformation &&) = delete; NodeInformation &operator=(NodeInformation &) = delete; @@ -55,15 +60,22 @@ class NodeInformation : public utility::RWConcurrentObjectAdaptor { // Subject to state mutex absl::Status queryHeartBeat(); - const std::string identifier; + + const std::string uuid; + const std::string host; + const uint16_t port; void setState(NodeState state); - NodeState state(); + NodeState state() const; void updateHeartBeat(const NodeHeartBeat &heartBeat); - std::tuple> lastHeartBeat(); + std::tuple> + lastHeartBeat() const; std::chrono::time_point lastCheckin(); // End subject to state mutex absl::Status downloadObjects(const std::vector> &objects); }; + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + std::shared_ptr const &n); diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp index a0f774d1..2468e484 100644 --- a/src/metadataservice/Nodes.cpp +++ b/src/metadataservice/Nodes.cpp @@ -6,58 +6,66 @@ #include "Nodes.h" #include +#include #include +#include #include #include #include #include #include +#include +#include +#include + #include "Logging.h" +#include "NodeInformation.h" #include "geds.grpc.pb.h" -absl::Status Nodes::registerNode(const std::string &identifier) { - auto val = std::make_shared(identifier); - auto existing = _nodes.insertOrExists(identifier, val); +absl::Status Nodes::registerNode(const std::string &uuid, const std::string &host, uint16_t port) { + auto val = std::make_shared(uuid, host, port); + auto existing = _nodes.insertOrExists(uuid, val); if (existing.get() != val.get()) { // auto diff = std::chrono::duration_cast(now - existing->lastCheckin); if (existing->state() == NodeState::Decomissioning) { // Allow reregistering decomissioned nodes. auto connect = val->connect(); if (!connect.ok()) { - LOG_ERROR("Unable to establish backchannel to " + identifier + - " unable to decomission node!"); + LOG_ERROR(connect.message()); } - _nodes.insertOrReplace(identifier, val); + _nodes.insertOrReplace(uuid, val); return absl::OkStatus(); } - auto message = "Node " + identifier + " was already registered!"; + auto message = "Node " + uuid + " was already registered!"; LOG_ERROR(message); return absl::AlreadyExistsError(message); } auto connect = val->connect(); - if (!connect.ok()) { - LOG_ERROR("Unable to establish backchannel to " + identifier + " unable to decomission node!"); + if (connect.ok()) { + val->setState(NodeState::Registered); + } else { + LOG_ERROR(connect.message()); } return absl::OkStatus(); } -absl::Status Nodes::unregisterNode(const std::string &identifier) { - auto removed = _nodes.getAndRemove(identifier); +absl::Status Nodes::unregisterNode(const std::string &uuid) { + auto removed = _nodes.getAndRemove(uuid); if (!removed.value()) { - auto message = "Unable to remove " + identifier + " not found!"; + auto message = "Unable to remove " + uuid + " not found!"; LOG_ERROR(message); return absl::NotFoundError(message); } return absl::OkStatus(); } -absl::Status Nodes::heartbeat(const std::string &identifier, const NodeHeartBeat &heartbeat) { - auto val = _nodes.get(identifier); +absl::Status Nodes::heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat) { + auto val = _nodes.get(uuid); if (!val.value()) { - auto message = "Unable to process heartbeat " + identifier + " not found!"; + auto message = "Unable to process heartbeat " + uuid + " not found!"; LOG_ERROR(message); return absl::NotFoundError(message); } @@ -174,7 +182,7 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, threads.emplace_back([target] { auto status = target->node->downloadObjects(target->objects); if (!status.ok()) { - LOG_ERROR("Unable to relocate objects to ", target->node->identifier); + LOG_ERROR("Unable to relocate objects to ", target->node->host); } }); } @@ -185,3 +193,17 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, _isDecommissioning.unlock(); return absl::OkStatus(); } + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, Nodes const &n) { + boost::json::array nv; + const auto &info = n.information(); + info.forall([&nv](const std::shared_ptr &node) { + nv.emplace_back(boost::json::value_from(node)); + }); + jv = {{"nodes", nv}}; +} + +void tag_invoke(boost::json::value_from_tag t, boost::json::value &jv, + std::shared_ptr const &n) { + tag_invoke(t, jv, *n); +} diff --git a/src/metadataservice/Nodes.h b/src/metadataservice/Nodes.h index a16d913b..38a97b89 100644 --- a/src/metadataservice/Nodes.h +++ b/src/metadataservice/Nodes.h @@ -6,6 +6,7 @@ #pragma once #include +#include #include #include #include @@ -14,6 +15,7 @@ #include #include +#include #include "ConcurrentMap.h" #include "MDSKVS.h" @@ -39,8 +41,15 @@ class Nodes { return _nodes; }; - absl::Status registerNode(const std::string &identifier); - absl::Status unregisterNode(const std::string &identifier); - absl::Status heartbeat(const std::string &identifier, const NodeHeartBeat &heartbeat); + absl::Status registerNode(const std::string &uuid, const std::string &host, uint16_t port); + absl::Status unregisterNode(const std::string &uuid); + absl::Status heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat); absl::Status decomissionNodes(const std::vector &nodes, std::shared_ptr kvs); + + std::string toJson() const; }; + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, Nodes const &n); + +void tag_invoke(boost::json::value_from_tag t, boost::json::value &jv, + std::shared_ptr const &n); diff --git a/src/protos/geds.proto b/src/protos/geds.proto index 75e87f98..a564fa6f 100644 --- a/src/protos/geds.proto +++ b/src/protos/geds.proto @@ -102,15 +102,19 @@ enum NodeState { PrepareDecomission = 2; } -message NodeIdentifier { string identifier = 1; } +message NodeIdentifier { + string identifier = 1; + uint32 port = 2; +} message NodeStatus { NodeIdentifier node = 1; NodeState state = 2; + string uuid = 3; } message HeartbeatMessage { - NodeIdentifier node = 1; + string uuid = 1; uint64 memoryAllocated = 2; uint64 memoryUsed = 3; uint64 storageAllocated = 4; From 3e68bc3198676f961381b1b2ed0ebf524f55c4bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 10 May 2023 13:18:19 +0200 Subject: [PATCH 03/20] GEDS: Seal objects once they have been downloaded. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 65779f95..bf190624 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -935,7 +935,11 @@ absl::Status GEDS::downloadObject(const std::string &bucket, const std::string & if (!newFile.ok()) { return newFile.status(); } - return (*oldFile)->download(*newFile); + auto status = (*oldFile)->download(*newFile); + if (!status.ok()) { + return status; + } + return (*newFile)->seal(); } absl::Status GEDS::downloadObjects(std::vector objects) { From 12fca78706fdbc3fb10c130c79b3b41c4fd5f86c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 25 May 2023 08:14:25 +0200 Subject: [PATCH 04/20] Implement: Purge Local Objects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 20 +++++++++++++++++++ src/libgeds/GEDS.h | 13 +++++++++++++ src/libgeds/Server.cpp | 12 ++++++++---- src/metadataservice/NodeInformation.cpp | 26 +++++++++++++++++++++++++ src/metadataservice/NodeInformation.h | 1 + src/metadataservice/Nodes.cpp | 9 ++++++++- 6 files changed, 76 insertions(+), 5 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index bf190624..5bc5b225 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -1121,3 +1121,23 @@ absl::Status GEDS::unsubscribe(const geds::SubscriptionEvent &event) { } return _metadataService.unsubscribe(event); } + +absl::Status GEDS::purgeLocalObject(const std::string &bucket, const std::string &key) { + const auto path = getPath(bucket, key); + auto result = _fileHandles.getAndRemove(path); + if (!result.has_value()) { + return absl::NotFoundError("The object with the path " + path.name + + " does not exist locally."); + } + return absl::OkStatus(); +} + +absl::Status GEDS::purgeLocalObjects(std::vector objects) { + for (const auto &obj : objects) { + auto status = purgeLocalObject(obj.bucket, obj.key); + if (!status.ok()) { + LOG_ERROR(status.message()); + } + } + return absl::OkStatus(); +} diff --git a/src/libgeds/GEDS.h b/src/libgeds/GEDS.h index e6cb9378..fcfd75cd 100644 --- a/src/libgeds/GEDS.h +++ b/src/libgeds/GEDS.h @@ -329,8 +329,21 @@ class GEDS : public std::enable_shared_from_this, utility::RWConcurrentObj absl::Status subscribe(const geds::SubscriptionEvent &event); absl::Status unsubscribe(const geds::SubscriptionEvent &event); + /** + * @brief Pull object to this GEDS instance. + */ absl::Status downloadObject(const std::string &bucket, const std::string &key); absl::Status downloadObjects(std::vector objects); + + /** + * @brief Purge locally stored object without updating the Metadata server. + */ + absl::Status purgeLocalObject(const std::string &bucket, const std::string &key); + + /** + * @brief Purge locally stored objects if they exist locally. Missing files will be logged. + */ + absl::Status purgeLocalObjects(std::vector objects); }; #endif // GEDS_GEDS_H diff --git a/src/libgeds/Server.cpp b/src/libgeds/Server.cpp index a2bc1251..6dd9dff7 100644 --- a/src/libgeds/Server.cpp +++ b/src/libgeds/Server.cpp @@ -85,10 +85,14 @@ class ServerImpl final : public geds::rpc::GEDSService::Service { ::geds::rpc::StatusResponse *response) override { LOG_INFO(context->peer(), " has requested to delete ", request->objects().size(), " objects."); - (void)context; - (void)request; - (void)response; - auto status = absl::UnimplementedError("DeleteObjectsLocally: NYI"); + std::vector objects; + const auto &data = request->objects(); + objects.reserve(data.size()); + for (const auto &o : data) { + objects.emplace_back(geds::ObjectID(o.bucket(), o.key())); + } + + auto status = _geds->purgeLocalObjects(objects); convertStatus(response, status); return grpc::Status::OK; } diff --git a/src/metadataservice/NodeInformation.cpp b/src/metadataservice/NodeInformation.cpp index e7711d97..22467399 100644 --- a/src/metadataservice/NodeInformation.cpp +++ b/src/metadataservice/NodeInformation.cpp @@ -100,6 +100,32 @@ NodeInformation::downloadObjects(const std::vector> &objects) { + if (!_connectionMutex.try_lock()) { + return absl::UnavailableError("Unable to pull objects: Lock is unavailable!"); + } + + geds::rpc::MultiObjectID request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + for (const auto &o : objects) { + auto obj = request.add_objects(); + obj->set_bucket(o->bucket); + obj->set_key(o->key); + } + + auto status = _stub->DeleteObjectsLocally(&context, request, &response); + if (!status.ok()) { + LOG_ERROR("Unable to execute object delete grpc call, status: ", status.error_code(), " ", + status.error_details()); + return absl::UnknownError("Unable to execute object delete!"); + } + + return convertStatus(response); +} + void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, std::shared_ptr const &n) { const auto &[h, timeStamp] = n->lastHeartBeat(); diff --git a/src/metadataservice/NodeInformation.h b/src/metadataservice/NodeInformation.h index 73742a65..98360af6 100644 --- a/src/metadataservice/NodeInformation.h +++ b/src/metadataservice/NodeInformation.h @@ -75,6 +75,7 @@ class NodeInformation : public utility::RWConcurrentObjectAdaptor { // End subject to state mutex absl::Status downloadObjects(const std::vector> &objects); + absl::Status purgeLocalObjects(const std::vector> &objects); }; void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp index 2468e484..ad15abd7 100644 --- a/src/metadataservice/Nodes.cpp +++ b/src/metadataservice/Nodes.cpp @@ -182,7 +182,14 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, threads.emplace_back([target] { auto status = target->node->downloadObjects(target->objects); if (!status.ok()) { - LOG_ERROR("Unable to relocate objects to ", target->node->host); + LOG_ERROR("Unable to relocate objects to ", target->node->host, + " uuid: ", target->node->uuid); + return; + } + status = target->node->purgeLocalObjects(target->objects); + if (!status.ok()) { + LOG_ERROR("Unable to cleanup local objects on ", target->node->host, + " uuid: ", target->node->uuid); } }); } From 3d4d473033f9724a94a9bf6a2cfa1c1fb4eeb906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 10 May 2023 15:34:38 +0200 Subject: [PATCH 05/20] Implement HTTP Endpoints to support server decommissioning. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/metadataservice/GRPCServer.cpp | 2 +- src/metadataservice/MDSHttpServer.cpp | 22 +++++----- src/metadataservice/MDSHttpServer.h | 4 +- src/metadataservice/MDSHttpSession.cpp | 54 ++++++++++++++++++++++--- src/metadataservice/MDSHttpSession.h | 6 ++- src/metadataservice/NodeInformation.cpp | 4 ++ src/metadataservice/NodeInformation.h | 4 +- src/metadataservice/Nodes.cpp | 53 +++++++++++++----------- src/metadataservice/Nodes.h | 3 +- src/protos/geds.proto | 3 +- 10 files changed, 111 insertions(+), 44 deletions(-) diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index 6a232b15..2ddb5592 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -44,7 +44,7 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { geds::MDSHttpServer _httpServer; public: - MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs), _httpServer(4383, _nodes) { + MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs), _httpServer(4383, _nodes, _kvs) { auto status = _httpServer.start(); if (!status.ok()) { LOG_ERROR("Unable to start HTTP Server!"); diff --git a/src/metadataservice/MDSHttpServer.cpp b/src/metadataservice/MDSHttpServer.cpp index 80db7c5f..4b666c79 100644 --- a/src/metadataservice/MDSHttpServer.cpp +++ b/src/metadataservice/MDSHttpServer.cpp @@ -24,7 +24,8 @@ namespace geds { -MDSHttpServer::MDSHttpServer(uint16_t port, Nodes &nodes) : _port(port), _nodes(nodes) {} +MDSHttpServer::MDSHttpServer(uint16_t port, Nodes &nodes, std::shared_ptr kvs) + : _port(port), _nodes(nodes), _kvs(kvs) {} absl::Status MDSHttpServer::start() { if (_acceptor != nullptr) { @@ -53,15 +54,16 @@ void MDSHttpServer::stop() { } void MDSHttpServer::accept() { - _acceptor->async_accept(boost::asio::make_strand(_ioContext), - [&](boost::beast::error_code ec, boost::asio::ip::tcp::socket socket) { - if (ec) { - LOG_ERROR("Unable to accept ", ec.message(), " ABORT."); - return; - } - std::make_shared(std::move(socket), _nodes)->start(); - accept(); - }); + _acceptor->async_accept( + boost::asio::make_strand(_ioContext), + [&](boost::beast::error_code ec, boost::asio::ip::tcp::socket socket) { + if (ec) { + LOG_ERROR("Unable to accept ", ec.message(), " ABORT."); + return; + } + std::make_shared(std::move(socket), _nodes, _kvs)->start(); + accept(); + }); } } // namespace geds diff --git a/src/metadataservice/MDSHttpServer.h b/src/metadataservice/MDSHttpServer.h index 983cfac1..a32a6208 100644 --- a/src/metadataservice/MDSHttpServer.h +++ b/src/metadataservice/MDSHttpServer.h @@ -11,6 +11,7 @@ #include #include "MDSHttpSession.h" +#include "MDSKVS.h" class Nodes; @@ -24,9 +25,10 @@ class MDSHttpServer { std::thread _thread; Nodes &_nodes; + std::shared_ptr _kvs; public: - MDSHttpServer(uint16_t port, Nodes &nodes); + MDSHttpServer(uint16_t port, Nodes &nodes, std::shared_ptr kvs); absl::Status start(); void stop(); diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp index cc483fd5..6882afcf 100644 --- a/src/metadataservice/MDSHttpSession.cpp +++ b/src/metadataservice/MDSHttpSession.cpp @@ -5,7 +5,14 @@ #include "MDSHttpSession.h" +#include #include +#include +#include + +#include +#include +#include #include "Logging.h" #include "Nodes.h" @@ -13,8 +20,9 @@ namespace geds { -MDSHttpSession::MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes) - : _stream(std::move(socket)), _nodes(nodes) {} +MDSHttpSession::MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes, + std::shared_ptr kvs) + : _stream(std::move(socket)), _nodes(nodes), _kvs(kvs) {} MDSHttpSession::~MDSHttpSession() { close(); } @@ -82,7 +90,7 @@ void MDSHttpSession::prepareHtmlReply() { void MDSHttpSession::prepareApiNodesReply() { _response.result(boost::beast::http::status::ok); _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); - _response.set(boost::beast::http::field::content_type, "text/csv"); + _response.set(boost::beast::http::field::content_type, "application/json"); _response.keep_alive(_request.keep_alive()); auto data = boost::json::value_from(_nodes); @@ -91,6 +99,40 @@ void MDSHttpSession::prepareApiNodesReply() { handleWrite(); } +void MDSHttpSession::prepareApiDecommissionReply(const std::string &body) { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + boost::json::error_code ec; + auto parsed = boost::json::parse(body, ec); + if (ec) { + return prepareError(boost::beast::http::status::bad_request, ec.message()); + } + if (!parsed.is_array()) { + return prepareError(boost::beast::http::status::bad_request, "Expected array!"); + } + + std::vector hostsToDecommission; + for (const auto &value : parsed.as_array()) { + if (!value.is_string()) { + return prepareError(boost::beast::http::status::bad_request, "Unexpected element in array!"); + } + hostsToDecommission.push_back(boost::json::value_to(value)); + } + + auto status = _nodes.decommissionNodes(hostsToDecommission, _kvs); + if (!status.ok()) { + return prepareError(boost::beast::http::status::internal_server_error, + std::string{status.message()}); + } + + boost::beast::ostream(_response.body()) + << R"({"status": "success", "nodes": )" << body << R"(}\n)"; + handleWrite(); +} + void MDSHttpSession::handleRequest() { if (_request.target().empty() || _request.target()[0] != '/') { return prepareError(boost::beast::http::status::bad_request, "Invalid path."); @@ -109,7 +151,9 @@ void MDSHttpSession::handleRequest() { return prepareError(boost::beast::http::status::not_found, "Invalid path"); } if (_request.method() == boost::beast::http::verb::post) { - if (_request.target() == "/api/decomission") { + auto body = boost::beast::buffers_to_string(_request.body().data()); + if (_request.target() == "/api/decommission") { + return prepareApiDecommissionReply(body); } return prepareError(boost::beast::http::status::not_found, "Invalid path"); } @@ -133,7 +177,7 @@ void MDSHttpSession::prepareError(boost::beast::http::status status, std::string _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); _response.set(boost::beast::http::field::content_type, "text/html"); _response.keep_alive(_request.keep_alive()); - boost::beast::ostream(_response.body()) << message; + boost::beast::ostream(_response.body()) << message << "\n"; return handleWrite(); } diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h index 116f3e39..460e8e67 100644 --- a/src/metadataservice/MDSHttpSession.h +++ b/src/metadataservice/MDSHttpSession.h @@ -17,6 +17,8 @@ #include #include +#include "MDSKVS.h" + class Nodes; namespace geds { @@ -28,9 +30,10 @@ class MDSHttpSession : public std::enable_shared_from_this { boost::beast::http::response _response; Nodes &_nodes; + std::shared_ptr _kvs; public: - MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes); + MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes, std::shared_ptr kvs); ~MDSHttpSession(); void start(); @@ -39,6 +42,7 @@ class MDSHttpSession : public std::enable_shared_from_this { void prepareHtmlReply(); void prepareMetricsReply(); void prepareApiNodesReply(); + void prepareApiDecommissionReply(const std::string &body); void prepareError(boost::beast::http::status status, std::string message); void handleWrite(); diff --git a/src/metadataservice/NodeInformation.cpp b/src/metadataservice/NodeInformation.cpp index 22467399..a4d4dfa6 100644 --- a/src/metadataservice/NodeInformation.cpp +++ b/src/metadataservice/NodeInformation.cpp @@ -57,6 +57,10 @@ NodeState NodeInformation::state() const { return _state; } +std::string NodeInformation::gedsHostUri() const { + return "geds://" + host + ":" + std::to_string(port); +} + void NodeInformation::updateHeartBeat(const NodeHeartBeat &heartBeat) { auto lock = getWriteLock(); _heartBeat = heartBeat; diff --git a/src/metadataservice/NodeInformation.h b/src/metadataservice/NodeInformation.h index 98360af6..f2803c1e 100644 --- a/src/metadataservice/NodeInformation.h +++ b/src/metadataservice/NodeInformation.h @@ -23,7 +23,7 @@ #include "boost/json/detail/value_from.hpp" #include "geds.grpc.pb.h" -enum class NodeState { Registered, Decomissioning, Unknown }; +enum class NodeState { Registered, Decommissioning, ReadyForShutdown, Unknown }; struct NodeHeartBeat { size_t memoryAllocated{0}; @@ -68,6 +68,8 @@ class NodeInformation : public utility::RWConcurrentObjectAdaptor { void setState(NodeState state); NodeState state() const; + std::string gedsHostUri() const; + void updateHeartBeat(const NodeHeartBeat &heartBeat); std::tuple> lastHeartBeat() const; diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp index ad15abd7..b64cac6a 100644 --- a/src/metadataservice/Nodes.cpp +++ b/src/metadataservice/Nodes.cpp @@ -21,6 +21,7 @@ #include "Logging.h" #include "NodeInformation.h" +#include "absl/status/status.h" #include "geds.grpc.pb.h" absl::Status Nodes::registerNode(const std::string &uuid, const std::string &host, uint16_t port) { @@ -28,8 +29,8 @@ absl::Status Nodes::registerNode(const std::string &uuid, const std::string &hos auto existing = _nodes.insertOrExists(uuid, val); if (existing.get() != val.get()) { // auto diff = std::chrono::duration_cast(now - existing->lastCheckin); - if (existing->state() == NodeState::Decomissioning) { - // Allow reregistering decomissioned nodes. + if (existing->state() == NodeState::Decommissioning) { + // Allow reregistering decommissioned nodes. auto connect = val->connect(); if (!connect.ok()) { LOG_ERROR(connect.message()); @@ -73,32 +74,31 @@ absl::Status Nodes::heartbeat(const std::string &uuid, const NodeHeartBeat &hear return absl::OkStatus(); } -absl::Status Nodes::decomissionNodes(const std::vector &nodes, - std::shared_ptr kvs) { - if (!_isDecommissioning.try_lock()) { - return absl::UnavailableError("Already decomissioning."); +absl::Status Nodes::decommissionNodes(const std::vector &nodes, + std::shared_ptr kvs) { + auto lock = std::unique_lock(_isDecommissioning, std::try_to_lock); + if (!lock.owns_lock()) { + return absl::UnavailableError("Already decommissioning."); } + // Mark hosts as decommissioning and determine host uris to collect objects. + std::vector gedsHostUris; + std::vector> hostsToDecommision; for (const auto &node : nodes) { auto existing = _nodes.get(node); if (!existing.has_value()) { - LOG_ERROR("Unable to decomission: Node " + node + " since it does not exist!"); - continue; + return absl::UnavailableError("Unable to decommission node: " + node + + " since it does not exist!"); } - (*existing)->setState(NodeState::Decomissioning); - } - - // Prefix nodes with geds:// - std::vector prefixedNodes(nodes.size()); - for (const auto &node : nodes) { - prefixedNodes.emplace_back("geds://" + node); + (*existing)->setState(NodeState::Decommissioning); + hostsToDecommision.emplace_back(*existing); + gedsHostUris.emplace_back((*existing)->gedsHostUri()); } // Find all buckets. auto buckets = kvs->listBuckets(); if (!buckets.ok()) { - LOG_ERROR("Unable to list buckets when decomissioning: ", buckets.status().message()); - _isDecommissioning.unlock(); + LOG_ERROR("Unable to list buckets when decommissioning: ", buckets.status().message()); return buckets.status(); } @@ -109,14 +109,14 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, if (!bucket.ok()) { continue; } - (*bucket)->forall([&objects, &prefixedNodes, &bucketName](const utility::Path &path, - const geds::ObjectInfo &obj) { + (*bucket)->forall([&objects, &gedsHostUris, &bucketName](const utility::Path &path, + const geds::ObjectInfo &obj) { // Do not relocate cached blocks. if (path.name.starts_with("_$cachedblock$/")) { return; } - for (const auto &n : prefixedNodes) { - if (obj.location.starts_with(n)) { + for (const auto &uri : gedsHostUris) { + if (obj.location == uri) { objects.emplace_back( new RelocatableObject{.bucket = bucketName, .key = path.name, .size = obj.size}); } @@ -176,12 +176,14 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, } } + std::atomic failures; std::vector threads; threads.reserve(targetNodes.size()); for (auto &target : targetNodes) { - threads.emplace_back([target] { + threads.emplace_back([target, &failures] { auto status = target->node->downloadObjects(target->objects); if (!status.ok()) { + failures += 1; LOG_ERROR("Unable to relocate objects to ", target->node->host, " uuid: ", target->node->uuid); return; @@ -197,7 +199,12 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, thread.join(); } - _isDecommissioning.unlock(); + if (failures == 0) { + for (auto &host : hostsToDecommision) { + host->setState(NodeState::ReadyForShutdown); + } + } + return absl::OkStatus(); } diff --git a/src/metadataservice/Nodes.h b/src/metadataservice/Nodes.h index 38a97b89..76d0e807 100644 --- a/src/metadataservice/Nodes.h +++ b/src/metadataservice/Nodes.h @@ -44,7 +44,8 @@ class Nodes { absl::Status registerNode(const std::string &uuid, const std::string &host, uint16_t port); absl::Status unregisterNode(const std::string &uuid); absl::Status heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat); - absl::Status decomissionNodes(const std::vector &nodes, std::shared_ptr kvs); + absl::Status decommissionNodes(const std::vector &nodes, + std::shared_ptr kvs); std::string toJson() const; }; diff --git a/src/protos/geds.proto b/src/protos/geds.proto index a564fa6f..f35f1ed7 100644 --- a/src/protos/geds.proto +++ b/src/protos/geds.proto @@ -99,7 +99,8 @@ message SubscriptionStreamResponse { enum NodeState { Register = 0; Unregister = 1; - PrepareDecomission = 2; + PrepareDecommission = 2; + ReadyForShutdown = 3; } message NodeIdentifier { From be7bdfe2a73c59fe508d11e713ffac4a0aee552b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 10 May 2023 17:31:52 +0200 Subject: [PATCH 06/20] Python: Create.py add a second file. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/python/create.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/python/create.py b/src/python/create.py index 3a0dba2b..5e031688 100644 --- a/src/python/create.py +++ b/src/python/create.py @@ -36,11 +36,12 @@ l = file.read(message_read, 0, len(message_read)) print(f"Read: {message_read.decode()}") -# Print path -# print(f"File: {file.path()}") +file.seal() -# Seal file +file2 = instance.create("bucket2", "testfile2") +file2.write(message_read, 0, len(message)) +file2.seal() -file.seal() +# Seal file sleep(1000000) From deb36edea0a3a3240c8a1efc64681d77cfff73d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 11 May 2023 10:53:23 +0200 Subject: [PATCH 07/20] Fix server decommissioning and add API to reregister client. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 10 +++-- src/libgeds/GEDSFileHandle.cpp | 1 + src/metadataservice/GRPCServer.cpp | 3 +- src/metadataservice/MDSHttpSession.cpp | 61 ++++++++++++++++++++++---- src/metadataservice/MDSHttpSession.h | 1 + src/metadataservice/Nodes.cpp | 15 ++++--- src/metadataservice/Nodes.h | 1 + 7 files changed, 74 insertions(+), 18 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 5bc5b225..c6058f18 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -967,8 +967,8 @@ absl::Status GEDS::downloadObjects(std::vector objects) { rend = objects.size(); } for (auto i = rbegin; i < rend; i++) { - auto file = objects[i]; - boost::asio::post(_ioThreadPool, [self, &file, h]() { + const auto &file = objects[i]; + boost::asio::post(_ioThreadPool, [self, file, h]() { bool error = false; try { auto status = self->downloadObject(file.bucket, file.key); @@ -1126,13 +1126,15 @@ absl::Status GEDS::purgeLocalObject(const std::string &bucket, const std::string const auto path = getPath(bucket, key); auto result = _fileHandles.getAndRemove(path); if (!result.has_value()) { - return absl::NotFoundError("The object with the path " + path.name + - " does not exist locally."); + auto message = "The object with the path " + path.name + " does not exist locally."; + LOG_ERROR(message); + return absl::NotFoundError(message); } return absl::OkStatus(); } absl::Status GEDS::purgeLocalObjects(std::vector objects) { + LOG_DEBUG("Purging ", objects.size(), "."); for (const auto &obj : objects) { auto status = purgeLocalObject(obj.bucket, obj.key); if (!status.ok()) { diff --git a/src/libgeds/GEDSFileHandle.cpp b/src/libgeds/GEDSFileHandle.cpp index bc1157da..91f888e6 100644 --- a/src/libgeds/GEDSFileHandle.cpp +++ b/src/libgeds/GEDSFileHandle.cpp @@ -120,6 +120,7 @@ absl::Status GEDSFileHandle::download(std::shared_ptr destinatio } pos += *count; } while (pos < *totalSize); + return absl::OkStatus(); } diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index 2ddb5592..5f083744 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -89,7 +89,8 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { grpc::Status Heartbeat(::grpc::ServerContext *context, const ::geds::rpc::HeartbeatMessage *request, ::geds::rpc::StatusResponse *response) override { - LOG_ACCESS("Heartbeat: ", request->uuid()); + // LOG_ACCESS("Heartbeat: ", request->uuid()); + (void)context; const auto &uuid = request->uuid(); NodeHeartBeat val; diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp index 6882afcf..418e732d 100644 --- a/src/metadataservice/MDSHttpSession.cpp +++ b/src/metadataservice/MDSHttpSession.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "Logging.h" #include "Nodes.h" @@ -74,16 +75,18 @@ void MDSHttpSession::prepareHtmlReply() { info.forall([&](const std::shared_ptr &node) { const auto &[heartBeat, ts] = node->lastHeartBeat(); boost::beast::ostream(_response.body()) - << " - " << node->uuid << ": " // - << node->host << ":" << node->port << " " // - << "Allocated: " << heartBeat.storageAllocated << " " // - << "Used: " << heartBeat.storageUsed << " " // - << "Memory Allocated: " << heartBeat.memoryAllocated << " " // + << " - " << node->uuid << ": " // + << node->host << ":" << node->port << " " // + << std::string{magic_enum::enum_name(node->state())} << " -- " // + << "Allocated: " << heartBeat.storageAllocated << " " // + << "Used: " << heartBeat.storageUsed << " " // + << "Memory Allocated: " << heartBeat.memoryAllocated << " " // << "Memory Used: " << heartBeat.memoryUsed << "\n"; }); } boost::beast::ostream(_response.body()) << "" << "" << std::endl; + boost::beast::ostream(_response.body()) << "\n"; handleWrite(); } @@ -95,7 +98,7 @@ void MDSHttpSession::prepareApiNodesReply() { auto data = boost::json::value_from(_nodes); boost::beast::ostream(_response.body()) << boost::json::serialize(data); - + boost::beast::ostream(_response.body()) << "\n"; handleWrite(); } @@ -128,8 +131,46 @@ void MDSHttpSession::prepareApiDecommissionReply(const std::string &body) { std::string{status.message()}); } - boost::beast::ostream(_response.body()) - << R"({"status": "success", "nodes": )" << body << R"(}\n)"; + boost::beast::ostream(_response.body()) << R"({"status": "success", "nodes": )" << body << "}"; + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + +void MDSHttpSession::prepareApiReregisterReply(const std::string &body) { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + // Parse body + boost::json::error_code ec; + auto parsed = boost::json::parse(body, ec); + if (ec) { + return prepareError(boost::beast::http::status::bad_request, ec.message()); + } + if (!parsed.is_array()) { + return prepareError(boost::beast::http::status::bad_request, "Expected array!"); + } + + // Send reply + size_t count = 0; + boost::beast::ostream(_response.body()) << R"({"status": "success", "nodes": [)"; + for (const auto &value : parsed.as_array()) { + if (!value.is_string()) { + continue; + } + auto host = boost::json::value_to(value); + auto status = _nodes.reregisterNode(host); + if (status.ok()) { + if (count > 1) { + boost::beast::ostream(_response.body()) << ", "; + } + boost::beast::ostream(_response.body()) << "\"" << host << "\""; + count++; + } + } + + boost::beast::ostream(_response.body()) << "\n"; handleWrite(); } @@ -155,6 +196,9 @@ void MDSHttpSession::handleRequest() { if (_request.target() == "/api/decommission") { return prepareApiDecommissionReply(body); } + if (_request.target() == "/api/reregister") { + return prepareApiReregisterReply(body); + } return prepareError(boost::beast::http::status::not_found, "Invalid path"); } return prepareError(boost::beast::http::status::bad_request, "Invalid method."); @@ -169,6 +213,7 @@ void MDSHttpSession::prepareMetricsReply() { std::stringstream stream; Statistics::get().prometheusMetrics(stream); boost::beast::ostream(_response.body()) << stream.str(); + boost::beast::ostream(_response.body()) << "\n"; handleWrite(); } diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h index 460e8e67..1a90e2d9 100644 --- a/src/metadataservice/MDSHttpSession.h +++ b/src/metadataservice/MDSHttpSession.h @@ -43,6 +43,7 @@ class MDSHttpSession : public std::enable_shared_from_this { void prepareMetricsReply(); void prepareApiNodesReply(); void prepareApiDecommissionReply(const std::string &body); + void prepareApiReregisterReply(const std::string &body); void prepareError(boost::beast::http::status status, std::string message); void handleWrite(); diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp index b64cac6a..8945ace7 100644 --- a/src/metadataservice/Nodes.cpp +++ b/src/metadataservice/Nodes.cpp @@ -53,6 +53,15 @@ absl::Status Nodes::registerNode(const std::string &uuid, const std::string &hos return absl::OkStatus(); } +absl::Status Nodes::reregisterNode(const std::string &uuid) { + auto exists = _nodes.get(uuid); + if (!exists.has_value()) { + return absl::NotFoundError("Node " + uuid + " does not exist!"); + } + (*exists)->setState(NodeState::Registered); + return absl::OkStatus(); +} + absl::Status Nodes::unregisterNode(const std::string &uuid) { auto removed = _nodes.getAndRemove(uuid); if (!removed.value()) { @@ -168,6 +177,7 @@ absl::Status Nodes::decommissionNodes(const std::vector &nodes, foundTarget = true; target->objects.push_back(obj); target->target += obj->size; + break; } } if (!foundTarget) { @@ -188,11 +198,6 @@ absl::Status Nodes::decommissionNodes(const std::vector &nodes, " uuid: ", target->node->uuid); return; } - status = target->node->purgeLocalObjects(target->objects); - if (!status.ok()) { - LOG_ERROR("Unable to cleanup local objects on ", target->node->host, - " uuid: ", target->node->uuid); - } }); } for (auto &thread : threads) { diff --git a/src/metadataservice/Nodes.h b/src/metadataservice/Nodes.h index 76d0e807..5b6df20a 100644 --- a/src/metadataservice/Nodes.h +++ b/src/metadataservice/Nodes.h @@ -42,6 +42,7 @@ class Nodes { }; absl::Status registerNode(const std::string &uuid, const std::string &host, uint16_t port); + absl::Status reregisterNode(const std::string &uuid); absl::Status unregisterNode(const std::string &uuid); absl::Status heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat); absl::Status decommissionNodes(const std::vector &nodes, From f6a149b42132c432d6b252ceefe0c1f9a1c59b66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 11 May 2023 11:44:32 +0200 Subject: [PATCH 08/20] MDSKVS: Make forall const. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/utility/MDSKVSBucket.cpp | 2 +- src/utility/MDSKVSBucket.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utility/MDSKVSBucket.cpp b/src/utility/MDSKVSBucket.cpp index 4025b870..07b104b6 100644 --- a/src/utility/MDSKVSBucket.cpp +++ b/src/utility/MDSKVSBucket.cpp @@ -106,7 +106,7 @@ MDSKVSBucket::listObjects(const std::string &keyPrefix, char delimiter) { } void MDSKVSBucket::forall( - std::function action) { + std::function action) const { auto lock = getReadLock(); for (const auto &v : _map) { const auto &key = v.first; diff --git a/src/utility/MDSKVSBucket.h b/src/utility/MDSKVSBucket.h index bcba00fe..dd3a515b 100644 --- a/src/utility/MDSKVSBucket.h +++ b/src/utility/MDSKVSBucket.h @@ -53,5 +53,5 @@ class MDSKVSBucket : public utility::RWConcurrentObjectAdaptor { absl::StatusOr, std::vector>> listObjects(const std::string &keyPrefix, char delimiter = 0); - void forall(std::function action); + void forall(std::function action) const; }; From 86ca92e7603564bbec338f8b40d6403ad329b0f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 11 May 2023 11:44:46 +0200 Subject: [PATCH 09/20] MDS HTTP Server: Expose objects as API endpoint. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/metadataservice/MDSHttpSession.cpp | 53 ++++++++++++++++++++++++++ src/metadataservice/MDSHttpSession.h | 2 + 2 files changed, 55 insertions(+) diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp index 418e732d..c9bef3b3 100644 --- a/src/metadataservice/MDSHttpSession.cpp +++ b/src/metadataservice/MDSHttpSession.cpp @@ -11,14 +11,52 @@ #include #include +#include +#include #include #include +#include #include #include "Logging.h" +#include "MDSKVS.h" +#include "MDSKVSBucket.h" #include "Nodes.h" #include "Statistics.h" +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + const std::shared_ptr &n) { + auto &nv = jv.emplace_array(); + n->forall([&nv](const utility::Path &key, const geds::ObjectInfo &info) { + nv.push_back({key.name, + {{"location", info.location}, + {"size", info.size}, + {"metadata", info.metadata.has_value() + ? (std::to_string(info.metadata->size()) + " bytes") + : std::string{""}}}}); + }); +} + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + const std::shared_ptr &n) { + auto &nv = jv.emplace_array(); + auto buckets = n->listBuckets(); + if (!buckets.ok()) { + jv = nv; + return; + } + for (const auto &bucket : *buckets) { + auto objs = n->getBucket(bucket); + if (!objs.ok()) { + continue; + } + auto b = *objs; + auto value = boost::json::value_from(b); + nv.push_back({bucket, value}); + } + jv = nv; +} + namespace geds { MDSHttpSession::MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes, @@ -90,6 +128,18 @@ void MDSHttpSession::prepareHtmlReply() { handleWrite(); } +void MDSHttpSession::prepareApiListReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + auto data = boost::json::value_from(_kvs); + boost::beast::ostream(_response.body()) << boost::json::serialize(data); + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + void MDSHttpSession::prepareApiNodesReply() { _response.result(boost::beast::http::status::ok); _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); @@ -183,6 +233,9 @@ void MDSHttpSession::handleRequest() { if (_request.target() == "/") { return prepareHtmlReply(); } + if (_request.target() == "/api/list") { + return prepareApiListReply(); + } if (_request.target() == "/api/nodes") { return prepareApiNodesReply(); } diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h index 1a90e2d9..272f5024 100644 --- a/src/metadataservice/MDSHttpSession.h +++ b/src/metadataservice/MDSHttpSession.h @@ -41,6 +41,7 @@ class MDSHttpSession : public std::enable_shared_from_this { void handleRequest(); void prepareHtmlReply(); void prepareMetricsReply(); + void prepareApiListReply(); void prepareApiNodesReply(); void prepareApiDecommissionReply(const std::string &body); void prepareApiReregisterReply(const std::string &body); @@ -49,4 +50,5 @@ class MDSHttpSession : public std::enable_shared_from_this { void close(); }; + } // namespace geds From 9b911b48cf4f71c2b8df633f0a929eb45d83a473 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 25 May 2023 08:16:55 +0200 Subject: [PATCH 10/20] Automatically decomission nodes when they disconnect. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 17 +++++++++++++---- src/libgeds/TcpTransport.cpp | 4 ++-- src/metadataservice/GRPCServer.cpp | 5 +++++ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index c6058f18..8910005e 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -178,11 +178,20 @@ absl::Status GEDS::stop() { LOG_INFO("Stopping"); LOG_INFO("Printing statistics"); - geds::Statistics::print(); + // Update state + _state = ServiceState::Stopped; + // Relocate to S3 if available. relocate(true); - auto result = _metadataService.disconnect(); + // Decomission node. + auto result = _metadataService.configureNode(uuid, _hostname, _server.port(), + geds::rpc::NodeState::Unregister); + if (!result.ok()) { + LOG_ERROR("Unable to unregister: ", result.message()); + } + + result = _metadataService.disconnect(); if (!result.ok()) { LOG_ERROR("cannot disconnect metadata service: ", result.message()); } @@ -198,14 +207,14 @@ absl::Status GEDS::stop() { _fileTransfers.clear(); _tcpTransport->stop(); - _state = ServiceState::Stopped; - _storageMonitoringThread.join(); if (_pubSubStreamThread.joinable()) { _pubSubStreamThread.join(); } + geds::Statistics::print(); + return result; } diff --git a/src/libgeds/TcpTransport.cpp b/src/libgeds/TcpTransport.cpp index f703ee35..994fabf7 100644 --- a/src/libgeds/TcpTransport.cpp +++ b/src/libgeds/TcpTransport.cpp @@ -311,7 +311,7 @@ void TcpTransport::tcpTxThread(unsigned int id) { } epoll_wfd[id] = poll_fd; do { - int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1); + int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, 500); for (int i = 0; i < cnt; i++) { struct epoll_event *ev = &events[i]; @@ -657,7 +657,7 @@ void TcpTransport::tcpRxThread(unsigned int id) { epoll_rfd[id] = poll_fd; do { - int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1); + int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, 500); for (int i = 0; i < cnt; i++) { struct epoll_event *ev = &events[i]; diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index 5f083744..5bb0358b 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -76,6 +76,11 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { if (state == geds::rpc::NodeState::Register) { status = _nodes.registerNode(uuid, identifier, port); } else if (state == geds::rpc::NodeState::Unregister) { + std::vector toDecommission = {uuid}; + auto decommissionStatus = _nodes.decommissionNodes(toDecommission, _kvs); + if (!decommissionStatus.ok()) { + LOG_ERROR("Unable to decommission node: ", decommissionStatus.message()); + } status = _nodes.unregisterNode(uuid); } else { LOG_ERROR("Invalid state ", state); From d7fa293ca4595da0e0226422a034ae69f20c99b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 25 May 2023 08:19:26 +0200 Subject: [PATCH 11/20] GEDS: Allow relocating objects which are in-use. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 8910005e..b5622c84 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -1066,7 +1066,7 @@ void GEDS::startStorageMonitoringThread() { } auto targetStorage = (size_t)(0.7 * (double)_config.available_local_storage); - if (memoryUsed > targetStorage) { + if (storageUsed > targetStorage) { std::sort(std::begin(relocatable), std::end(relocatable), [](std::shared_ptr a, std::shared_ptr b) { if (a->openCount() == 0 && b->openCount() == 0) { From 0beb942f69363b215b2e5304dedf3a630d32d6a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 25 May 2023 08:21:57 +0200 Subject: [PATCH 12/20] GEDSConfig: Support storage_spilling_fraction. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 18 +++++------------- src/libgeds/GEDSConfig.cpp | 32 ++++++++++++++++++++++++++++++++ src/libgeds/GEDSConfig.h | 14 ++++++++++++++ 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index b5622c84..b4060aaa 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -1034,7 +1034,7 @@ void GEDS::startStorageMonitoringThread() { auto memSize = fh->localMemorySize(); storageUsed += storageSize; memoryUsed += memSize; - if (fh->isRelocatable() && memoryUsed == 0) { + if (fh->isRelocatable() && fh->openCount() == 0) { relocatable.push_back(fh); } } @@ -1065,26 +1065,18 @@ void GEDS::startStorageMonitoringThread() { } } - auto targetStorage = (size_t)(0.7 * (double)_config.available_local_storage); + auto targetStorage = + (size_t)(_config.storage_spilling_fraction * (double)_config.available_local_storage); if (storageUsed > targetStorage) { std::sort(std::begin(relocatable), std::end(relocatable), [](std::shared_ptr a, std::shared_ptr b) { - if (a->openCount() == 0 && b->openCount() == 0) { - return a->lastReleased() < b->lastReleased(); - } - if (a->openCount() == 0) { - return true; - } - if (b->openCount() == 0) { - return false; - } - return a->lastOpened() < b->lastOpened(); + return a->lastReleased() < b->lastReleased(); }); std::vector> tasks; size_t relocateBytes = 0; for (auto &f : relocatable) { - if (relocateBytes > targetStorage) { + if (relocateBytes > (storageUsed - targetStorage)) { break; } relocateBytes += f->localStorageSize(); diff --git a/src/libgeds/GEDSConfig.cpp b/src/libgeds/GEDSConfig.cpp index f3aea55a..5e0df1e5 100644 --- a/src/libgeds/GEDSConfig.cpp +++ b/src/libgeds/GEDSConfig.cpp @@ -4,7 +4,11 @@ */ #include "GEDSConfig.h" + +#include + #include "Logging.h" +#include "absl/status/status.h" absl::Status GEDSConfig::set(const std::string &key, const std::string &value) { LOG_DEBUG("Trying to set '", key, "' to '", value, "'"); @@ -16,6 +20,14 @@ absl::Status GEDSConfig::set(const std::string &key, const std::string &value) { localStoragePath = value; } else if (key == "pub_sub_enabled" && value == "true") { pubSubEnabled = true; + } else if (key == "node_type") { + if (value == "Standard") { + node_type = GEDSNodeType::Standard; + } else if (value == "Storage") { + node_type = GEDSNodeType::Storage; + } else { + return absl::NotFoundError("Invalid node type " + value); + } } else { LOG_ERROR("Configuration " + key + " not supported (type: string)."); return absl::NotFoundError("Key " + key + " not found."); @@ -68,6 +80,15 @@ absl::Status GEDSConfig::set(const std::string &key, int64_t value) { return set(key, (size_t)value); } +absl::Status GEDSConfig::set(const std::string &key, double value) { + if (key == "storage_spilling_fraction") { + storage_spilling_fraction = value; + return absl::OkStatus(); + } + LOG_ERROR("Configuration " + key + " not supported (type: double)."); + return absl::NotFoundError("Key " + key + " not found."); +} + absl::StatusOr GEDSConfig::getString(const std::string &key) const { LOG_INFO("Get ", key, " as string"); if (key == "listen_address") { @@ -79,6 +100,9 @@ absl::StatusOr GEDSConfig::getString(const std::string &key) const if (key == "local_storage_path") { return localStoragePath; } + if (key == "node_type") { + return std::string{magic_enum::enum_name(node_type)}; + } LOG_ERROR("Configuration " + key + " not supported (type: string)."); return absl::NotFoundError("Key " + key + " not found."); } @@ -114,3 +138,11 @@ absl::StatusOr GEDSConfig::getSignedInt(const std::string &key) const { } return value.status(); } + +absl::StatusOr GEDSConfig::getDouble(const std::string &key) const { + if (key == "storage_spilling_fraction") { + return storage_spilling_fraction; + } + LOG_ERROR("Configuration " + key + " not supported (type: double)."); + return absl::NotFoundError("Key " + key + " (double) not found."); +} diff --git a/src/libgeds/GEDSConfig.h b/src/libgeds/GEDSConfig.h index ef1b7d09..c91b277c 100644 --- a/src/libgeds/GEDSConfig.h +++ b/src/libgeds/GEDSConfig.h @@ -16,6 +16,8 @@ #include "Ports.h" +enum class GEDSNodeType { Standard, Storage }; + struct GEDSConfig { /** * @brief The hostname of the metadata service/ @@ -79,6 +81,16 @@ struct GEDSConfig { size_t available_local_memory = 16 * 1024 * 1024 * (size_t)1024; + /** + * @brief Fraction of the storage where GEDS should start spilling. + */ + double storage_spilling_fraction = 0.7; + + /** + * @brief Node type. + */ + GEDSNodeType node_type = GEDSNodeType::Standard; + /** * @brief Publish/Subscribe is enabled. */ @@ -90,8 +102,10 @@ struct GEDSConfig { absl::Status set(const std::string &key, const std::string &value); absl::Status set(const std::string &key, size_t value); absl::Status set(const std::string &key, int64_t value); + absl::Status set(const std::string &key, double value); absl::StatusOr getString(const std::string &key) const; absl::StatusOr getUnsignedInt(const std::string &key) const; absl::StatusOr getSignedInt(const std::string &key) const; + absl::StatusOr getDouble(const std::string &key) const; }; From 4607f419c58ce09696e580a4f442d6e16d5ff687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Fri, 26 May 2023 17:19:40 +0200 Subject: [PATCH 13/20] State for running the experiments. --- src/libgeds/GEDS.cpp | 5 ++- src/libgeds/GEDSAbstractFileHandle.h | 2 +- src/metadataservice/S3Helper.cpp | 59 +++++++++++++++------------- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index b4060aaa..d72498b9 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -182,7 +182,7 @@ absl::Status GEDS::stop() { _state = ServiceState::Stopped; // Relocate to S3 if available. - relocate(true); + // relocate(true); // Decomission node. auto result = _metadataService.configureNode(uuid, _hostname, _server.port(), @@ -508,7 +508,8 @@ GEDS::reopenFileHandle(const std::string &bucket, const std::string &key, bool i if (location.compare(0, gedsPrefix.size(), gedsPrefix) == 0) { fileHandle = GEDSRemoteFileHandle::factory(shared_from_this(), object); } else if (location.compare(0, s3Prefix.size(), s3Prefix) == 0) { - fileHandle = GEDSCachedFileHandle::factory(shared_from_this(), object); + // fileHandle = GEDSCachedFileHandle::factory(shared_from_this(), object); + fileHandle = GEDSS3FileHandle::factory(shared_from_this(), object); } else { return absl::UnknownError("The remote location format " + location + " is not known."); } diff --git a/src/libgeds/GEDSAbstractFileHandle.h b/src/libgeds/GEDSAbstractFileHandle.h index ec939825..a3eea31d 100644 --- a/src/libgeds/GEDSAbstractFileHandle.h +++ b/src/libgeds/GEDSAbstractFileHandle.h @@ -211,7 +211,7 @@ template class GEDSAbstractFileHandle : public GEDSFileHandle { } auto status = (*fh)->seal(); if (!status.ok()) { - LOG_ERROR("Unable to seal relocated file!"); + LOG_ERROR("Unable to seal relocated file: ", status.message()); (void)(*s3Endpoint)->deleteObject(bucket, key); return status; } diff --git a/src/metadataservice/S3Helper.cpp b/src/metadataservice/S3Helper.cpp index 121628f7..ee1eec25 100644 --- a/src/metadataservice/S3Helper.cpp +++ b/src/metadataservice/S3Helper.cpp @@ -13,6 +13,8 @@ absl::Status PopulateKVS(std::shared_ptr config, std::shared_ptr kvs) { + + return absl::OkStatus(); // Ensure the bucket already exists { auto status = kvs->createBucket(config->bucket); @@ -20,33 +22,34 @@ absl::Status PopulateKVS(std::shared_ptr config, return status; } } - auto bucket = kvs->getBucket(config->bucket); - if (!bucket.ok()) { - return bucket.status(); - } - auto s3Endpoint = geds::s3::Endpoint(config->endpointURL, config->accessKey, config->secretKey); - auto files = s3Endpoint.list(config->bucket, ""); - if (!files.ok()) { - LOG_ERROR("Unable to list s3 endpoint for ", config->bucket, ": ", files.status().message()); - return files.status(); - } - for (const auto &f : *files) { - if (f.isDirectory) { - continue; - } - LOG_DEBUG("Adding: ", config->bucket, "/", f.key); - bool slash = f.key.size() > 0 && f.key[0] == '/'; - auto objInfo = - geds::ObjectInfo{.location = "s3://" + config->bucket + (slash ? "" : "/") + f.key, - .size = f.size, - .sealedOffset = f.size, - .metadata = std::nullopt}; - auto status = (*bucket)->createObject( - geds::Object{.id = geds::ObjectID{config->bucket, f.key}, .info = objInfo}); - if (!status.ok() && status.code() != absl::StatusCode::kAlreadyExists) { - LOG_ERROR("Unable to create entry for ", config->bucket, "/", f.key, ": ", status.message()); - continue; - } - } + return absl::OkStatus(); + // auto bucket = kvs->getBucket(config->bucket); + // if (!bucket.ok()) { + // return bucket.status(); + // } + // auto s3Endpoint = geds::s3::Endpoint(config->endpointURL, config->accessKey, config->secretKey); + // auto files = s3Endpoint.list(config->bucket, ""); + // if (!files.ok()) { + // LOG_ERROR("Unable to list s3 endpoint for ", config->bucket, ": ", files.status().message()); + // return files.status(); + // } + // for (const auto &f : *files) { + // if (f.isDirectory) { + // continue; + // } + // LOG_DEBUG("Adding: ", config->bucket, "/", f.key); + // bool slash = f.key.size() > 0 && f.key[0] == '/'; + // auto objInfo = + // geds::ObjectInfo{.location = "s3://" + config->bucket + (slash ? "" : "/") + f.key, + // .size = f.size, + // .sealedOffset = f.size, + // .metadata = std::nullopt}; + // auto status = (*bucket)->createObject( + // geds::Object{.id = geds::ObjectID{config->bucket, f.key}, .info = objInfo}); + // if (!status.ok() && status.code() != absl::StatusCode::kAlreadyExists) { + // LOG_ERROR("Unable to create entry for ", config->bucket, "/", f.key, ": ", status.message()); + // continue; + // } + // } return absl::OkStatus(); } From c176464d7ff9295977897f58ba2dab379a2d0397 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Tue, 30 May 2023 15:26:14 +0200 Subject: [PATCH 14/20] GEDS: Fix object relocation error. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/TcpTransport.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libgeds/TcpTransport.cpp b/src/libgeds/TcpTransport.cpp index 994fabf7..1e5748a4 100644 --- a/src/libgeds/TcpTransport.cpp +++ b/src/libgeds/TcpTransport.cpp @@ -570,7 +570,7 @@ bool TcpPeer::processEndpointRecv(int sock) { auto message = "Error from GET_REPLY: " + std::to_string(ctx->hdr.error) + "length: " + std::to_string(datalen) + " Ep: " + std::to_string(tep->sock); LOG_DEBUG(message); - ctx->p->set_value(absl::AbortedError(message)); + ctx->p->set_value(absl::UnknownError(message)); ctx->p = nullptr; ctx->state = PROC_IDLE; ctx->progress = 0; From fd212f16f56c0415357f0dda04cd2cd798ee87af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 7 Jun 2023 15:39:26 +0200 Subject: [PATCH 15/20] GEDS: Fix lost connection issue in FileTransferService. --- src/libgeds/FileTransferService.cpp | 25 +++++++++++++++++-------- src/libgeds/FileTransferService.h | 2 +- src/libgeds/GEDSRemoteFileHandle.cpp | 9 +++++++++ src/libgeds/GEDSRemoteFileHandle.h | 3 +++ 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/libgeds/FileTransferService.cpp b/src/libgeds/FileTransferService.cpp index 8115e6c2..51ab9b72 100644 --- a/src/libgeds/FileTransferService.cpp +++ b/src/libgeds/FileTransferService.cpp @@ -59,7 +59,6 @@ absl::Status FileTransferService::connect() { if (_connectionState != ConnectionState::Disconnected) { return absl::FailedPreconditionError("Cannot reinitialize service."); } - auto lock = getWriteLock(); try { assert(_channel.get() == nullptr); _channel = grpc::CreateChannel(nodeAddress, grpc::InsecureChannelCredentials()); @@ -88,6 +87,7 @@ absl::Status FileTransferService::connect() { } } _connectionState = ConnectionState::Connected; + LOG_INFO("Connected to ", nodeAddress); return absl::OkStatus(); } @@ -95,9 +95,9 @@ absl::Status FileTransferService::disconnect() { if (_connectionState != ConnectionState::Connected) { return absl::UnknownError("The service is in the wrong state!"); } - auto lock = getWriteLock(); - _tcpPeer.reset(); + _tcpPeer = nullptr; _channel = nullptr; + _connectionState = ConnectionState::Disconnected; return absl::OkStatus(); } @@ -137,18 +137,19 @@ FileTransferService::availTransportEndpoints() { absl::StatusOr FileTransferService::readBytes(const std::string &bucket, const std::string &key, uint8_t *buffer, size_t position, size_t length) { - CHECK_CONNECTED - + std::shared_ptr peer; std::future> fut; // Create a scope for the std::shared_ptr so that the peer is automatically cleaned up. { auto lock = getReadLock(); - if (_tcpPeer.expired()) { + CHECK_CONNECTED + + if (!_tcpPeer) { return absl::UnavailableError("No TCP for " + nodeAddress); } LOG_DEBUG("Found TCP peer for ", nodeAddress); - auto peer = _tcpPeer.lock(); + peer = _tcpPeer; lock.unlock(); auto prom = peer->sendRpcRequest((uint64_t)buffer, bucket + "/" + key, position, length); fut = prom->get_future(); @@ -160,7 +161,15 @@ absl::StatusOr FileTransferService::readBytes(const std::string &bucket, // Close the FileTransferService on error. if (status.status().code() == absl::StatusCode::kAborted) { auto lock = getWriteLock(); - _tcpPeer.reset(); + if (peer == _tcpPeer) { + LOG_ERROR("Encountered an error on the TCP Transport. Trying to reconnect! Node: ", + nodeAddress); + disconnect().IgnoreError(); + auto s = connect(); + if (!s.ok()) { + LOG_ERROR("Unable to reconnect: ", s.message()); + } + } } return status.status(); } diff --git a/src/libgeds/FileTransferService.h b/src/libgeds/FileTransferService.h index ea15db9f..362c419d 100644 --- a/src/libgeds/FileTransferService.h +++ b/src/libgeds/FileTransferService.h @@ -45,7 +45,7 @@ class FileTransferService : public utility::RWConcurrentObjectAdaptor { std::unique_ptr _stub; std::shared_ptr _geds; std::shared_ptr _tcp; - std::weak_ptr _tcpPeer; + std::shared_ptr _tcpPeer; absl::StatusOr>> availTransportEndpoints(); diff --git a/src/libgeds/GEDSRemoteFileHandle.cpp b/src/libgeds/GEDSRemoteFileHandle.cpp index 31147217..a77708b5 100644 --- a/src/libgeds/GEDSRemoteFileHandle.cpp +++ b/src/libgeds/GEDSRemoteFileHandle.cpp @@ -41,14 +41,23 @@ GEDSRemoteFileHandle::factory(std::shared_ptr gedsService, const geds::Obj return std::shared_ptr( new GEDSRemoteFileHandle(gedsService, object, fileTransferService.value())); } + absl::StatusOr GEDSRemoteFileHandle::readBytes(uint8_t *bytes, size_t position, size_t length) { + return readBytes(bytes, position, length, true); +} + +absl::StatusOr GEDSRemoteFileHandle::readBytes(uint8_t *bytes, size_t position, + size_t length, bool retry) { if (length == 0) { return 0; } auto lock = lockShared(); auto read = _fileTransferService->read(bucket, key, bytes, position, length); if (!read.ok()) { + if (read.status().code() == absl::StatusCode::kAborted && retry) { + return readBytes(bytes, position, length, false); + } return read; } if (*read != length) { diff --git a/src/libgeds/GEDSRemoteFileHandle.h b/src/libgeds/GEDSRemoteFileHandle.h index 2a119c01..963fdda1 100644 --- a/src/libgeds/GEDSRemoteFileHandle.h +++ b/src/libgeds/GEDSRemoteFileHandle.h @@ -11,6 +11,7 @@ #include "Object.h" #include "Statistics.h" + class GEDSRemoteFileHandle : public GEDSFileHandle { std::shared_ptr _fileTransferService; geds::ObjectInfo _info; @@ -37,6 +38,8 @@ class GEDSRemoteFileHandle : public GEDSFileHandle { absl::Status seal() override; absl::StatusOr readBytes(uint8_t *bytes, size_t position, size_t length) override; + + absl::StatusOr readBytes(uint8_t *bytes, size_t position, size_t length, bool retry); }; #endif From 38ffb3fbfae550a6e37dab4277c5aed79ce78f26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 7 Jun 2023 15:41:12 +0200 Subject: [PATCH 16/20] GEDS: Workaround Start/Stop. --- src/libgeds/GEDS.cpp | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index d72498b9..a4515af2 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -116,6 +116,10 @@ GEDS::~GEDS() { } absl::Status GEDS::start() { + if (_state == ServiceState::Running) { + LOG_INFO("GEDS is already running."); + return absl::OkStatus(); + } std::cout << "Starting GEDS (" << utility::GEDSVersion() << ")\n" << "- prefix: " << _pathPrefix << "\n" << "- metadata service: " << _metadataService.serverAddress << std::endl; @@ -178,18 +182,23 @@ absl::Status GEDS::stop() { LOG_INFO("Stopping"); LOG_INFO("Printing statistics"); + // Relocate to S3 if available. + if (_config.force_relocation_when_stopping) { + relocate(true); + return absl::OkStatus(); + } + // Update state _state = ServiceState::Stopped; - // Relocate to S3 if available. - // relocate(true); + absl::Status result; - // Decomission node. - auto result = _metadataService.configureNode(uuid, _hostname, _server.port(), - geds::rpc::NodeState::Unregister); - if (!result.ok()) { - LOG_ERROR("Unable to unregister: ", result.message()); - } + // // Decomission node. + // auto result = _metadataService.configureNode(uuid, _hostname, _server.port(), + // geds::rpc::NodeState::Unregister); + // if (!result.ok()) { + // LOG_ERROR("Unable to unregister: ", result.message()); + // } result = _metadataService.disconnect(); if (!result.ok()) { From 7cb1b6366865528b6f0bc0f5d942c482a9823ee6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 7 Jun 2023 15:41:47 +0200 Subject: [PATCH 17/20] GEDS: Workarounds for object relocation. --- src/libgeds/GEDS.cpp | 22 +++++++++++++++++----- src/libgeds/GEDS.h | 2 +- src/libgeds/GEDSConfig.cpp | 2 ++ src/libgeds/GEDSConfig.h | 12 +++++++++++- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index a4515af2..52152fe3 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -914,6 +914,10 @@ void GEDS::relocate(std::vector> &relocatable, b } h->cv.notify_one(); }); + + const auto tp_size = _config.io_thread_pool_size; + std::unique_lock lock(h->mutex); + h->cv.wait(lock, [h, tp_size]() { return h->nTasks <= (tp_size + 1); }); } std::unique_lock lock(h->mutex); h->cv.wait(lock, [h]() { return h->nTasks == 0; }); @@ -930,19 +934,26 @@ void GEDS::relocate(std::shared_ptr handle, bool force) { } static auto stats = geds::Statistics::createCounter("GEDS: Storage Relocated"); - *stats += handle->localStorageSize(); + auto fsize = handle->localStorageSize(); // Remove cached files. const auto path = getPath(handle->bucket, handle->key); if (handle->key.starts_with(GEDSCachedFileHandle::CacheBlockMarker)) { - _fileHandles.removeIf(path, [handle](const std::shared_ptr &existing) { - return handle.get() == existing.get(); - }); + auto status = + _fileHandles.removeIf(path, [handle](const std::shared_ptr &existing) { + return handle.get() == existing.get(); + }); + if (status) { + *stats += fsize; + } return; } // Relocate all other files. - (void)handle->relocate(); + auto status = handle->relocate(); + if (status.ok()) { + *stats += fsize; + } } absl::Status GEDS::downloadObject(const std::string &bucket, const std::string &key) { @@ -1010,6 +1021,7 @@ absl::Status GEDS::downloadObjects(std::vector objects) { }); } } + auto relocateLock = h->lock(); h->cv.wait(relocateLock, [h]() { return h->nTasks == 0; }); LOG_INFO("Downloaded ", objects.size(), " objects, errors: ", h->nErrors); diff --git a/src/libgeds/GEDS.h b/src/libgeds/GEDS.h index fcfd75cd..e817c7c6 100644 --- a/src/libgeds/GEDS.h +++ b/src/libgeds/GEDS.h @@ -328,7 +328,7 @@ class GEDS : public std::enable_shared_from_this, utility::RWConcurrentObj absl::Status subscribe(const geds::SubscriptionEvent &event); absl::Status unsubscribe(const geds::SubscriptionEvent &event); - + /** * @brief Pull object to this GEDS instance. */ diff --git a/src/libgeds/GEDSConfig.cpp b/src/libgeds/GEDSConfig.cpp index 5e0df1e5..1af90d60 100644 --- a/src/libgeds/GEDSConfig.cpp +++ b/src/libgeds/GEDSConfig.cpp @@ -64,6 +64,8 @@ absl::Status GEDSConfig::set(const std::string &key, size_t value) { available_local_memory = value; } else if (key == "pub_sub_enabled") { pubSubEnabled = value != 0; + } else if (key == "force_relocation_when_stopping") { + force_relocation_when_stopping = value != 0; } else { LOG_ERROR("Configuration " + key + " not supported (type: signed/unsigned integer)."); return absl::NotFoundError("Key " + key + " not found."); diff --git a/src/libgeds/GEDSConfig.h b/src/libgeds/GEDSConfig.h index c91b277c..2dba1761 100644 --- a/src/libgeds/GEDSConfig.h +++ b/src/libgeds/GEDSConfig.h @@ -96,8 +96,18 @@ struct GEDSConfig { */ bool pubSubEnabled = false; + /** + * @brief Force relocation when stopping. + */ + bool force_relocation_when_stopping = false; + GEDSConfig(std::string metadataServiceAddressArg) - : metadataServiceAddress(std::move(metadataServiceAddressArg)) {} + : metadataServiceAddress(std::move(metadataServiceAddressArg)) { + if (available_local_storage <= 4 * 1024 * 1024 * (size_t)1024) { + io_thread_pool_size = std::min(io_thread_pool_size, 6); + storage_spilling_fraction = 0.9; + } + } absl::Status set(const std::string &key, const std::string &value); absl::Status set(const std::string &key, size_t value); From f2690d0fde12513b366d39fdb6931c49487d3e06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 7 Jun 2023 15:42:08 +0200 Subject: [PATCH 18/20] TcpTransport: Increase concurrency. --- src/libgeds/TcpTransport.cpp | 2 +- src/libgeds/TcpTransport.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libgeds/TcpTransport.cpp b/src/libgeds/TcpTransport.cpp index 1e5748a4..3f88239d 100644 --- a/src/libgeds/TcpTransport.cpp +++ b/src/libgeds/TcpTransport.cpp @@ -58,7 +58,7 @@ void TcpTransport::start() { "=d"(registers[3]) : "a"(1), "c"(0)); bool hyperthreading = registers[3] & (1 << 28); // NOLINT - num_proc = std::thread::hardware_concurrency(); + num_proc = std::thread::hardware_concurrency() * 2; if (hyperthreading) num_proc /= 2; diff --git a/src/libgeds/TcpTransport.h b/src/libgeds/TcpTransport.h index f15896d4..fab6abf5 100644 --- a/src/libgeds/TcpTransport.h +++ b/src/libgeds/TcpTransport.h @@ -195,7 +195,7 @@ class TcpPeer : public std::enable_shared_from_this, utility::RWConcurr ~TcpPeer(); }; constexpr unsigned int MAX_PEERS = 8096; -constexpr unsigned int MAX_IO_THREADS = 8; +constexpr unsigned int MAX_IO_THREADS = 16; constexpr unsigned int EPOLL_MAXEVENTS = MAX_PEERS / MAX_IO_THREADS; class TcpTransport : public std::enable_shared_from_this { From 67edf15f77523e311640043d98b1b1a715ae0371 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 7 Jun 2023 15:42:33 +0200 Subject: [PATCH 19/20] GEDS: Fixup codestyle. --- src/metadataservice/S3Helper.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/metadataservice/S3Helper.cpp b/src/metadataservice/S3Helper.cpp index ee1eec25..829f084c 100644 --- a/src/metadataservice/S3Helper.cpp +++ b/src/metadataservice/S3Helper.cpp @@ -22,14 +22,13 @@ absl::Status PopulateKVS(std::shared_ptr config, return status; } } - return absl::OkStatus(); + return absl::OkStatus(); // auto bucket = kvs->getBucket(config->bucket); // if (!bucket.ok()) { // return bucket.status(); // } - // auto s3Endpoint = geds::s3::Endpoint(config->endpointURL, config->accessKey, config->secretKey); - // auto files = s3Endpoint.list(config->bucket, ""); - // if (!files.ok()) { + // auto s3Endpoint = geds::s3::Endpoint(config->endpointURL, config->accessKey, + // config->secretKey); auto files = s3Endpoint.list(config->bucket, ""); if (!files.ok()) { // LOG_ERROR("Unable to list s3 endpoint for ", config->bucket, ": ", files.status().message()); // return files.status(); // } @@ -47,8 +46,8 @@ absl::Status PopulateKVS(std::shared_ptr config, // auto status = (*bucket)->createObject( // geds::Object{.id = geds::ObjectID{config->bucket, f.key}, .info = objInfo}); // if (!status.ok() && status.code() != absl::StatusCode::kAlreadyExists) { - // LOG_ERROR("Unable to create entry for ", config->bucket, "/", f.key, ": ", status.message()); - // continue; + // LOG_ERROR("Unable to create entry for ", config->bucket, "/", f.key, ": ", + // status.message()); continue; // } // } return absl::OkStatus(); From 663865c5df929d9c60eeff659a133f0c3d392e31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 7 Jun 2023 17:28:50 +0200 Subject: [PATCH 20/20] Allow overriding TcpPeers. --- src/libgeds/FileTransferService.cpp | 12 +++++++++++- src/libgeds/TcpTransport.cpp | 9 ++++++--- src/libgeds/TcpTransport.h | 2 +- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/libgeds/FileTransferService.cpp b/src/libgeds/FileTransferService.cpp index 51ab9b72..7c5b6f12 100644 --- a/src/libgeds/FileTransferService.cpp +++ b/src/libgeds/FileTransferService.cpp @@ -56,6 +56,9 @@ FileTransferService::~FileTransferService() { } absl::Status FileTransferService::connect() { + if (_connectionState == ConnectionState::Connected) { + return absl::OkStatus(); + } if (_connectionState != ConnectionState::Disconnected) { return absl::FailedPreconditionError("Cannot reinitialize service."); } @@ -77,7 +80,7 @@ absl::Status FileTransferService::connect() { for (auto &addr : *endpoints) { if (std::get<1>(addr) == FileTransferProtocol::Socket) { struct sockaddr saddr = std::get<0>(addr); - auto peer = _tcp->getPeer(&saddr); + auto peer = _tcp->getPeer(&saddr, true); if (peer) { _tcpPeer = peer; @@ -86,6 +89,12 @@ absl::Status FileTransferService::connect() { } } } + if (_tcpPeer == nullptr) { + _channel = nullptr; + _stub = nullptr; + auto message = "Unable to establish a connection to " + nodeAddress; + return absl::UnknownError(message); + } _connectionState = ConnectionState::Connected; LOG_INFO("Connected to ", nodeAddress); return absl::OkStatus(); @@ -160,6 +169,7 @@ absl::StatusOr FileTransferService::readBytes(const std::string &bucket, } // Close the FileTransferService on error. if (status.status().code() == absl::StatusCode::kAborted) { + LOG_ERROR("FileTransfer was aborted: ", status.status().message()); auto lock = getWriteLock(); if (peer == _tcpPeer) { LOG_ERROR("Encountered an error on the TCP Transport. Trying to reconnect! Node: ", diff --git a/src/libgeds/TcpTransport.cpp b/src/libgeds/TcpTransport.cpp index 3f88239d..686426a4 100644 --- a/src/libgeds/TcpTransport.cpp +++ b/src/libgeds/TcpTransport.cpp @@ -238,8 +238,9 @@ bool TcpPeer::processEndpointSend(std::shared_ptr tep) { ctx->progress = 0; } } else { - if (errno != EWOULDBLOCK) + if (errno != EWOULDBLOCK) { LOG_ERROR("Send failed, errno: ", errno); + } break; } } @@ -681,6 +682,7 @@ void TcpTransport::tcpRxThread(unsigned int id) { continue; } if (ev->events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR)) { + LOG_INFO("Disabling endpoint with ", sock, " epoll."); deactivateEndpoint(poll_fd, sock, RX_CLOSED); if (tcpPeer->SocketStateChange(sock, RX_CLOSED)) { tcpPeers.remove(tcpPeer->Id); @@ -693,6 +695,7 @@ void TcpTransport::tcpRxThread(unsigned int id) { } if (!tcpPeer->processEndpointRecv(sock)) { + LOG_INFO("Disabling endpoint with ", sock, " socket shutdown"); shutdown(sock, SHUT_RDWR); deactivateEndpoint(poll_fd, sock, RX_CLOSED); if (tcpPeer->SocketStateChange(sock, RX_CLOSED)) { @@ -789,7 +792,7 @@ bool TcpTransport::addEndpointPassive(int sock) { return true; } -std::shared_ptr TcpTransport::getPeer(sockaddr *peer) { +std::shared_ptr TcpTransport::getPeer(sockaddr *peer, bool override) { auto inaddr = (sockaddr_in *)peer; std::string hostname = inet_ntoa(inaddr->sin_addr); size_t addrlen = sizeof *peer; @@ -802,7 +805,7 @@ std::shared_ptr TcpTransport::getPeer(sockaddr *peer) { */ std::shared_ptr tcpPeer; auto it = tcpPeers.get(epId); - if (it.has_value()) { + if (it.has_value() && !override) { tcpPeer = *it; return tcpPeer; } diff --git a/src/libgeds/TcpTransport.h b/src/libgeds/TcpTransport.h index fab6abf5..f1c1da53 100644 --- a/src/libgeds/TcpTransport.h +++ b/src/libgeds/TcpTransport.h @@ -243,7 +243,7 @@ class TcpTransport : public std::enable_shared_from_this { void start(); void stop(); - std::shared_ptr getPeer(sockaddr *); + std::shared_ptr getPeer(sockaddr *, bool override = false); bool addEndpointPassive(int sock); }; } // namespace geds