From a011222a74030346a55c611abf32119b8a498e1e Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Wed, 29 Jan 2025 14:43:03 +0000 Subject: [PATCH 1/6] wait for create connection --- api/grpc/mpi/v1/command.pb.go | 14 ++-- api/grpc/mpi/v1/command.pb.validate.go | 76 +++++++++---------- api/grpc/mpi/v1/common.pb.go | 14 ++-- api/grpc/mpi/v1/common.pb.validate.go | 10 +-- api/grpc/mpi/v1/files.pb.go | 14 ++-- api/grpc/mpi/v1/files.pb.validate.go | 36 ++++----- internal/command/command_plugin.go | 2 + internal/command/command_service.go | 12 +-- internal/command/command_service_test.go | 5 +- .../commandfakes/fake_command_service.go | 30 ++++++++ 10 files changed, 125 insertions(+), 88 deletions(-) diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go index b00c7d2cd..0e3edb591 100644 --- a/api/grpc/mpi/v1/command.pb.go +++ b/api/grpc/mpi/v1/command.pb.go @@ -8,7 +8,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.3 +// protoc-gen-go v1.36.4 // protoc (unknown) // source: mpi/v1/command.proto @@ -21,6 +21,7 @@ import ( structpb "google.golang.org/protobuf/types/known/structpb" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -2548,7 +2549,7 @@ func (*FileServer) Descriptor() ([]byte, []int) { var File_mpi_v1_command_proto protoreflect.FileDescriptor -var file_mpi_v1_command_proto_rawDesc = []byte{ +var file_mpi_v1_command_proto_rawDesc = string([]byte{ 0x0a, 0x14, 0x6d, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x6d, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x1a, 0x13, 0x6d, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, @@ -2932,16 +2933,16 @@ var file_mpi_v1_command_proto_rawDesc = []byte{ 0x6e, 0x74, 0x50, 0x6c, 0x61, 0x6e, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x6d, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +}) var ( file_mpi_v1_command_proto_rawDescOnce sync.Once - file_mpi_v1_command_proto_rawDescData = file_mpi_v1_command_proto_rawDesc + file_mpi_v1_command_proto_rawDescData []byte ) func file_mpi_v1_command_proto_rawDescGZIP() []byte { file_mpi_v1_command_proto_rawDescOnce.Do(func() { - file_mpi_v1_command_proto_rawDescData = protoimpl.X.CompressGZIP(file_mpi_v1_command_proto_rawDescData) + file_mpi_v1_command_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_mpi_v1_command_proto_rawDesc), len(file_mpi_v1_command_proto_rawDesc))) }) return file_mpi_v1_command_proto_rawDescData } @@ -3104,7 +3105,7 @@ func file_mpi_v1_command_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_mpi_v1_command_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_mpi_v1_command_proto_rawDesc), len(file_mpi_v1_command_proto_rawDesc)), NumEnums: 2, NumMessages: 38, NumExtensions: 0, @@ -3116,7 +3117,6 @@ func file_mpi_v1_command_proto_init() { MessageInfos: file_mpi_v1_command_proto_msgTypes, }.Build() File_mpi_v1_command_proto = out.File - file_mpi_v1_command_proto_rawDesc = nil file_mpi_v1_command_proto_goTypes = nil file_mpi_v1_command_proto_depIdxs = nil } diff --git a/api/grpc/mpi/v1/command.pb.validate.go b/api/grpc/mpi/v1/command.pb.validate.go index 22c1ca542..08fcf259f 100644 --- a/api/grpc/mpi/v1/command.pb.validate.go +++ b/api/grpc/mpi/v1/command.pb.validate.go @@ -129,7 +129,7 @@ type CreateConnectionRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m CreateConnectionRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -353,7 +353,7 @@ type ResourceMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m ResourceMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -485,7 +485,7 @@ type HostInfoMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m HostInfoMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -594,7 +594,7 @@ type ReleaseInfoMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m ReleaseInfoMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -698,7 +698,7 @@ type ContainerInfoMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m ContainerInfoMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -856,7 +856,7 @@ type CreateConnectionResponseMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m CreateConnectionResponseMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1016,7 +1016,7 @@ type UpdateDataPlaneStatusRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateDataPlaneStatusRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1119,7 +1119,7 @@ type UpdateDataPlaneStatusResponseMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateDataPlaneStatusResponseMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1228,7 +1228,7 @@ type InstanceHealthMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m InstanceHealthMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1391,7 +1391,7 @@ type UpdateDataPlaneHealthRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateDataPlaneHealthRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1494,7 +1494,7 @@ type UpdateDataPlaneHealthResponseMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateDataPlaneHealthResponseMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1657,7 +1657,7 @@ type DataPlaneResponseMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m DataPlaneResponseMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2039,7 +2039,7 @@ type ManagementPlaneRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m ManagementPlaneRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2141,7 +2141,7 @@ type StatusRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m StatusRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2241,7 +2241,7 @@ type HealthRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m HealthRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2370,7 +2370,7 @@ type ConfigApplyRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m ConfigApplyRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2501,7 +2501,7 @@ type ConfigUploadRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m ConfigUploadRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2651,7 +2651,7 @@ type APIActionRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m APIActionRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2961,7 +2961,7 @@ type NGINXPlusActionMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m NGINXPlusActionMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -3097,7 +3097,7 @@ type UpdateHTTPUpstreamServersMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateHTTPUpstreamServersMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -3201,7 +3201,7 @@ type GetHTTPUpstreamServersMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m GetHTTPUpstreamServersMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -3339,7 +3339,7 @@ type UpdateStreamServersMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateStreamServersMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -3440,7 +3440,7 @@ type GetUpstreamsMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m GetUpstreamsMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -3540,7 +3540,7 @@ type GetStreamUpstreamsMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m GetStreamUpstreamsMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -3642,7 +3642,7 @@ type CommandStatusRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m CommandStatusRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -3830,7 +3830,7 @@ type InstanceMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m InstanceMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -3935,7 +3935,7 @@ type InstanceMetaMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m InstanceMetaMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -4115,7 +4115,7 @@ type InstanceConfigMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m InstanceConfigMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -4342,7 +4342,7 @@ type InstanceRuntimeMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m InstanceRuntimeMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -4444,7 +4444,7 @@ type InstanceChildMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m InstanceChildMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -4573,7 +4573,7 @@ type NGINXRuntimeInfoMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m NGINXRuntimeInfoMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -4731,7 +4731,7 @@ type NGINXPlusRuntimeInfoMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m NGINXPlusRuntimeInfoMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -4836,7 +4836,7 @@ type APIDetailsMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m APIDetailsMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -4936,7 +4936,7 @@ type InstanceActionMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m InstanceActionMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -5158,7 +5158,7 @@ type AgentConfigMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m AgentConfigMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -5345,7 +5345,7 @@ type CommandServerMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m CommandServerMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -5445,7 +5445,7 @@ type MetricsServerMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m MetricsServerMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -5544,7 +5544,7 @@ type FileServerMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m FileServerMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } diff --git a/api/grpc/mpi/v1/common.pb.go b/api/grpc/mpi/v1/common.pb.go index d2cb5d35b..7451f04fc 100644 --- a/api/grpc/mpi/v1/common.pb.go +++ b/api/grpc/mpi/v1/common.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.3 +// protoc-gen-go v1.36.4 // protoc (unknown) // source: mpi/v1/common.proto @@ -18,6 +18,7 @@ import ( timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -453,7 +454,7 @@ func (x *TLSSettings) GetServerName() string { var File_mpi_v1_common_proto protoreflect.FileDescriptor -var file_mpi_v1_common_proto_rawDesc = []byte{ +var file_mpi_v1_common_proto_rawDesc = string([]byte{ 0x0a, 0x13, 0x6d, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x6d, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, @@ -513,16 +514,16 @@ var file_mpi_v1_common_proto_rawDesc = []byte{ 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x08, 0x5a, 0x06, 0x6d, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +}) var ( file_mpi_v1_common_proto_rawDescOnce sync.Once - file_mpi_v1_common_proto_rawDescData = file_mpi_v1_common_proto_rawDesc + file_mpi_v1_common_proto_rawDescData []byte ) func file_mpi_v1_common_proto_rawDescGZIP() []byte { file_mpi_v1_common_proto_rawDescOnce.Do(func() { - file_mpi_v1_common_proto_rawDescData = protoimpl.X.CompressGZIP(file_mpi_v1_common_proto_rawDescData) + file_mpi_v1_common_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_mpi_v1_common_proto_rawDesc), len(file_mpi_v1_common_proto_rawDesc))) }) return file_mpi_v1_common_proto_rawDescData } @@ -559,7 +560,7 @@ func file_mpi_v1_common_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_mpi_v1_common_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_mpi_v1_common_proto_rawDesc), len(file_mpi_v1_common_proto_rawDesc)), NumEnums: 2, NumMessages: 5, NumExtensions: 0, @@ -571,7 +572,6 @@ func file_mpi_v1_common_proto_init() { MessageInfos: file_mpi_v1_common_proto_msgTypes, }.Build() File_mpi_v1_common_proto = out.File - file_mpi_v1_common_proto_rawDesc = nil file_mpi_v1_common_proto_goTypes = nil file_mpi_v1_common_proto_depIdxs = nil } diff --git a/api/grpc/mpi/v1/common.pb.validate.go b/api/grpc/mpi/v1/common.pb.validate.go index e462feaee..6cab6b520 100644 --- a/api/grpc/mpi/v1/common.pb.validate.go +++ b/api/grpc/mpi/v1/common.pb.validate.go @@ -103,7 +103,7 @@ type MessageMetaMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m MessageMetaMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -209,7 +209,7 @@ type CommandResponseMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m CommandResponseMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -315,7 +315,7 @@ type ServerSettingsMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m ServerSettingsMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -414,7 +414,7 @@ type AuthSettingsMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m AuthSettingsMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -523,7 +523,7 @@ type TLSSettingsMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m TLSSettingsMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } diff --git a/api/grpc/mpi/v1/files.pb.go b/api/grpc/mpi/v1/files.pb.go index b0bd43e2c..36792a589 100644 --- a/api/grpc/mpi/v1/files.pb.go +++ b/api/grpc/mpi/v1/files.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.3 +// protoc-gen-go v1.36.4 // protoc (unknown) // source: mpi/v1/files.proto @@ -18,6 +18,7 @@ import ( timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -1351,7 +1352,7 @@ func (x *AttributeTypeAndValue) GetValue() string { var File_mpi_v1_files_proto protoreflect.FileDescriptor -var file_mpi_v1_files_proto_rawDesc = []byte{ +var file_mpi_v1_files_proto_rawDesc = string([]byte{ 0x0a, 0x12, 0x6d, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x6d, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x1a, 0x13, 0x6d, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, @@ -1583,16 +1584,16 @@ var file_mpi_v1_files_proto_rawDesc = []byte{ 0x1a, 0x1a, 0x2e, 0x6d, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x6d, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +}) var ( file_mpi_v1_files_proto_rawDescOnce sync.Once - file_mpi_v1_files_proto_rawDescData = file_mpi_v1_files_proto_rawDesc + file_mpi_v1_files_proto_rawDescData []byte ) func file_mpi_v1_files_proto_rawDescGZIP() []byte { file_mpi_v1_files_proto_rawDescOnce.Do(func() { - file_mpi_v1_files_proto_rawDescData = protoimpl.X.CompressGZIP(file_mpi_v1_files_proto_rawDescData) + file_mpi_v1_files_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_mpi_v1_files_proto_rawDesc), len(file_mpi_v1_files_proto_rawDesc))) }) return file_mpi_v1_files_proto_rawDescData } @@ -1679,7 +1680,7 @@ func file_mpi_v1_files_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_mpi_v1_files_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_mpi_v1_files_proto_rawDesc), len(file_mpi_v1_files_proto_rawDesc)), NumEnums: 2, NumMessages: 18, NumExtensions: 0, @@ -1691,7 +1692,6 @@ func file_mpi_v1_files_proto_init() { MessageInfos: file_mpi_v1_files_proto_msgTypes, }.Build() File_mpi_v1_files_proto = out.File - file_mpi_v1_files_proto_rawDesc = nil file_mpi_v1_files_proto_goTypes = nil file_mpi_v1_files_proto_depIdxs = nil } diff --git a/api/grpc/mpi/v1/files.pb.validate.go b/api/grpc/mpi/v1/files.pb.validate.go index e5d0d937c..876e89a87 100644 --- a/api/grpc/mpi/v1/files.pb.validate.go +++ b/api/grpc/mpi/v1/files.pb.validate.go @@ -129,7 +129,7 @@ type GetOverviewRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m GetOverviewRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -260,7 +260,7 @@ type GetOverviewResponseMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m GetOverviewResponseMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -420,7 +420,7 @@ type UpdateOverviewRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateOverviewRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -551,7 +551,7 @@ type UpdateOverviewResponseMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateOverviewResponseMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -657,7 +657,7 @@ type ConfigVersionMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m ConfigVersionMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -819,7 +819,7 @@ type FileOverviewMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m FileOverviewMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -952,7 +952,7 @@ type FileMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m FileMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1110,7 +1110,7 @@ type GetFileRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m GetFileRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1239,7 +1239,7 @@ type GetFileResponseMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m GetFileResponseMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1340,7 +1340,7 @@ type FileContentsMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m FileContentsMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1522,7 +1522,7 @@ type FileMetaMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m FileMetaMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1709,7 +1709,7 @@ type UpdateFileRequestMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateFileRequestMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -1840,7 +1840,7 @@ type UpdateFileResponseMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m UpdateFileResponseMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2064,7 +2064,7 @@ type CertificateMetaMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m CertificateMetaMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2168,7 +2168,7 @@ type CertificateDatesMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m CertificateDatesMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2268,7 +2268,7 @@ type SubjectAlternativeNamesMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m SubjectAlternativeNamesMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2441,7 +2441,7 @@ type X509NameMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m X509NameMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } @@ -2545,7 +2545,7 @@ type AttributeTypeAndValueMultiError []error // Error returns a concatenation of all the error messages it wraps. func (m AttributeTypeAndValueMultiError) Error() string { - var msgs []string + msgs := make([]string, 0, len(m)) for _, err := range m { msgs = append(msgs, err.Error()) } diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index 304b74a5f..ad278d8cc 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -32,6 +32,7 @@ type ( SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error CancelSubscription(ctx context.Context) IsConnected() bool + StartSubscription() CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error) } @@ -108,6 +109,7 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res slog.ErrorContext(ctx, "Unable to create connection", "error", err) } if createConnectionResponse != nil { + cp.commandService.StartSubscription() cp.messagePipe.Process(ctx, &bus.Message{ Topic: bus.ConnectionCreatedTopic, Data: createConnectionResponse, diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 61127db9e..fe7e67f2a 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -45,6 +45,7 @@ type ( subscribeChannel chan *mpi.ManagementPlaneRequest configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID resource *mpi.Resource + subscribeContext context.Context subscribeMutex sync.Mutex subscribeClientMutex sync.Mutex configApplyRequestQueueMutex sync.Mutex @@ -70,17 +71,18 @@ func NewCommandService( resource: &mpi.Resource{}, } - var subscribeCtx context.Context - commandService.subscribeMutex.Lock() - subscribeCtx, commandService.subscribeCancel = context.WithCancel(ctx) + commandService.subscribeContext, commandService.subscribeCancel = context.WithCancel(ctx) commandService.subscribeMutex.Unlock() - go commandService.subscribe(subscribeCtx) - return commandService } +func (cs *CommandService) StartSubscription() { + slog.DebugContext(cs.subscribeContext, "Starting subscribe") + go cs.subscribe(cs.subscribeContext) +} + func (cs *CommandService) IsConnected() bool { return cs.isConnected.Load() } diff --git a/internal/command/command_service_test.go b/internal/command/command_service_test.go index 899653f8f..0901bb0ab 100644 --- a/internal/command/command_service_test.go +++ b/internal/command/command_service_test.go @@ -77,7 +77,7 @@ func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, erro }, nil } -func TestCommandService_NewCommandService(t *testing.T) { +func TestCommandService_StartSubscribe(t *testing.T) { ctx := context.Background() commandServiceClient := &v1fakes.FakeCommandServiceClient{} @@ -90,6 +90,8 @@ func TestCommandService_NewCommandService(t *testing.T) { defer commandService.CancelSubscription(ctx) + commandService.StartSubscription() + assert.Eventually( t, func() bool { return commandServiceClient.SubscribeCallCount() > 0 }, @@ -113,6 +115,7 @@ func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) { types.AgentConfig(), subscribeChannel, ) + commandService.StartSubscription() nginxInstance := protos.GetNginxOssInstance([]string{}) commandService.resourceMutex.Lock() diff --git a/internal/command/commandfakes/fake_command_service.go b/internal/command/commandfakes/fake_command_service.go index 2df848b5e..45f8afe0b 100644 --- a/internal/command/commandfakes/fake_command_service.go +++ b/internal/command/commandfakes/fake_command_service.go @@ -50,6 +50,10 @@ type FakeCommandService struct { sendDataPlaneResponseReturnsOnCall map[int]struct { result1 error } + StartSubscriptionStub func() + startSubscriptionMutex sync.RWMutex + startSubscriptionArgsForCall []struct { + } UpdateDataPlaneHealthStub func(context.Context, []*v1.InstanceHealth) error updateDataPlaneHealthMutex sync.RWMutex updateDataPlaneHealthArgsForCall []struct { @@ -290,6 +294,30 @@ func (fake *FakeCommandService) SendDataPlaneResponseReturnsOnCall(i int, result }{result1} } +func (fake *FakeCommandService) StartSubscription() { + fake.startSubscriptionMutex.Lock() + fake.startSubscriptionArgsForCall = append(fake.startSubscriptionArgsForCall, struct { + }{}) + stub := fake.StartSubscriptionStub + fake.recordInvocation("StartSubscription", []interface{}{}) + fake.startSubscriptionMutex.Unlock() + if stub != nil { + fake.StartSubscriptionStub() + } +} + +func (fake *FakeCommandService) StartSubscriptionCallCount() int { + fake.startSubscriptionMutex.RLock() + defer fake.startSubscriptionMutex.RUnlock() + return len(fake.startSubscriptionArgsForCall) +} + +func (fake *FakeCommandService) StartSubscriptionCalls(stub func()) { + fake.startSubscriptionMutex.Lock() + defer fake.startSubscriptionMutex.Unlock() + fake.StartSubscriptionStub = stub +} + func (fake *FakeCommandService) UpdateDataPlaneHealth(arg1 context.Context, arg2 []*v1.InstanceHealth) error { var arg2Copy []*v1.InstanceHealth if arg2 != nil { @@ -430,6 +458,8 @@ func (fake *FakeCommandService) Invocations() map[string][][]interface{} { defer fake.isConnectedMutex.RUnlock() fake.sendDataPlaneResponseMutex.RLock() defer fake.sendDataPlaneResponseMutex.RUnlock() + fake.startSubscriptionMutex.RLock() + defer fake.startSubscriptionMutex.RUnlock() fake.updateDataPlaneHealthMutex.RLock() defer fake.updateDataPlaneHealthMutex.RUnlock() fake.updateDataPlaneStatusMutex.RLock() From 5783936fed9ef88d3c3ec020cd65b4868fb6e4bb Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 30 Jan 2025 13:54:32 +0000 Subject: [PATCH 2/6] PR feedback --- internal/command/command_plugin.go | 27 ++++-- internal/command/command_plugin_test.go | 34 ++++++- internal/command/command_service.go | 26 +----- internal/command/command_service_test.go | 44 +-------- .../commandfakes/fake_command_service.go | 90 +++++++------------ 5 files changed, 91 insertions(+), 130 deletions(-) diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index ad278d8cc..7768784bc 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -8,6 +8,7 @@ package command import ( "context" "log/slog" + "sync" "google.golang.org/protobuf/types/known/timestamppb" @@ -30,18 +31,19 @@ type ( UpdateDataPlaneStatus(ctx context.Context, resource *mpi.Resource) error UpdateDataPlaneHealth(ctx context.Context, instanceHealths []*mpi.InstanceHealth) error SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error - CancelSubscription(ctx context.Context) + Subscribe(ctx context.Context) IsConnected() bool - StartSubscription() CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error) } CommandPlugin struct { messagePipe bus.MessagePipeInterface config *config.Config + subscribeCancel context.CancelFunc conn grpc.GrpcConnectionInterface commandService commandService subscribeChannel chan *mpi.ManagementPlaneRequest + subscribeMutex sync.Mutex } ) @@ -57,7 +59,7 @@ func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeIn slog.DebugContext(ctx, "Starting command plugin") cp.messagePipe = messagePipe - cp.commandService = NewCommandService(ctx, cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel) + cp.commandService = NewCommandService(cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel) go cp.monitorSubscribeChannel(ctx) @@ -65,7 +67,14 @@ func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeIn } func (cp *CommandPlugin) Close(ctx context.Context) error { - cp.commandService.CancelSubscription(ctx) + slog.InfoContext(ctx, "Canceling subscribe context") + + cp.subscribeMutex.Lock() + if cp.subscribeCancel != nil { + cp.subscribeCancel() + } + cp.subscribeMutex.Unlock() + return cp.conn.Close(ctx) } @@ -104,12 +113,20 @@ func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Mes } func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Resource) { + var subscribeCtx context.Context + createConnectionResponse, err := cp.commandService.CreateConnection(ctx, resource) if err != nil { slog.ErrorContext(ctx, "Unable to create connection", "error", err) } + if createConnectionResponse != nil { - cp.commandService.StartSubscription() + cp.subscribeMutex.Lock() + subscribeCtx, cp.subscribeCancel = context.WithCancel(ctx) + cp.subscribeMutex.Unlock() + + go cp.commandService.Subscribe(subscribeCtx) + cp.messagePipe.Process(ctx, &bus.Message{ Topic: bus.ConnectionCreatedTopic, Data: createConnectionResponse, diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index ee36f5d73..ce4d89d86 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -70,7 +70,39 @@ func TestCommandPlugin_Init(t *testing.T) { closeError := commandPlugin.Close(ctx) require.NoError(t, closeError) - require.Equal(t, 1, fakeCommandService.CancelSubscriptionCallCount()) +} + +func TestCommandPlugin_createConnection(t *testing.T) { + ctx := context.Background() + commandService := &commandfakes.FakeCommandService{} + commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil) + messagePipe := busfakes.NewFakeMessagePipe() + + commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) + err := commandPlugin.Init(ctx, messagePipe) + commandPlugin.commandService = commandService + require.NoError(t, err) + defer commandPlugin.Close(ctx) + + commandPlugin.createConnection(ctx, &mpi.Resource{}) + + assert.Eventually( + t, + func() bool { return commandService.SubscribeCallCount() > 0 }, + 2*time.Second, + 10*time.Millisecond, + ) + + assert.Eventually( + t, + func() bool { return len(messagePipe.GetMessages()) == 1 }, + 2*time.Second, + 10*time.Millisecond, + ) + + messages := messagePipe.GetMessages() + assert.Len(t, messages, 1) + assert.Equal(t, bus.ConnectionCreatedTopic, messages[0].Topic) } func TestCommandPlugin_Process(t *testing.T) { diff --git a/internal/command/command_service.go b/internal/command/command_service.go index fe7e67f2a..436cf1fa1 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -41,12 +41,9 @@ type ( subscribeClient mpi.CommandService_SubscribeClient agentConfig *config.Config isConnected *atomic.Bool - subscribeCancel context.CancelFunc subscribeChannel chan *mpi.ManagementPlaneRequest configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID resource *mpi.Resource - subscribeContext context.Context - subscribeMutex sync.Mutex subscribeClientMutex sync.Mutex configApplyRequestQueueMutex sync.Mutex resourceMutex sync.Mutex @@ -54,7 +51,6 @@ type ( ) func NewCommandService( - ctx context.Context, commandServiceClient mpi.CommandServiceClient, agentConfig *config.Config, subscribeChannel chan *mpi.ManagementPlaneRequest, @@ -71,18 +67,9 @@ func NewCommandService( resource: &mpi.Resource{}, } - commandService.subscribeMutex.Lock() - commandService.subscribeContext, commandService.subscribeCancel = context.WithCancel(ctx) - commandService.subscribeMutex.Unlock() - return commandService } -func (cs *CommandService) StartSubscription() { - slog.DebugContext(cs.subscribeContext, "Starting subscribe") - go cs.subscribe(cs.subscribeContext) -} - func (cs *CommandService) IsConnected() bool { return cs.isConnected.Load() } @@ -192,17 +179,7 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m ) } -func (cs *CommandService) CancelSubscription(ctx context.Context) { - slog.InfoContext(ctx, "Canceling subscribe context") - - cs.subscribeMutex.Lock() - if cs.subscribeCancel != nil { - cs.subscribeCancel() - } - cs.subscribeMutex.Unlock() -} - -func (cs *CommandService) subscribe(ctx context.Context) { +func (cs *CommandService) Subscribe(ctx context.Context) { commonSettings := &config.BackOff{ InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval, MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval, @@ -411,6 +388,7 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error { var err error cs.subscribeClient, err = cs.commandServiceClient.Subscribe(ctx) if err != nil { + slog.Info("Error: ", "", err) subscribeErr := cs.handleSubscribeError(ctx, err, "create subscribe client") cs.subscribeClientMutex.Unlock() diff --git a/internal/command/command_service_test.go b/internal/command/command_service_test.go index 0901bb0ab..0dfb00e95 100644 --- a/internal/command/command_service_test.go +++ b/internal/command/command_service_test.go @@ -77,32 +77,10 @@ func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, erro }, nil } -func TestCommandService_StartSubscribe(t *testing.T) { - ctx := context.Background() - commandServiceClient := &v1fakes.FakeCommandServiceClient{} - - commandService := NewCommandService( - ctx, - commandServiceClient, - types.AgentConfig(), - make(chan *mpi.ManagementPlaneRequest), - ) - - defer commandService.CancelSubscription(ctx) - - commandService.StartSubscription() - - assert.Eventually( - t, - func() bool { return commandServiceClient.SubscribeCallCount() > 0 }, - 2*time.Second, - 10*time.Millisecond, - ) -} - func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) { - ctx := context.Background() fakeSubscribeClient := &FakeConfigApplySubscribeClient{} + ctx := context.Background() + subscribeCtx, subscribeCancel := context.WithCancel(ctx) commandServiceClient := &v1fakes.FakeCommandServiceClient{} commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil) @@ -110,20 +88,18 @@ func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) { subscribeChannel := make(chan *mpi.ManagementPlaneRequest) commandService := NewCommandService( - ctx, commandServiceClient, types.AgentConfig(), subscribeChannel, ) - commandService.StartSubscription() + go commandService.Subscribe(subscribeCtx) + defer subscribeCancel() nginxInstance := protos.GetNginxOssInstance([]string{}) commandService.resourceMutex.Lock() commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance) commandService.resourceMutex.Unlock() - defer commandService.CancelSubscription(ctx) - var wg sync.WaitGroup wg.Add(1) @@ -155,13 +131,10 @@ func TestCommandService_UpdateDataPlaneStatus(t *testing.T) { commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil) commandService := NewCommandService( - ctx, commandServiceClient, types.AgentConfig(), make(chan *mpi.ManagementPlaneRequest), ) - defer commandService.CancelSubscription(ctx) - // Fail first time since there are no other instances besides the agent err := commandService.UpdateDataPlaneStatus(ctx, protos.GetHostResource()) require.Error(t, err) @@ -194,12 +167,10 @@ func TestCommandService_UpdateDataPlaneStatusSubscribeError(t *testing.T) { stub.StubLoggerWith(logBuf) commandService := NewCommandService( - ctx, commandServiceClient, types.AgentConfig(), make(chan *mpi.ManagementPlaneRequest), ) - defer commandService.CancelSubscription(ctx) commandService.isConnected.Store(true) @@ -216,7 +187,6 @@ func TestCommandService_CreateConnection(t *testing.T) { commandServiceClient := &v1fakes.FakeCommandServiceClient{} commandService := NewCommandService( - ctx, commandServiceClient, types.AgentConfig(), make(chan *mpi.ManagementPlaneRequest), @@ -233,7 +203,6 @@ func TestCommandService_UpdateDataPlaneHealth(t *testing.T) { commandServiceClient := &v1fakes.FakeCommandServiceClient{} commandService := NewCommandService( - ctx, commandServiceClient, types.AgentConfig(), make(chan *mpi.ManagementPlaneRequest), @@ -264,7 +233,6 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) { subscribeClient := &FakeSubscribeClient{} commandService := NewCommandService( - ctx, commandServiceClient, types.AgentConfig(), make(chan *mpi.ManagementPlaneRequest), @@ -286,14 +254,11 @@ func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) { subscribeChannel := make(chan *mpi.ManagementPlaneRequest) commandService := NewCommandService( - ctx, commandServiceClient, types.AgentConfig(), subscribeChannel, ) - defer commandService.CancelSubscription(ctx) - request1 := &mpi.ManagementPlaneRequest{ MessageMeta: &mpi.MessageMeta{ MessageId: "1", @@ -405,7 +370,6 @@ func TestCommandService_isValidRequest(t *testing.T) { subscribeClient := &FakeSubscribeClient{} commandService := NewCommandService( - ctx, commandServiceClient, types.AgentConfig(), make(chan *mpi.ManagementPlaneRequest), diff --git a/internal/command/commandfakes/fake_command_service.go b/internal/command/commandfakes/fake_command_service.go index 45f8afe0b..0748ce080 100644 --- a/internal/command/commandfakes/fake_command_service.go +++ b/internal/command/commandfakes/fake_command_service.go @@ -9,11 +9,6 @@ import ( ) type FakeCommandService struct { - CancelSubscriptionStub func(context.Context) - cancelSubscriptionMutex sync.RWMutex - cancelSubscriptionArgsForCall []struct { - arg1 context.Context - } CreateConnectionStub func(context.Context, *v1.Resource) (*v1.CreateConnectionResponse, error) createConnectionMutex sync.RWMutex createConnectionArgsForCall []struct { @@ -50,9 +45,10 @@ type FakeCommandService struct { sendDataPlaneResponseReturnsOnCall map[int]struct { result1 error } - StartSubscriptionStub func() - startSubscriptionMutex sync.RWMutex - startSubscriptionArgsForCall []struct { + SubscribeStub func(context.Context) + subscribeMutex sync.RWMutex + subscribeArgsForCall []struct { + arg1 context.Context } UpdateDataPlaneHealthStub func(context.Context, []*v1.InstanceHealth) error updateDataPlaneHealthMutex sync.RWMutex @@ -82,38 +78,6 @@ type FakeCommandService struct { invocationsMutex sync.RWMutex } -func (fake *FakeCommandService) CancelSubscription(arg1 context.Context) { - fake.cancelSubscriptionMutex.Lock() - fake.cancelSubscriptionArgsForCall = append(fake.cancelSubscriptionArgsForCall, struct { - arg1 context.Context - }{arg1}) - stub := fake.CancelSubscriptionStub - fake.recordInvocation("CancelSubscription", []interface{}{arg1}) - fake.cancelSubscriptionMutex.Unlock() - if stub != nil { - fake.CancelSubscriptionStub(arg1) - } -} - -func (fake *FakeCommandService) CancelSubscriptionCallCount() int { - fake.cancelSubscriptionMutex.RLock() - defer fake.cancelSubscriptionMutex.RUnlock() - return len(fake.cancelSubscriptionArgsForCall) -} - -func (fake *FakeCommandService) CancelSubscriptionCalls(stub func(context.Context)) { - fake.cancelSubscriptionMutex.Lock() - defer fake.cancelSubscriptionMutex.Unlock() - fake.CancelSubscriptionStub = stub -} - -func (fake *FakeCommandService) CancelSubscriptionArgsForCall(i int) context.Context { - fake.cancelSubscriptionMutex.RLock() - defer fake.cancelSubscriptionMutex.RUnlock() - argsForCall := fake.cancelSubscriptionArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeCommandService) CreateConnection(arg1 context.Context, arg2 *v1.Resource) (*v1.CreateConnectionResponse, error) { fake.createConnectionMutex.Lock() ret, specificReturn := fake.createConnectionReturnsOnCall[len(fake.createConnectionArgsForCall)] @@ -294,28 +258,36 @@ func (fake *FakeCommandService) SendDataPlaneResponseReturnsOnCall(i int, result }{result1} } -func (fake *FakeCommandService) StartSubscription() { - fake.startSubscriptionMutex.Lock() - fake.startSubscriptionArgsForCall = append(fake.startSubscriptionArgsForCall, struct { - }{}) - stub := fake.StartSubscriptionStub - fake.recordInvocation("StartSubscription", []interface{}{}) - fake.startSubscriptionMutex.Unlock() +func (fake *FakeCommandService) Subscribe(arg1 context.Context) { + fake.subscribeMutex.Lock() + fake.subscribeArgsForCall = append(fake.subscribeArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.SubscribeStub + fake.recordInvocation("Subscribe", []interface{}{arg1}) + fake.subscribeMutex.Unlock() if stub != nil { - fake.StartSubscriptionStub() + fake.SubscribeStub(arg1) } } -func (fake *FakeCommandService) StartSubscriptionCallCount() int { - fake.startSubscriptionMutex.RLock() - defer fake.startSubscriptionMutex.RUnlock() - return len(fake.startSubscriptionArgsForCall) +func (fake *FakeCommandService) SubscribeCallCount() int { + fake.subscribeMutex.RLock() + defer fake.subscribeMutex.RUnlock() + return len(fake.subscribeArgsForCall) } -func (fake *FakeCommandService) StartSubscriptionCalls(stub func()) { - fake.startSubscriptionMutex.Lock() - defer fake.startSubscriptionMutex.Unlock() - fake.StartSubscriptionStub = stub +func (fake *FakeCommandService) SubscribeCalls(stub func(context.Context)) { + fake.subscribeMutex.Lock() + defer fake.subscribeMutex.Unlock() + fake.SubscribeStub = stub +} + +func (fake *FakeCommandService) SubscribeArgsForCall(i int) context.Context { + fake.subscribeMutex.RLock() + defer fake.subscribeMutex.RUnlock() + argsForCall := fake.subscribeArgsForCall[i] + return argsForCall.arg1 } func (fake *FakeCommandService) UpdateDataPlaneHealth(arg1 context.Context, arg2 []*v1.InstanceHealth) error { @@ -450,16 +422,14 @@ func (fake *FakeCommandService) UpdateDataPlaneStatusReturnsOnCall(i int, result func (fake *FakeCommandService) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.cancelSubscriptionMutex.RLock() - defer fake.cancelSubscriptionMutex.RUnlock() fake.createConnectionMutex.RLock() defer fake.createConnectionMutex.RUnlock() fake.isConnectedMutex.RLock() defer fake.isConnectedMutex.RUnlock() fake.sendDataPlaneResponseMutex.RLock() defer fake.sendDataPlaneResponseMutex.RUnlock() - fake.startSubscriptionMutex.RLock() - defer fake.startSubscriptionMutex.RUnlock() + fake.subscribeMutex.RLock() + defer fake.subscribeMutex.RUnlock() fake.updateDataPlaneHealthMutex.RLock() defer fake.updateDataPlaneHealthMutex.RUnlock() fake.updateDataPlaneStatusMutex.RLock() From 082e85d237efa202943a034d55f48b3345bb89a8 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 30 Jan 2025 14:25:07 +0000 Subject: [PATCH 3/6] fix race condition --- internal/command/command_plugin_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index ce4d89d86..f08601637 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -339,12 +339,12 @@ func TestCommandPlugin_FeatureDisabled(t *testing.T) { func TestMonitorSubscribeChannel(t *testing.T) { ctx, cncl := context.WithCancel(context.Background()) - defer cncl() logBuf := &bytes.Buffer{} stub.StubLoggerWith(logBuf) cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) + cp.subscribeCancel = cncl message := protos.CreateManagementPlaneRequest() @@ -359,7 +359,7 @@ func TestMonitorSubscribeChannel(t *testing.T) { // Give some time to process the message time.Sleep(100 * time.Millisecond) - cncl() + cp.Close(ctx) time.Sleep(100 * time.Millisecond) From c50e3331f9048a1dbb29c56067dff0567ca180f9 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 30 Jan 2025 17:38:53 +0000 Subject: [PATCH 4/6] clean up --- internal/command/command_service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 436cf1fa1..58cfdec48 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -388,7 +388,6 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error { var err error cs.subscribeClient, err = cs.commandServiceClient.Subscribe(ctx) if err != nil { - slog.Info("Error: ", "", err) subscribeErr := cs.handleSubscribeError(ctx, err, "create subscribe client") cs.subscribeClientMutex.Unlock() From 45a97141fc3fe1fa86df8a444703c75ca28356d8 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 31 Jan 2025 09:42:20 +0000 Subject: [PATCH 5/6] clean up --- internal/grpc/grpc.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index ebdf8693e..f08610ffa 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -68,6 +68,7 @@ var ( _ GrpcConnectionInterface = (*GrpcConnection)(nil) ) +// nolint: ireturn func NewGrpcConnection(ctx context.Context, agentConfig *config.Config) (*GrpcConnection, error) { if agentConfig == nil || agentConfig.Command.Server.Type != config.Grpc { return nil, errors.New("invalid command server settings") From 8d19a2f3d97cb9f7527a9bff997240c3e478a2b5 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Tue, 4 Feb 2025 13:20:53 +0000 Subject: [PATCH 6/6] clean up --- internal/grpc/grpc.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index f08610ffa..ebdf8693e 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -68,7 +68,6 @@ var ( _ GrpcConnectionInterface = (*GrpcConnection)(nil) ) -// nolint: ireturn func NewGrpcConnection(ctx context.Context, agentConfig *config.Config) (*GrpcConnection, error) { if agentConfig == nil || agentConfig.Command.Server.Type != config.Grpc { return nil, errors.New("invalid command server settings")