From f493794e12b780e8eceaaa654206a677c53a3051 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 3 Feb 2025 17:59:20 +0100 Subject: [PATCH] Spawn Configuration::Concurrency process managers and not just one to increase checks/time. --- lib/base/process.cpp | 105 +++++++++++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 38 deletions(-) diff --git a/lib/base/process.cpp b/lib/base/process.cpp index 23b7353529..c51fabe41e 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -3,6 +3,7 @@ #include "base/process.hpp" #include "base/exception.hpp" #include "base/convert.hpp" +#include "base/configuration.hpp" #include "base/array.hpp" #include "base/objectlock.hpp" #include "base/utility.hpp" @@ -11,8 +12,10 @@ #include "base/utility.hpp" #include "base/scriptglobal.hpp" #include "base/json.hpp" +#include #include #include +#include #include #include @@ -33,6 +36,21 @@ extern char **environ; using namespace icinga; #define IOTHREADS 4 +#define MySpawner l_ProcessControl.Spawners[std::hash()(this) % l_ProcessControl.Len] + +struct Spawner +{ + std::mutex Mutex; + int FD = -1; + pid_t PID = -1; + + void StartSpawnProcessHelper(); + void ProcessHandler(); + pid_t ProcessSpawn(const std::vector& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3]); + int ProcessKill(pid_t pid, int signum); + int ProcessWaitPID(pid_t pid, int *status); + Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request); +}; static std::mutex l_ProcessMutex[IOTHREADS]; static std::map l_Processes[IOTHREADS]; @@ -42,9 +60,10 @@ static HANDLE l_Events[IOTHREADS]; static int l_EventFDs[IOTHREADS][2]; static std::map l_FDs[IOTHREADS]; -static std::mutex l_ProcessControlMutex; -static int l_ProcessControlFD = -1; -static pid_t l_ProcessControlPID; +static struct { + Spawner* Spawners = nullptr; + size_t Len = 0; +} l_ProcessControl; #endif /* _WIN32 */ static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT; static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT; @@ -72,7 +91,7 @@ Process::~Process() } #ifndef _WIN32 -static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request) +Value Spawner::ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request) { struct cmsghdr *cmsg = CMSG_FIRSTHDR(msgh); @@ -147,7 +166,7 @@ static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& reques if (pid == 0) { // child process - (void)close(l_ProcessControlFD); + (void)close(FD); if (setsid() < 0) { perror("setsid() failed"); @@ -253,13 +272,13 @@ static Value ProcessWaitPIDImpl(struct msghdr *msgh, const Dictionary::Ptr& requ return response; } -static void ProcessHandler() +void Spawner::ProcessHandler() { sigset_t mask; sigfillset(&mask); sigprocmask(SIG_SETMASK, &mask, nullptr); - Utility::CloseAllFDs({0, 1, 2, l_ProcessControlFD}); + Utility::CloseAllFDs({0, 1, 2, FD}); for (;;) { size_t length; @@ -278,7 +297,7 @@ static void ProcessHandler() msg.msg_control = cbuf; msg.msg_controllen = sizeof(cbuf); - int rc = recvmsg(l_ProcessControlFD, &msg, 0); + int rc = recvmsg(FD, &msg, 0); if (rc <= 0) { if (rc < 0 && (errno == EINTR || errno == EAGAIN)) @@ -291,7 +310,7 @@ static void ProcessHandler() size_t count = 0; while (count < length) { - rc = recv(l_ProcessControlFD, mbuf + count, length - count, 0); + rc = recv(FD, mbuf + count, length - count, 0); if (rc <= 0) { if (rc < 0 && (errno == EINTR || errno == EAGAIN)) @@ -329,7 +348,7 @@ static void ProcessHandler() String jresponse = JsonEncode(response); - if (send(l_ProcessControlFD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) { + if (send(FD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) { BOOST_THROW_EXCEPTION(posix_error() << boost::errinfo_api_function("send") << boost::errinfo_errno(errno)); @@ -339,13 +358,13 @@ static void ProcessHandler() _exit(0); } -static void StartSpawnProcessHelper() +void Spawner::StartSpawnProcessHelper() { - if (l_ProcessControlFD != -1) { - (void)close(l_ProcessControlFD); + if (FD != -1) { + (void)close(FD); int status; - (void)waitpid(l_ProcessControlPID, &status, 0); + (void)waitpid(PID, &status, 0); } int controlFDs[2]; @@ -366,7 +385,7 @@ static void StartSpawnProcessHelper() if (pid == 0) { (void)close(controlFDs[1]); - l_ProcessControlFD = controlFDs[0]; + FD = controlFDs[0]; ProcessHandler(); @@ -375,11 +394,11 @@ static void StartSpawnProcessHelper() (void)close(controlFDs[0]); - l_ProcessControlFD = controlFDs[1]; - l_ProcessControlPID = pid; + FD = controlFDs[1]; + PID = pid; } -static pid_t ProcessSpawn(const std::vector& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3]) +pid_t Spawner::ProcessSpawn(const std::vector& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3]) { Dictionary::Ptr request = new Dictionary({ { "command", "spawn" }, @@ -391,7 +410,7 @@ static pid_t ProcessSpawn(const std::vector& arguments, const Dictionary String jrequest = JsonEncode(request); size_t length = jrequest.GetLength(); - std::unique_lock lock(l_ProcessControlMutex); + std::unique_lock lock(Mutex); struct msghdr msg; memset(&msg, 0, sizeof(msg)); @@ -417,14 +436,14 @@ static pid_t ProcessSpawn(const std::vector& arguments, const Dictionary msg.msg_controllen = cmsg->cmsg_len; do { - while (sendmsg(l_ProcessControlFD, &msg, 0) < 0) { + while (sendmsg(FD, &msg, 0) < 0) { StartSpawnProcessHelper(); } - } while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); + } while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); char buf[4096]; - ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0); + ssize_t rc = recv(FD, buf, sizeof(buf), 0); if (rc <= 0) return -1; @@ -439,7 +458,7 @@ static pid_t ProcessSpawn(const std::vector& arguments, const Dictionary return response->Get("rc"); } -static int ProcessKill(pid_t pid, int signum) +int Spawner::ProcessKill(pid_t pid, int signum) { Dictionary::Ptr request = new Dictionary({ { "command", "kill" }, @@ -450,17 +469,17 @@ static int ProcessKill(pid_t pid, int signum) String jrequest = JsonEncode(request); size_t length = jrequest.GetLength(); - std::unique_lock lock(l_ProcessControlMutex); + std::unique_lock lock(Mutex); do { - while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) { + while (send(FD, &length, sizeof(length), 0) < 0) { StartSpawnProcessHelper(); } - } while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); + } while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); char buf[4096]; - ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0); + ssize_t rc = recv(FD, buf, sizeof(buf), 0); if (rc <= 0) return -1; @@ -471,7 +490,7 @@ static int ProcessKill(pid_t pid, int signum) return response->Get("errno"); } -static int ProcessWaitPID(pid_t pid, int *status) +int Spawner::ProcessWaitPID(pid_t pid, int *status) { Dictionary::Ptr request = new Dictionary({ { "command", "waitpid" }, @@ -481,17 +500,17 @@ static int ProcessWaitPID(pid_t pid, int *status) String jrequest = JsonEncode(request); size_t length = jrequest.GetLength(); - std::unique_lock lock(l_ProcessControlMutex); + std::unique_lock lock(Mutex); do { - while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) { + while (send(FD, &length, sizeof(length), 0) < 0) { StartSpawnProcessHelper(); } - } while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); + } while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0); char buf[4096]; - ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0); + ssize_t rc = recv(FD, buf, sizeof(buf), 0); if (rc <= 0) return -1; @@ -505,8 +524,18 @@ static int ProcessWaitPID(pid_t pid, int *status) void Process::InitializeSpawnHelper() { - if (l_ProcessControlFD == -1) - StartSpawnProcessHelper(); + if (!l_ProcessControl.Spawners) { + auto len (std::max(1, Configuration::Concurrency)); + + l_ProcessControl.Spawners = new Spawner[len]; + l_ProcessControl.Len = len; + } + + for (Spawner *current = l_ProcessControl.Spawners, *stop = l_ProcessControl.Spawners + l_ProcessControl.Len; current < stop; ++current) { + if (current->FD == -1) { + current->StartSpawnProcessHelper(); + } + } } #endif /* _WIN32 */ @@ -992,7 +1021,7 @@ void Process::Run(const std::function& callback) fds[1] = outfds[1]; fds[2] = outfds[1]; - m_Process = ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds); + m_Process = MySpawner.ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds); m_PID = m_Process; if (m_PID == -1) { @@ -1058,7 +1087,7 @@ bool Process::DoEvents() m_OutputStream << ""; - int error = ProcessKill(m_Process, SIGTERM); + int error = MySpawner.ProcessKill(m_Process, SIGTERM); if (error) { Log(LogWarning, "Process") << "Couldn't terminate the process " << m_PID << " (" << PrettyPrintArguments(m_Arguments) @@ -1082,7 +1111,7 @@ bool Process::DoEvents() m_OutputStream << ""; TerminateProcess(m_Process, 3); #else /* _WIN32 */ - int error = ProcessKill(-m_Process, SIGKILL); + int error = MySpawner.ProcessKill(-m_Process, SIGKILL); if (error) { Log(LogWarning, "Process") << "Couldn't kill the process group " << m_PID << " (" << PrettyPrintArguments(m_Arguments) @@ -1138,7 +1167,7 @@ bool Process::DoEvents() int status, exitcode; if (could_not_kill || m_PID == -1) { exitcode = 128; - } else if (ProcessWaitPID(m_Process, &status) != m_Process) { + } else if (MySpawner.ProcessWaitPID(m_Process, &status) != m_Process) { exitcode = 128; Log(LogWarning, "Process")