@@ -56,6 +56,7 @@ func (*FakeConfigApplySubscribeClient) Send(*mpi.DataPlaneResponse) error {
5656
5757// nolint: nilnil
5858func (* FakeConfigApplySubscribeClient ) Recv () (* mpi.ManagementPlaneRequest , error ) {
59+ protos .CreateManagementPlaneRequest ()
5960 return & mpi.ManagementPlaneRequest {
6061 MessageMeta : & mpi.MessageMeta {
6162 MessageId : "1" ,
@@ -65,7 +66,6 @@ func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, erro
6566 Request : & mpi.ManagementPlaneRequest_ConfigApplyRequest {
6667 ConfigApplyRequest : & mpi.ConfigApplyRequest {
6768 Overview : & mpi.FileOverview {
68- Files : []* mpi.File {},
6969 ConfigVersion : & mpi.ConfigVersion {
7070 InstanceId : "12314" ,
7171 Version : "4215432" ,
@@ -104,23 +104,37 @@ func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
104104 commandServiceClient := & v1fakes.FakeCommandServiceClient {}
105105 commandServiceClient .SubscribeReturns (fakeSubscribeClient , nil )
106106
107+ subscribeChannel := make (chan * mpi.ManagementPlaneRequest )
108+
107109 commandService := NewCommandService (
108110 ctx ,
109111 commandServiceClient ,
110112 types .AgentConfig (),
111- make ( chan * mpi. ManagementPlaneRequest ) ,
113+ subscribeChannel ,
112114 )
113115
114116 defer commandService .CancelSubscription (ctx )
115117
118+ var wg sync.WaitGroup
119+
120+ wg .Add (1 )
121+ go func () {
122+ requestFromChannel := <- subscribeChannel
123+ assert .NotNil (t , requestFromChannel )
124+ wg .Done ()
125+ }()
126+
116127 assert .Eventually (
117128 t ,
118129 func () bool { return commandServiceClient .SubscribeCallCount () > 0 },
119130 2 * time .Second ,
120131 10 * time .Millisecond ,
121132 )
122133
134+ commandService .configApplyRequestQueueMutex .Lock ()
135+ defer commandService .configApplyRequestQueueMutex .Unlock ()
123136 assert .Len (t , commandService .configApplyRequestQueue , 1 )
137+ wg .Wait ()
124138}
125139
126140func TestCommandService_UpdateDataPlaneStatus (t * testing.T ) {
@@ -269,6 +283,8 @@ func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) {
269283 subscribeChannel ,
270284 )
271285
286+ defer commandService .CancelSubscription (ctx )
287+
272288 request1 := & mpi.ManagementPlaneRequest {
273289 MessageMeta : & mpi.MessageMeta {
274290 MessageId : "1" ,
@@ -367,6 +383,8 @@ func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) {
367383
368384 require .NoError (t , err )
369385
386+ commandService .configApplyRequestQueueMutex .Lock ()
387+ defer commandService .configApplyRequestQueueMutex .Unlock ()
370388 assert .Len (t , commandService .configApplyRequestQueue , 1 )
371389 assert .Equal (t , request3 , commandService .configApplyRequestQueue ["12314" ][0 ])
372390 wg .Wait ()
0 commit comments