Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package command
import (
"context"
"log/slog"
"sync"

"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -30,17 +31,19 @@ type (
UpdateDataPlaneStatus(ctx context.Context, resource *mpi.Resource) error
UpdateDataPlaneHealth(ctx context.Context, instanceHealths []*mpi.InstanceHealth) error
SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error
CancelSubscription(ctx context.Context)
Subscribe(ctx context.Context)
IsConnected() bool
CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error)
}

CommandPlugin struct {
messagePipe bus.MessagePipeInterface
config *config.Config
subscribeCancel context.CancelFunc
conn grpc.GrpcConnectionInterface
commandService commandService
subscribeChannel chan *mpi.ManagementPlaneRequest
subscribeMutex sync.Mutex
}
)

Expand All @@ -56,15 +59,22 @@ func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeIn
slog.DebugContext(ctx, "Starting command plugin")

cp.messagePipe = messagePipe
cp.commandService = NewCommandService(ctx, cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)
cp.commandService = NewCommandService(cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)

go cp.monitorSubscribeChannel(ctx)

return nil
}

func (cp *CommandPlugin) Close(ctx context.Context) error {
cp.commandService.CancelSubscription(ctx)
slog.InfoContext(ctx, "Canceling subscribe context")

cp.subscribeMutex.Lock()
if cp.subscribeCancel != nil {
cp.subscribeCancel()
}
cp.subscribeMutex.Unlock()

return cp.conn.Close(ctx)
}

Expand Down Expand Up @@ -103,11 +113,20 @@ func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Mes
}

