Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.

Commit db77b8d

Browse files
authored
Merge pull request #28 from spiral/update_package_versioning
Update package versioning
2 parents 68c2c06 + 6e18706 commit db77b8d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+11383
-0
lines changed

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
module github.com/spiral/jobs
22

3+
go 1.14
4+
35
require (
46
github.com/aws/aws-sdk-go v1.16.14
57
github.com/beanstalkd/go-beanstalk v0.0.0-20180822062812-53ecdaa3bcfb

v2/broker.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package jobs
2+
3+
// Broker manages set of pipelines and provides ability to push jobs into them.
4+
type Broker interface {
5+
// Register broker pipeline.
6+
Register(pipe *Pipeline) error
7+
8+
// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
9+
// the service is started!
10+
Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error
11+
12+
// Push job into the worker.
13+
Push(pipe *Pipeline, j *Job) (string, error)
14+
15+
// Stat must fetch statistics about given pipeline or return error.
16+
Stat(pipe *Pipeline) (stat *Stat, err error)
17+
}
18+
19+
// EventProvider defines the ability to throw events for the broker.
20+
type EventProvider interface {
21+
// Listen attaches the even listener.
22+
Listen(lsn func(event int, ctx interface{}))
23+
}
24+
25+
// Stat contains information about pipeline.
26+
type Stat struct {
27+
// Pipeline name.
28+
Pipeline string
29+
30+
// Broken is name of associated broker.
31+
Broker string
32+
33+
// InternalName defines internal broker specific pipeline name.
34+
InternalName string
35+
36+
// Consuming indicates that pipeline is pipelines jobs.
37+
Consuming bool
38+
39+
// testQueue defines number of pending jobs.
40+
Queue int64
41+
42+
// Active defines number of jobs which are currently being processed.
43+
Active int64
44+
45+
// Delayed defines number of jobs which are being processed.
46+
Delayed int64
47+
}

v2/broker/amqp/broker.go

+215
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package amqp
2+
3+
import (
4+
"fmt"
5+
"github.com/gofrs/uuid"
6+
"github.com/spiral/jobs/v2"
7+
"sync"
8+
"sync/atomic"
9+
)
10+
11+
// Broker represents AMQP broker.
12+
type Broker struct {
13+
cfg *Config
14+
lsn func(event int, ctx interface{})
15+
publish *chanPool
16+
consume *chanPool
17+
mu sync.Mutex
18+
wait chan error
19+
stopped chan interface{}
20+
queues map[*jobs.Pipeline]*queue
21+
}
22+
23+
// Listen attaches server event watcher.
24+
func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
25+
b.lsn = lsn
26+
}
27+
28+
// Init configures AMQP job broker (always 2 connections).
29+
func (b *Broker) Init(cfg *Config) (ok bool, err error) {
30+
b.cfg = cfg
31+
b.queues = make(map[*jobs.Pipeline]*queue)
32+
33+
return true, nil
34+
}
35+
36+
// Register broker pipeline.
37+
func (b *Broker) Register(pipe *jobs.Pipeline) error {
38+
b.mu.Lock()
39+
defer b.mu.Unlock()
40+
41+
if _, ok := b.queues[pipe]; ok {
42+
return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
43+
}
44+
45+
q, err := newQueue(pipe, b.throw)
46+
if err != nil {
47+
return err
48+
}
49+
50+
b.queues[pipe] = q
51+
52+
return nil
53+
}
54+
55+
// Serve broker pipelines.
56+
func (b *Broker) Serve() (err error) {
57+
b.mu.Lock()
58+
59+
if b.publish, err = newConn(b.cfg.dial, b.cfg.TimeoutDuration()); err != nil {
60+
b.mu.Unlock()
61+
return err
62+
}
63+
defer b.publish.Close()
64+
65+
if b.consume, err = newConn(b.cfg.dial, b.cfg.TimeoutDuration()); err != nil {
66+
b.mu.Unlock()
67+
return err
68+
}
69+
defer b.consume.Close()
70+
71+
for _, q := range b.queues {
72+
err := q.declare(b.publish, q.name, q.name, nil)
73+
if err != nil {
74+
b.mu.Unlock()
75+
return err
76+
}
77+
}
78+
79+
for _, q := range b.queues {
80+
if q.execPool != nil {
81+
go q.serve(b.publish, b.consume)
82+
}
83+
}
84+
85+
b.wait = make(chan error)
86+
b.stopped = make(chan interface{})
87+
defer close(b.stopped)
88+
89+
b.mu.Unlock()
90+
91+
b.throw(jobs.EventBrokerReady, b)
92+
93+
return <-b.wait
94+
}
95+
96+
// Stop all pipelines.
97+
func (b *Broker) Stop() {
98+
b.mu.Lock()
99+
defer b.mu.Unlock()
100+
101+
if b.wait == nil {
102+
return
103+
}
104+
105+
for _, q := range b.queues {
106+
q.stop()
107+
}
108+
109+
close(b.wait)
110+
<-b.stopped
111+
}
112+
113+
// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
114+
// the service is started!
115+
func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
116+
b.mu.Lock()
117+
defer b.mu.Unlock()
118+
119+
q, ok := b.queues[pipe]
120+
if !ok {
121+
return fmt.Errorf("undefined queue `%s`", pipe.Name())
122+
}
123+
124+
q.stop()
125+
126+
q.execPool = execPool
127+
q.errHandler = errHandler
128+
129+
if b.publish != nil && q.execPool != nil {
130+
if q.execPool != nil {
131+
go q.serve(b.publish, b.consume)
132+
}
133+
}
134+
135+
return nil
136+
}
137+
138+
// Push job into the worker.
139+
func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
140+
if err := b.isServing(); err != nil {
141+
return "", err
142+
}
143+
144+
id, err := uuid.NewV4()
145+
if err != nil {
146+
return "", err
147+
}
148+
149+
q := b.queue(pipe)
150+
if q == nil {
151+
return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
152+
}
153+
154+
if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil {
155+
return "", err
156+
}
157+
158+
return id.String(), nil
159+
}
160+
161+
// Stat must fetch statistics about given pipeline or return error.
162+
func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
163+
if err := b.isServing(); err != nil {
164+
return nil, err
165+
}
166+
167+
q := b.queue(pipe)
168+
if q == nil {
169+
return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
170+
}
171+
172+
queue, err := q.inspect(b.publish)
173+
if err != nil {
174+
return nil, err
175+
}
176+
177+
// this the closest approximation we can get for now
178+
return &jobs.Stat{
179+
InternalName: queue.Name,
180+
Queue: int64(queue.Messages),
181+
Active: int64(atomic.LoadInt32(&q.running)),
182+
}, nil
183+
}
184+
185+
// check if broker is serving
186+
func (b *Broker) isServing() error {
187+
b.mu.Lock()
188+
defer b.mu.Unlock()
189+
190+
if b.wait == nil {
191+
return fmt.Errorf("broker is not running")
192+
}
193+
194+
return nil
195+
}
196+
197+
// queue returns queue associated with the pipeline.
198+
func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
199+
b.mu.Lock()
200+
defer b.mu.Unlock()
201+
202+
q, ok := b.queues[pipe]
203+
if !ok {
204+
return nil
205+
}
206+
207+
return q
208+
}
209+
210+
// throw handles service, server and pool events.
211+
func (b *Broker) throw(event int, ctx interface{}) {
212+
if b.lsn != nil {
213+
b.lsn(event, ctx)
214+
}
215+
}

0 commit comments

Comments
 (0)