diff --git a/components/generic/collectors.go b/components/generic/collectors.go new file mode 100644 index 00000000000..783c01c7499 --- /dev/null +++ b/components/generic/collectors.go @@ -0,0 +1,100 @@ +package generic + +import ( + "context" + "errors" + "time" + + "go.viam.com/rdk/data" + "go.viam.com/rdk/logging" + "go.viam.com/rdk/resource" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" +) + +type method int64 + +const ( + readings method = iota +) + +func (m method) String() string { + if m == readings { + return "Readings" + } + return "Unknown" +} +func methodParamsFromProto(proto map[string]*anypb.Any) (map[string]interface{}, error) { + methodParameters := make(map[string]interface{}) + + // logger := logging.NewLogger("test") + for key, value := range proto { + if value == nil { + methodParameters[key] = nil + } + // structValue := &structpb.Value_StringValue{} + // if err := value.(structValue); err != nil { + // logger.Info(value.TypeUrl) + // return nil, err + // } + methodParameters[key] = string(value.GetValue()) + } + + return methodParameters, nil +} + +// newCommandCollector returns a collector to register a sensor reading method. If one is already registered +// with the same MethodMetadata it will panic. +func newCommandCollector(resource interface{}, params data.CollectorParams) (data.Collector, error) { + reso, err := assertResource(resource) + if err != nil { + return nil, err + } + + logger := logging.NewLogger("test") + cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) { + timeRequested := time.Now() + var res data.CaptureResult + + var s structpb.Struct + if err := params.MethodParams["docommand_payload"].UnmarshalTo(&s); err != nil { + logger.Info(err) + return res, err + } + + logger.Info(s.AsMap()) + + payload := s.AsMap() + if err != nil { + return res, err + } + logger.Infof("capturing docommand with payload %#v\n", payload) + + values, err := reso.DoCommand(ctx, payload) + + if err != nil { + // A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore + // is used in the datamanager to exclude readings from being captured and stored. + if errors.Is(err, data.ErrNoCaptureToStore) { + return res, err + } + return res, data.FailedToReadErr(params.ComponentName, readings.String(), err) + } + ts := data.Timestamps{TimeRequested: timeRequested, TimeReceived: time.Now()} + return data.NewTabularCaptureResultReadings(ts, values) + }) + return data.NewCollector(cFunc, params) +} + +type Resource interface { + resource.Resource +} + +func assertResource(resource interface{}) (Resource, error) { + res, ok := resource.(Resource) + if !ok { + return nil, data.InvalidInterfaceErr(API) + } + + return res, nil +} diff --git a/components/generic/collectors_test.go b/components/generic/collectors_test.go new file mode 100644 index 00000000000..06a64526f8b --- /dev/null +++ b/components/generic/collectors_test.go @@ -0,0 +1,55 @@ +package generic_test + +import ( + "context" + "testing" + "time" + + "github.com/benbjohnson/clock" + datasyncpb "go.viam.com/api/app/datasync/v1" + "go.viam.com/test" + + "go.viam.com/rdk/components/generic" + "go.viam.com/rdk/data" + "go.viam.com/rdk/logging" + tu "go.viam.com/rdk/testutils" + "go.viam.com/rdk/testutils/inject" +) + +const ( + captureInterval = time.Millisecond +) + +var readingMap = map[string]any{"reading1": false, "reading2": "test"} + +func TestCollectors(t *testing.T) { + start := time.Now() + buf := tu.NewMockBuffer(t) + params := data.CollectorParams{ + DataType: data.CaptureTypeTabular, + ComponentName: "generic", + Interval: captureInterval, + Logger: logging.NewTestLogger(t), + Target: buf, + Clock: clock.New(), + } + + gens := &inject.GenericComponent{} + col, err := generic.NewCommandCollector(gens, params) + test.That(t, err, test.ShouldBeNil) + + defer col.Close() + col.Collect() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + tu.CheckMockBufferWrites(t, ctx, start, buf.Writes, []*datasyncpb.SensorData{{ + Metadata: &datasyncpb.SensorMetadata{}, + Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{ + "readings": map[string]any{ + "hi": "bye", + }, + })}, + }}) + buf.Close() +} diff --git a/components/generic/export_collectors_test.go b/components/generic/export_collectors_test.go new file mode 100644 index 00000000000..3fa594ef887 --- /dev/null +++ b/components/generic/export_collectors_test.go @@ -0,0 +1,5 @@ +// export_collectors_test.go adds functionality to the package that we only want to use and expose during testing. +package generic + +// Exported variables for testing collectors, see unexported collectors for implementation details. +var NewCommandCollector = newCommandCollector diff --git a/components/generic/generic.go b/components/generic/generic.go index 99cb60acbac..a4b2aeb24c6 100644 --- a/components/generic/generic.go +++ b/components/generic/generic.go @@ -7,6 +7,7 @@ package generic import ( pb "go.viam.com/api/component/generic/v1" + "go.viam.com/rdk/data" "go.viam.com/rdk/resource" "go.viam.com/rdk/robot" ) @@ -18,6 +19,10 @@ func init() { RPCServiceDesc: &pb.GenericService_ServiceDesc, RPCClient: NewClientFromConn, }) + data.RegisterCollector(data.MethodMetadata{ + API: API, + MethodName: "DoCommand", + }, newCommandCollector) } // SubtypeName is a constant that identifies the component resource API string "Generic". diff --git a/data/capture_file.go b/data/capture_file.go index 3e4dc3be569..8f6c3788b2a 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -213,7 +213,7 @@ func BuildCaptureMetadata( api resource.API, name string, method string, - additionalParams map[string]string, + additionalParams map[string]interface{}, methodParams map[string]*anypb.Any, tags []string, ) (*v1.DataCaptureMetadata, CaptureType) { diff --git a/data/collector_types.go b/data/collector_types.go index 5d69f576cab..e4df2f9c5dd 100644 --- a/data/collector_types.go +++ b/data/collector_types.go @@ -377,7 +377,7 @@ const ( ) // getFileExt gets the file extension for a capture file. -func getFileExt(dataType CaptureType, methodName string, parameters map[string]string) string { +func getFileExt(dataType CaptureType, methodName string, parameters map[string]interface{}) string { switch dataType { case CaptureTypeTabular: return ExtDat diff --git a/services/datamanager/builtin/capture/capture.go b/services/datamanager/builtin/capture/capture.go index 47d10b700ea..4cad49a4b1f 100644 --- a/services/datamanager/builtin/capture/capture.go +++ b/services/datamanager/builtin/capture/capture.go @@ -13,10 +13,12 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" goutils "go.viam.com/utils" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" "go.viam.com/rdk/data" "go.viam.com/rdk/logging" - "go.viam.com/rdk/protoutils" "go.viam.com/rdk/resource" "go.viam.com/rdk/services/datamanager" ) @@ -277,7 +279,7 @@ func (c *Capture) initializeOrUpdateCollector( collection *mongo.Collection, ) (*collectorAndConfig, error) { // TODO(DATA-451): validate method params - methodParams, err := protoutils.ConvertStringMapToAnyPBMap(collectorConfig.AdditionalParams) + methodParams, err := convertMapToAnyMap(collectorConfig.AdditionalParams) if err != nil { return nil, err } @@ -441,3 +443,39 @@ func defaultIfZeroVal[T comparable](val, defaultVal T) T { } return val } + +func convertMapToAnyMap(input map[string]interface{}) (map[string]*anypb.Any, error) { + output := make(map[string]*anypb.Any) + + for key, val := range input { + var msg proto.Message + + switch typed := val.(type) { + case map[string]interface{}: + // For nested maps, convert to structpb.Struct + structVal, err := structpb.NewStruct(typed) + if err != nil { + return nil, fmt.Errorf("error converting key %q to structpb.Struct: %w", key, err) + } + msg = structVal + + default: + // For primitive types, convert to structpb.Value + value, err := structpb.NewValue(val) + if err != nil { + return nil, fmt.Errorf("error converting key %q to structpb.Value: %w", key, err) + } + msg = value + } + + // Pack the actual message (Value or Struct) into an Any + anyVal, err := anypb.New(msg) + if err != nil { + return nil, fmt.Errorf("error packing key %q into Any: %w", key, err) + } + + output[key] = anyVal + } + + return output, nil +} diff --git a/services/datamanager/data_manager.go b/services/datamanager/data_manager.go index 1310f1ca0b8..e9206a64c39 100644 --- a/services/datamanager/data_manager.go +++ b/services/datamanager/data_manager.go @@ -141,15 +141,15 @@ func (ac *AssociatedConfig) Link(conf *resource.Config) { // DataCaptureConfig is used to initialize a collector for a component or remote. type DataCaptureConfig struct { - Name resource.Name `json:"name"` - Method string `json:"method"` - CaptureFrequencyHz float32 `json:"capture_frequency_hz"` - CaptureQueueSize int `json:"capture_queue_size"` - CaptureBufferSize int `json:"capture_buffer_size"` - AdditionalParams map[string]string `json:"additional_params"` - Disabled bool `json:"disabled"` - Tags []string `json:"tags,omitempty"` - CaptureDirectory string `json:"capture_directory"` + Name resource.Name `json:"name"` + Method string `json:"method"` + CaptureFrequencyHz float32 `json:"capture_frequency_hz"` + CaptureQueueSize int `json:"capture_queue_size"` + CaptureBufferSize int `json:"capture_buffer_size"` + AdditionalParams map[string]interface{} `json:"additional_params"` + Disabled bool `json:"disabled"` + Tags []string `json:"tags,omitempty"` + CaptureDirectory string `json:"capture_directory"` } // Equals checks if one capture config is equal to another.