Skip to content

Commit

Permalink
MINIFICPP-2525 Handle errors when enabling controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Feb 19, 2025
1 parent 7b6b5de commit a1c6e5c
Show file tree
Hide file tree
Showing 13 changed files with 480 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ class CouchbaseClusterService : public core::controller::ControllerServiceImpl {
void initialize() override;

void yield() override {
};
}

bool isWorkAvailable() override {
return false;
};
}

bool isRunning() const override {
return getState() == core::controller::ControllerServiceState::ENABLED;
Expand Down
26 changes: 2 additions & 24 deletions libminifi/include/core/controller/StandardControllerServiceNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,17 @@ class StandardControllerServiceNode : public ControllerServiceNodeImpl {
StandardControllerServiceNode(const StandardControllerServiceNode &other) = delete;
StandardControllerServiceNode &operator=(const StandardControllerServiceNode &parent) = delete;

/**
* Initializes the controller service node.
*/
void initialize() override {
ControllerServiceNodeImpl::initialize();
active = false;
}

bool canEnable() override {
if (!active.load()) {
for (auto linked_service : linked_controller_services_) {
if (!linked_service->canEnable()) {
return false;
}
}
return true;
} else {
return false;
}
}

bool canEnable() override;
bool enable() override;

bool disable() override {
controller_service_->setState(DISABLED);
active = false;
return true;
}
bool disable() override;

protected:
// controller service provider.
std::shared_ptr<ControllerServiceProvider> provider;

std::mutex mutex_;

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>
#include <utility>
#include <memory>
#include <vector>
#include "core/ProcessGroup.h"
#include "core/ClassLoader.h"
#include <memory>
#include <unordered_set>
#include <thread>
#include "core/controller/ControllerService.h"
#include "ControllerServiceNodeMap.h"
#include "ControllerServiceNode.h"
#include "StandardControllerServiceNode.h"
#include "ControllerServiceProvider.h"
#include "core/logging/LoggerFactory.h"

Expand All @@ -39,6 +35,7 @@ class StandardControllerServiceProvider : public ControllerServiceProviderImpl
: ControllerServiceProviderImpl(std::move(services)),
extension_loader_(loader),
configuration_(std::move(configuration)),
admin_yield_duration_(readAdministrativeYieldDuration()),
logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) {
}

Expand All @@ -47,64 +44,35 @@ class StandardControllerServiceProvider : public ControllerServiceProviderImpl

StandardControllerServiceProvider& operator=(const StandardControllerServiceProvider &other) = delete;
StandardControllerServiceProvider& operator=(StandardControllerServiceProvider &&other) = delete;

std::shared_ptr<ControllerServiceNode> createControllerService(const std::string& type, const std::string&, const std::string& id, bool) override {
std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id);

if (!new_controller_service) {
return nullptr;
}

std::shared_ptr<ControllerServiceNode> new_service_node = std::make_shared<StandardControllerServiceNode>(new_controller_service,
sharedFromThis<ControllerServiceProvider>(), id,
configuration_);

controller_map_->put(id, new_service_node);
return new_service_node;
}

void enableAllControllerServices() override {
logger_->log_info("Enabling {} controller services", controller_map_->getAllControllerServices().size());
for (const auto& service : controller_map_->getAllControllerServices()) {
logger_->log_info("Enabling {}", service->getName());
if (!service->canEnable()) {
logger_->log_warn("Service {} cannot be enabled", service->getName());
continue;
}
if (!service->enable()) {
logger_->log_warn("Could not enable {}", service->getName());
}
}
}

