Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.

Commit 81964bf

Browse files
committed
- added support for job metrics
- dependency bump
1 parent 5794659 commit 81964bf

File tree

8 files changed

+269
-32
lines changed

8 files changed

+269
-32
lines changed

.rr.yaml

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
jobs:
2+
# worker pool configuration
3+
workers:
4+
command: "php consumer.php"
5+
pool:
6+
numWorkers: 4
7+
8+
# rabbitmq and similar servers
9+
amqp:
10+
addr: amqp://guest:guest@localhost:5672/
11+
12+
# beanstalk configuration
13+
beanstalk:
14+
addr: tcp://localhost:11300
15+
16+
# amazon sqs configuration
17+
sqs:
18+
key: api-key
19+
secret: api-secret
20+
region: us-west-1
21+
endpoint: http://localhost:9324
22+
23+
# job destinations and options
24+
dispatch:
25+
spiral-jobs-tests-amqp-*.pipeline: amqp
26+
spiral-jobs-tests-local-*.pipeline: local
27+
spiral-jobs-tests-beanstalk-*.pipeline: beanstalk
28+
spiral-jobs-tests-sqs-*.pipeline: sqs
29+
30+
# list of broker pipelines associated with endpoints
31+
pipelines:
32+
local:
33+
broker: ephemeral
34+
35+
amqp:
36+
broker: amqp
37+
queue: default
38+
39+
beanstalk:
40+
broker: beanstalk
41+
tube: default
42+
43+
sqs:
44+
broker: sqs
45+
queue: default
46+
declare:
47+
MessageRetentionPeriod: 86400
48+
49+
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
50+
consume: ["local", "amqp", "beanstalk", "sqs"]
51+
52+
metrics:
53+
# prometheus client address (path /metrics added automatically)
54+
address: localhost:2112
55+
56+
# list of metrics to collect from application
57+
collect:
58+
# metric name
59+
app_metric:
60+
# type [gauge, counter, histogram, symnmary]
61+
type: histogram
62+
63+
# short description
64+
help: "Custom application metric"
65+
66+
# metric groups/tags
67+
labels: ["type"]
68+
69+
# for histogram only
70+
buckets: [0.1, 0.2, 0.3, 1.0]
71+
72+
# monitors rr server(s)
73+
limit:
74+
# check worker state each second
75+
interval: 1
76+
77+
# custom watch configuration for each service
78+
services:
79+
# monitor queue workers
80+
jobs:
81+
# maximum allowed memory consumption per worker (soft)
82+
maxMemory: 100
83+
84+
# maximum time to live for the worker (soft)
85+
TTL: 0
86+
87+
# maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft)
88+
idleTTL: 0
89+
90+
# max_execution_time (brutal)
91+
execTTL: 60

cmd/rr-jobs/jobs/debug.go

-24
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,6 @@ func (s *debugger) listener(event int, ctx interface{}) {
6060
e.ID,
6161
))
6262