func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Resource) {
var subscribeCtx context.Context

createConnectionResponse, err := cp.commandService.CreateConnection(ctx, resource)
if err != nil {
slog.ErrorContext(ctx, "Unable to create connection", "error", err)
}

if createConnectionResponse != nil {
cp.subscribeMutex.Lock()
subscribeCtx, cp.subscribeCancel = context.WithCancel(ctx)
cp.subscribeMutex.Unlock()

go cp.commandService.Subscribe(subscribeCtx)

cp.messagePipe.Process(ctx, &bus.Message{
Topic: bus.ConnectionCreatedTopic,
Data: createConnectionResponse,
Expand Down
38 changes: 35 additions & 3 deletions internal/command/command_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,39 @@ func TestCommandPlugin_Init(t *testing.T) {

closeError := commandPlugin.Close(ctx)
require.NoError(t, closeError)
require.Equal(t, 1, fakeCommandService.CancelSubscriptionCallCount())
}

func TestCommandPlugin_createConnection(t *testing.T) {
ctx := context.Background()
commandService := &commandfakes.FakeCommandService{}
commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil)
messagePipe := busfakes.NewFakeMessagePipe()

commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
err := commandPlugin.Init(ctx, messagePipe)
commandPlugin.commandService = commandService
require.NoError(t, err)
defer commandPlugin.Close(ctx)

commandPlugin.createConnection(ctx, &mpi.Resource{})

assert.Eventually(
t,
func() bool { return commandService.SubscribeCallCount() > 0 },
2*time.Second,
10*time.Millisecond,
)

assert.Eventually(
t,
func() bool { return len(messagePipe.GetMessages()) == 1 },
2*time.Second,
10*time.Millisecond,
)

messages := messagePipe.GetMessages()
assert.Len(t, messages, 1)
assert.Equal(t, bus.ConnectionCreatedTopic, messages[0].Topic)
}

func TestCommandPlugin_Process(t *testing.T) {
Expand Down Expand Up @@ -307,12 +339,12 @@ func TestCommandPlugin_FeatureDisabled(t *testing.T) {

func TestMonitorSubscribeChannel(t *testing.T) {
ctx, cncl := context.WithCancel(context.Background())
defer cncl()

logBuf := &bytes.Buffer{}
stub.StubLoggerWith(logBuf)

cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
cp.subscribeCancel = cncl

message := protos.CreateManagementPlaneRequest()

Expand All @@ -327,7 +359,7 @@ func TestMonitorSubscribeChannel(t *testing.T) {
// Give some time to process the message
time.Sleep(100 * time.Millisecond)

cncl()
cp.Close(ctx)

time.Sleep(100 * time.Millisecond)

Expand Down
23 changes: 1 addition & 22 deletions internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,16 @@ type (
subscribeClient mpi.CommandService_SubscribeClient
agentConfig *config.Config
isConnected *atomic.Bool
subscribeCancel context.CancelFunc
subscribeChannel chan *mpi.ManagementPlaneRequest
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
resource *mpi.Resource
subscribeMutex sync.Mutex
subscribeClientMutex sync.Mutex
configApplyRequestQueueMutex sync.Mutex
resourceMutex sync.Mutex
}
)

func NewCommandService(
ctx context.Context,
commandServiceClient mpi.CommandServiceClient,
agentConfig *config.Config,
subscribeChannel chan *mpi.ManagementPlaneRequest,
Expand All @@ -70,14 +67,6 @@ func NewCommandService(
resource: &mpi.Resource{},
}

var subscribeCtx context.Context

commandService.subscribeMutex.Lock()
subscribeCtx, commandService.subscribeCancel = context.WithCancel(ctx)
commandService.subscribeMutex.Unlock()

go commandService.subscribe(subscribeCtx)

return commandService
}

Expand Down Expand Up @@ -190,17 +179,7 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m
)
}

func (cs *CommandService) CancelSubscription(ctx context.Context) {
slog.InfoContext(ctx, "Canceling subscribe context")

cs.subscribeMutex.Lock()
if cs.subscribeCancel != nil {
cs.subscribeCancel()
}
cs.subscribeMutex.Unlock()
}

func (cs *CommandService) subscribe(ctx context.Context) {
func (cs *CommandService) Subscribe(ctx context.Context) {
commonSettings := &config.BackOff{
InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval,
MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval,
Expand Down
41 changes: 4 additions & 37 deletions internal/command/command_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,50 +77,29 @@ func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, erro
}, nil
}

func TestCommandService_NewCommandService(t *testing.T) {
ctx := context.Background()
commandServiceClient := &v1fakes.FakeCommandServiceClient{}

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
)

defer commandService.CancelSubscription(ctx)

assert.Eventually(
t,
func() bool { return commandServiceClient.SubscribeCallCount() > 0 },
2*time.Second,
10*time.Millisecond,
)
}

func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
ctx := context.Background()
fakeSubscribeClient := &FakeConfigApplySubscribeClient{}
ctx := context.Background()
subscribeCtx, subscribeCancel := context.WithCancel(ctx)

commandServiceClient := &v1fakes.FakeCommandServiceClient{}
commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil)

subscribeChannel := make(chan *mpi.ManagementPlaneRequest)

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
subscribeChannel,
)
go commandService.Subscribe(subscribeCtx)
defer subscribeCancel()

nginxInstance := protos.GetNginxOssInstance([]string{})
commandService.resourceMutex.Lock()
commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance)
commandService.resourceMutex.Unlock()

defer commandService.CancelSubscription(ctx)

var wg sync.WaitGroup

wg.Add(1)
Expand Down Expand Up @@ -152,13 +131,10 @@ func TestCommandService_UpdateDataPlaneStatus(t *testing.T) {
commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil)

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
)
defer commandService.CancelSubscription(ctx)

// Fail first time since there are no other instances besides the agent
err := commandService.UpdateDataPlaneStatus(ctx, protos.GetHostResource())
require.Error(t, err)
Expand Down Expand Up @@ -191,12 +167,10 @@ func TestCommandService_UpdateDataPlaneStatusSubscribeError(t *testing.T) {
stub.StubLoggerWith(logBuf)

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
)
defer commandService.CancelSubscription(ctx)

commandService.isConnected.Store(true)

Expand All @@ -213,7 +187,6 @@ func TestCommandService_CreateConnection(t *testing.T) {
commandServiceClient := &v1fakes.FakeCommandServiceClient{}

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
Expand All @@ -230,7 +203,6 @@ func TestCommandService_UpdateDataPlaneHealth(t *testing.T) {
commandServiceClient := &v1fakes.FakeCommandServiceClient{}

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
Expand Down Expand Up @@ -261,7 +233,6 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) {
subscribeClient := &FakeSubscribeClient{}

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
Expand All @@ -283,14 +254,11 @@ func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) {
subscribeChannel := make(chan *mpi.ManagementPlaneRequest)

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
subscribeChannel,
)

defer commandService.CancelSubscription(ctx)

request1 := &mpi.ManagementPlaneRequest{
MessageMeta: &mpi.MessageMeta{
MessageId: "1",
Expand Down Expand Up @@ -402,7 +370,6 @@ func TestCommandService_isValidRequest(t *testing.T) {
subscribeClient := &FakeSubscribeClient{}

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
Expand Down
Loading