Skip to content

Commit f104794

Browse files
Merge pull request #4 from williamchanrico/add_upstream_downstream_services
Add upstream downstream services
2 parents 91827e7 + 9efdc88 commit f104794

File tree

4 files changed

+311
-9
lines changed

4 files changed

+311
-9
lines changed

cmd/planet-federator/internal/internal.go

+67-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,15 @@ func (s Service) Run(ctx context.Context) error {
7272
cronScheduler := cron.New(cron.WithSeconds())
7373
_, err := cronScheduler.AddFunc(s.Config.CronJobSchedule, s.TrafficBandwidthJobFunc)
7474
if err != nil {
75-
return fmt.Errorf("Error adding function to Cron scheduler: %v", err)
75+
return fmt.Errorf("Error adding TrafficBandwidthJobFunc function to Cron scheduler: %v", err)
76+
}
77+
_, err = cronScheduler.AddFunc(s.Config.CronJobSchedule, s.UpstreamServicesJobFunc)
78+
if err != nil {
79+
return fmt.Errorf("Error adding UpstreamServicesJobFunc function to Cron scheduler: %v", err)
80+
}
81+
_, err = cronScheduler.AddFunc(s.Config.CronJobSchedule, s.DownstreamServicesJobFunc)
82+
if err != nil {
83+
return fmt.Errorf("Error adding DownstreamServicesJobFunc function to Cron scheduler: %v", err)
7684
}
7785
cronScheduler.Start()
7886

@@ -137,3 +145,61 @@ func (s Service) TrafficBandwidthJobFunc() {
137145

138146
log.Infof("Traffic Bandwidth Job took: %v", time.Since(jobStartTime))
139147
}
148+
149+
// UpstreamServicesJobFunc queries upstream services (planet-exporter) data from Prometheus and store
150+
// them in federator backend
151+
func (s Service) UpstreamServicesJobFunc() {
152+
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.Config.CronJobTimeoutSecond)*time.Second)
153+
defer cancel()
154+
155+
jobStartTime := time.Now()
156+
log.Debugf("A job started: %v", jobStartTime)
157+
158+
upstreamServices, err := s.PrometheusSvc.QueryPlanetExporterUpstreamServices(ctx, time.Now().Add(-15*time.Second), time.Now())
159+
if err != nil {
160+
log.Errorf("Error querying upstream services from prometheus: %v", err)
161+
}
162+
163+
for _, svc := range upstreamServices {
164+
_ = s.FederatorSvc.AddUpstreamService(ctx, federator.UpstreamService{
165+
LocalHostgroup: svc.LocalHostgroup,
166+
LocalAddress: svc.LocalAddress,
167+
LocalProcessName: svc.LocalProcessName,
168+
UpstreamPort: svc.UpstreamPort,
169+
UpstreamHostgroup: svc.UpstreamHostgroup,
170+
UpstreamAddress: svc.UpstreamAddress,
171+
Protocol: svc.Protocol,
172+
}, time.Now())
173+
}
174+
175+
log.Infof("Upstream Service Job took: %v", time.Since(jobStartTime))
176+
}
177+
178+
// DownstreamServicesJobFunc queries downstream services (planet-exporter) data from Prometheus and store
179+
// them in federator backend
180+
func (s Service) DownstreamServicesJobFunc() {
181+
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.Config.CronJobTimeoutSecond)*time.Second)
182+
defer cancel()
183+
184+
jobStartTime := time.Now()
185+
log.Debugf("A job started: %v", jobStartTime)
186+
187+
downstreamServices, err := s.PrometheusSvc.QueryPlanetExporterDownstreamServices(ctx, time.Now().Add(-15*time.Second), time.Now())
188+
if err != nil {
189+
log.Errorf("Error querying downstream services from prometheus: %v", err)
190+
}
191+
192+
for _, svc := range downstreamServices {
193+
_ = s.FederatorSvc.AddDownstreamService(ctx, federator.DownstreamService{
194+
LocalHostgroup: svc.LocalHostgroup,
195+
LocalAddress: svc.LocalAddress,
196+
LocalProcessName: svc.LocalProcessName,
197+
LocalPort: svc.LocalPort,
198+
DownstreamHostgroup: svc.DownstreamHostgroup,
199+
DownstreamAddress: svc.DownstreamAddress,
200+
Protocol: svc.Protocol,
201+
}, time.Now())
202+
}
203+
204+
log.Infof("Downstream Service Job took: %v", time.Since(jobStartTime))
205+
}

