Skip to content

Commit bbe282d

Browse files
committed
Add gNMI Extension field parsing support
gNMI allows for the use of an `extension` field in each top-level message of the gNMI RPCs: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-extensions.md Given this is an arbitrary Protobyte payload, the default gNMI protobufs can't decode the payload contained within the field. This PR adds the necessary configuration options to load in an arbitrary Protobuf file per `extension` identifier, with a message-name to lookup the message type. In this change: 1. A new function `DecodeExtension` is added. This function uses `protoreflect` to dynamically marshal arbitrary protoBytes into JSON. The loaded JSON is then put back in the Extension message as bytes (mainting type) 2. The `Target` type has an `ExtensionProtoMap` added, allowing for the lookup of Extension IDs to loaded-in protobufs 3. The required changes to the `TargetConfig` type to support loading in the new configuration 4. Modified `collector.go` to output the gNMI message _after_ inlining the decoded protoBytes 5. Loading in the protobufs was added to `app/target.go`: `parseExtensionProtos`. This uses the `Parser` provided by `protoreflect/desc/protoparse` 6. Added functionality to `event.go` to insert the K/Vs provided by the Extension as Tags. Given we come from JSON, all numbers are float64, so the only 2 types supported currently are `string` and `float64` 7. Minor helper function to turn the arbitrary JSON into an arbitrary map. This has been tested with a device emiting an `extension` field: ``` [gnmic] target "edge01_test01": gNMI Subscribe Response: &{ SubscriptionName:port_stats SubscriptionConfig:{"name":"port_stats","paths":["/interfaces/interface/"],"mode":"STREAM","stream-mode":"SAMPLE","encoding":"JSON","sample-interval":15000000000,"heartbeat-interval":15000000000,"outputs":["prom-scrape-output"]} Response: update:{timestamp:1723653363502452302 prefix:{elem:{name:"interfaces"} elem:{name:"interface" key:{key:"name" value:"et-1/0/3"}}} update:{path:{elem:{name:"state"} elem:{name:"hardware-port"}} val:{json_val:"\"FPC1:PIC0:PORT3\""}} update:{path:{elem:{name:"state"} elem:{name:"transceiver"}} val:{json_val:"\"FPC1:PIC0:PORT3:Xcvr0\""}}} extension:{registered_ext:{id:1 msg:"{\"systemId\":\"edge01_test01\",\"componentId\":65535,\"sensorName\":\"sensor_1005_2_1\",\"subscribedPath\":\"/interfaces/interface/\",\"streamedPath\":\"/interfaces/interface/\",\"component\":\"chassisd\",\"sequenceNumber\":\"770002\",\"payloadGetTimestamp\":\"1723653363502\",\"streamCreationTimestamp\":\"1723653361858\",\"exportTimestamp\":\"1723653363504\",\"streamId\":\"PERIODIC\"}"}}} ``` Which is then properly rendered to a Prometheus metric: ``` gnmi_interfaces_interface_state_hardware_port{component="chassisd",componentId="65535",hardware_port="FPC1:PIC0:PORT3",interface_name="et-1/0/3",metric_source="edge01_test01",subscription_name="port_stats",systemId="edge01_test01"} 1 ``` Note that some label-drop rules have been added to remove the spurious labels to avoid a cardinality explosion.
1 parent fb7904e commit bbe282d

File tree

6 files changed

+112
-5
lines changed

6 files changed

+112
-5
lines changed

pkg/api/target/subscribe.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,25 @@ func (t *Target) DecodeProtoBytes(resp *gnmi.SubscribeResponse) error {
303303
return nil
304304
}
305305

