3
3
#include " base/process.hpp"
4
4
#include " base/exception.hpp"
5
5
#include " base/convert.hpp"
6
+ #include " base/configuration.hpp"
6
7
#include " base/array.hpp"
7
8
#include " base/objectlock.hpp"
8
9
#include " base/utility.hpp"
11
12
#include " base/utility.hpp"
12
13
#include " base/scriptglobal.hpp"
13
14
#include " base/json.hpp"
15
+ #include < algorithm>
14
16
#include < boost/algorithm/string/join.hpp>
15
17
#include < boost/thread/once.hpp>
18
+ #include < cstddef>
16
19
#include < thread>
17
20
#include < iostream>
18
21
@@ -32,6 +35,21 @@ extern char **environ;
32
35
using namespace icinga ;
33
36
34
37
#define IOTHREADS 4
38
+ #define MySpawner l_ProcessControl.Spawners[decltype(l_ProcessControl.Len)(this ) / sizeof (void *) % l_ProcessControl.Len]
39
+
40
+ struct Spawner
41
+ {
42
+ std::mutex Mutex;
43
+ int FD = -1 ;
44
+ pid_t PID = -1 ;
45
+
46
+ void StartSpawnProcessHelper ();
47
+ void ProcessHandler ();
48
+ pid_t ProcessSpawn (const std::vector<String>& arguments, const Dictionary::Ptr & extraEnvironment, bool adjustPriority, int fds[3 ]);
49
+ int ProcessKill (pid_t pid, int signum);
50
+ int ProcessWaitPID (pid_t pid, int *status);
51
+ Value ProcessSpawnImpl (struct msghdr *msgh, const Dictionary::Ptr & request);
52
+ };
35
53
36
54
static std::mutex l_ProcessMutex[IOTHREADS];
37
55
static std::map<Process::ProcessHandle, Process::Ptr > l_Processes[IOTHREADS];
@@ -41,9 +59,10 @@ static HANDLE l_Events[IOTHREADS];
41
59
static int l_EventFDs[IOTHREADS][2 ];
42
60
static std::map<Process::ConsoleHandle, Process::ProcessHandle> l_FDs[IOTHREADS];
43
61
44
- static std::mutex l_ProcessControlMutex;
45
- static int l_ProcessControlFD = -1 ;
46
- static pid_t l_ProcessControlPID;
62
+ static struct {
63
+ Spawner* Spawners = nullptr ;
64
+ size_t Len = 0 ;
65
+ } l_ProcessControl;
47
66
#endif /* _WIN32 */
48
67
static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT;
49
68
static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT;
@@ -71,7 +90,7 @@ Process::~Process()
71
90
}
72
91
73
92
#ifndef _WIN32
74
- static Value ProcessSpawnImpl (struct msghdr *msgh, const Dictionary::Ptr & request)
93
+ Value Spawner:: ProcessSpawnImpl (struct msghdr *msgh, const Dictionary::Ptr & request)
75
94
{
76
95
struct cmsghdr *cmsg = CMSG_FIRSTHDR (msgh);
77
96
@@ -146,7 +165,7 @@ static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& reques
146
165
if (pid == 0 ) {
147
166
// child process
148
167
149
- (void )close (l_ProcessControlFD );
168
+ (void )close (FD );
150
169
151
170
if (setsid () < 0 ) {
152
171
perror (" setsid() failed" );
@@ -241,13 +260,13 @@ static Value ProcessWaitPIDImpl(struct msghdr *msgh, const Dictionary::Ptr& requ
241
260
return response;
242
261
}
243
262
244
- static void ProcessHandler ()
263
+ void Spawner:: ProcessHandler ()
245
264
{
246
265
sigset_t mask;
247
266
sigfillset (&mask);
248
267
sigprocmask (SIG_SETMASK, &mask, nullptr );
249
268
250
- Utility::CloseAllFDs ({0 , 1 , 2 , l_ProcessControlFD });
269
+ Utility::CloseAllFDs ({0 , 1 , 2 , FD });
251
270
252
271
for (;;) {
253
272
size_t length;
@@ -266,7 +285,7 @@ static void ProcessHandler()
266
285
msg.msg_control = cbuf;
267
286
msg.msg_controllen = sizeof (cbuf);
268
287
269
- int rc = recvmsg (l_ProcessControlFD , &msg, 0 );
288
+ int rc = recvmsg (FD , &msg, 0 );
270
289
271
290
if (rc <= 0 ) {
272
291
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
@@ -279,7 +298,7 @@ static void ProcessHandler()
279
298
280
299
size_t count = 0 ;
281
300
while (count < length) {
282
- rc = recv (l_ProcessControlFD , mbuf + count, length - count, 0 );
301
+ rc = recv (FD , mbuf + count, length - count, 0 );
283
302
284
303
if (rc <= 0 ) {
285
304
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
@@ -317,7 +336,7 @@ static void ProcessHandler()
317
336
318
337
String jresponse = JsonEncode (response);
319
338
320
- if (send (l_ProcessControlFD , jresponse.CStr (), jresponse.GetLength (), 0 ) < 0 ) {
339
+ if (send (FD , jresponse.CStr (), jresponse.GetLength (), 0 ) < 0 ) {
321
340
BOOST_THROW_EXCEPTION (posix_error ()
322
341
<< boost::errinfo_api_function (" send" )
323
342
<< boost::errinfo_errno (errno));
@@ -327,13 +346,13 @@ static void ProcessHandler()
327
346
_exit (0 );
328
347
}
329
348
330
- static void StartSpawnProcessHelper ()
349
+ void Spawner:: StartSpawnProcessHelper ()
331
350
{
332
- if (l_ProcessControlFD != -1 ) {
333
- (void )close (l_ProcessControlFD );
351
+ if (FD != -1 ) {
352
+ (void )close (FD );
334
353
335
354
int status;
336
- (void )waitpid (l_ProcessControlPID , &status, 0 );
355
+ (void )waitpid (PID , &status, 0 );
337
356
}
338
357
339
358
int controlFDs[2 ];
@@ -354,7 +373,7 @@ static void StartSpawnProcessHelper()
354
373
if (pid == 0 ) {
355
374
(void )close (controlFDs[1 ]);
356
375
357
- l_ProcessControlFD = controlFDs[0 ];
376
+ FD = controlFDs[0 ];
358
377
359
378
ProcessHandler ();
360
379
@@ -363,11 +382,11 @@ static void StartSpawnProcessHelper()
363
382
364
383
(void )close (controlFDs[0 ]);
365
384
366
- l_ProcessControlFD = controlFDs[1 ];
367
- l_ProcessControlPID = pid;
385
+ FD = controlFDs[1 ];
386
+ PID = pid;
368
387
}
369
388
370
- static pid_t ProcessSpawn (const std::vector<String>& arguments, const Dictionary::Ptr & extraEnvironment, bool adjustPriority, int fds[3 ])
389
+ pid_t Spawner:: ProcessSpawn (const std::vector<String>& arguments, const Dictionary::Ptr & extraEnvironment, bool adjustPriority, int fds[3 ])
371
390
{
372
391
Dictionary::Ptr request = new Dictionary ({
373
392
{ " command" , " spawn" },
@@ -379,7 +398,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
379
398
String jrequest = JsonEncode (request);
380
399
size_t length = jrequest.GetLength ();
381
400
382
- std::unique_lock<std::mutex> lock (l_ProcessControlMutex );
401
+ std::unique_lock<std::mutex> lock (Mutex );
383
402
384
403
struct msghdr msg;
385
404
memset (&msg, 0 , sizeof (msg));
@@ -405,14 +424,14 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
405
424
msg.msg_controllen = cmsg->cmsg_len ;
406
425
407
426
do {
408
- while (sendmsg (l_ProcessControlFD , &msg, 0 ) < 0 ) {
427
+ while (sendmsg (FD , &msg, 0 ) < 0 ) {
409
428
StartSpawnProcessHelper ();
410
429
}
411
- } while (send (l_ProcessControlFD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
430
+ } while (send (FD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
412
431
413
432
char buf[4096 ];
414
433
415
- ssize_t rc = recv (l_ProcessControlFD , buf, sizeof (buf), 0 );
434
+ ssize_t rc = recv (FD , buf, sizeof (buf), 0 );
416
435
417
436
if (rc <= 0 )
418
437
return -1 ;
@@ -427,7 +446,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
427
446
return response->Get (" rc" );
428
447
}
429
448
430
- static int ProcessKill (pid_t pid, int signum)
449
+ int Spawner:: ProcessKill (pid_t pid, int signum)
431
450
{
432
451
Dictionary::Ptr request = new Dictionary ({
433
452
{ " command" , " kill" },
@@ -438,17 +457,17 @@ static int ProcessKill(pid_t pid, int signum)
438
457
String jrequest = JsonEncode (request);
439
458
size_t length = jrequest.GetLength ();
440
459
441
- std::unique_lock<std::mutex> lock (l_ProcessControlMutex );
460
+ std::unique_lock<std::mutex> lock (Mutex );
442
461
443
462
do {
444
- while (send (l_ProcessControlFD , &length, sizeof (length), 0 ) < 0 ) {
463
+ while (send (FD , &length, sizeof (length), 0 ) < 0 ) {
445
464
StartSpawnProcessHelper ();
446
465
}
447
- } while (send (l_ProcessControlFD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
466
+ } while (send (FD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
448
467
449
468
char buf[4096 ];
450
469
451
- ssize_t rc = recv (l_ProcessControlFD , buf, sizeof (buf), 0 );
470
+ ssize_t rc = recv (FD , buf, sizeof (buf), 0 );
452
471
453
472
if (rc <= 0 )
454
473
return -1 ;
@@ -459,7 +478,7 @@ static int ProcessKill(pid_t pid, int signum)
459
478
return response->Get (" errno" );
460
479
}
461
480
462
- static int ProcessWaitPID (pid_t pid, int *status)
481
+ int Spawner:: ProcessWaitPID (pid_t pid, int *status)
463
482
{
464
483
Dictionary::Ptr request = new Dictionary ({
465
484
{ " command" , " waitpid" },
@@ -469,17 +488,17 @@ static int ProcessWaitPID(pid_t pid, int *status)
469
488
String jrequest = JsonEncode (request);
470
489
size_t length = jrequest.GetLength ();
471
490
472
- std::unique_lock<std::mutex> lock (l_ProcessControlMutex );
491
+ std::unique_lock<std::mutex> lock (Mutex );
473
492
474
493
do {
475
- while (send (l_ProcessControlFD , &length, sizeof (length), 0 ) < 0 ) {
494
+ while (send (FD , &length, sizeof (length), 0 ) < 0 ) {
476
495
StartSpawnProcessHelper ();
477
496
}
478
- } while (send (l_ProcessControlFD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
497
+ } while (send (FD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
479
498
480
499
char buf[4096 ];
481
500
482
- ssize_t rc = recv (l_ProcessControlFD , buf, sizeof (buf), 0 );
501
+ ssize_t rc = recv (FD , buf, sizeof (buf), 0 );
483
502
484
503
if (rc <= 0 )
485
504
return -1 ;
@@ -493,8 +512,18 @@ static int ProcessWaitPID(pid_t pid, int *status)
493
512
494
513
void Process::InitializeSpawnHelper ()
495
514
{
496
- if (l_ProcessControlFD == -1 )
497
- StartSpawnProcessHelper ();
515
+ if (!l_ProcessControl.Spawners ) {
516
+ auto len (std::max (1 , Configuration::Concurrency));
517
+
518
+ l_ProcessControl.Spawners = new Spawner[len];
519
+ l_ProcessControl.Len = len;
520
+ }
521
+
522
+ for (Spawner *current = l_ProcessControl.Spawners , *stop = l_ProcessControl.Spawners + l_ProcessControl.Len ; current < stop; ++current) {
523
+ if (current->FD == -1 ) {
524
+ current->StartSpawnProcessHelper ();
525
+ }
526
+ }
498
527
}
499
528
#endif /* _WIN32 */
500
529
@@ -980,7 +1009,7 @@ void Process::Run(const std::function<void(const ProcessResult&)>& callback)
980
1009
fds[1 ] = outfds[1 ];
981
1010
fds[2 ] = outfds[1 ];
982
1011
983
- m_Process = ProcessSpawn (m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
1012
+ m_Process = MySpawner. ProcessSpawn (m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
984
1013
m_PID = m_Process;
985
1014
986
1015
if (m_PID == -1 ) {
@@ -1070,7 +1099,7 @@ bool Process::DoEvents()
1070
1099
m_OutputStream << " <Timeout exceeded.>" ;
1071
1100
TerminateProcess (m_Process, 3 );
1072
1101
#else /* _WIN32 */
1073
- int error = ProcessKill (-m_Process, SIGKILL);
1102
+ int error = MySpawner. ProcessKill (-m_Process, SIGKILL);
1074
1103
if (error) {
1075
1104
Log (LogWarning, " Process" )
1076
1105
<< " Couldn't kill the process group " << m_PID << " (" << PrettyPrintArguments (m_Arguments)
@@ -1124,7 +1153,7 @@ bool Process::DoEvents()
1124
1153
int status, exitcode;
1125
1154
if (could_not_kill || m_PID == -1 ) {
1126
1155
exitcode = 128 ;
1127
- } else if (ProcessWaitPID (m_Process, &status) != m_Process) {
1156
+ } else if (MySpawner. ProcessWaitPID (m_Process, &status) != m_Process) {
1128
1157
exitcode = 128 ;
1129
1158
1130
1159
Log (LogWarning, " Process" )
0 commit comments