Skip to content

Commit c675818

Browse files
Merge pull request #5 from williamchanrico/support_time_offset
Add support for cron-job-time-offset flag
2 parents e69de6e + 007d842 commit c675818

File tree

4 files changed

+55
-18
lines changed

4 files changed

+55
-18
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
NAME := planet-exporter
44
BIN_DIRECTORY := ./bin
55
REVISION := $(shell git rev-parse --short HEAD 2>/dev/null)
6-
VERSION := v0.1.0
6+
VERSION := v0.1.1
77

88
ifndef REVISION
99
override REVISION = none

cmd/planet-federator/internal/internal.go

+37-15
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,21 @@ import (
3232
// Config contains main service config options
3333
type Config struct {
3434
// Main config
35+
// CronJobSchedule schedule using cron format used by the Quartz Scheduler
36+
// 1. Seconds
37+
// 2. Minutes
38+
// 3. Hours
39+
// 4. Day-of-Month
40+
// 5. Month
41+
// 6. Day-of-Week
42+
// 7. Year (optional field)
3543
CronJobSchedule string
3644
CronJobTimeoutSecond int
37-
LogLevel string
38-
LogDisableTimestamp bool
39-
LogDisableColors bool
45+
// CronJobTimeOffset all cron job start time (e.g. '-5m' will query data from 5 minutes ago)
46+
CronJobTimeOffset time.Duration
47+
LogLevel string
48+
LogDisableTimestamp bool
49+
LogDisableColors bool
4050

4151
InfluxdbAddr string
4252
InfluxdbToken string
@@ -118,16 +128,28 @@ func (s Service) Run(ctx context.Context) error {
118128
return nil
119129
}
120130

131+
// getCronJobStartTime returns the time for cron job starting point
132+
func (s Service) getCronJobStartTime() time.Time {
133+
// We want to offset the query time by the specified offset
134+
return time.Now().Add(s.Config.CronJobTimeOffset)
135+
}
136+
137+
// getCronJobDuration returns the duration since the cron job was started
138+
func (s Service) getCronJobDuration(startTime time.Time) time.Duration {
139+
// We want to offset the query time by the specified offset
140+
return time.Now().Add(s.Config.CronJobTimeOffset).Sub(startTime)
141+
}
142+
121143
// TrafficBandwidthJobFunc queries traffic bandwidth (planet-exporter) data from Prometheus and store
122144
// them in federator backend
123145
func (s Service) TrafficBandwidthJobFunc() {
124146
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.Config.CronJobTimeoutSecond)*time.Second)
125147
defer cancel()
126148

127-
jobStartTime := time.Now()
149+
jobStartTime := s.getCronJobStartTime()
128150
log.Debugf("A job started: %v", jobStartTime)
129151

130-
trafficPeers, err := s.PrometheusSvc.QueryPlanetExporterTrafficBandwidth(ctx, time.Now().Add(-15*time.Second), time.Now())
152+
trafficPeers, err := s.PrometheusSvc.QueryPlanetExporterTrafficBandwidth(ctx, jobStartTime.Add(-15*time.Second), jobStartTime)
131153
if err != nil {
132154
log.Errorf("Error querying traffic peers from prometheus: %v", err)
133155
}
@@ -140,10 +162,10 @@ func (s Service) TrafficBandwidthJobFunc() {
140162
RemoteDomain: trafficPeer.RemoteDomain,
141163
BitsPerSecond: trafficPeer.BandwidthBitsPerSecond,
142164
Direction: trafficPeer.Direction,
143-
}, time.Now())
165+
}, jobStartTime)
144166
}
145167

146-
log.Infof("Traffic Bandwidth Job took: %v", time.Since(jobStartTime))
168+
log.Infof("Traffic Bandwidth Job took: %v", s.getCronJobDuration(jobStartTime))
147169
}
148170

149171
// UpstreamServicesJobFunc queries upstream services (planet-exporter) data from Prometheus and store
@@ -152,10 +174,10 @@ func (s Service) UpstreamServicesJobFunc() {
152174
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.Config.CronJobTimeoutSecond)*time.Second)
153175
defer cancel()
154176

155-
jobStartTime := time.Now()
177+
jobStartTime := s.getCronJobStartTime()
156178
log.Debugf("A job started: %v", jobStartTime)
157179

158-
upstreamServices, err := s.PrometheusSvc.QueryPlanetExporterUpstreamServices(ctx, time.Now().Add(-15*time.Second), time.Now())
180+
upstreamServices, err := s.PrometheusSvc.QueryPlanetExporterUpstreamServices(ctx, jobStartTime.Add(-15*time.Second), jobStartTime)
159181
if err != nil {
160182
log.Errorf("Error querying upstream services from prometheus: %v", err)
161183
}
@@ -169,10 +191,10 @@ func (s Service) UpstreamServicesJobFunc() {
169191
UpstreamHostgroup: svc.UpstreamHostgroup,
170192
UpstreamAddress: svc.UpstreamAddress,
171193
Protocol: svc.Protocol,
172-
}, time.Now())
194+
}, jobStartTime)
173195
}
174196

