diff --git a/elasticsearch.cmake b/elasticsearch.cmake index 80e01b6..69971f6 100644 --- a/elasticsearch.cmake +++ b/elasticsearch.cmake @@ -16,6 +16,9 @@ set( IRODS_PLUGIN_POLICY_COMPILE_DEFINITIONS RODS_SERVER ENABLE_RE + IRODS_ENABLE_SYSLOG + SPDLOG_FMT_EXTERNAL + SPDLOG_NO_TLS ) set( @@ -38,6 +41,7 @@ target_include_directories( ${IRODS_INCLUDE_DIRS} ${IRODS_EXTERNALS_FULLPATH_BOOST}/include ${IRODS_EXTERNALS_FULLPATH_FMT}/include + ${IRODS_EXTERNALS_FULLPATH_SPDLOG}/include ) target_link_libraries( diff --git a/indexing.cmake b/indexing.cmake index 6e2bf3c..a67bb0f 100644 --- a/indexing.cmake +++ b/indexing.cmake @@ -11,7 +11,10 @@ set( IRODS_PLUGIN_POLICY_COMPILE_DEFINITIONS RODS_SERVER ENABLE_RE - ) + IRODS_ENABLE_SYSLOG + SPDLOG_FMT_EXTERNAL + SPDLOG_NO_TLS +) set( IRODS_PLUGIN_POLICY_LINK_LIBRARIES @@ -34,8 +37,7 @@ target_include_directories( ${IRODS_INCLUDE_DIRS} ${IRODS_EXTERNALS_FULLPATH_BOOST}/include ${IRODS_EXTERNALS_FULLPATH_FMT}/include - ${IRODS_EXTERNALS_FULLPATH_JANSSON}/include - ${IRODS_EXTERNALS_FULLPATH_FMT}/include + ${IRODS_EXTERNALS_FULLPATH_SPDLOG}/include ${CMAKE_CURRENT_SOURCE_DIR}/include ) diff --git a/irods_consortium_continuous_integration_test_hook.py b/irods_consortium_continuous_integration_test_hook.py index eda28da..d91426e 100644 --- a/irods_consortium_continuous_integration_test_hook.py +++ b/irods_consortium_continuous_integration_test_hook.py @@ -103,7 +103,7 @@ def main(): if options.do_setup: install_build_prerequisites() - irods_python_ci_utilities.subprocess_get_output(['sudo', '-EH', 'python3', '-m', 'pip', 'install', 'unittest-xml-reporting==1.14.0']) + irods_python_ci_utilities.subprocess_get_output(['sudo', '-EH', 'python3', '-m', 'pip', 'install', 'unittest-xml-reporting==1.14.0', 'requests']) install_indexing_engine(options.indexing_engine) diff --git a/libirods_rule_engine_plugin-elasticsearch.cpp b/libirods_rule_engine_plugin-elasticsearch.cpp index b66f4d7..b3a3c2b 100644 --- a/libirods_rule_engine_plugin-elasticsearch.cpp +++ b/libirods_rule_engine_plugin-elasticsearch.cpp @@ -3,19 +3,29 @@ #include "utilities.hpp" #include +#include #include #include #include #include +#include #include #include +#include +#include #define IRODS_QUERY_ENABLE_SERVER_SIDE_API #include -#define IRODS_IO_TRANSPORT_ENABLE_SERVER_SIDE_API -#include -#include +#ifdef IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES +# define IRODS_IO_TRANSPORT_ENABLE_SERVER_SIDE_API +# include +# include +#else +# include +# include +# include +#endif // IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES #define IRODS_FILESYSTEM_ENABLE_SERVER_SIDE_API #include @@ -340,16 +350,61 @@ namespace { try { const std::string object_id = get_object_index_id(_rei, _object_path); - std::vector buffer(config->read_size); - irods::experimental::io::server::basic_transport xport(*_rei->rsComm); - irods::experimental::io::idstream in{xport, _object_path}; + std::vector buffer(config->read_size); int chunk_counter{0}; bool need_final_perform{false}; std::stringstream ss; +#ifdef IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES + irods::experimental::io::server::basic_transport xport(*_rei->rsComm); + irods::experimental::io::idstream in{xport, _object_path}; // TODO Add admin mode option when available. + while (in) { in.read(buffer.data(), buffer.size()); +#else + DataObjInp obj_input{}; + irods::at_scope_exit free_obj_input{[&obj_input] { clearKeyVal(&obj_input.condInput); }}; + + obj_input.dataSize = -1; + obj_input.openFlags = O_RDONLY; + + _object_path.copy(obj_input.objPath, sizeof(obj_input.objPath) - 1); + + // Add the admin keyword if the user invoking this is a rodsadmin. + // This primarily applies when the rule is invoked via irule by a rodsadmin and + // the rodsadmin does not have read permissions on the data object. + if (irods::is_privileged_client(*_rei->rsComm)) { + addKeyVal(&obj_input.condInput, ADMIN_KW, ""); + } + + char* json_output{}; + irods::at_scope_exit free_json_output{[&json_output] { std::free(json_output); }}; + + // We use rs_replica_open to ease policy management for admins. + // If we chose to use rsDataObjOpen, that means the admin has to worry about + // the PEP changing after dstream grows support for admin mode. + const auto fd = rs_replica_open(_rei->rsComm, &obj_input, &json_output); + if (fd < 3) { + THROW(fd, fmt::format("Could not open data object [{}] for reading", _object_path)); + } + + OpenedDataObjInp read_input{}; + read_input.l1descInx = fd; + read_input.len = buffer.size(); + + BytesBuf bbuf_output{}; + bbuf_output.len = read_input.len; + bbuf_output.buf = buffer.data(); + + int bytes_read = 0; + + while (true) { + bytes_read = rsDataObjRead(_rei->rsComm, &read_input, &bbuf_output); + + // TODO Log an error? The dstream library doesn't expose error codes, yet. + // It's not clear it ever will either. +#endif // IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES // The indexing instruction. // clang-format off @@ -369,7 +424,14 @@ namespace // clang-format off ss << json{ {"absolutePath", _object_path}, +#ifdef IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES {"data", std::string_view(buffer.data(), in.gcount())} +#else + // Take the max to avoid passing an integer that's less than zero to the + // the string_view constructor. This can occur when the read operation + // experiences an error. + {"data", std::string_view(buffer.data(), std::max(0, bytes_read))} +#endif // IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES }.dump(indent, indent_char, ensure_ascii, json::error_handler_t::ignore) << '\n'; // clang-format on @@ -391,6 +453,16 @@ namespace ss.str(""); } + +#ifndef IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES + if (0 == bytes_read) { + break; + } + else if (bytes_read < 0) { + rodsLog(LOG_ERROR, "%s: Read error on data object [%s]", __func__, _object_path.c_str()); + break; + } +#endif // IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES } if (chunk_counter > 0) { diff --git a/libirods_rule_engine_plugin-indexing.cpp b/libirods_rule_engine_plugin-indexing.cpp index 7ad2fc9..60dee52 100644 --- a/libirods_rule_engine_plugin-indexing.cpp +++ b/libirods_rule_engine_plugin-indexing.cpp @@ -1,13 +1,17 @@ #include "indexing_utilities.hpp" #include "utilities.hpp" +#include #include #include #include #include #include #include +#include #include +#include +#include #define IRODS_FILESYSTEM_ENABLE_SERVER_SIDE_API #include @@ -32,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +52,8 @@ int _delayExec(const char* inActionCall, namespace { + using log_re = irods::experimental::log::rule_engine; + // For handling of atomic metadata ops using metadata_tuple = std::tuple; @@ -500,6 +507,148 @@ namespace } } } + else { + // + // This block allows indexing rules to be invoked from other REPs. + // For example, the NREP. + // + + // The set of rules that can be manually invoked by the rodsadmin user. + // The rules must be aliased to avoid infinite loops. + const std::map invocable_rules{ + {"indexing_index_data_object", irods::indexing::policy::object::index}, + {"indexing_purge_data_object", irods::indexing::policy::object::purge}, + {"indexing_index_collection", irods::indexing::policy::collection::index}, + {"indexing_purge_collection", irods::indexing::policy::collection::purge}, + {"indexing_index_metadata", irods::indexing::policy::metadata::index}, + {"indexing_purge_metadata", irods::indexing::policy::metadata::purge}, + {"indexing_purge_by_regex_path", "irods_policy_recursive_rm_object_by_path"} + }; + + const auto end = std::end(invocable_rules); + const auto rule_iter = invocable_rules.find(_rn); + + if (rule_iter == end) { + // TODO Should we log here? + return; + } + + if (!irods::is_privileged_client(*_rei->rsComm)) { + const auto& user = _rei->rsComm->clientUser; + auto msg = fmt::format("{}: User [{}#{}] must be a rodsadmin to execute indexing operation [{}].", + __func__, + user.userName, + user.rodsZone, + _rn); + log_re::error(msg); + THROW(CAT_INSUFFICIENT_PRIVILEGE_LEVEL, std::move(msg)); + } + + rodsLog(LOG_NOTICE, "%s: indexing rule = [%s]", __func__, rule_iter->second.data()); + for (auto&& arg : _args) { + rodsLog(LOG_NOTICE, "arg = [%s]", boost::any_cast(arg)->c_str()); + } + + std::string policy_name; + std::list args; + + const auto get_string = [](const boost::any& _v) -> std::string& { + return *boost::any_cast(_v); + }; + + // Find the indexing technology input argument. + if (rule_iter->first.ends_with("_data_object")) { + auto iter = std::end(_args); + const auto* indexing_technology = boost::any_cast(*std::prev(iter)); + rodsLog(LOG_NOTICE, "%s: indexing_technology = [%s]", __func__, indexing_technology->c_str()); + + policy_name = irods::indexing::policy::compose_policy_name(std::string{rule_iter->second}, *indexing_technology); + rodsLog(LOG_NOTICE, "%s: policy name = [%s]", __func__, policy_name.c_str()); + + iter = std::begin(_args); + args.push_back(get_string(*iter)); // logical path + args.push_back(get_string(*++iter)); // source resource + args.push_back(get_string(*++iter)); // index name + //args.push_back(get_string(*++iter)); // index type + + rodsLog(LOG_NOTICE, "%s: testing boost::any copy operation", __func__); + for (auto&& arg : args) { + rodsLog(LOG_NOTICE, "arg = [%s]", boost::any_cast(arg).c_str()); + } + } + else if (rule_iter->first.ends_with("_collection")) { + auto iter = std::begin(_args); + const auto collection = get_string(*iter); + const auto user_name = get_string(*++iter); + const auto indexer = get_string(*++iter); + const auto index_name = get_string(*++iter); + const auto index_type = get_string(*++iter); + + irods::indexing::indexer idx{_rei, config->instance_name}; + + // Add task to the delay queue to potentially large collections do not + // block clients from making progress. + idx.schedule_policy_events_for_collection(irods::indexing::operation_type::index, + collection, + user_name, + indexer, + index_name, + index_type); + + return; + } + else if (rule_iter->first.ends_with("_metadata")) { + auto iter = std::end(_args); + const auto* indexing_technology = boost::any_cast(*std::prev(iter)); + rodsLog(LOG_NOTICE, "%s: indexing_technology = [%s]", __func__, indexing_technology->c_str()); + + policy_name = irods::indexing::policy::compose_policy_name(std::string{rule_iter->second}, *indexing_technology); + rodsLog(LOG_NOTICE, "%s: policy name = [%s]", __func__, policy_name.c_str()); + + iter = std::begin(_args); + args.push_back(get_string(*iter)); // logical path + args.push_back(get_string(*++iter)); // attribute (elastisearch plugin: op branches based on emptiness) + args.push_back(get_string(*++iter)); // value + args.push_back(get_string(*++iter)); // units + args.push_back(get_string(*++iter)); // index name + //args.push_back(get_string(*++iter)); // fetch system metadata (boolean: 0 or 1) + //args.push_back(boost::any(get_system_metadata(_rei, _object_path).dump())); + //args.push_back(get_string(*++iter)); //boost::any(_index_type)); + + rodsLog(LOG_NOTICE, "%s: testing boost::any copy operation", __func__); + for (auto&& arg : args) { + rodsLog(LOG_NOTICE, "arg = [%s]", boost::any_cast(arg).c_str()); + } + } + else if (rule_iter->first.ends_with("_by_path")) { + auto iter = std::end(_args); + const auto* indexing_technology = boost::any_cast(*std::prev(iter)); + rodsLog(LOG_NOTICE, "%s: indexing_technology = [%s]", __func__, indexing_technology->c_str()); + + policy_name = "irods_policy_recursive_rm_object_by_path"; + rodsLog(LOG_NOTICE, "%s: policy name = [%s]", __func__, policy_name.c_str()); + + iter = std::begin(_args); + args.push_back(get_string(*iter)); // logical path + ++iter; // second arg is ignored + + args.push_back(get_string(*++iter)); // user expected to pass JSON string like {"is_collection": boolean, "indices": ["index_name", ...]} + //args.push_back(nlohmann::json{ + // {"is_collection", false}, + // {"indices", nlohmann::json::array({ + // "" + // })} + //}.dump()); + //args.push_back(get_string(*++iter)); //boost::any(_index_type)); + + rodsLog(LOG_NOTICE, "%s: testing boost::any copy operation", __func__); + for (auto&& arg : args) { + rodsLog(LOG_NOTICE, "arg = [%s]", boost::any_cast(arg).c_str()); + } + } + + irods::indexing::invoke_policy(_rei, policy_name, args); + } } catch (const boost::bad_any_cast& _e) { THROW(INVALID_ANY_CAST, boost::str(boost::format("function [%s] rule name [%s]") % __FUNCTION__ % _rn)); @@ -759,7 +908,7 @@ namespace irods::error rule_exists(irods::default_re_ctx&, const std::string& _rn, bool& _ret) { - const std::set rules{ + const std::set rules{ "pep_api_atomic_apply_metadata_operations_pre", "pep_api_atomic_apply_metadata_operations_post", "pep_api_data_obj_open_post", @@ -774,14 +923,45 @@ namespace "pep_api_phy_path_reg_post", "pep_api_rm_coll_pre", "pep_api_rm_coll_post", + "indexing_index_data_object", + "indexing_purge_data_object", + "indexing_index_collection", + "indexing_purge_collection", + "indexing_index_metadata", + "indexing_purge_metadata", + "indexing_remove_data_object_by_path" }; _ret = rules.find(_rn) != rules.end(); return SUCCESS(); } // rule_exists - irods::error list_rules(irods::default_re_ctx&, std::vector&) + irods::error list_rules(irods::default_re_ctx&, std::vector& _rules) { + _rules.push_back("pep_api_atomic_apply_metadata_operations_pre"); + _rules.push_back("pep_api_atomic_apply_metadata_operations_post"); + _rules.push_back("pep_api_data_obj_open_post"); + _rules.push_back("pep_api_data_obj_create_post"); + _rules.push_back("pep_api_data_obj_repl_post"); + _rules.push_back("pep_api_data_obj_unlink_pre"); + _rules.push_back("pep_api_data_obj_unlink_post"); + _rules.push_back("pep_api_mod_avu_metadata_pre"); + _rules.push_back("pep_api_mod_avu_metadata_post"); + _rules.push_back("pep_api_data_obj_close_post"); + _rules.push_back("pep_api_data_obj_put_post"); + _rules.push_back("pep_api_phy_path_reg_post"); + _rules.push_back("pep_api_rm_coll_pre"); + _rules.push_back("pep_api_rm_coll_post"); + + // Rules that can be manually invoked by a rodsadmin. + _rules.push_back("indexing_index_data_object"); + _rules.push_back("indexing_purge_data_object"); + _rules.push_back("indexing_index_collection"); + _rules.push_back("indexing_purge_collection"); + _rules.push_back("indexing_index_metadata"); + _rules.push_back("indexing_purge_metadata"); + _rules.push_back("indexing_remove_data_object_by_path"); + return SUCCESS(); } // list_rules @@ -824,55 +1004,192 @@ namespace using json = nlohmann::json; try { - // skip the first line: @external - std::string rule_text{_rule_text}; - if (_rule_text.find("@external") != std::string::npos) { - rule_text = _rule_text.substr(10); + log_re::debug("_rule_text => [{}]", _rule_text); + + std::string_view rule_text = _rule_text; + + // irule + if (rule_text.find("@external rule {") != std::string_view::npos) { + const auto start = rule_text.find_first_of('{') + 1; + const auto end = rule_text.rfind(" }"); + + if (end == std::string_view::npos) { + auto msg = fmt::format("Received malformed rule text. " + "Expected closing curly brace following rule text [{}].", + rule_text); + log_re::error(msg); + return ERROR(SYS_INVALID_INPUT_PARAM, std::move(msg)); + } + + rule_text = rule_text.substr(start, end - start); + } + // irule -F