63-
// case jobs.EventJobStart:
64-
// e := ctx.(*jobs.JobEvent)
65-
// s.logger.Info(util.Sprintf(
66-
// "job.<cyan+h>RECV</reset> <white+hb>%s</reset> <gray+hb>%s</reset>",
67-
// e.Job.Job,
68-
// e.ID,
69-
// ))
70-
7163
case jobs.EventJobOK:
7264
e := ctx.(*jobs.JobEvent)
7365
s.logger.Info(util.Sprintf(
@@ -95,14 +87,6 @@ func (s *debugger) listener(event int, ctx interface{}) {
9587
e.Error(),
9688
))
9789

98-
// case jobs.EventPipeConsume:
99-
// e := ctx.(*jobs.Pipeline)
100-
// s.logger.Info(util.Sprintf(
101-
// "[%s]: resuming {<yellow+hb>%s</reset>}",
102-
// e.Broker(),
103-
// e.Name(),
104-
// ))
105-
10690
case jobs.EventPipeActive:
10791
e := ctx.(*jobs.Pipeline)
10892
s.logger.Info(util.Sprintf(
@@ -111,14 +95,6 @@ func (s *debugger) listener(event int, ctx interface{}) {
11195
e.Name(),
11296
))
11397

114-
// case jobs.EventPipeStop:
115-
// e := ctx.(*jobs.Pipeline)
116-
// s.logger.Info(util.Sprintf(
117-
// "[%s]: stopping {<yellow+hb>%s</reset>}",
118-
// e.Broker(),
119-
// e.Name(),
120-
// ))
121-
12298
case jobs.EventPipeStopped:
12399
e := ctx.(*jobs.Pipeline)
124100
s.logger.Debugf(util.Sprintf(

cmd/rr-jobs/jobs/metrics.go

+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright (c) 2018 SpiralScout
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in all
11+
// copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19+
// SOFTWARE.
20+
21+
package jobs
22+
23+
import (
24+
"github.com/prometheus/client_golang/prometheus"
25+
"github.com/spf13/cobra"
26+
"github.com/spiral/jobs"
27+
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
28+
"github.com/spiral/roadrunner/service/metrics"
29+
"github.com/spiral/roadrunner/util"
30+
"time"
31+
)
32+
33+
func init() {
34+
cobra.OnInitialize(func() {
35+
svc, _ := rr.Container.Get(metrics.ID)
36+
mtr, ok := svc.(*metrics.Service)
37+
if !ok || !mtr.Enabled() {
38+
return
39+
}
40+
41+
ht, _ := rr.Container.Get(jobs.ID)
42+
if jbs, ok := ht.(*jobs.Service); ok {
43+
collector := newCollector()
44+
45+
// register metrics
46+
mtr.MustRegister(collector.jobCounter)
47+
mtr.MustRegister(collector.jobDuration)
48+
mtr.MustRegister(collector.workersMemory)
49+
50+
// collect events
51+
jbs.AddListener(collector.listener)
52+
53+
// update memory usage every 10 seconds
54+
go collector.collectMemory(jbs, time.Second*10)
55+
}
56+
})
57+
}
58+
59+
// listener provide debug callback for system events. With colors!
60+
type metricCollector struct {
61+
jobCounter *prometheus.CounterVec
62+
jobDuration *prometheus.HistogramVec
63+
workersMemory prometheus.Gauge
64+
}
65+
66+
func newCollector() *metricCollector {
67+
return &metricCollector{
68+
jobCounter: prometheus.NewCounterVec(
69+
prometheus.CounterOpts{
70+
Name: "rr_job_total",
71+
Help: "Total number of handled jobs after server restart.",
72+
},
73+
[]string{"job", "ok"},
74+
),
75+
jobDuration: prometheus.NewHistogramVec(
76+
prometheus.HistogramOpts{
77+
Name: "rr_job_duration_seconds",
78+
Help: "Job execution duration.",
79+
},
80+
[]string{"job", "ok"},
81+
),
82+
workersMemory: prometheus.NewGauge(
83+
prometheus.GaugeOpts{
84+
Name: "rr_jobs_workers_memory_bytes",
85+
Help: "Memory usage by Jobs workers.",
86+
},
87+
),
88+
}
89+
}
90+
91+
// listener listens to http events and generates nice looking output.
92+
func (c *metricCollector) listener(event int, ctx interface{}) {
93+
switch event {
94+
case jobs.EventJobOK:
95+
e := ctx.(*jobs.JobEvent)
96+
97+
c.jobCounter.With(prometheus.Labels{
98+
"job": e.Job.Job,
99+
"ok": "true",
100+
}).Inc()
101+
102+
c.jobDuration.With(prometheus.Labels{
103+
"job": e.Job.Job,
104+
"ok": "true",
105+
}).Observe(e.Elapsed().Seconds())
106+
107+
case jobs.EventJobError:
108+
e := ctx.(*jobs.JobError)
109+
110+
c.jobCounter.With(prometheus.Labels{
111+
"job": e.Job.Job,
112+
"ok": "false",
113+
}).Inc()
114+
115+
c.jobDuration.With(prometheus.Labels{
116+
"job": e.Job.Job,
117+
"ok": "false",
118+
}).Observe(e.Elapsed().Seconds())
119+
}
120+
}
121+
122+
// collect memory usage by server workers
123+
func (c *metricCollector) collectMemory(service *jobs.Service, tick time.Duration) {
124+
started := false
125+
for {
126+
server := service.Server()
127+
if server == nil && started {
128+
// stopped
129+
return
130+
}
131+
132+
started = true
133+
134+
if workers, err := util.ServerState(server); err == nil {
135+
sum := 0.0
136+
for _, w := range workers {
137+
sum = sum + float64(w.MemoryUsage)
138+
}
139+
140+
c.workersMemory.Set(sum)
141+
}
142+
143+
time.Sleep(tick)
144+
}
145+
}

cmd/rr-jobs/jobs/stop.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ import (
2828

2929
func init() {
3030
rr.CLI.AddCommand(&cobra.Command{
31-
Use: "jobs:stop",
32-
Short: "Stop job consuming for Job service brokers",
33-
RunE: stopHandler,
31+
Use: "jobs:pause",
32+
Short: "Pause job consuming for Job service brokers",
33+
RunE: pauseHandler,
3434
})
3535
}
3636

37-
func stopHandler(cmd *cobra.Command, args []string) error {
37+
func pauseHandler(cmd *cobra.Command, args []string) error {
3838
client, err := util.RPCClient(rr.Container)
3939
if err != nil {
4040
return err

cmd/rr-jobs/main.go

+6
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ import (
66
"github.com/spiral/jobs/broker/beanstalk"
77
"github.com/spiral/jobs/broker/ephemeral"
88
"github.com/spiral/jobs/broker/sqs"
9+
910
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
11+
"github.com/spiral/roadrunner/service/limit"
12+
"github.com/spiral/roadrunner/service/metrics"
1013
"github.com/spiral/roadrunner/service/rpc"
1114

1215
_ "github.com/spiral/jobs/cmd/rr-jobs/jobs"
@@ -23,5 +26,8 @@ func main() {
2326
},
2427
})
2528

29+
rr.Container.Register(metrics.ID, &metrics.Service{})
30+
rr.Container.Register(limit.ID, &limit.Service{})
31+
2632
rr.Execute()
2733
}

go.mod

+4-3
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ require (
66
github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37
77
github.com/dustin/go-humanize v1.0.0
88
github.com/gofrs/uuid v3.1.0+incompatible
9+
github.com/kr/beanstalk v0.0.0-20180818045031-cae1762e4858 // indirect
910
github.com/olekukonko/tablewriter v0.0.1
10-
github.com/pkg/errors v0.8.1 // indirect
11+
github.com/prometheus/client_golang v1.0.0
1112
github.com/sirupsen/logrus v1.3.0
1213
github.com/spf13/cobra v0.0.3
1314
github.com/spf13/viper v1.3.1
14-
github.com/spiral/roadrunner v1.4.1
15+
github.com/spiral/roadrunner v1.4.8
1516
github.com/streadway/amqp v0.0.0-20181205114330-a314942b2fd9
16-
github.com/stretchr/testify v1.2.2
17+
github.com/stretchr/testify v1.3.0
1718
)

service.go

+5
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ func (svc *Service) Stop() {
177177
svc.brokers.Stop()
178178
}
179179

180+
// Server returns associated rr server (if any).
181+
func (svc *Service) Server() *roadrunner.Server {
182+
return svc.rr
183+
}
184+
180185
// Stat returns list of pipelines workers and their stats.
181186
func (svc *Service) Stat(pipe *Pipeline) (stat *Stat, err error) {
182187
b, ok := svc.Brokers[pipe.Broker()]

tests/.rr.yaml

+14-1
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,17 @@ jobs:
4747
MessageRetentionPeriod: 86400
4848

4949
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
50-
consume: ["local", "amqp", "beanstalk", "sqs"]
50+
consume: ["local", "amqp", "beanstalk", "sqs"]
51+
52+
metrics:
53+
address: localhost:2112
54+
55+
# monitors rr server(s)
56+
limit:
57+
interval: 1
58+
services:
59+
jobs:
60+
maxMemory: 100
61+
TTL: 0
62+
idleTTL: 0
63+
execTTL: 60

0 commit comments

Comments
 (0)