Skip to content

WIP: Support server decommissioning. #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/benchmarks/benchmark_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ struct Latency {
constexpr size_t KILOBYTE = 1024;
constexpr size_t MEGABYTE = KILOBYTE * KILOBYTE;

std::vector<std::vector<uint8_t>> buffers;

size_t getPayloadSize(size_t factor) { return KILOBYTE * (1 << factor); }

void runBenchmarkThread(std::shared_ptr<GEDS> geds, size_t threadId, size_t factor,
Expand All @@ -77,10 +79,12 @@ void runBenchmarkThread(std::shared_ptr<GEDS> geds, size_t threadId, size_t fact
} else {
key = std::to_string(factor) + "-" + std::to_string(threadId) + ".data";
}
std::vector<uint8_t> buffer(payloadSize);
// std::vector<uint8_t> buffer(payloadSize);

auto file = geds->open(FLAGS_bucket.CurrentValue(), key);
if (file.ok()) {
auto openTime = std::chrono::steady_clock::now();
auto buffer = buffers[threadId].data();
auto status = file->read(buffer, 0, payloadSize);
auto lastByteTime = std::chrono::steady_clock::now();
constexpr auto milliseconds = 1e6;
Expand All @@ -93,7 +97,6 @@ void runBenchmarkThread(std::shared_ptr<GEDS> geds, size_t threadId, size_t fact
lastByteTime - startTime)
.count() /
milliseconds});

if (!status.ok()) {
auto message = "Error: " + std::string{status.status().message()};
LOG_ERROR(message);
Expand Down Expand Up @@ -183,8 +186,16 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}

