Skip to content

WIP: Experiment Setup #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions src/libgeds/FileTransferService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ FileTransferService::~FileTransferService() {
}

absl::Status FileTransferService::connect() {
if (_connectionState == ConnectionState::Connected) {
return absl::OkStatus();
}
if (_connectionState != ConnectionState::Disconnected) {
return absl::FailedPreconditionError("Cannot reinitialize service.");
}
auto lock = getWriteLock();
try {
assert(_channel.get() == nullptr);
_channel = grpc::CreateChannel(nodeAddress, grpc::InsecureChannelCredentials());
Expand All @@ -78,7 +80,7 @@ absl::Status FileTransferService::connect() {
for (auto &addr : *endpoints) {
if (std::get<1>(addr) == FileTransferProtocol::Socket) {
struct sockaddr saddr = std::get<0>(addr);
auto peer = _tcp->getPeer(&saddr);
auto peer = _tcp->getPeer(&saddr, true);

if (peer) {
_tcpPeer = peer;
Expand All @@ -87,17 +89,24 @@ absl::Status FileTransferService::connect() {
}
}
}
if (_tcpPeer == nullptr) {
_channel = nullptr;
_stub = nullptr;
auto message = "Unable to establish a connection to " + nodeAddress;
return absl::UnknownError(message);
}
_connectionState = ConnectionState::Connected;
LOG_INFO("Connected to ", nodeAddress);
return absl::OkStatus();
}

absl::Status FileTransferService::disconnect() {
if (_connectionState != ConnectionState::Connected) {
return absl::UnknownError("The service is in the wrong state!");
}
auto lock = getWriteLock();
_tcpPeer.reset();
_tcpPeer = nullptr;
_channel = nullptr;
_connectionState = ConnectionState::Disconnected;
return absl::OkStatus();
}

Expand Down Expand Up @@ -137,18 +146,19 @@ FileTransferService::availTransportEndpoints() {
absl::StatusOr<size_t> FileTransferService::readBytes(const std::string &bucket,
const std::string &key, uint8_t *buffer,
size_t position, size_t length) {
CHECK_CONNECTED

std::shared_ptr<TcpPeer> peer;
std::future<absl::StatusOr<size_t>> fut;
// Create a scope for the std::shared_ptr<TcpPeer> so that the peer is automatically cleaned up.
{
auto lock = getReadLock();
if (_tcpPeer.expired()) {
CHECK_CONNECTED

if (!_tcpPeer) {
return absl::UnavailableError("No TCP for " + nodeAddress);
}

LOG_DEBUG("Found TCP peer for ", nodeAddress);
auto peer = _tcpPeer.lock();
peer = _tcpPeer;
lock.unlock();
auto prom = peer->sendRpcRequest((uint64_t)buffer, bucket + "/" + key, position, length);
fut = prom->get_future();
Expand All @@ -159,8 +169,17 @@ absl::StatusOr<size_t> FileTransferService::readBytes(const std::string &bucket,
}
// Close the FileTransferService on error.
if (status.status().code() == absl::StatusCode::kAborted) {
LOG_ERROR("FileTransfer was aborted: ", status.status().message());
auto lock = getWriteLock();
_tcpPeer.reset();
if (peer == _tcpPeer) {
LOG_ERROR("Encountered an error on the TCP Transport. Trying to reconnect! Node: ",
nodeAddress);
disconnect().IgnoreError();
auto s = connect();
if (!s.ok()) {
LOG_ERROR("Unable to reconnect: ", s.message());
}
}
}
return status.status();
}
Expand Down
2 changes: 1 addition & 1 deletion src/libgeds/FileTransferService.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class FileTransferService : public utility::RWConcurrentObjectAdaptor {
std::unique_ptr<geds::rpc::GEDSService::Stub> _stub;
std::shared_ptr<GEDS> _geds;
std::shared_ptr<TcpTransport> _tcp;
std::weak_ptr<TcpPeer> _tcpPeer;
std::shared_ptr<TcpPeer> _tcpPeer;

absl::StatusOr<std::vector<std::tuple<sockaddr, geds::FileTransferProtocol>>>
availTransportEndpoints();
Expand Down
179 changes: 155 additions & 24 deletions src/libgeds/GEDS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ GEDS::~GEDS() {
}

absl::Status GEDS::start() {
if (_state == ServiceState::Running) {
LOG_INFO("GEDS is already running.");
return absl::OkStatus();
}
std::cout << "Starting GEDS (" << utility::GEDSVersion() << ")\n"
<< "- prefix: " << _pathPrefix << "\n"
<< "- metadata service: " << _metadataService.serverAddress << std::endl;
Expand Down Expand Up @@ -159,6 +163,9 @@ absl::Status GEDS::start() {
// Update state.
_state = ServiceState::Running;

(void)_metadataService.configureNode(uuid, _hostname, _server.port(),
geds::rpc::NodeState::Register);

startStorageMonitoringThread();
startPubSubStreamThread();

Expand All @@ -175,11 +182,25 @@ absl::Status GEDS::stop() {
LOG_INFO("Stopping");
LOG_INFO("Printing statistics");

geds::Statistics::print();
// Relocate to S3 if available.
relocate(true);
if (_config.force_relocation_when_stopping) {
relocate(true);
return absl::OkStatus();
}

// Update state
_state = ServiceState::Stopped;

auto result = _metadataService.disconnect();
absl::Status result;

// // Decomission node.
// auto result = _metadataService.configureNode(uuid, _hostname, _server.port(),
// geds::rpc::NodeState::Unregister);
// if (!result.ok()) {
// LOG_ERROR("Unable to unregister: ", result.message());
// }

result = _metadataService.disconnect();
if (!result.ok()) {
LOG_ERROR("cannot disconnect metadata service: ", result.message());
}
Expand All @@ -195,14 +216,14 @@ absl::Status GEDS::stop() {
_fileTransfers.clear();
_tcpTransport->stop();

_state = ServiceState::Stopped;

_storageMonitoringThread.join();

if (_pubSubStreamThread.joinable()) {
_pubSubStreamThread.join();
}

geds::Statistics::print();

return result;
}

Expand Down Expand Up @@ -496,7 +517,8 @@ GEDS::reopenFileHandle(const std::string &bucket, const std::string &key, bool i
if (location.compare(0, gedsPrefix.size(), gedsPrefix) == 0) {
fileHandle = GEDSRemoteFileHandle::factory(shared_from_this(), object);
} else if (location.compare(0, s3Prefix.size(), s3Prefix) == 0) {
fileHandle = GEDSCachedFileHandle::factory<GEDSS3FileHandle>(shared_from_this(), object);
// fileHandle = GEDSCachedFileHandle::factory<GEDSS3FileHandle>(shared_from_this(), object);
fileHandle = GEDSS3FileHandle::factory(shared_from_this(), object);
} else {
return absl::UnknownError("The remote location format " + location + " is not known.");
}
Expand Down Expand Up @@ -892,6 +914,10 @@ void GEDS::relocate(std::vector<std::shared_ptr<GEDSFileHandle>> &relocatable, b
}
h->cv.notify_one();
});

const auto tp_size = _config.io_thread_pool_size;
std::unique_lock lock(h->mutex);
h->cv.wait(lock, [h, tp_size]() { return h->nTasks <= (tp_size + 1); });
}
std::unique_lock lock(h->mutex);
h->cv.wait(lock, [h]() { return h->nTasks == 0; });
Expand All @@ -908,19 +934,102 @@ void GEDS::relocate(std::shared_ptr<GEDSFileHandle> handle, bool force) {
}

static auto stats = geds::Statistics::createCounter("GEDS: Storage Relocated");
*stats += handle->localStorageSize();
auto fsize = handle->localStorageSize();

// Remove cached files.
const auto path = getPath(handle->bucket, handle->key);
if (handle->key.starts_with(GEDSCachedFileHandle::CacheBlockMarker)) {
_fileHandles.removeIf(path, [handle](const std::shared_ptr<GEDSFileHandle> &existing) {
return handle.get() == existing.get();
});
auto status =
_fileHandles.removeIf(path, [handle](const std::shared_ptr<GEDSFileHandle> &existing) {
return handle.get() == existing.get();
});
if (status) {
*stats += fsize;
}
return;
}

// Relocate all other files.
(void)handle->relocate();
auto status = handle->relocate();
if (status.ok()) {
*stats += fsize;
}
}

absl::Status GEDS::downloadObject(const std::string &bucket, const std::string &key) {
auto oldFile = openAsFileHandle(bucket, key);
if (!oldFile.ok()) {
return oldFile.status();
}
auto newFile = createAsFileHandle(bucket, key, true /* overwrite */);
if (!newFile.ok()) {
return newFile.status();
}
auto status = (*oldFile)->download(*newFile);
if (!status.ok()) {
return status;
}
return (*newFile)->seal();
}

absl::Status GEDS::downloadObjects(std::vector<geds::ObjectID> objects) {
struct PullHelper {
std::mutex mutex;
std::condition_variable cv;
size_t nTasks;
size_t nErrors;
auto lock() { return std::unique_lock<std::mutex>(mutex); }
};
auto h = std::make_shared<PullHelper>();
{
auto lock = h->lock();
h->nTasks = objects.size();
h->nErrors = 0;
}

auto self = shared_from_this();
size_t off = 3 * _config.io_thread_pool_size;

for (size_t offset = 0; offset < objects.size(); offset += off) {
auto rbegin = offset;
auto rend = rbegin + off;
if (rend > objects.size()) {
rend = objects.size();
}
for (auto i = rbegin; i < rend; i++) {
const auto &file = objects[i];
boost::asio::post(_ioThreadPool, [self, file, h]() {
bool error = false;
try {
auto status = self->downloadObject(file.bucket, file.key);
if (!status.ok()) {
LOG_ERROR("Unable to download ", file.bucket, "/", file.key);
error = true;
}
} catch (...) {
LOG_ERROR("Encountered an exception when downloading ", file.bucket, "/", file.key);
error = true;
}
{
auto lock = h->lock();
h->nTasks -= 1;
if (error) {
h->nErrors += 1;
}
}
h->cv.notify_all();
});
}
}

auto relocateLock = h->lock();
h->cv.wait(relocateLock, [h]() { return h->nTasks == 0; });
LOG_INFO("Downloaded ", objects.size(), " objects, errors: ", h->nErrors);
if (h->nErrors) {
return absl::UnknownError("Some objects were not downloaded: Observed " +
std::to_string(h->nErrors) + " errors!");
}
return absl::OkStatus();
}

void GEDS::startStorageMonitoringThread() {
Expand All @@ -947,7 +1056,7 @@ void GEDS::startStorageMonitoringThread() {
auto memSize = fh->localMemorySize();
storageUsed += storageSize;
memoryUsed += memSize;
if (fh->isRelocatable() && memoryUsed == 0) {
if (fh->isRelocatable() && fh->openCount() == 0) {
relocatable.push_back(fh);
}
}
Expand All @@ -970,26 +1079,26 @@ void GEDS::startStorageMonitoringThread() {
*statsLocalMemoryFree = _memoryCounters.free;
}

auto targetStorage = (size_t)(0.5 * (double)_config.available_local_storage);
{
// Send heartbeat.
auto status = _metadataService.heartBeat(uuid, _storageCounters, _memoryCounters);
if (!status.ok()) {
LOG_ERROR("Unable to send heartbeat to metadata service: ", status.message());
}
}

auto targetStorage =
(size_t)(_config.storage_spilling_fraction * (double)_config.available_local_storage);
if (storageUsed > targetStorage) {
std::sort(std::begin(relocatable), std::end(relocatable),
[](std::shared_ptr<GEDSFileHandle> a, std::shared_ptr<GEDSFileHandle> b) {
if (a->openCount() == 0 && b->openCount() == 0) {
return a->lastReleased() < b->lastReleased();
}
if (a->openCount() == 0) {
return true;
}
if (b->openCount() == 0) {
return false;
}
return a->lastOpened() < b->lastOpened();
return a->lastReleased() < b->lastReleased();
});

std::vector<std::shared_ptr<GEDSFileHandle>> tasks;
size_t relocateBytes = 0;
for (auto &f : relocatable) {
if (relocateBytes > targetStorage) {
if (relocateBytes > (storageUsed - targetStorage)) {
break;
}
relocateBytes += f->localStorageSize();
Expand Down Expand Up @@ -1035,3 +1144,25 @@ absl::Status GEDS::unsubscribe(const geds::SubscriptionEvent &event) {
}
return _metadataService.unsubscribe(event);
}

absl::Status GEDS::purgeLocalObject(const std::string &bucket, const std::string &key) {
const auto path = getPath(bucket, key);
auto result = _fileHandles.getAndRemove(path);
if (!result.has_value()) {
auto message = "The object with the path " + path.name + " does not exist locally.";
LOG_ERROR(message);
return absl::NotFoundError(message);
}
return absl::OkStatus();
}

absl::Status GEDS::purgeLocalObjects(std::vector<geds::ObjectID> objects) {
LOG_DEBUG("Purging ", objects.size(), ".");
for (const auto &obj : objects) {
auto status = purgeLocalObject(obj.bucket, obj.key);
if (!status.ok()) {
LOG_ERROR(status.message());
}
}
return absl::OkStatus();
}
Loading