federator/federator.go

+44-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import (
2222
// Federator package handles storing pre-processed planet-exporter data from Prometheus to
2323
// Influxdb and/or other time-series databases.
2424

25-
// TrafficBandwidth represents services involved in an ingress/egress traffic
25+
// TrafficBandwidth represents a pair of services that are involved in an ingress/egress traffic
26+
// e.g. LocalHostgroup testapp transmit 10Mbps to RemoteHostgroup abc
2627
type TrafficBandwidth struct {
2728
LocalHostgroup string
2829
LocalAddress string
@@ -32,9 +33,40 @@ type TrafficBandwidth struct {
3233
Direction string
3334
}
3435

35-
// Backend interface for a time-series DB handling pre-processed planet-exporter data
36+
// UpstreamService represents a target upstream service dependency of a local service process
37+
// e.g. LocalHostgroup testapp depends on UpstreamHostgroup abc, on abc's port 9000 via TCP protocol.
38+
// LocalHostgroup -> UpstreamHostgroup:UpstreamPort
39+
// testapp -> abc:9000 (upstream)
40+
type UpstreamService struct {
41+
LocalHostgroup string
42+
LocalAddress string
43+
LocalProcessName string
44+
UpstreamPort string
45+
UpstreamHostgroup string
46+
UpstreamAddress string
47+
Protocol string
48+
}
49+
50+
// DownstreamService represents a target downstream service that depends on local service process
51+
// e.g. LocalHostgroup testapp has a dependency DownstreamHostgroup abc, on testapp's port 80 via TCP protocol.
52+
// LocalHostgroup:LocalPort <- DownstreamHostgroup
53+
// testapp:80 <- abc (downstream)
54+
type DownstreamService struct {
55+
LocalHostgroup string
56+
LocalAddress string
57+
LocalProcessName string
58+
LocalPort string
59+
DownstreamHostgroup string
60+
DownstreamAddress string
61+
Protocol string
62+
}
63+
64+
// Backend interface for a time-series DB that is handling pre-processed planet-exporter data
65+
// Planet Expoter <- Prometheus -> Planet Federator (pre-process) -> Time-series DB
3666
type Backend interface {
3767
AddTrafficBandwidthData(context.Context, TrafficBandwidth, time.Time) error
68+
AddUpstreamService(context.Context, UpstreamService, time.Time) error
69+
AddDownstreamService(context.Context, DownstreamService, time.Time) error
3870
Flush()
3971
}
4072

@@ -55,6 +87,16 @@ func (s Service) AddTrafficBandwidthData(ctx context.Context, trafficBandwidth T
5587
return s.backend.AddTrafficBandwidthData(ctx, trafficBandwidth, t)
5688
}
5789

90+
// AddUpstreamService adds an upstream of a local service
91+
func (s Service) AddUpstreamService(ctx context.Context, upstreamService UpstreamService, t time.Time) error {
92+
return s.backend.AddUpstreamService(ctx, upstreamService, t)
93+
}
94+
95+
// AddDownstreamService adds a downstream of a local service
96+
func (s Service) AddDownstreamService(ctx context.Context, downstreamService DownstreamService, t time.Time) error {
97+
return s.backend.AddDownstreamService(ctx, downstreamService, t)
98+
}
99+
58100
// Flush any buffers related to backend
59101
func (s Service) Flush() {
60102
s.backend.Flush()

federator/influxdb/influxdb.go

+69-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
2424
influxdb2api "github.com/influxdata/influxdb-client-go/v2/api"
25+
log "github.com/sirupsen/logrus"
2526
)
2627

2728
// Backend interface for a time-series DB handling pre-processed planet-exporter data
@@ -34,26 +35,56 @@ type Backend struct {
3435

3536
// New returns new influxdb federator backend
3637
func New(c influxdb2.Client, org, bucket string) Backend {
38+
writeAPI := c.WriteAPI(org, bucket)
39+
40+
errChan := writeAPI.Errors()
41+
go func() {
42+
for err := range errChan {
43+
log.Errorf("Async error received on influxdb writes API: %v", err)
44+
}
45+
}()
46+
3747
return Backend{
3848
client: c,
39-
writeAPI: c.WriteAPI(org, bucket),
49+
writeAPI: writeAPI,
4050
org: org,
4151
bucket: bucket,
4252
}
4353
}
4454

4555
const (
56+
// Measurements
57+
58+
upstreamServiceMeasurement = "upstream"
59+
downstreamServiceMeasurement = "downstream"
60+
4661
ingressDirectionMeasurement = "ingress"
4762
egressDirectionMeasurement = "egress"
4863
unknownDirectionMeasurement = "unknown"
4964

50-
localServiceHostgroupTag = "service"
51-
localServiceAddressTag = "address"
65+
// Tags
66+
67+
localServiceHostgroupTag = "service"
68+
localServiceAddressTag = "address"
69+
localServicePortTag = "port"
70+
localServiceProcessNameTag = "process_name"
5271

5372
remoteServiceHostgroupTag = "remote_service"
5473
remoteServiceAddressTag = "remote_address"
5574

56-
bandwidthBpsField = "bandwidth_bps"
75+
upstreamServiceHostgroupTag = "upstream_service"
76+
upstreamServiceAddressTag = "upstream_address"
77+
upstreamServicePortTag = "upstream_port"
78+
79+
downstreamServiceHostgroupTag = "upstream_service"
80+
downstreamServiceAddressTag = "upstream_address"
81+
82+
protocolTag = "protocol"
83+
84+
// Fields
85+
86+
bandwidthBpsField = "bandwidth_bps"
87+
serviceDependencyField = "service_dependency"
5788
)
5889

5990
// AddTrafficBandwidthData adds a service's ingress bytes data point
@@ -93,6 +124,40 @@ func (b Backend) addBytesMeasurement(ctx context.Context, measurement string, tr
93124
return nil
94125
}
95126

127+
// AddUpstreamService adds an upstream service dependency of a service
128+
func (b Backend) AddUpstreamService(ctx context.Context, upstreamService federator.UpstreamService, t time.Time) error {
129+
dataPoint := influxdb2.NewPointWithMeasurement(upstreamServiceMeasurement).
130+
AddTag(localServiceHostgroupTag, upstreamService.LocalHostgroup).
131+
AddTag(localServiceAddressTag, upstreamService.LocalAddress).
132+
AddTag(upstreamServiceHostgroupTag, upstreamService.UpstreamHostgroup).
133+
AddTag(upstreamServiceAddressTag, upstreamService.UpstreamAddress).
134+
AddTag(upstreamServicePortTag, upstreamService.UpstreamPort).
135+
AddTag(localServiceProcessNameTag, upstreamService.LocalProcessName).
136+
AddTag(protocolTag, upstreamService.Protocol).
137+
AddField(serviceDependencyField, 1).
138+
SetTime(t)
139+
b.writeAPI.WritePoint(dataPoint)
140+
141+
return nil
142+
}
143+
144+
// AddDownstreamService adds a downstream service dependency of a service
145+
func (b Backend) AddDownstreamService(ctx context.Context, downstreamService federator.DownstreamService, t time.Time) error {
146+
dataPoint := influxdb2.NewPointWithMeasurement(downstreamServiceMeasurement).
147+
AddTag(localServiceHostgroupTag, downstreamService.LocalHostgroup).
148+
AddTag(localServiceAddressTag, downstreamService.LocalAddress).
149+
AddTag(localServicePortTag, downstreamService.LocalPort).
150+
AddTag(localServiceProcessNameTag, downstreamService.LocalProcessName).
151+
AddTag(downstreamServiceHostgroupTag, downstreamService.DownstreamHostgroup).
152+
AddTag(downstreamServiceAddressTag, downstreamService.DownstreamAddress).
153+
AddTag(protocolTag, downstreamService.Protocol).
154+
AddField(serviceDependencyField, 1).
155+
SetTime(t)
156+
b.writeAPI.WritePoint(dataPoint)
157+
158+
return nil
159+
}
160+
96161
// Flush all influxdb writes
97162
func (b Backend) Flush() {
98163
b.writeAPI.Flush()

0 commit comments

Comments
 (0)