for (size_t i = 0; i <= absl::GetFlag(FLAGS_maxFactor); i++) {
for (size_t j = 1; j < absl::GetFlag(FLAGS_maxThreads); j++) {
auto threadCount = absl::GetFlag(FLAGS_maxThreads);
buffers.resize(threadCount);
auto factorCount = absl::GetFlag(FLAGS_maxFactor);
auto maxPayload = getPayloadSize(factorCount);
for (auto &buffer : buffers) {
buffer.resize(maxPayload);
}

for (size_t i = 0; i <= factorCount; i++) {
for (size_t j = 1; j < threadCount; j++) {
auto result = benchmark(geds, i, j);
std::cout << result.payloadSize << ": " << result.threads << " " << result.rate << " MB/s"
<< std::endl;
Expand Down
5 changes: 4 additions & 1 deletion src/libgeds/Filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ absl::Status removeFile(const std::string &path) {
int err = unlink(path.c_str());
if (err != 0 && (errno != ENOENT)) {
int error = errno;
return absl::UnknownError("Unable to delete file " + path + ": " + std::strerror(error));
auto message = "Unable to delete file " + path + ": " + std::strerror(error);
LOG_ERROR(message);
return absl::UnknownError(message);
}
LOG_DEBUG("Removed ", path);
return absl::OkStatus();
}

Expand Down
191 changes: 149 additions & 42 deletions src/libgeds/GEDS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ absl::Status GEDS::start() {
// Update state.
_state = ServiceState::Running;

(void)_metadataService.configureNode(uuid, _hostname, _server.port(),
geds::rpc::NodeState::Register);

startStorageMonitoringThread();

auto st = syncObjectStoreConfigs();
Expand All @@ -173,11 +176,20 @@ absl::Status GEDS::stop() {
GEDS_CHECK_SERVICE_RUNNING
LOG_INFO("Stopping");
LOG_INFO("Printing statistics");
_state = ServiceState::Stopped;

geds::Statistics::print();
auto result = _metadataService.disconnect();
// Relocate to S3 if available.
relocate(true);
// Unregister.
auto result = _metadataService.configureNode(uuid, _hostname, _server.port(),
geds::rpc::NodeState::Unregister);
if (!result.ok()) {
LOG_ERROR("Unable to unregister: ", result.message());
}
result = _metadataService.disconnect();
if (!result.ok()) {
LOG_ERROR("cannot disconnect metadata service");
LOG_ERROR("cannot disconnect metadata service: ", result.message());
}
result = _server.stop();
if (!result.ok()) {
Expand All @@ -191,8 +203,6 @@ absl::Status GEDS::stop() {
_fileTransfers.clear();
_tcpTransport->stop();

_state = ServiceState::Stopped;

_storageMonitoringThread.join();

return result;
Expand Down Expand Up @@ -855,45 +865,34 @@ void GEDS::relocate(std::vector<std::shared_ptr<GEDSFileHandle>> &relocatable, b
struct RelocateHelper {
std::mutex mutex;
std::condition_variable cv;
size_t nTasks;
auto lock() { return std::unique_lock<std::mutex>(mutex); }
std::atomic<size_t> nTasks;
};
auto h = std::make_shared<RelocateHelper>();
{
auto lock = h->lock();
std::lock_guard lock(h->mutex);
h->nTasks = relocatable.size();
}

LOG_INFO("Relocating ", relocatable.size(), " objects.");

auto self = shared_from_this();
size_t off = 3 * _config.io_thread_pool_size;

for (size_t offset = 0; offset < relocatable.size(); offset += off) {
auto rbegin = offset;
auto rend = rbegin + off;
if (rend > relocatable.size()) {
rend = relocatable.size();
}
for (auto i = rbegin; i < rend; i++) {
auto fh = relocatable[i];
boost::asio::post(_ioThreadPool, [self, fh, h, force]() {
try {
self->relocate(fh, force);
} catch (...) {
LOG_ERROR("Encountered an exception during relocation ", fh->identifier);
}
{
auto lock = h->lock();
h->nTasks -= 1;
}
h->cv.notify_all();
});
}
auto relocateLock = h->lock();
h->cv.wait(relocateLock, [h]() { return h->nTasks == 0; });
LOG_INFO("Relocated ", relocatable.size(), " objects.");
for (auto fh : relocatable) {
boost::asio::post(_ioThreadPool, [self, fh, h, force]() {
try {
self->relocate(fh, force);
} catch (...) {
LOG_ERROR("Encountered an exception during relocation ", fh->identifier);
}
{
std::lock_guard lock(h->mutex);
h->nTasks -= 1;
}
h->cv.notify_one();
});
}
std::unique_lock lock(h->mutex);
h->cv.wait(lock, [h]() { return h->nTasks == 0; });
LOG_INFO("Relocated ", relocatable.size(), " objects.");
}

void GEDS::relocate(std::shared_ptr<GEDSFileHandle> handle, bool force) {
Expand Down Expand Up @@ -921,6 +920,81 @@ void GEDS::relocate(std::shared_ptr<GEDSFileHandle> handle, bool force) {
(void)handle->relocate();
}

absl::Status GEDS::downloadObject(const std::string &bucket, const std::string &key) {
auto oldFile = openAsFileHandle(bucket, key);
if (!oldFile.ok()) {
return oldFile.status();
}
auto newFile = createAsFileHandle(bucket, key, true /* overwrite */);
if (!newFile.ok()) {
return newFile.status();
}
auto status = (*oldFile)->download(*newFile);
if (!status.ok()) {
return status;
}
return (*newFile)->seal();
}

absl::Status GEDS::downloadObjects(std::vector<geds::ObjectID> objects) {
struct PullHelper {
std::mutex mutex;
std::condition_variable cv;
size_t nTasks;
size_t nErrors;
auto lock() { return std::unique_lock<std::mutex>(mutex); }
};
auto h = std::make_shared<PullHelper>();
{
auto lock = h->lock();
h->nTasks = objects.size();
h->nErrors = 0;
}

auto self = shared_from_this();
size_t off = 3 * _config.io_thread_pool_size;

for (size_t offset = 0; offset < objects.size(); offset += off) {
auto rbegin = offset;
auto rend = rbegin + off;
if (rend > objects.size()) {
rend = objects.size();
}
for (auto i = rbegin; i < rend; i++) {
const auto &file = objects[i];
boost::asio::post(_ioThreadPool, [self, file, h]() {
bool error = false;
try {
auto status = self->downloadObject(file.bucket, file.key);
if (!status.ok()) {
LOG_ERROR("Unable to download ", file.bucket, "/", file.key);
error = true;
}
} catch (...) {
LOG_ERROR("Encountered an exception when downloading ", file.bucket, "/", file.key);
error = true;
}
{
auto lock = h->lock();
h->nTasks -= 1;
if (error) {
h->nErrors += 1;
}
}
h->cv.notify_all();
});
}
}
auto relocateLock = h->lock();
h->cv.wait(relocateLock, [h]() { return h->nTasks == 0; });
LOG_INFO("Downloaded ", objects.size(), " objects, errors: ", h->nErrors);
if (h->nErrors) {
return absl::UnknownError("Some objects were not downloaded: Observed " +
std::to_string(h->nErrors) + " errors!");
}
return absl::OkStatus();
}

void GEDS::startStorageMonitoringThread() {
_storageMonitoringThread = std::thread([&]() {
auto statsLocalStorageUsed = geds::Statistics::createGauge("GEDS: Local Storage used");
Expand All @@ -941,12 +1015,12 @@ void GEDS::startStorageMonitoringThread() {
allFileHandles.push_back(fh);
});
for (const auto &fh : allFileHandles) {
storageUsed += fh->localStorageSize();
memoryUsed += fh->localMemorySize();
if (fh->isRelocatable()) {
if (fh->openCount() == 0) {
relocatable.push_back(fh);
}
auto storageSize = fh->localStorageSize();
auto memSize = fh->localMemorySize();
storageUsed += storageSize;
memoryUsed += memSize;
if (fh->isRelocatable() && fh->openCount() == 0) {
relocatable.push_back(fh);
}
}
}
Expand All @@ -968,8 +1042,17 @@ void GEDS::startStorageMonitoringThread() {
*statsLocalMemoryFree = _memoryCounters.free;
}

auto targetStorage = (size_t)(0.7 * (double)_config.available_local_storage);
if (memoryUsed > targetStorage) {
{
// Send heartbeat.
auto status = _metadataService.heartBeat(uuid, _storageCounters, _memoryCounters);
if (!status.ok()) {
LOG_ERROR("Unable to send heartbeat to metadata service: ", status.message());
}
}

auto targetStorage =
(size_t)(_config.storage_spilling_fraction * (double)_config.available_local_storage);
if (storageUsed > targetStorage) {
std::sort(std::begin(relocatable), std::end(relocatable),
[](std::shared_ptr<GEDSFileHandle> a, std::shared_ptr<GEDSFileHandle> b) {
return a->lastReleased() < b->lastReleased();
Expand All @@ -978,18 +1061,42 @@ void GEDS::startStorageMonitoringThread() {
std::vector<std::shared_ptr<GEDSFileHandle>> tasks;
size_t relocateBytes = 0;
for (auto &f : relocatable) {
if (relocateBytes > targetStorage) {
if (relocateBytes > (storageUsed - targetStorage)) {
break;
}
relocateBytes += f->localStorageSize();
tasks.push_back(f);
}
if (tasks.size()) {
relocate(tasks);
} else {
LOG_WARNING("Unable to relocate files: No task found!");
}
}
relocatable.clear();
sleep(1);
}
});
}

absl::Status GEDS::purgeLocalObject(const std::string &bucket, const std::string &key) {
const auto path = getPath(bucket, key);
auto result = _fileHandles.getAndRemove(path);
if (!result.has_value()) {
auto message = "The object with the path " + path.name + " does not exist locally.";
LOG_ERROR(message);
return absl::NotFoundError(message);
}
return absl::OkStatus();
}

absl::Status GEDS::purgeLocalObjects(std::vector<geds::ObjectID> objects) {
LOG_DEBUG("Purging ", objects.size(), ".");
for (const auto &obj : objects) {
auto status = purgeLocalObject(obj.bucket, obj.key);
if (!status.ok()) {
LOG_ERROR(status.message());
}
}
return absl::OkStatus();
}
17 changes: 17 additions & 0 deletions src/libgeds/GEDS.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <set>
#include <shared_mutex>
#include <string>
#include <tuple>
#include <utility>

#include <absl/status/status.h>
Expand Down Expand Up @@ -321,6 +322,22 @@ class GEDS : public std::enable_shared_from_this<GEDS>, utility::RWConcurrentObj
void relocate(bool force = false);
void relocate(std::vector<std::shared_ptr<GEDSFileHandle>> &relocatable, bool force = false);
void relocate(std::shared_ptr<GEDSFileHandle> handle, bool force = false);

/**
* @brief Pull object to this GEDS instance.
*/
absl::Status downloadObject(const std::string &bucket, const std::string &key);
absl::Status downloadObjects(std::vector<geds::ObjectID> objects);

/**
* @brief Purge locally stored object without updating the Metadata server.
*/
absl::Status purgeLocalObject(const std::string &bucket, const std::string &key);

/**
* @brief Purge locally stored objects if they exist locally. Missing files will be logged.
*/
absl::Status purgeLocalObjects(std::vector<geds::ObjectID> objects);
};

#endif // GEDS_GEDS_H
6 changes: 3 additions & 3 deletions src/libgeds/GEDSAbstractFileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ template <class T> class GEDSAbstractFileHandle : public GEDSFileHandle {
if (!isValid()) {
return absl::UnavailableError("The file " + identifier + " is no longer valid!");
}
LOG_INFO("Relocating ", identifier);
LOG_INFO("Relocating ", identifier, " (size: ", _file.size(), ") ");
if (_openCount > 0) {
auto message = "Unable to relocate " + identifier + " reason: The file is still in use.";
LOG_ERROR(message);
Expand Down Expand Up @@ -205,13 +205,13 @@ template <class T> class GEDSAbstractFileHandle : public GEDSFileHandle {
}
auto fh = GEDSS3FileHandle::factory(_gedsService, bucket, key, metadata());
if (!fh.ok()) {
LOG_ERROR("Unable to reopen the relocateed file ", identifier,
LOG_ERROR("Unable to reopen the relocated file ", identifier,
" on s3:", fh.status().message());
return fh.status();
}
auto status = (*fh)->seal();
if (!status.ok()) {
LOG_ERROR("Unable to seal relocateed file!");
LOG_ERROR("Unable to seal relocated file!");
(void)(*s3Endpoint)->deleteObject(bucket, key);
return status;
}
Expand Down
Loading