Skip to content

Commit fed4564

Browse files
author
Tomáš Tatarko
committed
Initial commit
1 parent 31cfcf8 commit fed4564

9 files changed

Lines changed: 1042 additions & 0 deletions

File tree

composer.json

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"name": "websupport/celery-php-client",
3+
"description": "Simple celery client for php",
4+
"license": "MIT",
5+
"authors": [
6+
{
7+
"name": "Matej Capkovic",
8+
"email": "matej.capkovic@websupport.sk"
9+
},
10+
{
11+
"name": "Tomas Tatarko",
12+
"email": "tomas.tatarko@websupport.sk"
13+
}
14+
],
15+
"require": {
16+
"php": "~5.3"
17+
},
18+
"autoload": {
19+
"psr-4": {
20+
"Websupport\\Celery\\": "source/"
21+
}
22+
}
23+
}

source/AsyncResult.php

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
<?php
2+
3+
/**
4+
* For the full copyright and license information, please view the LICENSE
5+
* file that was distributed with this source code.
6+
*
7+
* PHP Version 5.3
8+
*
9+
* @category API
10+
* @package Celery
11+
* @author Matej Čapkovič <matej.capkovic@websupport.sk>
12+
* @author Tomáš Tatarko <tomas.tatarko@websupport.sk>
13+
* @license http://choosealicense.com/licenses/mit/ MIT
14+
* @link https://github.com/websupport-sk/celery-php-client Repository
15+
*/
16+
17+
namespace Websupport\Celery;
18+
19+
use Exception;
20+
21+
/**
22+
* Fetching and other jobs with task async results
23+
*
24+
* @category API
25+
* @package Celery
26+
* @author Matej Čapkovič <matej.capkovic@websupport.sk>
27+
* @author Tomáš Tatarko <tomas.tatarko@websupport.sk>
28+
* @license http://choosealicense.com/licenses/mit/ MIT
29+
* @link https://github.com/websupport-sk/celery-php-client Repository
30+
*/
31+
class AsyncResult
32+
{
33+
public $task;
34+
public $body;
35+
36+
/**
37+
* Creating result instance from given task
38+
* @param Task $task Instance of executed task
39+
*/
40+
public function __construct(Task $task)
41+
{
42+
$this->task = $task;
43+
}
44+
45+
/**
46+
* Checks if task is completed
47+
* @return boolean
48+
*/
49+
protected function getCompleteResult()
50+
{
51+
$r = $this->task->connection->getMessageBody($this->task->id);
52+
53+
if ($r !== false
54+
&& isset($r->status)
55+
&& !in_array($r->status, array('STARTED', 'RECEIVED', 'PENDING'))
56+
) {
57+
$this->body = $r;
58+
}
59+
60+
if ($r !== false) {
61+
return $r;
62+
}
63+
64+
return false;
65+
}
66+
67+
/**
68+
* Checks if task is ready
69+
* @return booelan
70+
*/
71+
public function isReady()
72+
{
73+
$r = $this->getCompleteResult();
74+
return $r !== false
75+
&& isset($r->status)
76+
&& !in_array($r->status, array('STARTED', 'RECEIVED', 'PENDING'));
77+
}
78+
79+
/**
80+
* Checks if task has completed successfully
81+
* @return booelan
82+
*/
83+
public function isSuccess()
84+
{
85+
return $this->getStatus() == 'SUCCESS';
86+
}
87+
88+
/**
89+
* Gets task result status
90+
* @return string
91+
*/
92+
public function getStatus()
93+
{
94+
if (!$this->body) {
95+
throw new Exception('Called getStatus before task was ready');
96+
}
97+
return $this->body->status;
98+
}
99+
100+
/**
101+
* Gets task result traceback
102+
* @return array
103+
*/
104+
public function getTraceback()
105+
{
106+
if (!$this->body) {
107+
throw new Exception('Called getTraceback before task was ready');
108+
}
109+
return $this->body->traceback;
110+
}
111+
112+
/**
113+
* Gets task's result
114+
* @return string
115+
*/
116+
public function getResult()
117+
{
118+
if (!$this->body) {
119+
throw new Exception('Called getResult before task was ready');
120+
}
121+
return $this->body->result;
122+
}
123+
124+
/**
125+
* Waiting for task to complete
126+
* @param integer $timeout Max number of seconds to wait
127+
* @param float $interval Time interval of repeated checks
128+
* @return string
129+
* @throws \Exception
130+
*/
131+
public function get($timeout = 10, $interval = 0.5)
132+
{
133+
$interval_us = (int) ($interval * 1000000);
134+
135+
$startTime = microtime(true);
136+
while (microtime(true) - $startTime < $timeout) {
137+
if ($this->isReady()) {
138+
break;
139+
}
140+
usleep($interval_us);
141+
}
142+
143+
if (!$this->isReady()) {
144+
throw new Exception(
145+
sprintf(
146+
'task %s(%s) did not return after %d seconds',
147+
$this->task->name,
148+
json_encode($this->task->args), $timeout
149+
)
150+
);
151+
}
152+
153+
return $this->getResult();
154+
}
155+
156+
/**
157+
* Accessing class methods return as properties
158+
* @param string $property Property name to get
159+
* @return string
160+
* @throws \Exception
161+
*/
162+
public function __get($property)
163+
{
164+
if ($property == 'result' || $property == 'traceback') {
165+
if ($this->isReady()) {
166+
$n = 'get'.$property;
167+
return $this->$n();
168+
} else {
169+
return null;
170+
}
171+
} elseif ($property == 'status') {
172+
if ($this->isReady()) {
173+
return $this->getStatus();
174+
} else {
175+
return 'PENDING';
176+
}
177+
}
178+
throw new Exception(
179+
'Property "'.get_class($this).'.'.$property.'" is not defined.'
180+
);
181+
}
182+
}

