Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 58 additions & 44 deletions extension/alloyengine/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,53 @@ import (
"go.uber.org/zap"

"github.com/grafana/alloy/flowcmd"
"github.com/grafana/alloy/internal/readyctx"
"github.com/spf13/cobra"
)

var _ extension.Extension = (*alloyEngineExtension)(nil)

type state int

var (
stateNotStarted state = 0
stateRunning state = 1
stateShuttingDown state = 2
stateTerminated state = 3
const (
stateNotStarted state = iota
stateStarting
stateRunning
stateShuttingDown
stateTerminated
stateRunError
)

func (e *alloyEngineExtension) setState(newState state) {
e.stateMutex.Lock()
defer e.stateMutex.Unlock()
oldState := e.state
e.state = newState
if oldState != newState {
e.settings.Logger.Info("alloyengine extension state changed", zap.String("from", oldState.String()), zap.String("to", newState.String()))
}
}

func (e *alloyEngineExtension) getState() state {
e.stateMutex.Lock()
defer e.stateMutex.Unlock()
return e.state
}

func (s state) String() string {
switch s {
case stateNotStarted:
return "not_started"
case stateStarting:
return "starting"
case stateRunning:
return "running"
case stateShuttingDown:
return "shutting_down"
case stateTerminated:
return "terminated"
case stateRunError:
return "run_error"
}
return fmt.Sprintf("unknown_state_%d", s)
}
Expand All @@ -62,48 +85,46 @@ func newAlloyEngineExtension(config *Config, settings component.TelemetrySetting
}
}

