Skip to content

Commit 7906983

Browse files
committed
Initial commit
0 parents  commit 7906983

11 files changed

+364
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.idea/
2+
/vendor/
3+
.DS_Store

composer.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"name": "ilzrv/php-bull-queue",
3+
"description": "PHP Job Creator For Bull Queue",
4+
"require": {
5+
"php": "^7.4",
6+
"ramsey/uuid": "^4.1"
7+
},
8+
"license": "MIT",
9+
"authors": [
10+
{
11+
"name": "Ilia Lazarev",
12+
"email": "[email protected]"
13+
}
14+
],
15+
"autoload": {
16+
"psr-4": {
17+
"Ilzrv\\PhpBullQueue\\": "src"
18+
}
19+
}
20+
}
21+
22+

src/DTOs/DataTransferObject.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ilzrv\PhpBullQueue\DTOs;
6+
7+
use ReflectionClass;
8+
use ReflectionNamedType;
9+
use ReflectionProperty;
10+
11+
abstract class DataTransferObject
12+
{
13+
public function __construct(array $parameters = [])
14+
{
15+
$class = new ReflectionClass(static::class);
16+
17+
foreach ($class->getProperties(ReflectionProperty::IS_PUBLIC) as $reflectionProperty) {
18+
$property = $reflectionProperty->getName();
19+
20+
if (isset($parameters[$property])) {
21+
$this->{$property} = $parameters[$property];
22+
} else {
23+
/** @var ReflectionNamedType $type */
24+
$type = $reflectionProperty->getType();
25+
$name = $type->getName();
26+
27+
$this->{$property} = !$type->isBuiltin() && class_exists($name)
28+
? new $name()
29+
: $this->{$property} ?? null;
30+
}
31+
32+
// $this->{$property} = !$type->isBuiltin() && class_exists($name)
33+
// ? $parameters[$property] ?? new $name()
34+
// : $parameters[$property] ?? $this->{$property} ?? null;
35+
}
36+
}
37+
38+
public function toJson(): string
39+
{
40+
return (string) json_encode($this);
41+
}
42+
}

src/DTOs/JobOpts.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ilzrv\PhpBullQueue\DTOs;
6+
7+
class JobOpts extends DataTransferObject
8+
{
9+
public string $customJobId = '';
10+
public int $priority = 0;
11+
public bool $lifo = false;
12+
public int $attempts = 1;
13+
public int $timestamp;
14+
public int $delay = 0;
15+
}

src/DTOs/QueueOpts.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ilzrv\PhpBullQueue\DTOs;
6+
7+
class QueueOpts extends DataTransferObject
8+
{
9+
public RedisConfig $redis;
10+
public string $prefix = 'bull';
11+
}

src/DTOs/RedisConfig.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ilzrv\PhpBullQueue\DTOs;
6+
7+
class RedisConfig extends DataTransferObject
8+
{
9+
public string $driver = 'phpredis';
10+
public string $host = '127.0.0.1';
11+
public int $port = 6379;
12+
}

src/Drivers/PhpRedisQueue.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ilzrv\PhpBullQueue\Drivers;
6+
7+
use Ilzrv\PhpBullQueue\DTOs\RedisConfig;
8+
use Redis;
9+
10+
class PhpRedisQueue implements RedisQueue
11+
{
12+
protected Redis $client;
13+
14+
public function __construct(RedisConfig $config, Redis $redis = null)
15+
{
16+
if (!is_null($redis)) {
17+
$this->client = $redis;
18+
} else {
19+
$this->client = new Redis();
20+
21+
$this->client->connect(
22+
$config->host,
23+
$config->port,
24+
);
25+
}
26+
}
27+
28+
public function add(string $script, array $args, int $numKeys)
29+
{
30+
return $this->client->eval($script, $args, $numKeys);
31+
}
32+
}

src/Drivers/PredisQueue.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ilzrv\PhpBullQueue\Drivers;
6+
7+
use Ilzrv\PhpBullQueue\DTOs\RedisConfig;
8+
use Predis\Client as Redis;
9+
10+
class PredisQueue implements RedisQueue
11+
{
12+
protected Redis $client;
13+
14+
public function __construct(RedisConfig $config, Redis $redis = null)
15+
{
16+
if (!is_null($redis)) {
17+
$this->client = $redis;
18+
} else {
19+
$this->client = new Redis([
20+
'host' => $config->host,
21+
'port' => $config->port,
22+
]);
23+
}
24+
}
25+
26+
public function add(string $script, array $args, int $numKeys)
27+
{
28+
return $this->client->eval($script, $numKeys, ...$args);
29+
}
30+
}

src/Drivers/RedisQueue.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ilzrv\PhpBullQueue\Drivers;
6+
7+
interface RedisQueue
8+
{
9+
public function add(string $script, array $args, int $numKeys);
10+
}