source/Chain.php

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<?php
2+
3+
/**
4+
* For the full copyright and license information, please view the LICENSE
5+
* file that was distributed with this source code.
6+
*
7+
* PHP Version 5.3
8+
*
9+
* @category API
10+
* @package Celery
11+
* @author Matej Čapkovič <matej.capkovic@websupport.sk>
12+
* @author Tomáš Tatarko <tomas.tatarko@websupport.sk>
13+
* @license http://choosealicense.com/licenses/mit/ MIT
14+
* @link https://github.com/websupport-sk/celery-php-client Repository
15+
*/
16+
17+
namespace Websupport\Celery;
18+
19+
/**
20+
* Execution tasks in specific order
21+
*
22+
* @category API
23+
* @package Celery
24+
* @author Matej Čapkovič <matej.capkovic@websupport.sk>
25+
* @author Tomáš Tatarko <tomas.tatarko@websupport.sk>
26+
* @license http://choosealicense.com/licenses/mit/ MIT
27+
* @link https://github.com/websupport-sk/celery-php-client Repository
28+
*/
29+
class Chain extends Task
30+
{
31+
/**
32+
* List of group's tasks to execute
33+
* @var array
34+
*/
35+
public $tasks;
36+
37+
/**
38+
* Instance of group result execution
39+
* @var GroupResult
40+
*/
41+
public $parents;
42+
43+
/**
44+
* Setting tasks to group on create
45+
* @param array $tasks Instance of Task class
46+
*/
47+
public function __construct($tasks)
48+
{
49+
$this->tasks = $tasks;
50+
$this->name = 'celery.chain';
51+
$t = array_pop($tasks); // $tasks was modified!
52+
$this->connection = $t->connection;
53+
$this->result = new AsyncResult($t);
54+
$this->parents = new GroupResult($tasks);
55+
}
56+
57+
/**
58+
* Wrapping and formatting tasks before putting into celery
59+
* @return array
60+
*/
61+
protected function format()
62+
{
63+
$callbacks = array_reverse($this->tasks);
64+
$tasks = array(array_pop($callbacks)); // pop first
65+
66+
$res = array();
67+
foreach ($tasks as $task) {
68+
$res[] = array_merge(
69+
$this->formatBase($task), array(
70+
'id' => $task->id,
71+
'callbacks' => $this->formatCallbacks($callbacks),
72+
)
73+
);
74+
}
75+
return $res;
76+
}
77+
}