306+
func (t *Target) DecodeExtension(resp *gnmi.SubscribeResponse) error {
307+
if t.ExtensionProtoMap == nil {
308+
return nil
309+
}
310+
for _, extension := range resp.Extension {
311+
m := dynamic.NewMessage(t.ExtensionProtoMap[int(extension.GetRegisteredExt().GetId().Number())])
312+
err := m.Unmarshal(extension.GetRegisteredExt().GetMsg())
313+
if err != nil {
314+
return err
315+
}
316+
jsondata, err := m.MarshalJSON()
317+
if err != nil {
318+
return err
319+
}
320+
extension.GetRegisteredExt().Msg = jsondata
321+
}
322+
return nil
323+
}
324+
306325
func (t *Target) DeleteSubscription(name string) {
307326
t.m.Lock()
308327
defer t.m.Unlock()

pkg/api/target/target.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ type Target struct {
5353
subscribeResponses chan *SubscribeResponse
5454
errors chan *TargetError
5555
stopped bool
56-
StopChan chan struct{} `json:"-"`
57-
Cfn context.CancelFunc `json:"-"`
58-
RootDesc desc.Descriptor `json:"-"`
56+
StopChan chan struct{} `json:"-"`
57+
Cfn context.CancelFunc `json:"-"`
58+
RootDesc desc.Descriptor `json:"-"`
59+
ExtensionProtoMap map[int]*desc.MessageDescriptor `json:"-"`
5960
}
6061

6162
// NewTarget //

pkg/api/types/target.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,14 @@ type TargetConfig struct {
154154
GRPCKeepalive *clientKeepalive `mapstructure:"grpc-keepalive,omitempty" yaml:"grpc-keepalive,omitempty" json:"grpc-keepalive,omitempty"`
155155

156156
tlsConfig *tls.Config
157+
158+
RegisteredExtensions []*RegisteredExtension `mapstructure:"registered-extensions,omitempty" yaml:"registered-extensions,omitempty" json:"registered-extensions,omitempty"`
159+
}
160+
161+
type RegisteredExtension struct {
162+
Id int `mapstructure:"id,omitempty" yaml:"id,omitempty" json:"id,omitempty"`
163+
MessageName string `mapstructure:"message-name,omitempty" yaml:"message-name,omitempty" json:"message-name,omitempty"`
164+
ProtoFile string `mapstructure:"proto-file,omitempty" yaml:"proto-file,omitempty" json:"proto-file,omitempty"`
157165
}
158166

159167
type clientKeepalive struct {

pkg/app/collector.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,22 @@ func (a *App) StartCollector(ctx context.Context) {
6767
select {
6868
case rsp := <-rspChan:
6969
subscribeResponseReceivedCounter.WithLabelValues(t.Config.Name, rsp.SubscriptionConfig.Name).Add(1)
70-
if a.Config.Debug {
71-
a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp)
70+
// decode gNMI extensions
71+
if extensions := rsp.Response.Extension; len(extensions) > 0 {
72+
err := t.DecodeExtension(rsp.Response)
73+
if err != nil {
74+
a.Logger.Printf("target %q: failed to decode extension field: %v", t.Config.Name, err)
75+
continue
76+
}
7277
}
7378
err := t.DecodeProtoBytes(rsp.Response)
7479
if err != nil {
7580
a.Logger.Printf("target %q: failed to decode proto bytes: %v", t.Config.Name, err)
7681
continue
7782
}
83+
if a.Config.Debug {
84+
a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp)
85+
}
7886
m := outputs.Meta{
7987
"source": t.Config.Name,
8088
"format": a.Config.Format,

pkg/app/target.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414

1515
"github.com/fullstorydev/grpcurl"
1616

17+
"github.com/jhump/protoreflect/desc"
18+
"github.com/jhump/protoreflect/desc/protoparse"
19+
1720
"github.com/openconfig/gnmic/pkg/api/target"
1821
"github.com/openconfig/gnmic/pkg/api/types"
1922
)
@@ -39,6 +42,10 @@ func (a *App) initTarget(tc *types.TargetConfig) (*target.Target, error) {
3942
if err != nil {
4043
return nil, err
4144
}
45+
err = a.parseExtensionProtos(t)
46+
if err != nil {
47+
return nil, err
48+
}
4249
a.Targets[t.Config.Name] = t
4350
return t, nil
4451
}
@@ -155,6 +162,34 @@ func (a *App) parseProtoFiles(t *target.Target) error {
155162
return nil
156163
}
157164

165+
// Dynamically parse (and load) protobuf files defined in config for specific extension IDs
166+
func (a *App) parseExtensionProtos(t *target.Target) error {
167+
parser := protoparse.Parser{}
168+
extensionProtoMap := make(map[int]*desc.MessageDescriptor)
169+
a.Logger.Printf("Target %q loading protofiles for gNMI extensions", t.Config.Name)
170+
if len(t.Config.RegisteredExtensions) == 0 {
171+
return nil
172+
}
173+
for _, extension := range t.Config.RegisteredExtensions {
174+
descSources, err := parser.ParseFiles(extension.ProtoFile)
175+
if err != nil {
176+
a.Logger.Printf("target %q could not load protofile: %s: %v", t.Config.Name, extension.ProtoFile, err)
177+
return err
178+
}
179+
// Only a single file is ever provided to ParseFiles, so we can just grab offset 0 from the returned slice
180+
// Verify if the provided message exists in the proto and assign
181+
if desc := descSources[0].FindMessage(extension.MessageName); desc != nil {
182+
extensionProtoMap[extension.Id] = desc
183+
} else {
184+
a.Logger.Printf("target %q could not find message %q", t.Config.Name, extension.MessageName)
185+
return fmt.Errorf("target %q could not find message %q", t.Config.Name, extension.MessageName)
186+
}
187+
}
188+
t.ExtensionProtoMap = extensionProtoMap
189+
a.Logger.Printf("target %q loaded proto files for gNMI extensions", t.Config.Name)
190+
return nil
191+
}
192+
158193
func (a *App) targetConfigExists(name string) bool {
159194
a.configLock.RLock()
160195
_, ok := a.Config.Targets[name]

pkg/formatters/event.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"encoding/json"
1313
"fmt"
1414
"math"
15+
"strconv"
1516
"strings"
1617

1718
flattener "github.com/karimra/go-map-flattener"
1819
"github.com/openconfig/gnmi/proto/gnmi"
20+
"github.com/openconfig/gnmi/proto/gnmi_ext"
1921
)
2022

2123
// EventMsg represents a gNMI update message,
@@ -40,9 +42,28 @@ func ResponseToEventMsgs(name string, rsp *gnmi.SubscribeResponse, meta map[stri
4042
return nil, nil
4143
}
4244
evs := make([]*EventMsg, 0, len(rsp.GetUpdate().GetUpdate())+len(rsp.GetUpdate().GetDelete()))
45+
response := rsp
4346
switch rsp := rsp.Response.(type) {
4447
case *gnmi.SubscribeResponse_Update:
4548
namePrefix, prefixTags := tagsFromGNMIPath(rsp.Update.GetPrefix())
49+
// Extension message to tags
50+
if prefixTags == nil {
51+
prefixTags = make(map[string]string)
52+
}
53+
for _, ext := range response.Extension {
54+
extensionValues, err := extensionToMap(ext)
55+
if err != nil {
56+
return nil, err
57+
}
58+
for k, v := range extensionValues {
59+
switch v.(type) {
60+
case string:
61+
prefixTags[k] = string(v.(string))
62+
case float64:
63+
prefixTags[k] = strconv.FormatFloat(v.(float64), 'G', -1, 64)
64+
}
65+
}
66+
}
4667
// notification updates
4768
uevs, err := updatesToEvent(name, namePrefix, rsp.Update.GetTimestamp(), rsp.Update.GetUpdate(), prefixTags, meta)
4869
if err != nil {
@@ -200,6 +221,21 @@ func tagsFromGNMIPath(p *gnmi.Path) (string, map[string]string) {
200221
return sb.String(), tags
201222
}
202223

224+
func extensionToMap(ext *gnmi_ext.Extension) (map[string]interface{}, error) {
225+
var jsondata []byte
226+
jsondata = ext.GetRegisteredExt().GetMsg()
227+
228+
var anyJson map[string]interface{}
229+
if len(jsondata) != 0 {
230+
err := json.Unmarshal(jsondata, &anyJson)
231+
if err != nil {
232+
return nil, err
233+
}
234+
return anyJson, nil
235+
}
236+
return nil, fmt.Errorf("0 length JSON decoded")
237+
}
238+
203239
func getValueFlat(prefix string, updValue *gnmi.TypedValue) (map[string]interface{}, error) {
204240
if updValue == nil {
205241
return nil, nil

0 commit comments

Comments
 (0)