Skip to content

Commit 371a08a

Browse files
authored
chore: remove mqtt dependency (#68)
1 parent 69e9446 commit 371a08a

File tree

10 files changed

+1
-133
lines changed

10 files changed

+1
-133
lines changed

agent/agent.go

-23
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"runtime"
88
"time"
99

10-
mqtt "github.com/eclipse/paho.mqtt.golang"
1110
"github.com/google/uuid"
1211
"github.com/mitchellh/mapstructure"
1312
"go.uber.org/zap"
@@ -32,8 +31,6 @@ type Agent interface {
3231
type orbAgent struct {
3332
logger *zap.Logger
3433
config config.Config
35-
client mqtt.Client
36-
agentID string
3734
backends map[string]backend.Backend
3835
backendState map[string]*backend.State
3936
backendsCommon config.BackendCommons
@@ -45,10 +42,6 @@ type orbAgent struct {
4542
heartbeatCtx context.Context
4643
heartbeatCancel context.CancelFunc
4744

48-
// Agent RPC channel, configured from command line
49-
baseTopic string
50-
rpcFromCoreTopic string
51-
5245
// Retry Mechanism to ensure the Request is received
5346
groupRequestSucceeded context.CancelFunc
5447
policyRequestSucceeded context.CancelFunc
@@ -149,13 +142,6 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
149142
a.rpcFromCancelFunc = cancelAllAsync
150143
a.cancelFunction = cancelFunc
151144
a.logger.Info("agent started", zap.String("version", version.GetBuildVersion()), zap.Any("routine", agentCtx.Value(routineKey)))
152-
mqtt.CRITICAL = &agentLoggerCritical{a: a}
153-
mqtt.ERROR = &agentLoggerError{a: a}
154-
155-
if a.config.OrbAgent.Debug.Enable {
156-
a.logger.Info("debug logging enabled")
157-
mqtt.DEBUG = &agentLoggerDebug{a: a}
158-
}
159145

160146
if err := a.startBackends(ctx); err != nil {
161147
return err
@@ -180,11 +166,6 @@ func (a *orbAgent) logoffWithHeartbeat(ctx context.Context) {
180166
if a.heartbeatCtx != nil {
181167
a.heartbeatCancel()
182168
}
183-
if a.client != nil && a.client.IsConnected() {
184-
if token := a.client.Unsubscribe(a.rpcFromCoreTopic); token.Wait() && token.Error() != nil {
185-
a.logger.Warn("failed to unsubscribe to RPC channel", zap.Error(token.Error()))
186-
}
187-
}
188169
}
189170

190171
func (a *orbAgent) Stop(ctx context.Context) {
@@ -201,9 +182,6 @@ func (a *orbAgent) Stop(ctx context.Context) {
201182
}
202183
}
203184
a.logoffWithHeartbeat(ctx)
204-
if a.client != nil && a.client.IsConnected() {
205-
a.client.Disconnect(0)
206-
}
207185
a.logger.Debug("stopping agent with number of go routines and go calls", zap.Int("goroutines", runtime.NumGoroutine()), zap.Int64("gocalls", runtime.NumCgoCall()))
208186
if a.policyRequestSucceeded != nil {
209187
a.policyRequestSucceeded()
@@ -237,7 +215,6 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin
237215
a.backendState[name].LastError = fmt.Sprintf("failed to reset backend: %v", err)
238216
a.logger.Error("failed to reset backend", zap.String("backend", name), zap.Error(err))
239217
}
240-
be.SetCommsClient(a.agentID, &a.client, fmt.Sprintf("%s/?/%s", a.baseTopic, name))
241218

242219
return nil
243220
}

agent/backend/backend.go

-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"time"
66

7-
mqtt "github.com/eclipse/paho.mqtt.golang"
87
"go.uber.org/zap"
98

109
"github.com/netboxlabs/orb-agent/agent/config"
@@ -49,7 +48,6 @@ func (s RunningStatus) String() string {
4948
// Backend is the interface that all backends must implement
5049
type Backend interface {
5150
Configure(*zap.Logger, policies.PolicyRepo, map[string]interface{}, config.BackendCommons) error
52-
SetCommsClient(string, *mqtt.Client, string)
5351
Version() (string, error)
5452
Start(ctx context.Context, cancelFunc context.CancelFunc) error
5553
Stop(ctx context.Context) error

agent/backend/devicediscovery/device_discovery.go

-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"net/http"
99
"time"
1010

11-
mqtt "github.com/eclipse/paho.mqtt.golang"
1211
"github.com/go-cmd/cmd"
1312
"go.uber.org/zap"
1413
"gopkg.in/yaml.v3"
@@ -50,8 +49,6 @@ type deviceDiscoveryBackend struct {
5049
statusChan <-chan cmd.Status
5150
cancelFunc context.CancelFunc
5251
ctx context.Context
53-
54-
mqttClient *mqtt.Client
5552
}
5653

5754
type info struct {
@@ -87,10 +84,6 @@ func (d *deviceDiscoveryBackend) Configure(logger *zap.Logger, repo policies.Pol
8784
return nil
8885
}
8986

90-
func (d *deviceDiscoveryBackend) SetCommsClient(_ string, client *mqtt.Client, _ string) {
91-
d.mqttClient = client
92-
}
93-
9487
func (d *deviceDiscoveryBackend) Version() (string, error) {
9588
var info info
9689
err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", versionTimeout)

agent/backend/networkdiscovery/network_discovery.go

-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"net/http"
99
"time"
1010

11-
mqtt "github.com/eclipse/paho.mqtt.golang"
1211
"github.com/go-cmd/cmd"
1312
"go.uber.org/zap"
1413
"gopkg.in/yaml.v3"
@@ -50,8 +49,6 @@ type networkDiscoveryBackend struct {
5049
statusChan <-chan cmd.Status
5150
cancelFunc context.CancelFunc
5251
ctx context.Context
53-
54-
mqttClient *mqtt.Client
5552
}
5653

5754
type info struct {
@@ -87,10 +84,6 @@ func (d *networkDiscoveryBackend) Configure(logger *zap.Logger, repo policies.Po
8784
return nil
8885
}
8986

90-
func (d *networkDiscoveryBackend) SetCommsClient(_ string, client *mqtt.Client, _ string) {
91-
d.mqttClient = client
92-
}
93-
9487
func (d *networkDiscoveryBackend) Version() (string, error) {
9588
var info info
9689
err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", versionTimeout)

agent/backend/otel/otel.go

-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"net/http"
99
"time"
1010

11-
mqtt "github.com/eclipse/paho.mqtt.golang"
1211
"github.com/go-cmd/cmd"
1312
"go.uber.org/zap"
1413
"gopkg.in/yaml.v3"
@@ -47,8 +46,6 @@ type openTelemetryBackend struct {
4746
statusChan <-chan cmd.Status
4847
cancelFunc context.CancelFunc
4948
ctx context.Context
50-
51-
mqttClient *mqtt.Client
5249
}
5350

5451
type info struct {
@@ -248,10 +245,6 @@ func (o *openTelemetryBackend) GetRunningStatus() (backend.RunningStatus, string
248245
return runningStatus, "", nil
249246
}
250247

251-
func (o *openTelemetryBackend) SetCommsClient(_ string, client *mqtt.Client, _ string) {
252-
o.mqttClient = client
253-
}
254-
255248
func (o *openTelemetryBackend) ApplyPolicy(data policies.PolicyData, updatePolicy bool) error {
256249
if updatePolicy {
257250
// To update a policy it's necessary first remove it and then apply a new version

agent/backend/pktvisor/pktvisor.go

+1-16
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@ package pktvisor
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"net/http"
87
"os/exec"
98
"strconv"
10-
"strings"
119
"time"
1210

13-
mqtt "github.com/eclipse/paho.mqtt.golang"
1411
"github.com/go-cmd/cmd"
1512
"go.uber.org/zap"
1613

@@ -53,11 +50,7 @@ type pktvisorBackend struct {
5350
startTime time.Time
5451
cancelFunc context.CancelFunc
5552
ctx context.Context
56-
57-
mqttClient *mqtt.Client
58-
metricsTopic string
59-
otlpMetricsTopic string
60-
policyRepo policies.PolicyRepo
53+
policyRepo policies.PolicyRepo
6154

6255
adminAPIHost string
6356
adminAPIPort string
@@ -79,14 +72,6 @@ func (p *pktvisorBackend) GetInitialState() backend.RunningStatus {
7972
return backend.Unknown
8073
}
8174

82-
func (p *pktvisorBackend) SetCommsClient(agentID string, client *mqtt.Client, baseTopic string) {
83-
p.mqttClient = client
84-
metricsTopic := strings.Replace(baseTopic, "?", "be", 1)
85-
otelMetricsTopic := strings.Replace(baseTopic, "?", "otlp", 1)
86-
p.metricsTopic = fmt.Sprintf("%s/m/%c", metricsTopic, agentID[0])
87-
p.otlpMetricsTopic = fmt.Sprintf("%s/m/%c", otelMetricsTopic, agentID[0])
88-
}
89-
9075
func (p *pktvisorBackend) GetRunningStatus() (backend.RunningStatus, string, error) {
9176
// first check process status
9277
runningStatus, errMsg, err := p.getProcRunningStatus()

agent/backend/worker/worker.go

-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"net/http"
99
"time"
1010

11-
mqtt "github.com/eclipse/paho.mqtt.golang"
1211
"github.com/go-cmd/cmd"
1312
"go.uber.org/zap"
1413
"gopkg.in/yaml.v3"
@@ -50,8 +49,6 @@ type workerBackend struct {
5049
statusChan <-chan cmd.Status
5150
cancelFunc context.CancelFunc
5251
ctx context.Context
53-
54-
mqttClient *mqtt.Client
5552
}
5653

5754
type info struct {
@@ -87,10 +84,6 @@ func (d *workerBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo,
8784
return nil
8885
}
8986

90-
func (d *workerBackend) SetCommsClient(_ string, client *mqtt.Client, _ string) {
91-
d.mqttClient = client
92-
}
93-
9487
func (d *workerBackend) Version() (string, error) {
9588
var info info
9689
err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", versionTimeout)

agent/logging.go

-58
This file was deleted.

go.mod

-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module github.com/netboxlabs/orb-agent
33
go 1.23.2
44

55
require (
6-
github.com/eclipse/paho.mqtt.golang v1.5.0
76
github.com/go-cmd/cmd v1.4.3
87
github.com/go-co-op/gocron/v2 v2.15.0
98
github.com/go-git/go-git/v5 v5.13.2
@@ -34,7 +33,6 @@ require (
3433
github.com/go-gorp/gorp/v3 v3.1.0 // indirect
3534
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
3635
github.com/google/pprof v0.0.0-20230228050547-1710fef4ab10 // indirect
37-
github.com/gorilla/websocket v1.5.3 // indirect
3836
github.com/hashicorp/hcl v1.0.0 // indirect
3937
github.com/inconshreveable/mousetrap v1.1.0 // indirect
4038
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect

go.sum

-4
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
2323
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2424
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
2525
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
26-
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
27-
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
2826
github.com/elazarl/goproxy v1.4.0 h1:4GyuSbFa+s26+3rmYNSuUVsx+HgPrV1bk1jXI0l9wjM=
2927
github.com/elazarl/goproxy v1.4.0/go.mod h1:X/5W/t+gzDyLfHW4DrMdpjqYjpXsURlBt9lpBDxZZZQ=
3028
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
@@ -64,8 +62,6 @@ github.com/google/pprof v0.0.0-20230228050547-1710fef4ab10 h1:CqYfpuYIjnlNxM3msd
6462
github.com/google/pprof v0.0.0-20230228050547-1710fef4ab10/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk=
6563
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
6664
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
67-
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
68-
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
6965
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
7066
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
7167
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=

0 commit comments

Comments
 (0)