void disableAllControllerServices() override {
logger_->log_info("Disabling {} controller services", controller_map_->getAllControllerServices().size());
for (const auto& service : controller_map_->getAllControllerServices()) {
logger_->log_info("Disabling {}", service->getName());
if (!service->enabled()) {
logger_->log_warn("Service {} is not enabled", service->getName());
continue;
}
if (!service->disable()) {
logger_->log_warn("Could not disable {}", service->getName());
}
}
~StandardControllerServiceProvider() override {
stopEnableRetryThread();
}

void clearControllerServices() override {
controller_map_->clear();
}
std::shared_ptr<ControllerServiceNode> createControllerService(const std::string& type, const std::string&, const std::string& id, bool) override;
void enableAllControllerServices() override;
void disableAllControllerServices() override;
void clearControllerServices() override;

protected:
void stopEnableRetryThread();
void startEnableRetryThread();

bool canEdit() override {
return false;
}

ClassLoader &extension_loader_;

std::shared_ptr<Configure> configuration_;

private:
std::chrono::milliseconds readAdministrativeYieldDuration() const;

std::thread controller_service_enable_retry_thread_;
std::atomic_bool enable_retry_thread_running_{false};
std::mutex enable_retry_mutex_;
std::condition_variable enable_retry_condition_;
std::unordered_set<std::shared_ptr<ControllerServiceNode>> controller_services_to_enable_;
std::chrono::milliseconds admin_yield_duration_;
std::shared_ptr<logging::Logger> logger_;
};

Expand Down
12 changes: 8 additions & 4 deletions libminifi/src/core/FlowConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,13 @@ std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const s
auto old_parameter_contexts = std::move(parameter_contexts_);
service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(std::make_unique<core::controller::ControllerServiceNodeMap>(), configuration_);
auto payload = getRootFromPayload(yamlConfigPayload);
if (!url.empty() && payload != nullptr) {
if (!payload) {
service_provider_ = old_provider;
parameter_contexts_ = std::move(old_parameter_contexts);
return nullptr;
}

if (!url.empty()) {
std::string payload_flow_id;
std::string bucket_id;
auto path_split = utils::string::split(url, "/");
Expand All @@ -110,10 +116,8 @@ std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const s
}
}
flow_version_->setFlowVersion(url, bucket_id, flow_id ? *flow_id : payload_flow_id);
} else {
service_provider_ = old_provider;
parameter_contexts_ = std::move(old_parameter_contexts);
}

return payload;
}

Expand Down
1 change: 1 addition & 0 deletions libminifi/src/core/controller/ControllerServiceNodeMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void ControllerServiceNodeMap::clear() {
node->disable();
}
controller_service_nodes_.clear();
process_groups_.clear();
}

