Skip to content

Commit a8ae4ee

Browse files
committed
队列支持
1 parent 0e161ff commit a8ae4ee

File tree

10 files changed

+147
-56
lines changed

10 files changed

+147
-56
lines changed

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,38 @@ Route::get('static/:path', function (string $path) {
4141

4242
2. 访问路由 `http://localhost/static/文件路径`
4343

44+
## 队列支持
45+
46+
使用方法见 [think-queue](https://github.com/top-think/think-queue)
47+
48+
以下配置代替think-queue里的最后一步:`监听任务并执行`,无需另外起进程执行队列
49+
50+
```php
51+
return [
52+
// ...
53+
'queue' => [
54+
'enable' => true,
55+
//键名是队列名称
56+
'workers' => [
57+
//下面参数是不设置时的默认配置
58+
'default' => [
59+
'delay' => 0,
60+
'sleep' => 3,
61+
'tries' => 0,
62+
'timeout' => 60,
63+
'worker_num' => 1,
64+
],
65+
//使用@符号后面可指定队列使用驱动
66+
'default@connection' => [
67+
//此处可不设置任何参数,使用上面的默认配置
68+
],
69+
],
70+
],
71+
// ...
72+
];
73+
74+
```
75+
4476
## 自定义worker
4577
监听`worker.init`事件 注入`Manager`对象,调用addWorker方法添加
4678
~~~php

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
"require-dev": {
3232
"pestphp/pest": "^3.7",
3333
"guzzlehttp/guzzle": "^7.0",
34+
"topthink/think-queue": "^3.0",
3435
"phpstan/phpstan": "^2.0"
3536
},
3637
"autoload-dev": {

phpstan.neon.dist renamed to phpstan.neon

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ parameters:
1212
- '#Function config_path not found.#'
1313
- '#Function public_path not found.#'
1414
- '#Function json not found.#'
15+
- '#While loop condition is always true#'
16+
- '#Constant STUB_DIR not found.#'
File renamed without changes.

src/Manager.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace think\worker;
44

55
use think\worker\concerns\InteractsWithHttp;
6+
use think\worker\concerns\InteractsWithQueue;
67
use think\worker\concerns\InteractsWithServer;
78
use think\worker\concerns\WithApplication;
89
use think\worker\concerns\WithContainer;
@@ -11,11 +12,13 @@ class Manager
1112
{
1213
use InteractsWithServer,
1314
InteractsWithHttp,
15+
InteractsWithQueue,
1416
WithApplication,
1517
WithContainer;
1618

1719
protected function initialize(): void
1820
{
1921
$this->prepareHttp();
22+
$this->prepareQueue();
2023
}
2124
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
namespace think\worker\concerns;
4+
5+
use think\helper\Arr;
6+
use think\queue\event\JobFailed;
7+
use think\queue\Worker;
8+
use Workerman\Timer;
9+
10+
trait InteractsWithQueue
11+
{
12+
protected function createQueueWorkers()
13+
{
14+
$workers = $this->getConfig('queue.workers', []);
15+
16+
foreach ($workers as $queue => $options) {
17+
18+
if (strpos($queue, '@') !== false) {
19+
[$queue, $connection] = explode('@', $queue);
20+
} else {
21+
$connection = null;
22+
}
23+
24+
$workerNum = Arr::get($options, 'worker_num', 1);
25+
26+
$this->addWorker(function (\think\worker\Worker $worker) use ($options, $connection, $queue) {
27+
$delay = Arr::get($options, 'delay', 0);
28+
$sleep = Arr::get($options, 'sleep', 3);
29+
$tries = Arr::get($options, 'tries', 0);
30+
$timeout = Arr::get($options, 'timeout', 60);
31+
32+
$qWorker = $this->app->make(Worker::class);
33+
34+
while (true) {
35+
$timer = Timer::add($timeout, function () use ($worker) {
36+
$worker->stop();
37+
}, [], false);
38+
39+
$this->runInSandbox(function () use ($connection, $queue, $delay, $sleep, $tries, $qWorker) {
40+
$qWorker->runNextJob($connection, $queue, $delay, $sleep, $tries);
41+
});
42+
43+
Timer::del($timer);
44+
}
45+
}, "queue [$queue]", $workerNum);
46+
}
47+
}
48+
49+
public function prepareQueue()
50+
{
51+
if ($this->getConfig('queue.enable', false)) {
52+
$this->listenForEvents();
53+
$this->createQueueWorkers();
54+
}
55+
}
56+
57+
/**
58+
* 注册事件
59+
*/
60+
protected function listenForEvents()
61+
{
62+
$this->container->event->listen(JobFailed::class, function (JobFailed $event) {
63+
$this->logFailedJob($event);
64+
});
65+
}
66+
67+
/**
68+
* 记录失败任务
69+
* @param JobFailed $event
70+
*/
71+
protected function logFailedJob(JobFailed $event)
72+
{
73+
$this->container['queue.failer']->log(
74+
$event->connection,
75+
$event->job->getQueue(),
76+
$event->job->getRawBody(),
77+
$event->exception
78+
);
79+
}
80+
}

src/config/worker.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
'worker_num' => 2,
1818
'options' => [],
1919
],
20+
//队列
21+
'queue' => [
22+
'enable' => false,
23+
'workers' => [],
24+
],
2025
'hot_update' => [
2126
'enable' => env('APP_DEBUG', false),
2227
'name' => ['*.php'],

tests/Pest.php

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,2 @@
11
<?php
2-
3-
/*
4-
|--------------------------------------------------------------------------
5-
| Test Case
6-
|--------------------------------------------------------------------------
7-
|
8-
| The closure you provide to your test functions is always bound to a specific PHPUnit test
9-
| case class. By default, that class is "PHPUnit\Framework\TestCase". Of course, you may
10-
| need to change it using the "uses()" function to bind a different classes or traits.
11-
|
12-
*/
13-
14-
// uses(Tests\TestCase::class)->in('Feature');
15-
16-
/*
17-
|--------------------------------------------------------------------------
18-
| Expectations
19-
|--------------------------------------------------------------------------
20-
|
21-
| When you're writing tests, you often need to check that values meet certain conditions. The
22-
| "expect()" function gives you access to a set of "expectations" methods that you can use
23-
| to assert different things. Of course, you may extend the Expectation API at any time.
24-
|
25-
*/
26-
27-
28-
/*
29-
|--------------------------------------------------------------------------
30-
| Functions
31-
|--------------------------------------------------------------------------
32-
|
33-
| While Pest is very powerful out-of-the-box, you may have some testing code specific to your
34-
| project that you don't want to repeat in every file. Here you can also expose helpers as
35-
| global functions to help you to reduce the number of lines of code in your test files.
36-
|
37-
*/
38-
39-
function something()
40-
{
41-
// ..
42-
}
2+
define('STUB_DIR', realpath(__DIR__ . '/stub'));

tests/HttpTest.php renamed to tests/feature/HttpTest.php

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
<?php
22

3+
use GuzzleHttp\Client;
34
use GuzzleHttp\Cookie\CookieJar;
45
use Symfony\Component\Process\Process;
5-
use GuzzleHttp\Client;
66

77
$process = null;
88
beforeAll(function () use (&$process) {
9-
$process = new Process(['php', 'think', 'worker'], __DIR__ . '/stub/');
9+
$process = new Process(['php', 'think', 'worker'], STUB_DIR);
1010
$process->start();
1111
$wait = 0;
1212

@@ -25,15 +25,16 @@
2525
});
2626

2727
beforeEach(function () {
28-
$this->client = new Client([
28+
$this->httpClient = new Client([
2929
'base_uri' => 'http://127.0.0.1:8080',
3030
'cookies' => true,
3131
'http_errors' => false,
32+
'timeout' => 1,
3233
]);
3334
});
3435

3536
it('callback route', function () {
36-
$response = $this->client->get('/');
37+
$response = $this->httpClient->get('/');
3738

3839
expect($response->getStatusCode())
3940
->toBe(200)
@@ -44,7 +45,7 @@
4445
it('controller route', function () {
4546
$jar = new CookieJar();
4647

47-
$response = $this->client->get('/test', ['cookies' => $jar]);
48+
$response = $this->httpClient->get('/test', ['cookies' => $jar]);
4849

4950
expect($response->getStatusCode())
5051
->toBe(200)
@@ -59,7 +60,7 @@
5960
$data = [
6061
'name' => 'think',
6162
];
62-
$response = $this->client->post('/json', [
63+
$response = $this->httpClient->post('/json', [
6364
'json' => $data,
6465
]);
6566

@@ -70,14 +71,14 @@
7071
});
7172

7273
it('put and delete request', function () {
73-
$response = $this->client->put('/');
74+
$response = $this->httpClient->put('/');
7475

7576
expect($response->getStatusCode())
7677
->toBe(200)
7778
->and($response->getBody()->getContents())
7879
->toBe('put');
7980

80-
$response = $this->client->delete('/');
81+
$response = $this->httpClient->delete('/');
8182

8283
expect($response->getStatusCode())
8384
->toBe(200)
@@ -86,16 +87,16 @@
8687
});
8788

8889
it('file response', function () {
89-
$response = $this->client->get('/static/asset.txt');
90+
$response = $this->httpClient->get('/static/asset.txt');
9091

9192
expect($response->getStatusCode())
9293
->toBe(200)
9394
->and($response->getBody()->getContents())
94-
->toBe(file_get_contents(__DIR__ . '/stub/public/asset.txt'));
95+
->toBe(file_get_contents(STUB_DIR . '/public/asset.txt'));
9596
});
9697

9798
it('hot update', function () {
98-
$response = $this->client->get('/hot');
99+
$response = $this->httpClient->get('/hot');
99100

100101
expect($response->getStatusCode())
101102
->toBe(404);
@@ -110,16 +111,16 @@
110111
});
111112
PHP;
112113

113-
file_put_contents(__DIR__ . '/stub/route/hot.php', $route);
114+
file_put_contents(STUB_DIR . '/route/hot.php', $route);
114115

115116
sleep(2);
116117

117-
$response = $this->client->get('/hot');
118+
$response = $this->httpClient->get('/hot');
118119

119120
expect($response->getStatusCode())
120121
->toBe(200)
121122
->and($response->getBody()->getContents())
122123
->toBe('hot');
123124
})->after(function () {
124-
@unlink(__DIR__ . '/stub/route/hot.php');
125+
@unlink(STUB_DIR . '/route/hot.php');
125126
})->skipOnWindows();

tests/stub/config/worker.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,19 @@
1111

1212
return [
1313
'http' => [
14-
'enable' => true,
14+
'enable' => env('HTTP_ENABLE', true),
1515
'host' => '0.0.0.0',
1616
'port' => 8080,
1717
'worker_num' => 2,
1818
'options' => [],
1919
],
20+
//队列
21+
'queue' => [
22+
'enable' => env('QUEUE_ENABLE', false),
23+
'workers' => [
24+
'default' => [],
25+
],
26+
],
2027
'hot_update' => [
2128
'enable' => env('APP_DEBUG', false),
2229
'name' => ['*.php'],

0 commit comments

Comments
 (0)