3
3
#include " base/process.hpp"
4
4
#include " base/exception.hpp"
5
5
#include " base/convert.hpp"
6
+ #include " base/configuration.hpp"
6
7
#include " base/array.hpp"
7
8
#include " base/objectlock.hpp"
8
9
#include " base/utility.hpp"
11
12
#include " base/utility.hpp"
12
13
#include " base/scriptglobal.hpp"
13
14
#include " base/json.hpp"
15
+ #include < algorithm>
14
16
#include < boost/algorithm/string/join.hpp>
15
17
#include < boost/thread/once.hpp>
18
+ #include < cstddef>
16
19
#include < thread>
17
20
#include < iostream>
18
21
@@ -32,6 +35,21 @@ extern char **environ;
32
35
using namespace icinga ;
33
36
34
37
#define IOTHREADS 4
38
+ #define MySpawner l_ProcessControl.Spawners[decltype(l_ProcessControl.Len)(this ) / sizeof (void *) % l_ProcessControl.Len]
39
+
40
+ struct Spawner
41
+ {
42
+ boost::mutex Mutex;
43
+ int FD = -1 ;
44
+ pid_t PID = -1 ;
45
+
46
+ void StartSpawnProcessHelper ();
47
+ void ProcessHandler ();
48
+ pid_t ProcessSpawn (const std::vector<String>& arguments, const Dictionary::Ptr & extraEnvironment, bool adjustPriority, int fds[3 ]);
49
+ int ProcessKill (pid_t pid, int signum);
50
+ int ProcessWaitPID (pid_t pid, int *status);
51
+ Value ProcessSpawnImpl (struct msghdr *msgh, const Dictionary::Ptr & request);
52
+ };
35
53
36
54
static boost::mutex l_ProcessMutex[IOTHREADS];
37
55
static std::map<Process::ProcessHandle, Process::Ptr > l_Processes[IOTHREADS];
@@ -41,9 +59,10 @@ static HANDLE l_Events[IOTHREADS];
41
59
static int l_EventFDs[IOTHREADS][2 ];
42
60
static std::map<Process::ConsoleHandle, Process::ProcessHandle> l_FDs[IOTHREADS];
43
61
44
- static boost::mutex l_ProcessControlMutex;
45
- static int l_ProcessControlFD = -1 ;
46
- static pid_t l_ProcessControlPID;
62
+ static struct {
63
+ Spawner* Spawners = nullptr ;
64
+ size_t Len = 0 ;
65
+ } l_ProcessControl;
47
66
#endif /* _WIN32 */
48
67
static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT;
49
68
static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT;
@@ -68,7 +87,7 @@ Process::~Process()
68
87
}
69
88
70
89
#ifndef _WIN32
71
- static Value ProcessSpawnImpl (struct msghdr *msgh, const Dictionary::Ptr & request)
90
+ Value Spawner:: ProcessSpawnImpl (struct msghdr *msgh, const Dictionary::Ptr & request)
72
91
{
73
92
struct cmsghdr *cmsg = CMSG_FIRSTHDR (msgh);
74
93
@@ -138,7 +157,7 @@ static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& reques
138
157
if (pid == 0 ) {
139
158
// child process
140
159
141
- (void )close (l_ProcessControlFD );
160
+ (void )close (FD );
142
161
143
162
if (setsid () < 0 ) {
144
163
perror (" setsid() failed" );
@@ -234,7 +253,7 @@ static Value ProcessWaitPIDImpl(struct msghdr *msgh, const Dictionary::Ptr& requ
234
253
return response;
235
254
}
236
255
237
- static void ProcessHandler ()
256
+ void Spawner:: ProcessHandler ()
238
257
{
239
258
sigset_t mask;
240
259
sigfillset (&mask);
@@ -248,7 +267,7 @@ static void ProcessHandler()
248
267
maxfds = 65536 ;
249
268
250
269
for (rlim_t i = 3 ; i < maxfds; i++)
251
- if (i != static_cast <rlim_t >(l_ProcessControlFD ))
270
+ if (i != static_cast <rlim_t >(FD ))
252
271
(void )close (i);
253
272
}
254
273
@@ -269,7 +288,7 @@ static void ProcessHandler()
269
288
msg.msg_control = cbuf;
270
289
msg.msg_controllen = sizeof (cbuf);
271
290
272
- int rc = recvmsg (l_ProcessControlFD , &msg, 0 );
291
+ int rc = recvmsg (FD , &msg, 0 );
273
292
274
293
if (rc <= 0 ) {
275
294
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
@@ -282,7 +301,7 @@ static void ProcessHandler()
282
301
283
302
size_t count = 0 ;
284
303
while (count < length) {
285
- rc = recv (l_ProcessControlFD , mbuf + count, length - count, 0 );
304
+ rc = recv (FD , mbuf + count, length - count, 0 );
286
305
287
306
if (rc <= 0 ) {
288
307
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
@@ -320,7 +339,7 @@ static void ProcessHandler()
320
339
321
340
String jresponse = JsonEncode (response);
322
341
323
- if (send (l_ProcessControlFD , jresponse.CStr (), jresponse.GetLength (), 0 ) < 0 ) {
342
+ if (send (FD , jresponse.CStr (), jresponse.GetLength (), 0 ) < 0 ) {
324
343
BOOST_THROW_EXCEPTION (posix_error ()
325
344
<< boost::errinfo_api_function (" send" )
326
345
<< boost::errinfo_errno (errno));
@@ -330,13 +349,13 @@ static void ProcessHandler()
330
349
_exit (0 );
331
350
}
332
351
333
- static void StartSpawnProcessHelper ()
352
+ void Spawner:: StartSpawnProcessHelper ()
334
353
{
335
- if (l_ProcessControlFD != -1 ) {
336
- (void )close (l_ProcessControlFD );
354
+ if (FD != -1 ) {
355
+ (void )close (FD );
337
356
338
357
int status;
339
- (void )waitpid (l_ProcessControlPID , &status, 0 );
358
+ (void )waitpid (PID , &status, 0 );
340
359
}
341
360
342
361
int controlFDs[2 ];
@@ -357,7 +376,7 @@ static void StartSpawnProcessHelper()
357
376
if (pid == 0 ) {
358
377
(void )close (controlFDs[1 ]);
359
378
360
- l_ProcessControlFD = controlFDs[0 ];
379
+ FD = controlFDs[0 ];
361
380
362
381
ProcessHandler ();
363
382
@@ -366,11 +385,11 @@ static void StartSpawnProcessHelper()
366
385
367
386
(void )close (controlFDs[0 ]);
368
387
369
- l_ProcessControlFD = controlFDs[1 ];
370
- l_ProcessControlPID = pid;
388
+ FD = controlFDs[1 ];
389
+ PID = pid;
371
390
}
372
391
373
- static pid_t ProcessSpawn (const std::vector<String>& arguments, const Dictionary::Ptr & extraEnvironment, bool adjustPriority, int fds[3 ])
392
+ pid_t Spawner:: ProcessSpawn (const std::vector<String>& arguments, const Dictionary::Ptr & extraEnvironment, bool adjustPriority, int fds[3 ])
374
393
{
375
394
Dictionary::Ptr request = new Dictionary ({
376
395
{ " command" , " spawn" },
@@ -382,7 +401,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
382
401
String jrequest = JsonEncode (request);
383
402
size_t length = jrequest.GetLength ();
384
403
385
- boost::mutex::scoped_lock lock (l_ProcessControlMutex );
404
+ boost::mutex::scoped_lock lock (Mutex );
386
405
387
406
struct msghdr msg;
388
407
memset (&msg, 0 , sizeof (msg));
@@ -408,14 +427,14 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
408
427
msg.msg_controllen = cmsg->cmsg_len ;
409
428
410
429
do {
411
- while (sendmsg (l_ProcessControlFD , &msg, 0 ) < 0 ) {
430
+ while (sendmsg (FD , &msg, 0 ) < 0 ) {
412
431
StartSpawnProcessHelper ();
413
432
}
414
- } while (send (l_ProcessControlFD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
433
+ } while (send (FD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
415
434
416
435
char buf[4096 ];
417
436
418
- ssize_t rc = recv (l_ProcessControlFD , buf, sizeof (buf), 0 );
437
+ ssize_t rc = recv (FD , buf, sizeof (buf), 0 );
419
438
420
439
if (rc <= 0 )
421
440
return -1 ;
@@ -430,7 +449,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
430
449
return response->Get (" rc" );
431
450
}
432
451
433
- static int ProcessKill (pid_t pid, int signum)
452
+ int Spawner:: ProcessKill (pid_t pid, int signum)
434
453
{
435
454
Dictionary::Ptr request = new Dictionary ({
436
455
{ " command" , " kill" },
@@ -441,17 +460,17 @@ static int ProcessKill(pid_t pid, int signum)
441
460
String jrequest = JsonEncode (request);
442
461
size_t length = jrequest.GetLength ();
443
462
444
- boost::mutex::scoped_lock lock (l_ProcessControlMutex );
463
+ boost::mutex::scoped_lock lock (Mutex );
445
464
446
465
do {
447
- while (send (l_ProcessControlFD , &length, sizeof (length), 0 ) < 0 ) {
466
+ while (send (FD , &length, sizeof (length), 0 ) < 0 ) {
448
467
StartSpawnProcessHelper ();
449
468
}
450
- } while (send (l_ProcessControlFD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
469
+ } while (send (FD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
451
470
452
471
char buf[4096 ];
453
472
454
- ssize_t rc = recv (l_ProcessControlFD , buf, sizeof (buf), 0 );
473
+ ssize_t rc = recv (FD , buf, sizeof (buf), 0 );
455
474
456
475
if (rc <= 0 )
457
476
return -1 ;
@@ -462,7 +481,7 @@ static int ProcessKill(pid_t pid, int signum)
462
481
return response->Get (" errno" );
463
482
}
464
483
465
- static int ProcessWaitPID (pid_t pid, int *status)
484
+ int Spawner:: ProcessWaitPID (pid_t pid, int *status)
466
485
{
467
486
Dictionary::Ptr request = new Dictionary ({
468
487
{ " command" , " waitpid" },
@@ -472,17 +491,17 @@ static int ProcessWaitPID(pid_t pid, int *status)
472
491
String jrequest = JsonEncode (request);
473
492
size_t length = jrequest.GetLength ();
474
493
475
- boost::mutex::scoped_lock lock (l_ProcessControlMutex );
494
+ boost::mutex::scoped_lock lock (Mutex );
476
495
477
496
do {
478
- while (send (l_ProcessControlFD , &length, sizeof (length), 0 ) < 0 ) {
497
+ while (send (FD , &length, sizeof (length), 0 ) < 0 ) {
479
498
StartSpawnProcessHelper ();
480
499
}
481
- } while (send (l_ProcessControlFD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
500
+ } while (send (FD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
482
501
483
502
char buf[4096 ];
484
503
485
- ssize_t rc = recv (l_ProcessControlFD , buf, sizeof (buf), 0 );
504
+ ssize_t rc = recv (FD , buf, sizeof (buf), 0 );
486
505
487
506
if (rc <= 0 )
488
507
return -1 ;
@@ -496,8 +515,18 @@ static int ProcessWaitPID(pid_t pid, int *status)
496
515
497
516
void Process::InitializeSpawnHelper ()
498
517
{
499
- if (l_ProcessControlFD == -1 )
500
- StartSpawnProcessHelper ();
518
+ if (!l_ProcessControl.Spawners ) {
519
+ auto len (std::max (1 , Configuration::Concurrency));
520
+
521
+ l_ProcessControl.Spawners = new Spawner[len];
522
+ l_ProcessControl.Len = len;
523
+ }
524
+
525
+ for (Spawner *current = l_ProcessControl.Spawners , *stop = l_ProcessControl.Spawners + l_ProcessControl.Len ; current < stop; ++current) {
526
+ if (current->FD == -1 ) {
527
+ current->StartSpawnProcessHelper ();
528
+ }
529
+ }
501
530
}
502
531
#endif /* _WIN32 */
503
532
@@ -977,7 +1006,7 @@ void Process::Run(const std::function<void(const ProcessResult&)>& callback)
977
1006
fds[1 ] = outfds[1 ];
978
1007
fds[2 ] = outfds[1 ];
979
1008
980
- m_Process = ProcessSpawn (m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
1009
+ m_Process = MySpawner. ProcessSpawn (m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
981
1010
m_PID = m_Process;
982
1011
983
1012
if (m_PID == -1 ) {
@@ -1040,7 +1069,7 @@ bool Process::DoEvents()
1040
1069
#ifdef _WIN32
1041
1070
TerminateProcess (m_Process, 3 );
1042
1071
#else /* _WIN32 */
1043
- int error = ProcessKill (-m_Process, SIGKILL);
1072
+ int error = MySpawner. ProcessKill (-m_Process, SIGKILL);
1044
1073
if (error) {
1045
1074
Log (LogWarning, " Process" )
1046
1075
<< " Couldn't kill the process group " << m_PID << " (" << PrettyPrintArguments (m_Arguments)
@@ -1094,7 +1123,7 @@ bool Process::DoEvents()
1094
1123
int status, exitcode;
1095
1124
if (could_not_kill || m_PID == -1 ) {
1096
1125
exitcode = 128 ;
1097
- } else if (ProcessWaitPID (m_Process, &status) != m_Process) {
1126
+ } else if (MySpawner. ProcessWaitPID (m_Process, &status) != m_Process) {
1098
1127
exitcode = 128 ;
1099
1128
1100
1129
Log (LogWarning, " Process" )
0 commit comments