std::vector<std::shared_ptr<ControllerServiceNode>> ControllerServiceNodeMap::getAllControllerServices() const {
Expand Down
33 changes: 30 additions & 3 deletions libminifi/src/core/controller/StandardControllerServiceNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,20 @@
#include "core/controller/StandardControllerServiceNode.h"
#include <memory>
#include <mutex>
#include <algorithm>

namespace org::apache::nifi::minifi::core::controller {

bool StandardControllerServiceNode::canEnable() {
if (active) {
return false;
}

return std::all_of(linked_controller_services_.begin(), linked_controller_services_.end(), [](auto linked_service) {
return linked_service->canEnable();
});
}

bool StandardControllerServiceNode::enable() {
logger_->log_trace("Enabling CSN {}", getName());
if (active) {
Expand All @@ -47,16 +58,32 @@ bool StandardControllerServiceNode::enable() {
for (const auto& service : linked_controller_services_) {
services.push_back(service->getControllerServiceImplementation());
if (!service->enable()) {
logger_->log_debug("Linked Service '{}' could not be enabled", service->getName());
logger_->log_warn("Linked Service '{}' could not be enabled", service->getName());
return false;
}
}
impl->setLinkedControllerServices(services);
impl->onEnable();
try {
impl->setLinkedControllerServices(services);
impl->onEnable();
} catch(const std::exception& e) {
logger_->log_warn("Service '{}' failed to enable: {}", getName(), e.what());
controller_service_->setState(ENABLING);
return false;
}
} else {
logger_->log_warn("Service '{}' service implementation could not be found", controller_service_->getName());
controller_service_->setState(ENABLING);
return false;
}
active = true;
controller_service_->setState(ENABLED);
return true;
}

bool StandardControllerServiceNode::disable() {
controller_service_->setState(DISABLED);
active = false;
return true;
}

} // namespace org::apache::nifi::minifi::core::controller
127 changes: 127 additions & 0 deletions libminifi/src/core/controller/StandardControllerServiceProvider.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/controller/StandardControllerServiceProvider.h"

#include "core/controller/StandardControllerServiceNode.h"

using namespace std::literals::chrono_literals;

namespace org::apache::nifi::minifi::core::controller {

std::shared_ptr<ControllerServiceNode> StandardControllerServiceProvider::createControllerService(const std::string& type, const std::string&, const std::string& id, bool) {
std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id);

if (!new_controller_service) {
return nullptr;
}

std::shared_ptr<ControllerServiceNode> new_service_node = std::make_shared<StandardControllerServiceNode>(new_controller_service,
sharedFromThis<ControllerServiceProvider>(), id,
configuration_);

controller_map_->put(id, new_service_node);
return new_service_node;
}

void StandardControllerServiceProvider::enableAllControllerServices() {
gsl_Expects(!enable_retry_thread_running_);
{
std::lock_guard<std::mutex> lock(enable_retry_mutex_);
logger_->log_info("Enabling {} controller services", controller_map_->getAllControllerServices().size());
for (const auto& service : controller_map_->getAllControllerServices()) {
logger_->log_info("Enabling {}", service->getName());
if (!service->canEnable()) {
logger_->log_warn("Service {} cannot be enabled", service->getName());
continue;
}
if (!service->enable()) {
logger_->log_warn("Could not enable {}", service->getName());
controller_services_to_enable_.insert(service);
}
}
}
startEnableRetryThread();
}

void StandardControllerServiceProvider::disableAllControllerServices() {
stopEnableRetryThread();
logger_->log_info("Disabling {} controller services", controller_map_->getAllControllerServices().size());
for (const auto& service : controller_map_->getAllControllerServices()) {
logger_->log_info("Disabling {}", service->getName());
if (!service->disable()) {
logger_->log_warn("Could not disable {}", service->getName());
}
}
}

void StandardControllerServiceProvider::clearControllerServices() {
stopEnableRetryThread();
controller_map_->clear();
}

void StandardControllerServiceProvider::stopEnableRetryThread() {
enable_retry_thread_running_ = false;
enable_retry_condition_.notify_all();
if (controller_service_enable_retry_thread_.joinable()) {
controller_service_enable_retry_thread_.join();
}
}

void StandardControllerServiceProvider::startEnableRetryThread() {
enable_retry_thread_running_ = true;
controller_service_enable_retry_thread_ = std::thread([this]() {
if (controller_services_to_enable_.empty()) {
return;
}
std::unique_lock<std::mutex> lock(enable_retry_mutex_);
enable_retry_condition_.wait_for(lock, admin_yield_duration_, [this]() {
return !enable_retry_thread_running_;
});
while (enable_retry_thread_running_) {
for (auto it = controller_services_to_enable_.begin(); it != controller_services_to_enable_.end();) {
if ((*it)->enable()) {
it = controller_services_to_enable_.erase(it);
} else {
++it;
}
}
if (controller_services_to_enable_.empty()) {
break;
}
enable_retry_condition_.wait_for(lock, admin_yield_duration_, [this]() {
return !enable_retry_thread_running_;
});
}
controller_services_to_enable_.clear();
});
}

std::chrono::milliseconds StandardControllerServiceProvider::readAdministrativeYieldDuration() const {
std::chrono::milliseconds admin_yield_duration = 30s;
std::string yield_value_str;

if (configuration_->get(Configure::nifi_administrative_yield_duration, yield_value_str)) {
std::optional<core::TimePeriodValue> value = core::TimePeriodValue::fromString(yield_value_str);
if (value) {
admin_yield_duration = value->getMilliseconds();
}
}
return admin_yield_duration;
}

} // namespace org::apache::nifi::minifi::core::controller
Loading

0 comments on commit a1c6e5c

Please sign in to comment.