175-
log.Infof("Upstream Service Job took: %v", time.Since(jobStartTime))
197+
log.Infof("Upstream Service Job took: %v", s.getCronJobDuration(jobStartTime))
176198
}
177199

178200
// DownstreamServicesJobFunc queries downstream services (planet-exporter) data from Prometheus and store
@@ -181,10 +203,10 @@ func (s Service) DownstreamServicesJobFunc() {
181203
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.Config.CronJobTimeoutSecond)*time.Second)
182204
defer cancel()
183205

184-
jobStartTime := time.Now()
206+
jobStartTime := s.getCronJobStartTime()
185207
log.Debugf("A job started: %v", jobStartTime)
186208

187-
downstreamServices, err := s.PrometheusSvc.QueryPlanetExporterDownstreamServices(ctx, time.Now().Add(-15*time.Second), time.Now())
209+
downstreamServices, err := s.PrometheusSvc.QueryPlanetExporterDownstreamServices(ctx, jobStartTime.Add(-15*time.Second), jobStartTime)
188210
if err != nil {
189211
log.Errorf("Error querying downstream services from prometheus: %v", err)
190212
}
@@ -198,8 +220,8 @@ func (s Service) DownstreamServicesJobFunc() {
198220
DownstreamHostgroup: svc.DownstreamHostgroup,
199221
DownstreamAddress: svc.DownstreamAddress,
200222
Protocol: svc.Protocol,
201-
}, time.Now())
223+
}, jobStartTime)
202224
}
203225

204-
log.Infof("Downstream Service Job took: %v", time.Since(jobStartTime))
226+
log.Infof("Downstream Service Job took: %v", s.getCronJobDuration(jobStartTime))
205227
}

cmd/planet-federator/main.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"flag"
2020
"fmt"
2121
"os"
22+
"time"
2223

2324
"planet-exporter/cmd/planet-federator/internal"
2425
federator "planet-exporter/federator"
@@ -37,11 +38,20 @@ var (
3738
)
3839

3940
func main() {
41+
var err error
4042
var config internal.Config
4143

44+
// cronJobTimeOffsetDuration allows federator to go back in time. For example,
45+
// set '-10h30m' to tell federator to offset query time to 10 hours 30 minutes ago.
46+
//
47+
// This is useful when we want to integrate federator to existing Prometheus setup.
48+
// TODO: Allows running multiple jobs for federator to catch up faster.
49+
var cronJobTimeOffsetDuration string
50+
4251
// Main
43-
flag.StringVar(&config.CronJobSchedule, "cron-job-schedule", "*/30 * * * * *", "Cron jobs schedule to pre-process planet-exporter metrics into federator backend")
52+
flag.StringVar(&config.CronJobSchedule, "cron-job-schedule", "*/30 * * * * *", "Cron jobs schedule (Quartz Scheduler format: s m h dom mo dow y) to pre-process planet-exporter metrics into federator backend")
4453
flag.IntVar(&config.CronJobTimeoutSecond, "cron-job-timeout-second", 30, "Timeout per federator job in second")
54+
flag.StringVar(&cronJobTimeOffsetDuration, "cron-job-time-offset", "0s", "Cron jobs time offset. (e.g. '-1h5m' to query data from 1 hour 5 minutes ago)")
4555
flag.StringVar(&config.LogLevel, "log-level", "info", "Log level")
4656
flag.BoolVar(&config.LogDisableTimestamp, "log-disable-timestamp", false, "Disable timestamp on logger")
4757
flag.BoolVar(&config.LogDisableColors, "log-disable-colors", false, "Disable colors on logger")
@@ -64,6 +74,11 @@ func main() {
6474
os.Exit(0)
6575
}
6676

77+
config.CronJobTimeOffset, err = time.ParseDuration(cronJobTimeOffsetDuration)
78+
if err != nil {
79+
log.Fatalf("Error parsing cron-job-time-offset-minute: %v", err)
80+
}
81+
6782
log.SetFormatter(&log.TextFormatter{
6883
DisableColors: config.LogDisableColors,
6984
DisableTimestamp: config.LogDisableTimestamp,

federator/influxdb/influxdb.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (b Backend) addBytesMeasurement(ctx context.Context, measurement string, tr
118118
AddTag(remoteServiceHostgroupTag, trafficBandwidth.RemoteHostgroup).
119119
AddTag(remoteServiceAddressTag, trafficBandwidth.RemoteDomain).
120120
AddField(bandwidthBpsField, trafficBandwidth.BitsPerSecond).
121-
SetTime(time.Now())
121+
SetTime(t)
122122
b.writeAPI.WritePoint(dataPoint)
123123

124124
return nil

0 commit comments

Comments
 (0)