Skip to content

docommand data capture poc #4929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions components/generic/collectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package generic

import (
"context"
"errors"
"time"

"go.viam.com/rdk/data"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
)

type method int64

const (
readings method = iota
)

func (m method) String() string {
if m == readings {
return "Readings"
}
return "Unknown"
}
func methodParamsFromProto(proto map[string]*anypb.Any) (map[string]interface{}, error) {
methodParameters := make(map[string]interface{})

// logger := logging.NewLogger("test")
for key, value := range proto {
if value == nil {
methodParameters[key] = nil
}
// structValue := &structpb.Value_StringValue{}
// if err := value.(structValue); err != nil {
// logger.Info(value.TypeUrl)
// return nil, err
// }
methodParameters[key] = string(value.GetValue())
}

return methodParameters, nil
}

// newCommandCollector returns a collector to register a sensor reading method. If one is already registered
// with the same MethodMetadata it will panic.
func newCommandCollector(resource interface{}, params data.CollectorParams) (data.Collector, error) {
reso, err := assertResource(resource)
if err != nil {
return nil, err
}

logger := logging.NewLogger("test")
cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult

var s structpb.Struct
if err := params.MethodParams["docommand_payload"].UnmarshalTo(&s); err != nil {
logger.Info(err)
return res, err
}

logger.Info(s.AsMap())

payload := s.AsMap()
if err != nil {
return res, err
}
logger.Infof("capturing docommand with payload %#v\n", payload)

values, err := reso.DoCommand(ctx, payload)

if err != nil {
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return res, err
}
return res, data.FailedToReadErr(params.ComponentName, readings.String(), err)
}
ts := data.Timestamps{TimeRequested: timeRequested, TimeReceived: time.Now()}
return data.NewTabularCaptureResultReadings(ts, values)
})
return data.NewCollector(cFunc, params)
}

type Resource interface {
resource.Resource
}

func assertResource(resource interface{}) (Resource, error) {
res, ok := resource.(Resource)
if !ok {
return nil, data.InvalidInterfaceErr(API)
}

return res, nil
}
55 changes: 55 additions & 0 deletions components/generic/collectors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package generic_test

import (
"context"
"testing"
"time"

"github.com/benbjohnson/clock"
datasyncpb "go.viam.com/api/app/datasync/v1"
"go.viam.com/test"

"go.viam.com/rdk/components/generic"
"go.viam.com/rdk/data"
"go.viam.com/rdk/logging"
tu "go.viam.com/rdk/testutils"
"go.viam.com/rdk/testutils/inject"
)

const (
captureInterval = time.Millisecond
)

var readingMap = map[string]any{"reading1": false, "reading2": "test"}

func TestCollectors(t *testing.T) {
start := time.Now()
buf := tu.NewMockBuffer(t)
params := data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: "generic",
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Target: buf,
Clock: clock.New(),
}

gens := &inject.GenericComponent{}
col, err := generic.NewCommandCollector(gens, params)
test.That(t, err, test.ShouldBeNil)

defer col.Close()
col.Collect()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
tu.CheckMockBufferWrites(t, ctx, start, buf.Writes, []*datasyncpb.SensorData{{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{
"readings": map[string]any{
"hi": "bye",
},
})},
}})
buf.Close()
}
5 changes: 5 additions & 0 deletions components/generic/export_collectors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// export_collectors_test.go adds functionality to the package that we only want to use and expose during testing.
package generic

// Exported variables for testing collectors, see unexported collectors for implementation details.
var NewCommandCollector = newCommandCollector
5 changes: 5 additions & 0 deletions components/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package generic
import (
pb "go.viam.com/api/component/generic/v1"

"go.viam.com/rdk/data"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
)
Expand All @@ -18,6 +19,10 @@ func init() {
RPCServiceDesc: &pb.GenericService_ServiceDesc,
RPCClient: NewClientFromConn,
})
data.RegisterCollector(data.MethodMetadata{
API: API,
MethodName: "DoCommand",
}, newCommandCollector)
}

// SubtypeName is a constant that identifies the component resource API string "Generic".
Expand Down
2 changes: 1 addition & 1 deletion data/capture_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func BuildCaptureMetadata(
api resource.API,
name string,
method string,
additionalParams map[string]string,
additionalParams map[string]interface{},
methodParams map[string]*anypb.Any,
tags []string,
) (*v1.DataCaptureMetadata, CaptureType) {
Expand Down
2 changes: 1 addition & 1 deletion data/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ const (
)

// getFileExt gets the file extension for a capture file.
func getFileExt(dataType CaptureType, methodName string, parameters map[string]string) string {
func getFileExt(dataType CaptureType, methodName string, parameters map[string]interface{}) string {
switch dataType {
case CaptureTypeTabular:
return ExtDat
Expand Down
42 changes: 40 additions & 2 deletions services/datamanager/builtin/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
goutils "go.viam.com/utils"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"

"go.viam.com/rdk/data"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/protoutils"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/services/datamanager"
)
Expand Down Expand Up @@ -277,7 +279,7 @@ func (c *Capture) initializeOrUpdateCollector(
collection *mongo.Collection,
) (*collectorAndConfig, error) {
// TODO(DATA-451): validate method params
methodParams, err := protoutils.ConvertStringMapToAnyPBMap(collectorConfig.AdditionalParams)
methodParams, err := convertMapToAnyMap(collectorConfig.AdditionalParams)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -441,3 +443,39 @@ func defaultIfZeroVal[T comparable](val, defaultVal T) T {
}
return val
}

func convertMapToAnyMap(input map[string]interface{}) (map[string]*anypb.Any, error) {
output := make(map[string]*anypb.Any)

for key, val := range input {
var msg proto.Message

switch typed := val.(type) {
case map[string]interface{}:
// For nested maps, convert to structpb.Struct
structVal, err := structpb.NewStruct(typed)
if err != nil {
return nil, fmt.Errorf("error converting key %q to structpb.Struct: %w", key, err)
}
msg = structVal

default:
// For primitive types, convert to structpb.Value
value, err := structpb.NewValue(val)
if err != nil {
return nil, fmt.Errorf("error converting key %q to structpb.Value: %w", key, err)
}
msg = value
}

// Pack the actual message (Value or Struct) into an Any
anyVal, err := anypb.New(msg)
if err != nil {
return nil, fmt.Errorf("error packing key %q into Any: %w", key, err)
}

output[key] = anyVal
}

return output, nil
}
18 changes: 9 additions & 9 deletions services/datamanager/data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ func (ac *AssociatedConfig) Link(conf *resource.Config) {

// DataCaptureConfig is used to initialize a collector for a component or remote.
type DataCaptureConfig struct {
Name resource.Name `json:"name"`
Method string `json:"method"`
CaptureFrequencyHz float32 `json:"capture_frequency_hz"`
CaptureQueueSize int `json:"capture_queue_size"`
CaptureBufferSize int `json:"capture_buffer_size"`
AdditionalParams map[string]string `json:"additional_params"`
Disabled bool `json:"disabled"`
Tags []string `json:"tags,omitempty"`
CaptureDirectory string `json:"capture_directory"`
Name resource.Name `json:"name"`
Method string `json:"method"`
CaptureFrequencyHz float32 `json:"capture_frequency_hz"`
CaptureQueueSize int `json:"capture_queue_size"`
CaptureBufferSize int `json:"capture_buffer_size"`
AdditionalParams map[string]interface{} `json:"additional_params"`
Disabled bool `json:"disabled"`
Tags []string `json:"tags,omitempty"`
CaptureDirectory string `json:"capture_directory"`
}

// Equals checks if one capture config is equal to another.
Expand Down
Loading