src/LuaScripts.php

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ilzrv\PhpBullQueue;
6+
7+
class LuaScripts
8+
{
9+
/**
10+
* Adds a job to the queue by doing the following:
11+
*
12+
* KEYS[1] 'wait',
13+
* KEYS[2] 'paused'
14+
* KEYS[3] 'meta-paused'
15+
* KEYS[4] 'id'
16+
* KEYS[5] 'delayed'
17+
* KEYS[6] 'priority'
18+
*
19+
* ARGV[1] key prefix
20+
* ARGV[2] custom id (will not generate one automatically)
21+
* ARGV[3] name
22+
* ARGV[4] data (json stringified job data)
23+
* ARGV[5] opts (json stringified job opts)
24+
* ARGV[6] timestamp
25+
* ARGV[7] delay
26+
* ARGV[8] delayedTimestamp
27+
* ARGV[9] priority
28+
* ARGV[10] LIFO
29+
* ARGV[11] token
30+
*
31+
* @return string
32+
*/
33+
public static function add()
34+
{
35+
return <<<'LUA'
36+
local jobId
37+
local jobIdKey
38+
local rcall = redis.call
39+
40+
local jobCounter = rcall("INCR", KEYS[4])
41+
42+
if ARGV[2] == "" then
43+
jobId = jobCounter
44+
jobIdKey = ARGV[1] .. jobId
45+
else
46+
jobId = ARGV[2]
47+
jobIdKey = ARGV[1] .. jobId
48+
if rcall("EXISTS", jobIdKey) == 1 then
49+
return jobId .. "" -- convert to string
50+
end
51+
end
52+
53+
-- Store the job.
54+
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
55+
56+
-- Check if job is delayed
57+
local delayedTimestamp = tonumber(ARGV[8])
58+
if(delayedTimestamp ~= 0) then
59+
local timestamp = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)
60+
rcall("ZADD", KEYS[5], timestamp, jobId)
61+
rcall("PUBLISH", KEYS[5], delayedTimestamp)
62+
else
63+
local target
64+
65+
-- Whe check for the meta-paused key to decide if we are paused or not
66+
-- (since an empty list and !EXISTS are not really the same)
67+
local paused
68+
if rcall("EXISTS", KEYS[3]) ~= 1 then
69+
target = KEYS[1]
70+
paused = false
71+
else
72+
target = KEYS[2]
73+
paused = true
74+
end
75+
76+
-- Standard or priority add
77+
local priority = tonumber(ARGV[9])
78+
if priority == 0 then
79+
-- LIFO or FIFO
80+
rcall(ARGV[10], target, jobId)
81+
else
82+
-- Priority add
83+
rcall("ZADD", KEYS[6], priority, jobId)
84+
local count = rcall("ZCOUNT", KEYS[6], 0, priority)
85+
86+
local len = rcall("LLEN", target)
87+
local id = rcall("LINDEX", target, len - (count-1))
88+
if id then
89+
rcall("LINSERT", target, "BEFORE", id, jobId)
90+
else
91+
rcall("RPUSH", target, jobId)
92+
end
93+
94+
end
95+
96+
-- Emit waiting event (wait..ing@token)
97+
rcall("PUBLISH", KEYS[1] .. "ing@" .. ARGV[11], jobId)
98+
end
99+
100+
return jobId .. "" -- convert to string
101+
LUA;
102+
}
103+
}

src/Queue.php

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ilzrv\PhpBullQueue;
6+
7+
use Ilzrv\PhpBullQueue\Drivers\PhpRedisQueue;
8+
use Ilzrv\PhpBullQueue\Drivers\PredisQueue;
9+
use Ilzrv\PhpBullQueue\Drivers\RedisQueue;
10+
use Ilzrv\PhpBullQueue\DTOs\JobOpts;
11+
use Ilzrv\PhpBullQueue\DTOs\QueueOpts;
12+
use Ramsey\Uuid\Uuid;
13+
use RuntimeException;
14+
15+
class Queue
16+
{
17+
const DEFAULT_JOB_NAME = '__default__';
18+
19+
private string $name;
20+
21+
private QueueOpts $opts;
22+
23+
public function __construct(string $name, QueueOpts $opts = null)
24+
{
25+
$this->name = $name;
26+
$this->opts = $opts ?: new QueueOpts();
27+
}
28+
29+
/**
30+
* Adds a job to the queue.
31+
*
32+
* @param string $name
33+
* @param array $data
34+
* @param JobOpts|null $opts
35+
* @return mixed
36+
*/
37+
public function add(string $name, array $data, JobOpts $opts = null)
38+
{
39+
$opts = $opts ?: new JobOpts([
40+
'timestamp' => (int) str_replace('.', '', microtime(true)),
41+
]);
42+
43+
$prefix = sprintf('%s:%s:', $this->opts->prefix, $this->name);
44+
45+
return $this->queue()->add(
46+
LuaScripts::add(),
47+
[
48+
$prefix.'wait',
49+
$prefix.'paused',
50+
$prefix.'meta-paused',
51+
$prefix.'id',
52+
$prefix.'delayed',
53+
$prefix.'priority',
54+
$prefix,
55+
$opts->customJobId,
56+
$name,
57+
json_encode($data),
58+
$opts->toJson(),
59+
$opts->timestamp,
60+
$opts->delay,
61+
$opts->delay ? $opts->timestamp + $opts->delay : 0,
62+
$opts->priority,
63+
$opts->lifo ? 'RPUSH' : 'LPUSH',
64+
(string) Uuid::uuid4()
65+
],
66+
6
67+
);
68+
}
69+
70+
/**
71+
* @return RedisQueue
72+
*/
73+
protected function queue()
74+
{
75+
switch ($this->opts->redis->driver) {
76+
case 'predis':
77+
return new PredisQueue($this->opts->redis);
78+
case 'phpredis':
79+
return new PhpRedisQueue($this->opts->redis);
80+
default:
81+
throw new RuntimeException("{$this->opts->redis->driver} driver not supported.");
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)