Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ type OpAMPClient interface {
// May be also called from OnMessage handler.
RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error

// SetConnectionSettingsStatus sets the current ConnectionSettingsStatus.
// LastConnectionSettingsHash field must be non-nil.
// May be called anytime after Start(), including from OnMessage handler.
// nil values are not allowed and will return an error.
// Must be explicitly used after OnOpampConnectionSettings/OnConnectionSettings
// has completed with APPLIED or FAILED.
SetConnectionSettingsStatus(status *protobufs.ConnectionSettingsStatus) error

// SetCustomCapabilities modifies the set of customCapabilities supported by the client.
// The new customCapabilities will be sent with the next message to the server. If
// custom capabilities are used SHOULD be called before Start(). If not called before
Expand Down
175 changes: 175 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2757,3 +2757,178 @@ func generateTestAvailableComponents() *protobufs.AvailableComponents {
},
}
}

func TestSetConnectionSettingsStatus(t *testing.T) {
testCases := []struct {
name string
capabilities protobufs.AgentCapabilities
needsServer bool
testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer)
}{{
name: "no capability returns error",
capabilities: coreCapabilities,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
err := client.SetConnectionSettingsStatus(&protobufs.ConnectionSettingsStatus{
LastConnectionSettingsHash: []byte{1, 2, 3},
Status: protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED,
})
require.ErrorIs(t, err, internal.ErrReportsConnectionSettingsStatusNotSet)
},
}, {
name: "nil status returns error",
capabilities: coreCapabilities | protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
err := client.SetConnectionSettingsStatus(nil)
require.Error(t, err)
},
}, {
name: "nil hash returns error",
capabilities: coreCapabilities | protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
err := client.SetConnectionSettingsStatus(&protobufs.ConnectionSettingsStatus{
Status: protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED,
})
require.Error(t, err)
},
}, {
name: "sends status to server",
capabilities: coreCapabilities | protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus,
needsServer: true,
testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) {
gotApplied := new(atomic.Bool)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg.ConnectionSettingsStatus != nil &&
msg.ConnectionSettingsStatus.Status == protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED {
gotApplied.Store(true)
}
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
}

err := client.SetConnectionSettingsStatus(&protobufs.ConnectionSettingsStatus{
LastConnectionSettingsHash: []byte{1, 2, 3},
Status: protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED,
})
require.NoError(t, err)
eventually(t, func() bool { return gotApplied.Load() })
},
}, {
name: "duplicate status is no-op",
capabilities: coreCapabilities | protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus,
needsServer: true,
testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) {
var appliedCount atomic.Int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg.ConnectionSettingsStatus != nil &&
msg.ConnectionSettingsStatus.Status == protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED {
appliedCount.Add(1)
}
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
}

status := &protobufs.ConnectionSettingsStatus{
LastConnectionSettingsHash: []byte{1, 2, 3},
Status: protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED,
}

// First call should send to server.
err := client.SetConnectionSettingsStatus(status)
require.NoError(t, err)
eventually(t, func() bool { return appliedCount.Load() == 1 })

// Second call with identical status should be a no-op (updateStoredConnectionSettingsStatus returns false).
err = client.SetConnectionSettingsStatus(status)
require.NoError(t, err)

eventually(t, func() bool { return appliedCount.Load() == 1 })
},
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
var srv *internal.MockServer
var settings types.StartSettings

if tc.needsServer {
srv = internal.StartMockServer(t)
defer srv.Close()
settings.OpAMPServerURL = "ws://" + srv.Endpoint
} else {
settings = createNoServerSettings()
}
settings.Capabilities = tc.capabilities

startClient(t, settings, client)
tc.testFunc(t, client, srv)

err := client.Stop(t.Context())
require.NoError(t, err)
})
})
}
}

