Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 139 additions & 50 deletions cmd/goodman/main.go
Original file line number Diff line number Diff line change
@@ -1,77 +1,166 @@
// The package that the dredd cli calls
// When you run `dredd` from the cli dredd creates a new process and runs this binary with
// a list of hooks to instrument.
// This (goodman) binary brings up an rpc server to recieve commands from dredd
// This (goodman) binary the relays this across the hooks specified on boot
package main

import (
"flag"
"fmt"
"log"
"net"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"

"github.com/snikch/goodman"
)

var (
c chan os.Signal
cmds chan *exec.Cmd
runners []goodman.Runner
hookServerInitalPort = 61322
hooksServerCount int
const (
// The default port for communicating with the dredd cli
defaultServerPort = 61321
// How long to wait for a hook binary to begin responding
hookServerWait = 100 * time.Millisecond
hookServerRetries = 5
)

func closeHookRunners(runners []goodman.Runner) []error {
errs := []error{}
for _, runner := range runners {
if err := runner.Close(); err != nil {
errs = append(errs, err)
}
}
return errs
}

// Determine if an err represents a refused connection
// https://github.com/snikch/goodman/pull/23/files
func isConnectionRefusedError(err error) bool {
if noerr, ok := err.(*net.OpError); ok {
if scerr, ok := noerr.Err.(*os.SyscallError); ok {
if scerr.Err == syscall.ECONNREFUSED {
return true
}
}
}
return false
}

func createHookRunners(hookPaths []string, errChan chan error, startingPort int) ([]goodman.Runner, error) {
runners := make([]goodman.Runner, len(hookPaths))
// For each hook specified by the user, call it and bring up a hook runner
// to make calls to it
for i, path := range hookPaths {
// Each hook should communicate on a different port, use a "block" of ports
port := startingPort + i
cmd := exec.Command(path, fmt.Sprintf("-port=%d", port))
// Propogate messages from the cmd into the stdout/err
// H.C: is this threadsafe?
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// don't block on the hook being called
go func() {
// Sniffed in logging output by tests to assert it actually happened
log.Printf("Starting hooks server in go routine")
if err := cmd.Run(); err != nil {
errChan <- fmt.Errorf("hook server on port `%s`: %s", port, err.Error())
}
}()
// The server may not immediatly return, give it a few attempts
// TODO: investigate cmd.Wait() to shortcut this
for retries := hookServerRetries; retries > 0; retries-- {
// Must sleep so go routine running hooks server has chance to startup
time.Sleep(hookServerWait)
// Bring up the runner that will call out to the hook
runner, err := goodman.NewRunner("Hooks", port, cmd)
if err != nil {
// Connection refused errors can be retried
if isConnectionRefusedError(err) {
continue
}
// Any other error cannot
return runners, fmt.Errorf("creating runner: %s", err.Error())
}
runners[i] = runner
break
}
}

return runners, nil
}

func main() {
cmds = make(chan *exec.Cmd, 50)
port := flag.Int("port", defaultServerPort, "The port that the dredd callback server will run on")
flag.Parse()
if port == nil {
log.Fatal("must be provided with port")
}

args := os.Args
// Arguments are the hook binaries to run
hookPaths := args[1:len(args)]
c = make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
closeHooksServers()
os.Exit(0)
}()
hooksServerCount = len(args) - 1
if len(args) < 2 {
runners = append(runners, &goodman.DummyRunner{})
} else {
for _, path := range hookPaths {
cmd := exec.Command(path, fmt.Sprintf("-port=%d", hookServerInitalPort))
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
fmt.Println("Sending to channel\n")
cmds <- cmd
fmt.Println("Completed")
go func() {
log.Printf("Starting hooks server in go routine")
err := cmd.Run()
if err != nil {
fmt.Println("Hooks client failed with " + err.Error())
}
}()
// Must sleep so go routine running hooks server has chance to startup
time.Sleep(100 * time.Millisecond)
runners = append(runners, goodman.NewRunner("Hooks", hookServerInitalPort))
hookServerInitalPort++
// Each hook can report an error, as can the server itself (+1)
errChan := make(chan error, len(hookPaths)+1)
defer close(errChan)

// Due to legacy reasons dummy run if not provided any real hooks
runners := []goodman.Runner{&goodman.DummyRunner{}}
var err error
if len(hookPaths) > 0 {
// Begin the ports to communicate with our runners 1 after the port used to communicate
// with dredd
startingPort := *port + 1
runners, err = createHookRunners(hookPaths, errChan, startingPort)
}

defer func() {
if errs := closeHookRunners(runners); len(errs) != 0 {
closingErrs := make([]string, len(errs))
for i, err := range errs {
closingErrs[i] = err.Error()
}
log.Fatalf("closing hook runners: %s", strings.Join(closingErrs, "\n"))
}
}()

if err != nil {
log.Fatalf("creating hook runners: %s", err.Error())
}
close(cmds)
server := goodman.NewServer(runners)
err := server.Run()

// Bring up a server that will communicate with the main dredd runner
// (on the port specified by dredd)
server, err := goodman.NewServer(runners, *port)
if err != nil {
log.Fatal(err.Error())
log.Fatalf("creating server: %s", err.Error())
}
closeHooksServers()
}

func closeHooksServers() {
log.Printf("Shutting down hooks servers\n")
count := 0
for cmd := range cmds {
cmd.Process.Kill()
count++
if hooksServerCount == count {
return
// Server blocks so run in goroutine
go func() {
if err := server.Run(); err != nil {
errChan <- fmt.Errorf("running server: %s", err.Error())
}
}()

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
for {
select {
// If the user bails out, attempt to shut down gracefully
case <-c:
server.Close()
closeHookRunners(runners)
os.Exit(0)
// If something broke, let the user know and bail out
case err := <-errChan:
server.Close()
closeHookRunners(runners)
log.Fatalf("encountered an unrecoverable error: %s", err.Error())
os.Exit(0)
}
}
}
73 changes: 58 additions & 15 deletions hooks/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Include this code in your bootstrap code
// it receives messages from dredd and triggers the callbacks you registered
package hooks

import (
"errors"
"flag"
"fmt"
"log"
Expand All @@ -12,30 +15,77 @@ import (
)

type Server struct {
// TODO: stop exposing this, have the user only use server.Close()
// kept around to avoid breaking changes
Listener net.Listener
}

func NewServer(run RunnerRPC) *Server {
func NewServerWithPortAndError(run RunnerRPC, port int) (*Server, error) {
if port == 0 {
return nil, errors.New("hook server must be provided with non-0 port")
}

serv := rpc.NewServer()
// Publishes in the server the exported methods on our `run` interface
serv.Register(run)
// register these handlers in the default mux handler
// calling http.Serve later expose these
serv.HandleHTTP("/", "/debug")

if *port == 0 {
panic("-port flag was not given to hook server")
l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return nil, fmt.Errorf("listen error: %s", err.Error())
}
l, e := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if e != nil {
log.Fatal("listen error:", e)

server := Server{
Listener: l,
}
return &server, nil
}

var port int

// Use the globally scoped port (which is parsed in init)
func NewServerWithError(run RunnerRPC) (*Server, error) {
server, err := NewServerWithPortAndError(run, port)
if err != nil {
return nil, fmt.Errorf("Creating hook server: %s", err.Error())
}
return server, nil
}

func init() {
flag.IntVar(&port, "port", 0, "The port that the hooks server will run on")
flag.Parse()
}

// Legacy, compliant with existing usage
func NewServer(run RunnerRPC) *Server {
server, err := NewServerWithError(run)
if err != nil {
log.Fatalf(err.Error())
}
server := &Server{}
server.Listener = l
return server
}

// Legacy, compliant with existing usage
func (s *Server) Serve() {
http.Serve(s.Listener, nil)
}

func (s *Server) ServeWithError() error {
// Listen on the tcp connection we made on boot
// serve handlers from the _default mux handler_
return http.Serve(s.Listener, nil)
}

func (s *Server) Close() error {
if err := s.Listener.Close(); err != nil {
return fmt.Errorf("Closing listener: %s", err.Error())
}
return nil
}

type RunnerRPC interface {
RunBeforeAll(args []*trans.Transaction, reply *[]*trans.Transaction) error
RunBeforeEach(args trans.Transaction, reply *trans.Transaction) error
Expand All @@ -46,10 +96,3 @@ type RunnerRPC interface {
RunAfterEach(args trans.Transaction, reply *trans.Transaction) error
RunAfterAll(args []*trans.Transaction, reply *[]*trans.Transaction) error
}

var port *int

func init() {
port = flag.Int("port", 0, "The port that the hooks server will run on")
flag.Parse()
}
5 changes: 4 additions & 1 deletion hooks/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ func TestServerRPC(t *testing.T) {
hooksServerPort := 61322
var addr = fmt.Sprintf(":%d", hooksServerPort)
if os.Getenv("RUN_HOOKS") == "1" {
server := NewServer(&run)
server, err := NewServerWithPortAndError(&run, hooksServerPort)
if err != nil {
t.Errorf("creating server: %s", err.Error())
}
fmt.Println("Running the server")
server.Serve()
defer server.Listener.Close()
Expand Down
Loading