source/Chord.php

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
<?php
2+
3+
/**
4+
* For the full copyright and license information, please view the LICENSE
5+
* file that was distributed with this source code.
6+
*
7+
* PHP Version 5.3
8+
*
9+
* @category API
10+
* @package Celery
11+
* @author Matej Čapkovič <matej.capkovic@websupport.sk>
12+
* @author Tomáš Tatarko <tomas.tatarko@websupport.sk>
13+
* @license http://choosealicense.com/licenses/mit/ MIT
14+
* @link https://github.com/websupport-sk/celery-php-client Repository
15+
*/
16+
17+
namespace Websupport\Celery;
18+
19+
/**
20+
* Executing groups of tasks at random with one final task to summarize them all
21+
*
22+
* @category API
23+
* @package Celery
24+
* @author Matej Čapkovič <matej.capkovic@websupport.sk>
25+
* @author Tomáš Tatarko <tomas.tatarko@websupport.sk>
26+
* @license http://choosealicense.com/licenses/mit/ MIT
27+
* @link https://github.com/websupport-sk/celery-php-client Repository
28+
*/
29+
class Chord extends Task
30+
{
31+
/**
32+
* List of group's tasks to execute
33+
* @var array
34+
*/
35+
public $tasks;
36+
37+
/**
38+
* Instance of final task
39+
* @var Task
40+
*/
41+
public $final;
42+
43+
/**
44+
* Instance of group result execution
45+
* @var GroupResult
46+
*/
47+
public $parents;
48+
49+
/**
50+
* Setting tasks to group on create
51+
* @param array $tasks Instance of Task class
52+
* @param Task $final Instance of final task to execute
53+
*/
54+
public function __construct($tasks, $final)
55+
{
56+
$this->tasks = $tasks;
57+
$this->final = $final;
58+
$this->name = 'celery.chord';
59+
$this->connection = $final->connection;
60+
$this->result = new AsyncResult($final);
61+
$this->parents = new GroupResult($tasks);
62+
}
63+
64+
/**
65+
* Wrapping and formatting tasks before putting into celery
66+
* @return array
67+
*/
68+
protected function format()
69+
{
70+
$this->taskset = self::genUuid();
71+
$k = sprintf("%s%s", 'celery-taskset-meta-', $this->taskset);
72+
$s = array();
73+
foreach ($this->tasks as $t) {
74+
$t->taskset = $this->taskset;
75+
$s[] = array(array($t->id, null), null);
76+
}
77+
$v = json_encode(array("result" => array($this->taskset, $s)));
78+
$this->final->connection->executeCommand('SETEX', array($k, 86400, $v));
79+
80+
$res = array();
81+
foreach ($this->tasks as $task) {
82+
$res[] = array_merge(
83+
$this->formatBase($task), array(
84+
'id'=>$task->id,
85+
'chord' => $this->formatChord($this->final, count($this->tasks)),
86+
)
87+
);
88+
}
89+
return $res;
90+
}
91+
92+
/**
93+
* Wrapping and formatting chord before putting into celery
94+
* @param Task $task Instance of task to format
95+
* @param type $size Chord's size
96+
* @return array
97+
*/
98+
protected function formatChord(Task $task, $size)
99+
{
100+
return array_merge(
101+
$this->formatBase($task), array(
102+
'options' => array("task_id" => $task->id),
103+
'chord_size' => $size,
104+
)
105+
);
106+
}
107+
}

0 commit comments

Comments
 (0)