// TestSetConnectionSettingsStatusAsync tests that when the server offers connection settings,
// the client sets APPLYING automatically, and then the agent can asynchronously set APPLIED via
// SetConnectionSettingsStatus.
func TestSetConnectionSettingsStatusAsync(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
hash := []byte{1, 2, 3}
gotApplying := new(atomic.Bool)
gotApplied := new(atomic.Bool)
callbackCalled := new(atomic.Bool)

srv := internal.StartMockServer(t)
defer srv.Close()

firstMessage := true
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg.ConnectionSettingsStatus != nil {
switch msg.ConnectionSettingsStatus.Status {
case protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLYING:
gotApplying.Store(true)
case protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED:
gotApplied.Store(true)
}
}
resp := &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
if firstMessage {
firstMessage = false
resp.ConnectionSettings = &protobufs.ConnectionSettingsOffers{
Hash: hash,
Opamp: &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"},
}
}
return resp
}

capabilities := coreCapabilities |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings |
protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus
settings := types.StartSettings{
Capabilities: capabilities,
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.Callbacks{
OnOpampConnectionSettings: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error {
callbackCalled.Store(true)
return nil
},
},
}
startClient(t, settings, client)

eventually(t, func() bool { return gotApplying.Load() })
eventually(t, func() bool { return callbackCalled.Load() })

err := client.SetConnectionSettingsStatus(&protobufs.ConnectionSettingsStatus{
LastConnectionSettingsHash: hash,
Status: protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED,
})
require.NoError(t, err)

eventually(t, func() bool { return gotApplied.Load() })

err = client.Stop(t.Context())
require.NoError(t, err)
})
}
5 changes: 5 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (c *httpClient) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus)
return c.common.SetRemoteConfigStatus(status)
}

// SetConnectionSettingsStatus implements OpAMPClient.SetConnectionSettingsStatus.
func (c *httpClient) SetConnectionSettingsStatus(status *protobufs.ConnectionSettingsStatus) error {
return c.common.SetConnectionSettingsStatus(status)
}

// SetPackageStatuses implements OpAMPClient.SetPackageStatuses.
func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error {
return c.common.SetPackageStatuses(statuses)
Expand Down
61 changes: 48 additions & 13 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@ import (
)

var (
ErrAgentDescriptionMissing = errors.New("AgentDescription is nil")
ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined")
ErrHealthMissing = errors.New("health is nil")
ErrReportsEffectiveConfigNotSet = errors.New("ReportsEffectiveConfig capability is not set")
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrCapabilitiesNotSet = errors.New("Capabilities is not set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set")
ErrAgentDescriptionMissing = errors.New("AgentDescription is nil")
ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined")
ErrHealthMissing = errors.New("health is nil")
ErrReportsEffectiveConfigNotSet = errors.New("ReportsEffectiveConfig capability is not set")
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrCapabilitiesNotSet = errors.New("Capabilities is not set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil")
ErrReportsConnectionSettingsStatusNotSet = errors.New("ReportsConnectionSettingsStatus capability is not set")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set")
errLastConnectionSettingsHashEmpty = errors.New("LastConnectionSettingsHash is empty")
)

// ClientCommon contains the OpAMP logic that is common between WebSocket and
Expand Down Expand Up @@ -321,6 +323,39 @@ func (c *ClientCommon) RequestConnectionSettings(request *protobufs.ConnectionSe
return nil
}

// SetConnectionSettingsStatus sends a status update to the Server with the new ConnectionSettingsStatus.
// It also remembers the new status in the client state so that it can be sent
// to the Server when the Server asks for it.
func (c *ClientCommon) SetConnectionSettingsStatus(status *protobufs.ConnectionSettingsStatus) error {
if !c.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus) {
return ErrReportsConnectionSettingsStatusNotSet
}

if status == nil {
return errConnectionSettingsStatusMissing
}

if len(status.LastConnectionSettingsHash) == 0 {
return errLastConnectionSettingsHashEmpty
}

oldStatus := c.ClientSyncedState.ConnectionSettingsStatus()
if !updateStoredConnectionSettingsStatus(oldStatus, status) {
return nil
}

if err := c.ClientSyncedState.SetConnectionSettingsStatus(status); err != nil {
return err
}

c.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.ConnectionSettingsStatus = c.ClientSyncedState.ConnectionSettingsStatus()
})
c.sender.ScheduleSend()

