Skip to content

Commit 7354b2d

Browse files
authored
Merge pull request #75 from vpg/worker_monitoring
Worker monitoring
2 parents 098ccca + ffcc9b8 commit 7354b2d

11 files changed

Lines changed: 288 additions & 10 deletions

File tree

Library/Core/AbstractWorker.php

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,21 @@
1818
*/
1919
abstract class AbstractWorker extends Task implements WorkerInterface
2020
{
21+
/**
22+
* Started worker status
23+
*
24+
* @const string STATUS_STARTED
25+
*/
26+
const STATUS_STARTED = 'started';
27+
28+
/**
29+
* Exited worker status
30+
*
31+
* @const string STATUS_EXITED
32+
*/
33+
const STATUS_EXITED = 'exited';
34+
35+
2136
protected $taskOptionBaseList = [
2237
'workflow:', // required step code config file
2338
'?force', // Optional force run even if lockfile exists
@@ -38,7 +53,6 @@ abstract class AbstractWorker extends Task implements WorkerInterface
3853

3954
protected $workflowConfig;
4055

41-
4256
/**
4357
* Inits the current worker according to the given workflow config
4458
* - Loads the config
@@ -111,6 +125,7 @@ public final function startAction(array $paramList)
111125
$this->lock();
112126
$this->initWorker();
113127

128+
114129
// xxx Factorize stdout/err support
115130
$this->getDI()->get('logr')->info(
116131
"Worker listening on \033[32m" .
@@ -147,7 +162,11 @@ public final function startAction(array $paramList)
147162
$this->processMonitoringMessage($msgDto);
148163
continue;
149164
}
150-
$this->processMessage($msgDto);
165+
try {
166+
$this->processMessage($msgDto);
167+
} catch (\Exception $e) {
168+
$this->getDI()->get('logger')->error($e->getMessage());
169+
}
151170
}
152171
}
153172

@@ -214,7 +233,6 @@ private function initMq()
214233

215234
// xxx put kafka\TopicConf in DI and config in a config file
216235
$this->kafkaTopicConf = new \RdKafka\TopicConf();
217-
$this->kafkaTopicConf->set('offset.store.method', 'file');
218236
$this->kafkaTopicConf->set('auto.commit.interval.ms', 100);
219237
$this->kafkaTopicConf->set('offset.store.sync.interval.ms', 100);
220238
$this->kafkaTopicConf->set('offset.store.method', 'broker');
@@ -263,13 +281,27 @@ private function getLockFilePath()
263281
{
264282
$this->getDI()->get('logr')->debug(json_encode(func_get_args()));
265283
$lockDirPath = '/var/run/';
284+
$workerName = self::getWorkerCode($this->paramHash);
285+
$lockFileName = $workerName;
286+
return $lockDirPath . $lockFileName . '.pid';
287+
}
288+
289+
/**
290+
* Returns the code of the current worker according to the worker given argv
291+
*
292+
* @param array $paramHash The argv list
293+
*
294+
* @return string worker code e.g. : disturb-step-computesomething-1
295+
*/
296+
public static function getWorkerCode(array $paramHash)
297+
{
266298
$taskFullName = get_called_class();
267299
// xxx We will probably have to deal w/ the BU
268300
if (strpos($taskFullName, 'Manager')) {
269-
$lockFileName = 'disturb-manager';
301+
$workerName = 'disturb-manager';
270302
} else {
271-
$lockFileName = 'disturb-step-' . $this->paramHash['step'] . '-' . $this->paramHash['workerId'];
303+
$workerName = 'disturb-step-' . $paramHash['step'] . '-' . $paramHash['workerId'];
272304
}
273-
return $lockDirPath . $lockFileName . '.pid';
305+
return $workerName;
274306
}
275307
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
namespace Vpg\Disturb\Monitoring;
4+
5+
use Vpg\Disturb\Core;
6+
7+
/**
8+
* Class InvalidWorkflowConfigException
9+
*
10+
* @package Disturb\Core\Config
11+
* @author Jérome BOURGEAIS <jbourgeais@voyageprive.com>
12+
* @license https://github.com/vpg/disturb/blob/master/LICENSE MIT Licence
13+
*/
14+
class MonitoringException extends Core\Exception
15+
{
16+
17+
}

Library/Monitoring/Service.php

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
<?php
2+
namespace Vpg\Disturb\Monitoring;
3+
4+
use \Phalcon\Config;
5+
use \Phalcon\Mvc\User\Component;
6+
use \Elasticsearch;
7+
8+
use Vpg\Disturb\Workflow;
9+
use Vpg\Disturb\Core;
10+
use Vpg\Disturb\Core\Storage;
11+
12+
/**
13+
* Class WorkerService monitoring
14+
*
15+
* @package Disturb\Monitoring
16+
* @author Jérôme Bourgeais <jbourgeais@voyageprive.com>
17+
* @license https://github.com/vpg/disturb/blob/master/LICENSE MIT Licence
18+
*/
19+
class Service extends Component
20+
{
21+
22+
/**
23+
* ContextStorage constructor
24+
*
25+
* @param Json $config config
26+
*
27+
* @throws ContextStorageException
28+
*/
29+
public function __construct(Config $config)
30+
{
31+
$this->di->get('logr')->debug(json_encode(func_get_args()));
32+
$this->config = new Workflow\WorkflowConfigDto($config);
33+
$this->initClient();
34+
}
35+
36+
/**
37+
* Initialization of Elasticsearch Client
38+
*
39+
* @throws ContextStorageException
40+
*
41+
* @return void
42+
*/
43+
private function initClient()
44+
{
45+
$this->di->get('logr')->debug(json_encode(func_get_args()));
46+
$storageHost = $this->config->getStorageHost();
47+
$this->storageClient = Storage\StorageAdapterFactory::get(
48+
$this->config,
49+
Storage\StorageAdapterFactory::USAGE_MONITORING
50+
);
51+
}
52+
53+
/**
54+
* Registers a worker into the monitoring sys
55+
*
56+
* @param string $workerCode the worker's code to register
57+
*
58+
* @return void
59+
*/
60+
public function logWorkerBeat(string $workerCode)
61+
{
62+
$this->di->get('logr')->debug(json_encode(func_get_args()));
63+
$workerHash = [
64+
'heartBeatAt' => date('Y-m-d H:i:s')
65+
];
66+
$this->storageClient->save($workerCode, $workerHash);
67+
}
68+
69+
/**
70+
* Registers a worker into the monitoring sys
71+
*
72+
* @param string $workerCode the worker's code to register
73+
* @param int $pid the worker's pid
74+
*
75+
* @return void
76+
*/
77+
public function logWorkerStarted(string $workerCode, int $pid)
78+
{
79+
$this->di->get('logr')->debug(json_encode(func_get_args()));
80+
$workerHash = [
81+
'status' => Core\AbstractWorker::STATUS_STARTED,
82+
'runingOn' => php_uname("n"),
83+
'pid' => $pid,
84+
'startedAt' => date('Y-m-d H:i:s'),
85+
'heartBeatAt' => date('Y-m-d H:i:s')
86+
];
87+
$this->storageClient->save($workerCode, $workerHash);
88+
}
89+
90+
/**
91+
* Registers a worker into the monitoring sys
92+
*
93+
* @param string $workerCode the worker's code to register
94+
* @param int $exitCode the worker's exit code
95+
*
96+
* @return void
97+
*/
98+
public function logWorkerExited(string $workerCode, int $exitCode = 0)
99+
{
100+
$this->di->get('logr')->debug(json_encode(func_get_args()));
101+
$workerHash = [
102+
'exitedAt' => date('Y-m-d H:i:s'),
103+
'runingOn' => php_uname("n"),
104+
'status' => Core\AbstractWorker::STATUS_EXITED,
105+
'exitCode' => $exitCode
106+
];
107+
$this->storageClient->save($workerCode, $workerHash);
108+
}
109+
}

Tests/Library/Core/Cli/ConsoleTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Tests\Library\Cli;
3+
namespace Tests\Library\Core\Cli;
44

55
use Vpg\Disturb\Core\Cli;
66

bin/disturb-manager

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,11 @@ DISTURB_DIR=$BASE_DIR
55
if [[ "$BASE_DIR" =~ vendor\/bin$ ]]; then
66
DISTURB_DIR="$BASE_DIR/../Vpg/Disturb/bin/"
77
fi
8+
9+
php $DISTURB_DIR/monitoring.php $(uname -n) manager 'start' --pid=$$ ${@}
10+
(php $DISTURB_DIR/monitoring.php $(uname -n) manager heartbeat ${@}) &
11+
pid="$!"
812
php $DISTURB_DIR/disturb.php "Vpg\\Disturb\\Workflow\\Manager" ${@}
13+
managerExitCode=$?
14+
kill -9 $pid
15+
php $DISTURB_DIR/monitoring.php $(uname -n) manager 'exit' --exitCode=$managerExitCode ${@}

bin/disturb-step

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,10 @@ DISTURB_DIR=$BASE_DIR
55
if [[ "$BASE_DIR" =~ vendor\/bin$ ]]; then
66
DISTURB_DIR="$BASE_DIR/../Vpg/Disturb/bin/"
77
fi
8+
php $DISTURB_DIR/monitoring.php $(uname -n) step 'start' --pid=$$ ${@}
9+
(php $DISTURB_DIR/monitoring.php $(uname -n) step heartbeat ${@}) &
10+
pid="$!"
811
php $DISTURB_DIR/disturb.php "Vpg\\Disturb\\Step\\Step" start ${@}
12+
stepExitCode=$?
13+
kill -9 $pid
14+
php $DISTURB_DIR/monitoring.php $(uname -n) step 'exit' --exitCode=$stepExitCode ${@}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"actions": [
3+
{
4+
"add": {
5+
"index": "disturb_monitoring-1.0.0-beta",
6+
"alias": "disturb_monitoring"
7+
}
8+
}
9+
]
10+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"mappings": {
3+
"_default_": {
4+
"dynamic": "false"
5+
},
6+
"worker" : {
7+
}
8+
}
9+
}

bin/elasticsearch/initialize.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ HOST=$1
1414
INIT_FOLDER_PATH=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
1515

1616
# Elasticsearch indexes to create
17-
INDEX_LIST=('disturb_context-1.0.0-beta')
17+
declare -a INDEX_LIST=("disturb_context-1.0.0-beta" "disturb_monitoring-1.0.0-beta")
1818

19-
for index in $INDEX_LIST; do
19+
for index in "${INDEX_LIST[@]}"; do
2020

2121
INDEX_EXISTS_HTTP_CODE="$(curl -k -sL -w "%{http_code}\\n" $HOST/$index -o /dev/null)"
2222
if [[ $INDEX_EXISTS_HTTP_CODE == 200 ]]; then

bin/monitoring.php

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?php
2+
3+
use \Phalcon\Config\Adapter\Json;
4+
5+
use \Vpg\Disturb\Core\Cli\Console as ConsoleApp;
6+
use \Vpg\Disturb\Core;
7+
use \Vpg\Disturb\Workflow;
8+
use \Vpg\Disturb\Step;
9+
use \Vpg\Disturb\Monitoring;
10+
11+
define('DISTURB_DEBUG', getenv('DISTURB_DEBUG'));
12+
define('DISTURB_TOPIC_PREFIX', getenv('DISTURB_TOPIC_PREFIX'));
13+
14+
/**
15+
* Register the autoloader and tell it to register the tasks directory
16+
*/
17+
$loader = new \Phalcon\Loader();
18+
$loader->registerNamespaces(
19+
[
20+
'Vpg\Disturb' => realpath(__DIR__ . '/../Library/')
21+
],
22+
true
23+
);
24+
$loader->registerFiles([__DIR__ . '/../../vendor/autoload.php']);
25+
$loader->register();
26+
27+
require_once(__DIR__ . '/../Library/Core/DI.php');
28+
29+
$di->setShared('loader', $loader);
30+
31+
32+
// Create a console application
33+
$console = new ConsoleApp();
34+
$console->setDI($di);
35+
36+
37+
/**
38+
* Process the console arguments
39+
*/
40+
$arguments = [];
41+
42+
foreach ($argv as $k => $arg) {
43+
if ($k === 1) {
44+
$arguments['host'] = $arg;
45+
} elseif ($k === 2) {
46+
$arguments['worker'] = $arg;
47+
} elseif ($k === 3) {
48+
$arguments['action'] = $arg;
49+
} elseif ($k >= 4) {
50+
$arguments['params'][] = $arg;
51+
}
52+
}
53+
54+
55+
$paramHash = ConsoleApp::parseLongOpt(join($arguments['params'], ' '));
56+
$workflowConfig = new Json($paramHash['workflow']);
57+
58+
$projectBootstrapFilePath = $workflowConfig['projectBootstrap'] ?? '';
59+
if (is_readable($projectBootstrapFilePath)) {
60+
$di->get('logr')->info('Loading Bootstrap : ' . $projectBootstrapFilePath);
61+
require_once($projectBootstrapFilePath);
62+
}
63+
64+
switch ($arguments['worker']) {
65+
case 'manager':
66+
$workerCode = Workflow\ManagerWorker::getWorkerCode($paramHash);
67+
break;
68+
case 'step':
69+
$workerCode = Step\StepWorker::getWorkerCode($paramHash);
70+
break;
71+
}
72+
$monitoringService = new Monitoring\Service($workflowConfig);
73+
switch ($arguments['action']) {
74+
case 'start':
75+
$monitoringService->logWorkerStarted($workerCode, $paramHash['pid']);
76+
break;
77+
case 'exit':
78+
$monitoringService->logWorkerExited($workerCode, $paramHash['exitCode']);
79+
break;
80+
case 'heartbeat':
81+
while (true) {
82+
$monitoringService->logWorkerBeat($workerCode);
83+
// xxx put it in conf
84+
sleep(5);
85+
}
86+
break;
87+
}

0 commit comments

Comments
 (0)