diff --git a/cmd/goodman/main.go b/cmd/goodman/main.go index b4e272c..80e8edd 100644 --- a/cmd/goodman/main.go +++ b/cmd/goodman/main.go @@ -1,77 +1,166 @@ +// The package that the dredd cli calls +// When you run `dredd` from the cli dredd creates a new process and runs this binary with +// a list of hooks to instrument. +// This (goodman) binary brings up an rpc server to recieve commands from dredd +// This (goodman) binary the relays this across the hooks specified on boot package main import ( + "flag" "fmt" "log" + "net" "os" "os/exec" "os/signal" + "strings" "syscall" "time" "github.com/snikch/goodman" ) -var ( - c chan os.Signal - cmds chan *exec.Cmd - runners []goodman.Runner - hookServerInitalPort = 61322 - hooksServerCount int +const ( + // The default port for communicating with the dredd cli + defaultServerPort = 61321 + // How long to wait for a hook binary to begin responding + hookServerWait = 100 * time.Millisecond + hookServerRetries = 5 ) +func closeHookRunners(runners []goodman.Runner) []error { + errs := []error{} + for _, runner := range runners { + if err := runner.Close(); err != nil { + errs = append(errs, err) + } + } + return errs +} + +// Determine if an err represents a refused connection +// https://github.com/snikch/goodman/pull/23/files +func isConnectionRefusedError(err error) bool { + if noerr, ok := err.(*net.OpError); ok { + if scerr, ok := noerr.Err.(*os.SyscallError); ok { + if scerr.Err == syscall.ECONNREFUSED { + return true + } + } + } + return false +} + +func createHookRunners(hookPaths []string, errChan chan error, startingPort int) ([]goodman.Runner, error) { + runners := make([]goodman.Runner, len(hookPaths)) + // For each hook specified by the user, call it and bring up a hook runner + // to make calls to it + for i, path := range hookPaths { + // Each hook should communicate on a different port, use a "block" of ports + port := startingPort + i + cmd := exec.Command(path, fmt.Sprintf("-port=%d", port)) + // Propogate messages from the cmd into the stdout/err + // H.C: is this threadsafe? + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + // don't block on the hook being called + go func() { + // Sniffed in logging output by tests to assert it actually happened + log.Printf("Starting hooks server in go routine") + if err := cmd.Run(); err != nil { + errChan <- fmt.Errorf("hook server on port `%s`: %s", port, err.Error()) + } + }() + // The server may not immediatly return, give it a few attempts + // TODO: investigate cmd.Wait() to shortcut this + for retries := hookServerRetries; retries > 0; retries-- { + // Must sleep so go routine running hooks server has chance to startup + time.Sleep(hookServerWait) + // Bring up the runner that will call out to the hook + runner, err := goodman.NewRunner("Hooks", port, cmd) + if err != nil { + // Connection refused errors can be retried + if isConnectionRefusedError(err) { + continue + } + // Any other error cannot + return runners, fmt.Errorf("creating runner: %s", err.Error()) + } + runners[i] = runner + break + } + } + + return runners, nil +} + func main() { - cmds = make(chan *exec.Cmd, 50) + port := flag.Int("port", defaultServerPort, "The port that the dredd callback server will run on") + flag.Parse() + if port == nil { + log.Fatal("must be provided with port") + } + args := os.Args + // Arguments are the hook binaries to run hookPaths := args[1:len(args)] - c = make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - closeHooksServers() - os.Exit(0) - }() - hooksServerCount = len(args) - 1 - if len(args) < 2 { - runners = append(runners, &goodman.DummyRunner{}) - } else { - for _, path := range hookPaths { - cmd := exec.Command(path, fmt.Sprintf("-port=%d", hookServerInitalPort)) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - fmt.Println("Sending to channel\n") - cmds <- cmd - fmt.Println("Completed") - go func() { - log.Printf("Starting hooks server in go routine") - err := cmd.Run() - if err != nil { - fmt.Println("Hooks client failed with " + err.Error()) - } - }() - // Must sleep so go routine running hooks server has chance to startup - time.Sleep(100 * time.Millisecond) - runners = append(runners, goodman.NewRunner("Hooks", hookServerInitalPort)) - hookServerInitalPort++ + // Each hook can report an error, as can the server itself (+1) + errChan := make(chan error, len(hookPaths)+1) + defer close(errChan) + + // Due to legacy reasons dummy run if not provided any real hooks + runners := []goodman.Runner{&goodman.DummyRunner{}} + var err error + if len(hookPaths) > 0 { + // Begin the ports to communicate with our runners 1 after the port used to communicate + // with dredd + startingPort := *port + 1 + runners, err = createHookRunners(hookPaths, errChan, startingPort) + } + + defer func() { + if errs := closeHookRunners(runners); len(errs) != 0 { + closingErrs := make([]string, len(errs)) + for i, err := range errs { + closingErrs[i] = err.Error() + } + log.Fatalf("closing hook runners: %s", strings.Join(closingErrs, "\n")) } + }() + + if err != nil { + log.Fatalf("creating hook runners: %s", err.Error()) } - close(cmds) - server := goodman.NewServer(runners) - err := server.Run() + + // Bring up a server that will communicate with the main dredd runner + // (on the port specified by dredd) + server, err := goodman.NewServer(runners, *port) if err != nil { - log.Fatal(err.Error()) + log.Fatalf("creating server: %s", err.Error()) } - closeHooksServers() -} -func closeHooksServers() { - log.Printf("Shutting down hooks servers\n") - count := 0 - for cmd := range cmds { - cmd.Process.Kill() - count++ - if hooksServerCount == count { - return + // Server blocks so run in goroutine + go func() { + if err := server.Run(); err != nil { + errChan <- fmt.Errorf("running server: %s", err.Error()) + } + }() + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + for { + select { + // If the user bails out, attempt to shut down gracefully + case <-c: + server.Close() + closeHookRunners(runners) + os.Exit(0) + // If something broke, let the user know and bail out + case err := <-errChan: + server.Close() + closeHookRunners(runners) + log.Fatalf("encountered an unrecoverable error: %s", err.Error()) + os.Exit(0) } } } diff --git a/hooks/server.go b/hooks/server.go index a3f16fa..4c7d3fa 100644 --- a/hooks/server.go +++ b/hooks/server.go @@ -1,6 +1,9 @@ +// Include this code in your bootstrap code +// it receives messages from dredd and triggers the callbacks you registered package hooks import ( + "errors" "flag" "fmt" "log" @@ -12,30 +15,77 @@ import ( ) type Server struct { + // TODO: stop exposing this, have the user only use server.Close() + // kept around to avoid breaking changes Listener net.Listener } -func NewServer(run RunnerRPC) *Server { +func NewServerWithPortAndError(run RunnerRPC, port int) (*Server, error) { + if port == 0 { + return nil, errors.New("hook server must be provided with non-0 port") + } + serv := rpc.NewServer() + // Publishes in the server the exported methods on our `run` interface serv.Register(run) + // register these handlers in the default mux handler + // calling http.Serve later expose these serv.HandleHTTP("/", "/debug") - if *port == 0 { - panic("-port flag was not given to hook server") + l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return nil, fmt.Errorf("listen error: %s", err.Error()) } - l, e := net.Listen("tcp", fmt.Sprintf(":%d", *port)) - if e != nil { - log.Fatal("listen error:", e) + + server := Server{ + Listener: l, + } + return &server, nil +} + +var port int + +// Use the globally scoped port (which is parsed in init) +func NewServerWithError(run RunnerRPC) (*Server, error) { + server, err := NewServerWithPortAndError(run, port) + if err != nil { + return nil, fmt.Errorf("Creating hook server: %s", err.Error()) + } + return server, nil +} + +func init() { + flag.IntVar(&port, "port", 0, "The port that the hooks server will run on") + flag.Parse() +} + +// Legacy, compliant with existing usage +func NewServer(run RunnerRPC) *Server { + server, err := NewServerWithError(run) + if err != nil { + log.Fatalf(err.Error()) } - server := &Server{} - server.Listener = l return server } +// Legacy, compliant with existing usage func (s *Server) Serve() { http.Serve(s.Listener, nil) } +func (s *Server) ServeWithError() error { + // Listen on the tcp connection we made on boot + // serve handlers from the _default mux handler_ + return http.Serve(s.Listener, nil) +} + +func (s *Server) Close() error { + if err := s.Listener.Close(); err != nil { + return fmt.Errorf("Closing listener: %s", err.Error()) + } + return nil +} + type RunnerRPC interface { RunBeforeAll(args []*trans.Transaction, reply *[]*trans.Transaction) error RunBeforeEach(args trans.Transaction, reply *trans.Transaction) error @@ -46,10 +96,3 @@ type RunnerRPC interface { RunAfterEach(args trans.Transaction, reply *trans.Transaction) error RunAfterAll(args []*trans.Transaction, reply *[]*trans.Transaction) error } - -var port *int - -func init() { - port = flag.Int("port", 0, "The port that the hooks server will run on") - flag.Parse() -} diff --git a/hooks/server_test.go b/hooks/server_test.go index 8d1ea63..6ce61ec 100644 --- a/hooks/server_test.go +++ b/hooks/server_test.go @@ -19,7 +19,10 @@ func TestServerRPC(t *testing.T) { hooksServerPort := 61322 var addr = fmt.Sprintf(":%d", hooksServerPort) if os.Getenv("RUN_HOOKS") == "1" { - server := NewServer(&run) + server, err := NewServerWithPortAndError(&run, hooksServerPort) + if err != nil { + t.Errorf("creating server: %s", err.Error()) + } fmt.Println("Running the server") server.Serve() defer server.Listener.Close() diff --git a/runner.go b/runner.go index d6a3f2b..6455c95 100644 --- a/runner.go +++ b/runner.go @@ -3,23 +3,27 @@ package goodman import ( "fmt" "net/rpc" + "os/exec" "github.com/snikch/goodman/transaction" ) -func NewRunner(rpcService string, port int) *Run { +func NewRunner(rpcService string, port int, cmd *exec.Cmd) (*Run, error) { client, err := rpc.DialHTTPPath("tcp", fmt.Sprintf(":%d", port), "/") - if err != nil { - panic(err.Error()) + return nil, fmt.Errorf("dialing tcp server: %s", err.Error()) } - return &Run{ + + runner := Run{ + cmd: cmd, client: client, rpcService: rpcService, } + return &runner, nil } type Run struct { + cmd *exec.Cmd client *rpc.Client rpcService string } @@ -104,10 +108,20 @@ func (r *Run) RunAfter(t *transaction.Transaction) { *t = reply } -func (r *Run) Close() { - if err := r.client.Close(); err != nil { - panic("RPC client threw error on Close() " + err.Error()) +func (r *Run) Close() (err error) { + // Kill the underlying hook binary + if cmdErr := r.cmd.Process.Kill(); cmdErr != nil { + // What is dead may never die + // TODO: this is a pretty bad idea, we should robustly detect if the binary is still running + if cmdErr.Error() != "os: process already finished" { + err = fmt.Errorf("Killing cmd: %s", cmdErr.Error()) + } } + // terminate our connection listening to it + if clientErr := r.client.Close(); clientErr != nil { + err = fmt.Errorf("RPC client on Close() " + clientErr.Error()) + } + return err } type Runner interface { @@ -119,7 +133,7 @@ type Runner interface { RunAfterAll(t *[]*transaction.Transaction) RunAfterEach(t *transaction.Transaction) RunAfter(t *transaction.Transaction) - Close() + Close() error } type DummyRunner struct{} @@ -140,4 +154,6 @@ func (r *DummyRunner) RunAfterEach(t *transaction.Transaction) {} func (r *DummyRunner) RunAfter(t *transaction.Transaction) {} -func (r *DummyRunner) Close() {} +func (r *DummyRunner) Close() error { + return nil +} diff --git a/server.go b/server.go index bc69de0..0cdf0ff 100644 --- a/server.go +++ b/server.go @@ -5,54 +5,55 @@ import ( "encoding/json" "fmt" "io" + "log" "net" t "github.com/snikch/goodman/transaction" ) const ( - defaultPort = "61321" defaultMessageDelimiter = "\n" ) // Server is responsible for starting a server and running lifecycle callbacks. type Server struct { Runner []Runner - Port string MessageDelimeter []byte + listener net.Listener conn net.Conn } // NewServer returns a new server instance with the supplied runner. If no // runner is supplied, a new one will be created. -func NewServer(runners []Runner) *Server { - return &Server{ +func NewServer(runners []Runner, port int) (*Server, error) { + log.Printf("trying to listen on %d", port) + ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + + if err != nil { + return nil, fmt.Errorf("listening on tcp: %s", err.Error()) + } + + server := Server{ Runner: runners, - Port: defaultPort, MessageDelimeter: []byte(defaultMessageDelimiter), + listener: ln, } + return &server, nil } // Run starts the server listening for events from dredd. func (server *Server) Run() error { fmt.Println("Starting") - ln, err := net.Listen("tcp", ":"+server.Port) - if err != nil { - return err - } - fmt.Println("Accepting connection") - conn, err := ln.Accept() + conn, err := server.listener.Accept() if err != nil { - return err + return fmt.Errorf("accepting on tcp: %s", err.Error()) } - defer ln.Close() - defer conn.Close() server.conn = conn for { body, err := bufio. - NewReader(conn). + NewReader(server.conn). ReadString('\n') if err == io.EOF { return nil @@ -60,26 +61,33 @@ func (server *Server) Run() error { if err != nil { return err } - body = body[:len(body)-1] m := &message{} - err = json.Unmarshal([]byte(body), m) - if err != nil { - return err + if err := json.Unmarshal([]byte(body), m); err != nil { + return fmt.Errorf("unmarshaling body: %s", err.Error()) } - err = server.ProcessMessage(m) - if err != nil { - return err + if err := server.ProcessMessage(m); err != nil { + return fmt.Errorf("processing message: %s", err.Error()) } } } +func (s Server) Close() (err error) { + if listenerErr := s.listener.Close(); listenerErr != nil { + err = fmt.Errorf("closing listener: %s", listenerErr.Error()) + } + if s.conn != nil { + if connErr := s.conn.Close(); connErr != nil { + err = fmt.Errorf("closing server tcp connection: %s", connErr.Error()) + } + } + return err +} + // ProcessMessage handles a single event message. func (server *Server) ProcessMessage(m *message) error { switch m.Event { - case "beforeAll": - fallthrough - case "afterAll": + case "beforeAll", "afterAll": m.transactions = []*t.Transaction{} err := json.Unmarshal(m.Data, &m.transactions) if err != nil { @@ -121,9 +129,7 @@ func (server *Server) ProcessMessage(m *message) error { } switch m.Event { - case "beforeAll": - fallthrough - case "afterAll": + case "beforeAll", "afterAll": return server.sendResponse(m, m.transactions) default: return server.sendResponse(m, m.transaction) @@ -182,16 +188,20 @@ func (server *Server) RunAfterAll(trans *[]*t.Transaction) { func (server *Server) sendResponse(m *message, dataObj interface{}) error { data, err := json.Marshal(dataObj) if err != nil { - return err + return fmt.Errorf("marshaling data json: %s", err) } m.Data = json.RawMessage(data) response, err := json.Marshal(m) if err != nil { - return err + return fmt.Errorf("marshaling message json: %s", err) + } + if _, err := server.conn.Write(response); err != nil { + return fmt.Errorf("writing error response: %s", err.Error()) + } + if _, err := server.conn.Write(server.MessageDelimeter); err != nil { + return fmt.Errorf("writing message delimeter: %s", err.Error()) } - server.conn.Write(response) - server.conn.Write(server.MessageDelimeter) return nil } diff --git a/server_test.go b/server_test.go index bc3cfef..ef3eccc 100644 --- a/server_test.go +++ b/server_test.go @@ -8,7 +8,7 @@ import ( func TestSendingServerMessages(t *testing.T) { runner := DummyRunner{} - server := NewServer([]Runner{&runner}) + server, err := NewServer([]Runner{&runner}, 61321) go func() { err := server.Run() @@ -39,7 +39,6 @@ func TestSendingServerMessages(t *testing.T) { var ( conn net.Conn - err error ) for {