return nil
}

// SetHealth sends a status update to the Server with the new agent health
// and remembers the health in the client state so that it can be sent
// to the Server when the Server asks for it.
Expand Down
56 changes: 0 additions & 56 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,34 +275,6 @@ func (r *receivedProcessor) rcvOpampConnectionSettings(ctx context.Context, sett
if err != nil {
r.logger.Errorf(ctx, "Failed to process OpAMPConnectionSettings: %v", err)
}
if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus) {
status := protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED
errMsg := ""
if err != nil {
status = protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_FAILED
errMsg = err.Error()
}

connectionStatus := &protobufs.ConnectionSettingsStatus{
LastConnectionSettingsHash: settings.Hash,
Status: status,
ErrorMessage: errMsg,
}
oldStatus := r.clientSyncedState.ConnectionSettingsStatus()

if !updateStoredConnectionSettingsStatus(oldStatus, connectionStatus) {
r.logger.Debugf(ctx, "Client skipping connection status state update from %v to %v", oldStatus.GetStatus(), connectionStatus.GetStatus())
return
}

if err := r.clientSyncedState.SetConnectionSettingsStatus(connectionStatus); err != nil {
r.logger.Errorf(ctx, "Unable to persist connection settings status %s state: %v", status.String(), err)
}
r.sender.NextMessage().Update(func(sendMsg *protobufs.AgentToServer) {
sendMsg.ConnectionSettingsStatus = connectionStatus
})
r.sender.ScheduleSend()
}
} else {
r.logger.Debugf(ctx, "Ignoring Opamp, agent does not have AcceptsOpAMPConnectionSettings capability")
}
Expand Down Expand Up @@ -339,34 +311,6 @@ func (r *receivedProcessor) rcvConnectionSettings(ctx context.Context, settings
if err != nil {
r.logger.Errorf(ctx, "Failed to process ConnectionSettings: %v", err)
}
if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus) {
status := protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_APPLIED
errMsg := ""
if err != nil {
status = protobufs.ConnectionSettingsStatuses_ConnectionSettingsStatuses_FAILED
errMsg = err.Error()
}

connectionStatus := &protobufs.ConnectionSettingsStatus{
LastConnectionSettingsHash: settings.Hash,
Status: status,
ErrorMessage: errMsg,
}
oldStatus := r.clientSyncedState.ConnectionSettingsStatus()

if !updateStoredConnectionSettingsStatus(oldStatus, connectionStatus) {
r.logger.Debugf(ctx, "Client skipping connection status state update from %v to %v", oldStatus.GetStatus(), connectionStatus.GetStatus())
return
}

if err := r.clientSyncedState.SetConnectionSettingsStatus(connectionStatus); err != nil {
r.logger.Errorf(ctx, "Unable to persist connection settings status %s state: %v", status.String(), err)
}
r.sender.NextMessage().Update(func(sendMsg *protobufs.AgentToServer) {
sendMsg.ConnectionSettingsStatus = connectionStatus
})
r.sender.ScheduleSend()
}
} else {
r.logger.Debugf(ctx, "Ignoring ConnectionSettings, agent does not have corresponding capability")
}
Expand Down
4 changes: 4 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ func (c *wsClient) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) e
return c.common.SetRemoteConfigStatus(status)
}

func (c *wsClient) SetConnectionSettingsStatus(status *protobufs.ConnectionSettingsStatus) error {
return c.common.SetConnectionSettingsStatus(status)
}

func (c *wsClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error {
return c.common.SetPackageStatuses(statuses)
}
Expand Down
Loading
Loading