Skip to content

worker config [WIP] #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
module github.com/crowdsecurity/crowdsec-spoa

go 1.22
go 1.23.0

toolchain go1.23.5

require (
github.com/crowdsecurity/crowdsec v1.6.3
github.com/crowdsecurity/go-cs-bouncer v0.0.14
github.com/crowdsecurity/go-cs-lib v0.0.15
github.com/crowdsecurity/go-cs-lib v0.0.18
github.com/davecgh/go-spew v1.1.1
github.com/google/uuid v1.6.0
github.com/negasus/haproxy-spoe-go v1.0.5
github.com/negasus/haproxy-spoe-go v1.0.6
github.com/oschwald/geoip2-golang v1.9.0
github.com/prometheus/client_golang v1.20.4
github.com/prometheus/client_model v0.6.1
Expand All @@ -24,7 +27,6 @@ require (
github.com/blackfireio/osinfo v1.0.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/expr-lang/expr v1.16.9 // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/go-openapi/analysis v0.23.0 // indirect
Expand Down Expand Up @@ -53,7 +55,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
go.mongodb.org/mongo-driver v1.17.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/crowdsecurity/go-cs-bouncer v0.0.14 h1:0hxOaa59pMT274qDzJXNxps4QfMnhS
github.com/crowdsecurity/go-cs-bouncer v0.0.14/go.mod h1:4nSF37v7i98idHM6cw1o0V0XgiY25EjTLfFFXvqg6OA=
github.com/crowdsecurity/go-cs-lib v0.0.15 h1:zNWqOPVLHgKUstlr6clom9d66S0eIIW66jQG3Y7FEvo=
github.com/crowdsecurity/go-cs-lib v0.0.15/go.mod h1:ePyQyJBxp1W/1bq4YpVAilnLSz7HkzmtI7TRhX187EU=
github.com/crowdsecurity/go-cs-lib v0.0.18 h1:GNyvaag5MXfuapIy4E30pIOvIE5AyHoanJBNSMA1cmE=
github.com/crowdsecurity/go-cs-lib v0.0.18/go.mod h1:XwGcvTt4lMq4Tm1IRMSKMDf0CVrnytTU8Uoofa7AR+g=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -86,6 +88,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/negasus/haproxy-spoe-go v1.0.5 h1:iMUOg/WTdwh4qOD5VUWqXElIG6YefqdOZbTzbVXN8ZU=
github.com/negasus/haproxy-spoe-go v1.0.5/go.mod h1:ZrBizxtx2EeLN37Jkg9w9g32a1AFCJizA8vg46PaAp4=
github.com/negasus/haproxy-spoe-go v1.0.6 h1:uJ5coC6n0p4tI0MbVPna4ztFTpW1P3pzswvjuKAF8X4=
github.com/negasus/haproxy-spoe-go v1.0.6/go.mod h1:ZrBizxtx2EeLN37Jkg9w9g32a1AFCJizA8vg46PaAp4=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oschwald/geoip2-golang v1.9.0 h1:uvD3O6fXAXs+usU+UGExshpdP13GAqp4GBrzN7IgKZc=
Expand Down Expand Up @@ -123,6 +127,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
44 changes: 28 additions & 16 deletions internal/worker/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package worker

import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"syscall"
Expand All @@ -11,28 +13,31 @@ import (
)

type Worker struct {
Name string `yaml:"name"`
ListenAddr string `yaml:"listen_addr"`
ListenSocket string `yaml:"listen_socket"`
LogLevel *log.Level `yaml:"log_level"`
Uid int `yaml:"-"` // Set by the worker manager
Gid int `yaml:"-"` // Set by the worker manager
Command *exec.Cmd `yaml:"-"`
SocketPath string `yaml:"-"` // Set by combining the socket dir and the worker name
Name string `yaml:"name"`
Config string `yaml:"config"`
LogLevel *log.Level `yaml:"log_level"`
Uid int `yaml:"-"` // Set by the worker manager
Gid int `yaml:"-"` // Set by the worker manager
Command *exec.Cmd `yaml:"-"`
SocketPath string `yaml:"-"` // Set by combining the socket dir and the worker name
}

func (w *Worker) Run(socket string) {
type WorkerConfig struct {
ListenAddr string `yaml:"listen_addr"`
ListenSocket string `yaml:"listen_socket"`
}

func (w *Worker) Run(socket string) error {
args := []string{
"-worker",
}

if w.ListenAddr != "" {
args = append(args, "-tcp", w.ListenAddr)
}
if w.ListenSocket != "" {
args = append(args, "-unix", w.ListenSocket)
config, err := json.Marshal(*w)
if err != nil {
return fmt.Errorf("failed to marshal appsec config: %w", err)
}

args = append(args, "-config", string(config))
command := exec.Command(os.Args[0], args...)

command.Env = []string{
Expand All @@ -43,7 +48,6 @@ func (w *Worker) Run(socket string) {
if w.LogLevel != nil {
command.Env = append(command.Env, "LOG_LEVEL="+w.LogLevel.String())
}

log.Infof("Starting worker %s with cmd %s %v", w.Name, os.Args[0], args)

command.SysProcAttr = &syscall.SysProcAttr{}
Expand All @@ -63,6 +67,7 @@ func (w *Worker) Run(socket string) {
}

log.Infof("Worker %s exited", w.Name)
return nil
}

type Manager struct {
Expand Down Expand Up @@ -105,7 +110,14 @@ func (m *Manager) AddWorker(w *Worker) {
log.Errorf("failed to create worker listener: %s", err)
return
}
go w.Run(socketString)

go func() {
err := w.Run(socketString)
if err != nil {
m.Stop()
}
}()

m.Workers = append(m.Workers, w)
}

Expand Down
185 changes: 185 additions & 0 deletions internal/worker/worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// worker_test.go
package worker

import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"testing"
"time"

"github.com/crowdsecurity/crowdsec-spoa/pkg/server"
"github.com/stretchr/testify/assert"
)

// func TestMain implements the helper process trick. When Worker.Run spawns a new process,
// the test binary is re-invoked. In that case we check for the "-worker" flag and exit immediately.
func TestMain(m *testing.M) {
configFlag := flag.String("config", "", "Configuration JSON")
workerFlag := flag.Bool("worker", false, "Worker flag")
flag.Parse()

if *workerFlag {
var config Worker
err := json.Unmarshal([]byte(*configFlag), &config)
if err != nil {
os.Exit(1)
}

switch config.Name {
case "test-worker-1":
os.Exit(0)
case "test-worker-2":
os.Exit(1)
case "test-worker-3":
// Create a channel to receive OS signals.
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Block until a signal is received.
sig := <-sigChan
fmt.Printf("Received signal: %s, exiting...\n", sig)
os.Exit(0)
}
}

os.Exit(m.Run())
}

// TestManagerAddWorkerWithSuccess tests the AddWorker method when NewWorkerListener succeeds.
func TestManagerAddWorkerWithSuccess(t *testing.T) {
// Create a fake server that returns a dummy socket string.
s := &server.Server{}

// Create a Manager with a cancellable context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mgr := NewManager(ctx, s, 1000, 1000)

// Create a worker.
w := &Worker{
Name: "test-worker-1",
}

// Call AddWorker.
mgr.AddWorker(w)

// Allow some time for the goroutine spawned inside AddWorker to run.
time.Sleep(100 * time.Millisecond)

// Verify that the worker is appended and its fields are set.
if len(mgr.Workers) != 1 {
t.Fatalf("expected 1 worker, got %d", len(mgr.Workers))
}
if w.Uid != 1000 || w.Gid != 1000 {
t.Errorf("expected worker Uid and Gid to be 1000, got %d and %d", w.Uid, w.Gid)
}

// Verify that the command was created and its environment includes expected variables.
if w.Command == nil {
t.Errorf("expected worker command to be set")
} else {
foundWorkerName := false
foundWorkerSocket := false
for _, env := range w.Command.Env {
if env == "WORKERNAME="+w.Name {
foundWorkerName = true
}
if env == "WORKERSOCKET=crowdsec-spoa-worker-test-worker-1.sock" {
foundWorkerSocket = true
}
}
if !foundWorkerName {
t.Errorf("expected WORKERNAME in command env")
}
if !foundWorkerSocket {
t.Errorf("expected WORKERSOCKET in command env")
}
}

assert.NotNil(t, w.Command, "expected worker command to be set")
expectedCommandPrefix := "/tmp/go-build"
expectedCommandSuffix := `worker.test -worker -config {"Name":"test-worker-1","Config":"","LogLevel":null,"Uid":1000,"Gid":1000,"Command":null,"SocketPath":""}`
commandString := w.Command.String()
assert.True(t, strings.HasPrefix(commandString, expectedCommandPrefix), "expected worker command to start with %s", expectedCommandPrefix)
assert.True(t, strings.HasSuffix(commandString, expectedCommandSuffix), "expected worker command to end with %s", expectedCommandSuffix)
mgr.Stop()

s.Close()
}

// TestManagerAddWorker_NewWorkerListenerError tests that when NewWorkerListener fails,
// the worker is not added to the Manager.
func TestManagerAddWorkerNewWorkerListenerError(t *testing.T) {
s := &server.Server{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mgr := NewManager(ctx, s, 1000, 1000)

w := &Worker{
Name: "test-worker-2",
}

// Call AddWorker. Since the fake server returns an error, AddWorker should return early.
mgr.AddWorker(w)
time.Sleep(100 * time.Millisecond)

// Wait briefly.

assert.Equal(t, 1, len(mgr.Workers), "expected 1 worker even it failed to start")
if len(mgr.Workers) == 0 {
t.Fatalf("expected 1 worker, got %d", len(mgr.Workers))
}
assert.NotNil(t, mgr.Workers, "expected worker command to be set")
assert.Nil(t, w.Command, "expected worker command to be nil")
assert.Nil(t, mgr.Workers[0].Command, "expected worker command to be nil")

mgr.Stop()
s.Close()
t.Logf("Don't care any consideration of the error")
}

// TestManagerAddWorker_NewWorkerListenerError tests that when NewWorkerListener fails,
// the worker is not added to the Manager.
func TestManagerAddWorkersNewWorkerListenerError(t *testing.T) {
fmt.Println("TestManagerAddWorkersNewWorkerListenerError")
s := &server.Server{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mgr := NewManager(ctx, s, 1000, 1000)

w3 := &Worker{
Name: "test-worker-3",
}
mgr.AddWorker(w3)

w2 := &Worker{
Name: "test-worker-2",
}
// Call AddWorker. Since the fake server returns an error, AddWorker should return early.
mgr.AddWorker(w2)
time.Sleep(100 * time.Millisecond)
fmt.Printf("workers: %v\n", mgr.Workers[0].Command)
// Wait briefly.
mgr.Stop()

assert.Equal(t, 2, len(mgr.Workers), "expected 2 workers due to NewWorkerListener error")
assert.NotNil(t, mgr.Workers, "expected workers to be set")
assert.Nil(t, w2.Command, "expected worker command to be nil")
assert.Nil(t, mgr.Workers[1].Command, "expected worker command to be nil")
expectedCommandPrefix := "/tmp/go-build"
expectedCommandSuffix := `worker.test -worker -config {"Name":"test-worker-3","Config":"","LogLevel":null,"Uid":1000,"Gid":1000,"Command":null,"SocketPath":""}`
commandString := w3.Command.String()
assert.True(t, strings.HasPrefix(commandString, expectedCommandPrefix), "expected worker command to start with %s", expectedCommandPrefix)
assert.True(t, strings.HasSuffix(commandString, expectedCommandSuffix), "expected worker command to end with %s", expectedCommandSuffix)

s.Close()

}
16 changes: 13 additions & 3 deletions pkg/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,20 @@ func (s *Server) NewAdminListener(path string) error {

s.listeners = append(s.listeners, &l)

go s.Run(&l)

go func() {
err := s.Run(&l)
if err != nil {
s.logger.Infof("admin listener got an error: %s", err)
}
}()
return nil
}

func (s *Server) NewWorkerListener(name string, gid int) (string, error) {
socketString := fmt.Sprintf("%s%s%s.sock", s.workerSocketDir, WORKER_SOCKET_PREFIX, name)

l, err := newUnixSocket(socketString)
s.logger.Infof("Creating worker socket %s", socketString)

if err != nil {
return "", err
Expand All @@ -97,7 +102,12 @@ func (s *Server) NewWorkerListener(name string, gid int) (string, error) {

s.listeners = append(s.listeners, &l)

go s.Run(&l)
go func() {
err := s.Run(&l)
if err != nil {
s.logger.Infof("worker listener %s got an error: %s", name, err)
}
}()

return socketString, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/spoa/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (s *Spoa) ServeUnix(ctx context.Context) error {

go func() {
defer close(errorChan)
if err := s.Server.Serve(s.ListenAddr); err != nil {
if err := s.Server.Serve(s.ListenSocket); err != nil {
errorChan <- err
}
}()
Expand Down
Loading