3
3
#include " base/process.hpp"
4
4
#include " base/exception.hpp"
5
5
#include " base/convert.hpp"
6
+ #include " base/configuration.hpp"
6
7
#include " base/array.hpp"
7
8
#include " base/objectlock.hpp"
8
9
#include " base/utility.hpp"
11
12
#include " base/utility.hpp"
12
13
#include " base/scriptglobal.hpp"
13
14
#include " base/json.hpp"
15
+ #include < algorithm>
14
16
#include < boost/algorithm/string/join.hpp>
15
17
#include < boost/thread/once.hpp>
18
+ #include < cstddef>
16
19
#include < thread>
17
20
#include < iostream>
18
21
@@ -33,6 +36,21 @@ extern char **environ;
33
36
using namespace icinga ;
34
37
35
38
#define IOTHREADS 4
39
+ #define MySpawner l_ProcessControl.Spawners[std::hash<decltype(this )>()(this ) % l_ProcessControl.Len]
40
+
41
+ struct Spawner
42
+ {
43
+ std::mutex Mutex;
44
+ int FD = -1 ;
45
+ pid_t PID = -1 ;
46
+
47
+ void StartSpawnProcessHelper ();
48
+ void ProcessHandler ();
49
+ pid_t ProcessSpawn (const std::vector<String>& arguments, const Dictionary::Ptr & extraEnvironment, bool adjustPriority, int fds[3 ]);
50
+ int ProcessKill (pid_t pid, int signum);
51
+ int ProcessWaitPID (pid_t pid, int *status);
52
+ Value ProcessSpawnImpl (struct msghdr *msgh, const Dictionary::Ptr & request);
53
+ };
36
54
37
55
static std::mutex l_ProcessMutex[IOTHREADS];
38
56
static std::map<Process::ProcessHandle, Process::Ptr > l_Processes[IOTHREADS];
@@ -42,9 +60,10 @@ static HANDLE l_Events[IOTHREADS];
42
60
static int l_EventFDs[IOTHREADS][2 ];
43
61
static std::map<Process::ConsoleHandle, Process::ProcessHandle> l_FDs[IOTHREADS];
44
62
45
- static std::mutex l_ProcessControlMutex;
46
- static int l_ProcessControlFD = -1 ;
47
- static pid_t l_ProcessControlPID;
63
+ static struct {
64
+ Spawner* Spawners = nullptr ;
65
+ size_t Len = 0 ;
66
+ } l_ProcessControl;
48
67
#endif /* _WIN32 */
49
68
static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT;
50
69
static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT;
@@ -72,7 +91,7 @@ Process::~Process()
72
91
}
73
92
74
93
#ifndef _WIN32
75
- static Value ProcessSpawnImpl (struct msghdr *msgh, const Dictionary::Ptr & request)
94
+ Value Spawner:: ProcessSpawnImpl (struct msghdr *msgh, const Dictionary::Ptr & request)
76
95
{
77
96
struct cmsghdr *cmsg = CMSG_FIRSTHDR (msgh);
78
97
@@ -147,7 +166,7 @@ static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& reques
147
166
if (pid == 0 ) {
148
167
// child process
149
168
150
- (void )close (l_ProcessControlFD );
169
+ (void )close (FD );
151
170
152
171
if (setsid () < 0 ) {
153
172
perror (" setsid() failed" );
@@ -253,13 +272,13 @@ static Value ProcessWaitPIDImpl(struct msghdr *msgh, const Dictionary::Ptr& requ
253
272
return response;
254
273
}
255
274
256
- static void ProcessHandler ()
275
+ void Spawner:: ProcessHandler ()
257
276
{
258
277
sigset_t mask;
259
278
sigfillset (&mask);
260
279
sigprocmask (SIG_SETMASK, &mask, nullptr );
261
280
262
- Utility::CloseAllFDs ({0 , 1 , 2 , l_ProcessControlFD });
281
+ Utility::CloseAllFDs ({0 , 1 , 2 , FD });
263
282
264
283
for (;;) {
265
284
size_t length;
@@ -278,7 +297,7 @@ static void ProcessHandler()
278
297
msg.msg_control = cbuf;
279
298
msg.msg_controllen = sizeof (cbuf);
280
299
281
- int rc = recvmsg (l_ProcessControlFD , &msg, 0 );
300
+ int rc = recvmsg (FD , &msg, 0 );
282
301
283
302
if (rc <= 0 ) {
284
303
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
@@ -291,7 +310,7 @@ static void ProcessHandler()
291
310
292
311
size_t count = 0 ;
293
312
while (count < length) {
294
- rc = recv (l_ProcessControlFD , mbuf + count, length - count, 0 );
313
+ rc = recv (FD , mbuf + count, length - count, 0 );
295
314
296
315
if (rc <= 0 ) {
297
316
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
@@ -329,7 +348,7 @@ static void ProcessHandler()
329
348
330
349
String jresponse = JsonEncode (response);
331
350
332
- if (send (l_ProcessControlFD , jresponse.CStr (), jresponse.GetLength (), 0 ) < 0 ) {
351
+ if (send (FD , jresponse.CStr (), jresponse.GetLength (), 0 ) < 0 ) {
333
352
BOOST_THROW_EXCEPTION (posix_error ()
334
353
<< boost::errinfo_api_function (" send" )
335
354
<< boost::errinfo_errno (errno));
@@ -339,13 +358,13 @@ static void ProcessHandler()
339
358
_exit (0 );
340
359
}
341
360
342
- static void StartSpawnProcessHelper ()
361
+ void Spawner:: StartSpawnProcessHelper ()
343
362
{
344
- if (l_ProcessControlFD != -1 ) {
345
- (void )close (l_ProcessControlFD );
363
+ if (FD != -1 ) {
364
+ (void )close (FD );
346
365
347
366
int status;
348
- (void )waitpid (l_ProcessControlPID , &status, 0 );
367
+ (void )waitpid (PID , &status, 0 );
349
368
}
350
369
351
370
int controlFDs[2 ];
@@ -366,7 +385,7 @@ static void StartSpawnProcessHelper()
366
385
if (pid == 0 ) {
367
386
(void )close (controlFDs[1 ]);
368
387
369
- l_ProcessControlFD = controlFDs[0 ];
388
+ FD = controlFDs[0 ];
370
389
371
390
ProcessHandler ();
372
391
@@ -375,11 +394,11 @@ static void StartSpawnProcessHelper()
375
394
376
395
(void )close (controlFDs[0 ]);
377
396
378
- l_ProcessControlFD = controlFDs[1 ];
379
- l_ProcessControlPID = pid;
397
+ FD = controlFDs[1 ];
398
+ PID = pid;
380
399
}
381
400
382
- static pid_t ProcessSpawn (const std::vector<String>& arguments, const Dictionary::Ptr & extraEnvironment, bool adjustPriority, int fds[3 ])
401
+ pid_t Spawner:: ProcessSpawn (const std::vector<String>& arguments, const Dictionary::Ptr & extraEnvironment, bool adjustPriority, int fds[3 ])
383
402
{
384
403
Dictionary::Ptr request = new Dictionary ({
385
404
{ " command" , " spawn" },
@@ -391,7 +410,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
391
410
String jrequest = JsonEncode (request);
392
411
size_t length = jrequest.GetLength ();
393
412
394
- std::unique_lock<std::mutex> lock (l_ProcessControlMutex );
413
+ std::unique_lock<std::mutex> lock (Mutex );
395
414
396
415
struct msghdr msg;
397
416
memset (&msg, 0 , sizeof (msg));
@@ -417,14 +436,14 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
417
436
msg.msg_controllen = cmsg->cmsg_len ;
418
437
419
438
do {
420
- while (sendmsg (l_ProcessControlFD , &msg, 0 ) < 0 ) {
439
+ while (sendmsg (FD , &msg, 0 ) < 0 ) {
421
440
StartSpawnProcessHelper ();
422
441
}
423
- } while (send (l_ProcessControlFD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
442
+ } while (send (FD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
424
443
425
444
char buf[4096 ];
426
445
427
- ssize_t rc = recv (l_ProcessControlFD , buf, sizeof (buf), 0 );
446
+ ssize_t rc = recv (FD , buf, sizeof (buf), 0 );
428
447
429
448
if (rc <= 0 )
430
449
return -1 ;
@@ -439,7 +458,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
439
458
return response->Get (" rc" );
440
459
}
441
460
442
- static int ProcessKill (pid_t pid, int signum)
461
+ int Spawner:: ProcessKill (pid_t pid, int signum)
443
462
{
444
463
Dictionary::Ptr request = new Dictionary ({
445
464
{ " command" , " kill" },
@@ -450,17 +469,17 @@ static int ProcessKill(pid_t pid, int signum)
450
469
String jrequest = JsonEncode (request);
451
470
size_t length = jrequest.GetLength ();
452
471
453
- std::unique_lock<std::mutex> lock (l_ProcessControlMutex );
472
+ std::unique_lock<std::mutex> lock (Mutex );
454
473
455
474
do {
456
- while (send (l_ProcessControlFD , &length, sizeof (length), 0 ) < 0 ) {
475
+ while (send (FD , &length, sizeof (length), 0 ) < 0 ) {
457
476
StartSpawnProcessHelper ();
458
477
}
459
- } while (send (l_ProcessControlFD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
478
+ } while (send (FD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
460
479
461
480
char buf[4096 ];
462
481
463
- ssize_t rc = recv (l_ProcessControlFD , buf, sizeof (buf), 0 );
482
+ ssize_t rc = recv (FD , buf, sizeof (buf), 0 );
464
483
465
484
if (rc <= 0 )
466
485
return -1 ;
@@ -471,7 +490,7 @@ static int ProcessKill(pid_t pid, int signum)
471
490
return response->Get (" errno" );
472
491
}
473
492
474
- static int ProcessWaitPID (pid_t pid, int *status)
493
+ int Spawner:: ProcessWaitPID (pid_t pid, int *status)
475
494
{
476
495
Dictionary::Ptr request = new Dictionary ({
477
496
{ " command" , " waitpid" },
@@ -481,17 +500,17 @@ static int ProcessWaitPID(pid_t pid, int *status)
481
500
String jrequest = JsonEncode (request);
482
501
size_t length = jrequest.GetLength ();
483
502
484
- std::unique_lock<std::mutex> lock (l_ProcessControlMutex );
503
+ std::unique_lock<std::mutex> lock (Mutex );
485
504
486
505
do {
487
- while (send (l_ProcessControlFD , &length, sizeof (length), 0 ) < 0 ) {
506
+ while (send (FD , &length, sizeof (length), 0 ) < 0 ) {
488
507
StartSpawnProcessHelper ();
489
508
}
490
- } while (send (l_ProcessControlFD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
509
+ } while (send (FD , jrequest.CStr (), jrequest.GetLength (), 0 ) < 0 );
491
510
492
511
char buf[4096 ];
493
512
494
- ssize_t rc = recv (l_ProcessControlFD , buf, sizeof (buf), 0 );
513
+ ssize_t rc = recv (FD , buf, sizeof (buf), 0 );
495
514
496
515
if (rc <= 0 )
497
516
return -1 ;
@@ -505,8 +524,18 @@ static int ProcessWaitPID(pid_t pid, int *status)
505
524
506
525
void Process::InitializeSpawnHelper ()
507
526
{
508
- if (l_ProcessControlFD == -1 )
509
- StartSpawnProcessHelper ();
527
+ if (!l_ProcessControl.Spawners ) {
528
+ auto len (std::max (1 , Configuration::Concurrency));
529
+
530
+ l_ProcessControl.Spawners = new Spawner[len];
531
+ l_ProcessControl.Len = len;
532
+ }
533
+
534
+ for (Spawner *current = l_ProcessControl.Spawners , *stop = l_ProcessControl.Spawners + l_ProcessControl.Len ; current < stop; ++current) {
535
+ if (current->FD == -1 ) {
536
+ current->StartSpawnProcessHelper ();
537
+ }
538
+ }
510
539
}
511
540
#endif /* _WIN32 */
512
541
@@ -992,7 +1021,7 @@ void Process::Run(const std::function<void(const ProcessResult&)>& callback)
992
1021
fds[1 ] = outfds[1 ];
993
1022
fds[2 ] = outfds[1 ];
994
1023
995
- m_Process = ProcessSpawn (m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
1024
+ m_Process = MySpawner. ProcessSpawn (m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
996
1025
m_PID = m_Process;
997
1026
998
1027
if (m_PID == -1 ) {
@@ -1058,7 +1087,7 @@ bool Process::DoEvents()
1058
1087
1059
1088
m_OutputStream << " <Timeout exceeded.>" ;
1060
1089
1061
- int error = ProcessKill (m_Process, SIGTERM);
1090
+ int error = MySpawner. ProcessKill (m_Process, SIGTERM);
1062
1091
if (error) {
1063
1092
Log (LogWarning, " Process" )
1064
1093
<< " Couldn't terminate the process " << m_PID << " (" << PrettyPrintArguments (m_Arguments)
@@ -1082,7 +1111,7 @@ bool Process::DoEvents()
1082
1111
m_OutputStream << " <Timeout exceeded.>" ;
1083
1112
TerminateProcess (m_Process, 3 );
1084
1113
#else /* _WIN32 */
1085
- int error = ProcessKill (-m_Process, SIGKILL);
1114
+ int error = MySpawner. ProcessKill (-m_Process, SIGKILL);
1086
1115
if (error) {
1087
1116
Log (LogWarning, " Process" )
1088
1117
<< " Couldn't kill the process group " << m_PID << " (" << PrettyPrintArguments (m_Arguments)
@@ -1138,7 +1167,7 @@ bool Process::DoEvents()
1138
1167
int status, exitcode;
1139
1168
if (could_not_kill || m_PID == -1 ) {
1140
1169
exitcode = 128 ;
1141
- } else if (ProcessWaitPID (m_Process, &status) != m_Process) {
1170
+ } else if (MySpawner. ProcessWaitPID (m_Process, &status) != m_Process) {
1142
1171
exitcode = 128 ;
1143
1172
1144
1173
Log (LogWarning, " Process" )
0 commit comments