Skip to content

Commit cead526

Browse files
authored
Add CustomMessages demo examples (open-telemetry#496)
Added examples how to send and receive Custom Messages in example agent and server. This PR addresses open-telemetry#468. We improved examples while exploring custom messages. Changes: * in server UI there is new section where user see last xx received custom messages and can send custom message * agent listens for specific custom message and send back response
1 parent 433dc5c commit cead526

File tree

5 files changed

+176
-0
lines changed

5 files changed

+176
-0
lines changed

internal/examples/agent/agent/agent.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"crypto/x509"
1010
"crypto/x509/pkix"
1111
"encoding/pem"
12+
"errors"
1213
"fmt"
1314
"net/http"
1415
"os"
@@ -46,6 +47,8 @@ service:
4647
exporters: [otlp]
4748
`
4849

50+
const customCapability_Health = "io.opentelemetry.custom.health"
51+
4952
type Agent struct {
5053
logger types.Logger
5154

@@ -191,6 +194,16 @@ func (agent *Agent) connect(ops ...settingsOp) error {
191194
return err
192195
}
193196

197+
customCapabilities := &protobufs.CustomCapabilities{
198+
Capabilities: []string{
199+
customCapability_Health,
200+
},
201+
}
202+
err = agent.opampClient.SetCustomCapabilities(customCapabilities)
203+
if err != nil {
204+
return err
205+
}
206+
194207
// This sets the request to create a client certificate before the OpAMP client
195208
// is started, before the connection is established. However, this assumes the
196209
// server supports "AcceptsConnectionRequest" capability.
@@ -517,6 +530,10 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {
517530
agent.updateAgentIdentity(ctx, uid)
518531
}
519532

533+
if msg.CustomMessage != nil {
534+
agent.processCustomMessage(ctx, msg.CustomMessage)
535+
}
536+
520537
if configChanged {
521538
err := agent.opampClient.UpdateEffectiveConfig(ctx)
522539
if err != nil {
@@ -534,6 +551,53 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {
534551
agent.requestClientCertificate()
535552
}
536553

554+
func (agent *Agent) processCustomMessage(ctx context.Context, customMessage *protobufs.CustomMessage) {
555+
if customMessage == nil {
556+
return
557+
}
558+
559+
agent.logger.Debugf(ctx, "received custom message: capability=%s, type=%s, data=%s",
560+
customMessage.Capability,
561+
customMessage.Type,
562+
string(customMessage.Data),
563+
)
564+
565+
if customMessage.Capability == customCapability_Health {
566+
switch customMessage.Type {
567+
case "get_health":
568+
569+
responseMsg := &protobufs.CustomMessage{
570+
Capability: customCapability_Health,
571+
Type: "health_status",
572+
Data: []byte("OK"),
573+
}
574+
575+
if err := agent.sendCustomMessage(ctx, responseMsg); err != nil {
576+
agent.logger.Errorf(ctx, "Failed to send custom message: %v", err)
577+
}
578+
default:
579+
agent.logger.Errorf(ctx, "unknown custom message type: %s", customMessage.Type)
580+
}
581+
}
582+
}
583+
584+
func (agent *Agent) sendCustomMessage(ctx context.Context, message *protobufs.CustomMessage) error {
585+
for {
586+
sendingChan, err := agent.opampClient.SendCustomMessage(message)
587+
588+
switch {
589+
case err == nil:
590+
return nil
591+
case errors.Is(err, types.ErrCustomMessagePending):
592+
// Failed to send opamp message. Waiting for previous custom message to be sent
593+
<-sendingChan
594+
default:
595+
agent.logger.Errorf(ctx, "failed to send custom message: %w", err)
596+
return fmt.Errorf("failed to send custom message: %w", err)
597+
}
598+
}
599+
}
600+
537601
func (agent *Agent) tryChangeOpAMP(ctx context.Context, cert *tls.Certificate, tlsConfig *tls.Config, proxy *proxySettings) {
538602
agent.logger.Debugf(ctx, "Reconnecting to verify new OpAMP settings.\n")
539603
agent.disconnect(ctx)

internal/examples/html/html/agent.html

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,34 @@ <h3>OpAMP Connection Settings</h3>
147147

148148
</body>
149149
</html>
150+
151+
<hr/>
152+
153+
<h3>Custom Messages</h3>
154+
155+
<h4>Send Custom Message</h4>
156+
<form action="/send_custom_message" method="post">
157+
<input type="hidden" name="instanceid" value="{{ .InstanceIdStr }}"/>
158+
159+
<label for="capability">Capability:</label>
160+
<input type="text" name="capability" value="io.opentelemetry.custom.health" size="30"/><br/>
161+
162+
<label for="type">Type:</label>
163+
<input type="text" name="type" value="get_health" size="30"/><br/>
164+
165+
<label for="data">Data:</label>
166+
<input type="text" name="data" value="" size="30"/><br/>
167+
168+
<input type="submit" value="Send Custom Message" />
169+
</form>
170+
171+
<h4>Received Messages</h4>
172+
173+
{{ if eq (len .CustomMessageHistory) 0 }}
174+
No Received Messages
175+
{{ else }}
176+
Last {{ len .CustomMessageHistory }} Received Messages
177+
<pre class="log-container"><code>{{ range .CustomMessageHistory }}{{ . }}
178+
{{ end }}
179+
</code></pre>
180+
{{ end }}

internal/examples/server/data/agent.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ import (
1919
"github.com/open-telemetry/opamp-go/server/types"
2020
)
2121

22+
const (
23+
CustomMessageHistorySize = 15
24+
)
25+
2226
// Agent represents a connected Agent.
2327
type Agent struct {
2428
// Some fields in this struct are exported so that we can render them in the UI.
@@ -51,6 +55,9 @@ type Agent struct {
5155
ClientCertSha256Fingerprint string
5256
ClientCertOfferError string
5357

58+
// Custom message history
59+
CustomMessageHistory []string
60+
5461
// Remote config that we will give to this Agent.
5562
remoteConfig *protobufs.AgentRemoteConfig
5663

@@ -95,6 +102,7 @@ func (agent *Agent) CloneReadonly() *Agent {
95102
ClientCert: agent.ClientCert,
96103
ClientCertOfferError: agent.ClientCertOfferError,
97104
ClientCertSha256Fingerprint: agent.ClientCertSha256Fingerprint,
105+
CustomMessageHistory: agent.CustomMessageHistory,
98106
}
99107
}
100108

@@ -113,6 +121,10 @@ func (agent *Agent) UpdateStatus(
113121
agent.processConnectionSettingsRequest(statusMsg.ConnectionSettingsRequest.Opamp, response)
114122
}
115123

124+
if statusMsg.CustomMessage != nil {
125+
agent.processCustomMessage(statusMsg)
126+
}
127+
116128
statusUpdateWatchers := agent.statusUpdateWatchers
117129
agent.statusUpdateWatchers = nil
118130

@@ -122,6 +134,28 @@ func (agent *Agent) UpdateStatus(
122134
notifyStatusWatchers(statusUpdateWatchers)
123135
}
124136

137+
func (agent *Agent) processCustomMessage(statusMsg *protobufs.AgentToServer) {
138+
formattedMessage := fmt.Sprintf("[%s] capability=%s, type=%s, data=%s",
139+
time.Now().Format(time.DateTime),
140+
statusMsg.CustomMessage.Capability,
141+
statusMsg.CustomMessage.Type,
142+
string(statusMsg.CustomMessage.Data),
143+
)
144+
145+
agent.CustomMessageHistory = append(agent.CustomMessageHistory, formattedMessage)
146+
147+
if len(agent.CustomMessageHistory) > CustomMessageHistorySize {
148+
agent.CustomMessageHistory = agent.CustomMessageHistory[len(agent.CustomMessageHistory)-CustomMessageHistorySize:]
149+
}
150+
}
151+
152+
func (agent *Agent) SendCustomMessage(customMsg *protobufs.ServerToAgent) {
153+
agent.mux.Lock()
154+
defer agent.mux.Unlock()
155+
156+
agent.SendToAgent(customMsg)
157+
}
158+
125159
func notifyStatusWatchers(statusUpdateWatchers []chan<- struct{}) {
126160
// Notify everyone who is waiting on this Agent's status updates.
127161
for _, ch := range statusUpdateWatchers {

internal/examples/server/data/agents.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ func (agents *Agents) RemoveConnection(conn types.Connection) {
3131
delete(agents.connections, conn)
3232
}
3333

34+
func (agents *Agents) SendCustomMessageToAgent(
35+
agentId InstanceId,
36+
customMsg *protobufs.ServerToAgent,
37+
) {
38+
agent := agents.FindAgent(agentId)
39+
if agent != nil {
40+
agent.SendCustomMessage(customMsg)
41+
}
42+
}
43+
3444
func (agents *Agents) SetCustomConfigForAgent(
3545
agentId InstanceId,
3646
config *protobufs.AgentConfigMap,

internal/examples/server/uisrv/ui.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func Start(rootDir string) {
3333
mux.HandleFunc("/save_config", saveCustomConfigForInstance)
3434
mux.HandleFunc("/rotate_client_cert", rotateInstanceClientCert)
3535
mux.HandleFunc("/opamp_connection_settings", opampConnectionSettings)
36+
mux.HandleFunc("/send_custom_message", sendCustomMessage)
3637
srv = &http.Server{
3738
Addr: "0.0.0.0:4321",
3839
Handler: mux,
@@ -80,6 +81,42 @@ func renderAgent(w http.ResponseWriter, r *http.Request) {
8081
renderTemplate(w, "agent.html", agent)
8182
}
8283

84+
func sendCustomMessage(w http.ResponseWriter, r *http.Request) {
85+
if err := r.ParseForm(); err != nil {
86+
w.WriteHeader(http.StatusInternalServerError)
87+
return
88+
}
89+
90+
uid, err := uuid.Parse(r.Form.Get("instanceid"))
91+
if err != nil {
92+
w.WriteHeader(http.StatusNotFound)
93+
return
94+
}
95+
96+
instanceId := data.InstanceId(uid)
97+
agent := data.AllAgents.GetAgentReadonlyClone(instanceId)
98+
if agent == nil {
99+
w.WriteHeader(http.StatusNotFound)
100+
return
101+
}
102+
103+
capability := r.PostForm.Get("capability")
104+
msgType := r.PostForm.Get("type")
105+
dataStr := r.PostForm.Get("data")
106+
107+
customMsg := &protobufs.ServerToAgent{
108+
CustomMessage: &protobufs.CustomMessage{
109+
Capability: capability,
110+
Type: msgType,
111+
Data: []byte(dataStr),
112+
},
113+
}
114+
115+
data.AllAgents.SendCustomMessageToAgent(instanceId, customMsg)
116+
117+
http.Redirect(w, r, "/agent?instanceid="+uid.String(), http.StatusSeeOther)
118+
}
119+
83120
func saveCustomConfigForInstance(w http.ResponseWriter, r *http.Request) {
84121
if err := r.ParseForm(); err != nil {
85122
w.WriteHeader(http.StatusInternalServerError)

0 commit comments

Comments
 (0)