// Start is called when the extension is started.
func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) error {
e.stateMutex.Lock()
defer e.stateMutex.Unlock()

switch e.state {
currentState := e.getState()
switch currentState {
case stateNotStarted:
break
default:
return fmt.Errorf("cannot start alloyengine extension in current state: %s", e.state.String())
return fmt.Errorf("cannot start alloyengine extension in current state: %s", currentState)
}

runCtx, runCancel := context.WithCancel(context.Background())
e.runCancel = runCancel
e.runExited = make(chan struct{})

runCommand := e.runCommandFactory()
runCommand.SetArgs([]string{e.config.AlloyConfig.File})
err := runCommand.ParseFlags(e.config.flagsAsSlice())
if err != nil {
return fmt.Errorf("failed to parse flags: %w", err)
}

runCtx, runCancel := context.WithCancel(context.Background())
e.runCancel = runCancel
e.runExited = make(chan struct{})

runCtx = readyctx.WithOnReady(runCtx, func() {
e.setState(stateRunning)
})

e.setState(stateStarting)

go func() {
defer close(e.runExited)

err := e.runWithBackoffRetry(runCommand, runCtx)

e.stateMutex.Lock()
previousState := e.state
e.state = stateTerminated
e.stateMutex.Unlock()
previousState := e.getState()
e.setState(stateTerminated)

if err == nil {
e.settings.Logger.Debug("run command exited successfully without error")
} else if previousState == stateShuttingDown {
e.settings.Logger.Warn("run command exited with an error during shutdown", zap.Error(err))
}
}()

e.state = stateRunning
e.settings.Logger.Info("alloyengine extension started successfully")
return nil
}

Expand All @@ -120,6 +141,7 @@ func (e *alloyEngineExtension) runWithBackoffRetry(runCommand *cobra.Command, ct
}

lastError = err
e.setState(stateRunError)

// exponential backoff until 15s
delay := 15 * time.Second
Expand All @@ -142,15 +164,13 @@ func (e *alloyEngineExtension) runWithBackoffRetry(runCommand *cobra.Command, ct

// Shutdown is called when the extension is being stopped.
func (e *alloyEngineExtension) Shutdown(ctx context.Context) error {
e.stateMutex.Lock()
switch e.state {
case stateRunning:
currentState := e.getState()
switch currentState {
case stateStarting, stateRunning, stateRunError:
e.settings.Logger.Info("alloyengine extension shutting down")
e.state = stateShuttingDown
// guaranteed to be non-nil since we are in stateRunning
e.setState(stateShuttingDown)
// guaranteed to be non-nil since runCancel is set in Start()
e.runCancel()
// unlock so that the run goroutine can transition to terminated
e.stateMutex.Unlock()

select {
case <-e.runExited:
Expand All @@ -161,37 +181,31 @@ func (e *alloyEngineExtension) Shutdown(ctx context.Context) error {
return nil
case stateNotStarted:
e.settings.Logger.Info("alloyengine extension shutdown completed (not started)")
e.stateMutex.Unlock()
return nil
default:
e.settings.Logger.Warn("alloyengine extension shutdown in current state is a no-op", zap.String("state", e.state.String()))
e.stateMutex.Unlock()
e.settings.Logger.Warn("alloyengine extension shutdown in current state is a no-op", zap.String("state", e.getState().String()))
return nil
}
Comment thread
blewis12 marked this conversation as resolved.
}

// Ready returns nil when the extension is ready to process data.
func (e *alloyEngineExtension) Ready() error {
e.stateMutex.Lock()
defer e.stateMutex.Unlock()

switch e.state {
case stateRunning:
currentState := e.getState()
switch currentState {
case stateStarting, stateRunning, stateRunError:
return nil
default:
return fmt.Errorf("alloyengine extension not ready in current state: %s", e.state.String())
return fmt.Errorf("alloyengine extension not ready in current state: %s", currentState.String())
}
}

// NotReady returns an error when the extension is not ready to process data.
func (e *alloyEngineExtension) NotReady() error {
e.stateMutex.Lock()
defer e.stateMutex.Unlock()

switch e.state {
case stateRunning:
currentState := e.getState()
switch currentState {
case stateStarting, stateRunning, stateRunError:
return nil
default:
return fmt.Errorf("alloyengine extension not ready in current state: %s", e.state.String())
return fmt.Errorf("alloyengine extension not ready in current state: %s", currentState.String())
}
}
49 changes: 41 additions & 8 deletions extension/alloyengine/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/grafana/alloy/internal/readyctx"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -33,6 +34,19 @@ func newTestExtension(t *testing.T, factory func() *cobra.Command, config *Confi

// blockingCommand returns a cobra command that blocks until the context is cancelled, then returns nil.
func blockingCommand() *cobra.Command {
return &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
if fn, ok := readyctx.OnReadyFromContext(cmd.Context()); ok && fn != nil {
fn()
}
<-cmd.Context().Done()
return nil
},
}
}

// blockingCommandWithoutReady blocks until context cancellation but never calls the ready callback.
func blockingCommandWithoutReady() *cobra.Command {
return &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
<-cmd.Context().Done()
Expand All @@ -45,6 +59,9 @@ func blockingCommand() *cobra.Command {
func shutdownErrorCommand(err error) *cobra.Command {
return &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
if fn, ok := readyctx.OnReadyFromContext(cmd.Context()); ok && fn != nil {
fn()
}
<-cmd.Context().Done()
return err
},
Expand Down Expand Up @@ -98,7 +115,7 @@ func TestLifecycle_SuccessfulStartAndShutdown(t *testing.T) {
host := componenttest.NewNopHost()

require.NoError(t, e.Start(ctx, host))
require.Equal(t, stateRunning, e.state)
require.Eventually(t, func() bool { return e.getState() == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning")
require.NoError(t, e.Ready())
require.NoError(t, e.NotReady())

Expand All @@ -116,27 +133,43 @@ func TestLifecycle_SuccessfulStartAndShutdown(t *testing.T) {
return false
}
}, 1*time.Second, 25*time.Millisecond, "run command did not exit in time")
require.Equal(t, stateTerminated, e.state)
require.Equal(t, stateTerminated, e.getState())
}

func TestStartTwiceFails(t *testing.T) {
func TestLifecycle_StartTwiceFails(t *testing.T) {
e := newTestExtension(t, blockingCommand, defaultTestConfig())
require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))
err := e.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
}

func TestReadyWhenNotStarted(t *testing.T) {
func TestLifecycle_NotReadyWhenNotStarted(t *testing.T) {
e := newTestExtension(t, blockingCommand, defaultTestConfig())
require.Error(t, e.Ready())
require.Error(t, e.NotReady())
}

func TestShutdownWithRunCommandError(t *testing.T) {
func TestLifecycle_StayInStartingWhenReadyNotCalled(t *testing.T) {
e := newTestExtension(t, blockingCommandWithoutReady, defaultTestConfig())
require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))

// Give the run goroutine time to start and block (without calling ready).
time.Sleep(50 * time.Millisecond)

// We should still be in stateStarting since the ready callback was never invoked.
require.Equal(t, stateStarting, e.getState())

shutdownCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
t.Cleanup(cancel)
require.NoError(t, e.Shutdown(shutdownCtx))
}

func TestLifecycle_ShutdownWithRunCommandError(t *testing.T) {
expected := errors.New("shutdown error")
e := newTestExtension(t, func() *cobra.Command { return shutdownErrorCommand(expected) }, defaultTestConfig())

require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))
require.Eventually(t, func() bool { return e.getState() == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning")

shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
Expand All @@ -151,10 +184,10 @@ func TestShutdownWithRunCommandError(t *testing.T) {
return false
}
}, 1*time.Second, 25*time.Millisecond, "run command did not exit in time")
require.Equal(t, stateTerminated, e.state)
require.Equal(t, stateTerminated, e.getState())
}

func Test_RunSucceedsAfterRetries(t *testing.T) {
func TestLifecycle_RunSucceedsAfterRetries(t *testing.T) {
testErr := errors.New("temporary failure")
factory, state := newRetryTrackingCommand(2, testErr) // Fail 2 times, succeed on 3rd attempt
cfg := defaultTestConfig()
Expand All @@ -175,5 +208,5 @@ func Test_RunSucceedsAfterRetries(t *testing.T) {
// Verify it succeeded after 3 attempts (2 failures + 1 success)
require.Equal(t, 3, state.attempts)
require.Equal(t, 3, state.succeededAt)
require.Equal(t, stateTerminated, e.state)
require.Equal(t, stateTerminated, e.getState())
}
6 changes: 6 additions & 0 deletions internal/alloycli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/alloy/internal/converter"
convert_diag "github.com/grafana/alloy/internal/converter/diag"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/readyctx"
alloy_runtime "github.com/grafana/alloy/internal/runtime"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/runtime/logging/level"
Expand Down Expand Up @@ -462,6 +463,11 @@ func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error {
return err
}

// Signal to the caller (e.g. alloyengine extension) that the default engine is running
if fn, ok := readyctx.OnReadyFromContext(ctx); ok && fn != nil {
fn()
}

// By now, have either joined or started a new cluster.
// Nodes initially join in the Viewer state. After the graph has been
// loaded successfully, we can move to the Participant state to signal that
Expand Down
21 changes: 21 additions & 0 deletions internal/readyctx/readyctx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package readyctx

import "context"

type ctxKey struct{}

func WithOnReady(ctx context.Context, fn func()) context.Context {
if fn == nil {
return ctx
}
return context.WithValue(ctx, ctxKey{}, fn)
}

func OnReadyFromContext(ctx context.Context) (fn func(), ok bool) {
v := ctx.Value(ctxKey{})
if v == nil {
return nil, false
}
fn, ok = v.(func())
return fn, ok
}
Loading