Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package runnable

import (
"context"
"time"
)

var DefaultComponentShutdownTimeout = 30 * time.Second

type C interface {
Add(runner Runnable)
AddService(service Runnable)
}

type component struct {
processes *group
services *group
}

// Component manages two groups of Runnable, processes and services.
// Services are stopped after processes.
// The intended purposes is to manage the simplest form of runnable dependencies.
func Component(processes ...Runnable) *component {
return &component{
processes: &group{timeout: DefaultComponentShutdownTimeout, runnables: processes},
services: &group{timeout: DefaultComponentShutdownTimeout},
}
}

// WithProcessShutdownTimeout changes the shutdown timeout of the process runnables.
// See also DefaultComponentShutdownTimeout.
func (c *component) WithProcessShutdownTimeout(timeout time.Duration) *component {
c.processes.timeout = timeout
return c
}

// WithServiceShutdownTimeout changes the shutdown timeout of the service runnables.
// See also DefaultComponentShutdownTimeout.
func (c *component) WithServiceShutdownTimeout(timeout time.Duration) *component {
c.services.timeout = timeout
return c
}

// AddProcess registers runnables as process. Processes will be shutdown before services.
func (c *component) AddProcess(runner ...Runnable) *component {
c.processes.Add(runner...)
return c
}

// Add registers runnables as services. Services will be shutdown after processes.
func (c *component) AddService(service ...Runnable) *component {
c.services.Add(service...)
return c
}

func (c *component) Run(ctx context.Context) error {
ctxNoCancel := context.WithoutCancel(ctx)

// Starting

Log(c, "starting services")
c.services.Start(ctxNoCancel)

Log(c, "starting processes")
c.processes.Start(ctxNoCancel)

// Waiting for shutdown

select {
case r := <-c.processes.StoppedRunnables():
Log(c, "process stopped: %s", findName(r))
case r := <-c.services.StoppedRunnables():
Log(c, "service stopped: %s", findName(r))
case <-ctx.Done():
Log(c, "context cancelled")
}

// shutdown

Log(c, "shutting down processes")
c.processes.Stop()

Log(c, "shutting down services")
c.services.Stop()

Log(c, "shutdown complete")
return ctx.Err()
}
48 changes: 48 additions & 0 deletions component_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package runnable

import (
"context"
stdlog "log"
"os"
)

func ExampleComponent_cancelled() {
ctx, cancel := initializeForExample()
defer cancel()

c := Component()
c.AddProcess(newDummyRunnable())

_ = c.Run(ctx)

// Output:
// component: starting services
// component: starting processes
// dummyRunnable: started
// component: context cancelled
// component: shutting down processes
// dummyRunnable: stopped
// component: shutting down services
// component: shutdown complete
}

func ExampleComponent_failing() {
ctx := context.Background()

SetLogger(stdlog.New(os.Stdout, "", 0))

c := Component()
c.AddProcess(newDyingRunnable())

_ = c.Run(ctx)

// Output:
// component: starting services
// component: starting processes
// dyingRunnable: started
// dyingRunnable: stopped with error: dying
// component: a process stopped
// component: shutting down processes
// component: shutting down services
// component: shutdown complete
}
49 changes: 49 additions & 0 deletions examples/component/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"context"
"fmt"
"net/http"
"time"

"github.com/pior/runnable"
"github.com/pior/runnable/examples/jobqueue"
)

// Run it with:
// go run ./examples/component/main.go

// Test it with:
// curl "http://localhost:8000/?count=3"

func jobMonitor(jobs *jobqueue.JobQueue) runnable.RunnableFunc {
return func(ctx context.Context) error {
fmt.Printf("Task executed: %d\t\ttasks waiting: %d\n", jobs.Executed(), jobs.Waiting())
return nil
}
}

func main() {
jobs := jobqueue.New()

server := &http.Server{
Addr: "localhost:8000",
Handler: http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
jobs.Enqueue(r.URL.Path)
fmt.Fprintln(rw, "Job enqueued!")
}),
}

c := runnable.Component().
WithProcessShutdownTimeout(5 * time.Second).
WithServiceShutdownTimeout(5 * time.Second).
AddProcess(runnable.HTTPServer(server)).
AddService(jobs).
AddProcess(
runnable.Every(runnable.Func(jobMonitor(jobs)), 2*time.Second),
)

fmt.Println("Enqueue jobs with: curl http://localhost:8000/?count=3")

runnable.Run(c)
}
5 changes: 3 additions & 2 deletions examples/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
"time"

"github.com/pior/runnable"
"github.com/pior/runnable/examples/jobqueue"
)

func main() {
jobs := NewStupidJobQueue()
jobs := jobqueue.New()

server := &http.Server{
Addr: "localhost:8000",
Handler: http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
jobs.Perform(r.URL.Path)
jobs.Enqueue(r.URL.Path)
fmt.Fprintln(rw, "Job enqueued!")
}),
}
Expand Down
70 changes: 70 additions & 0 deletions group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package runnable

import (
"context"
"time"
)

type group struct {
runnables []Runnable
timeout time.Duration

contextCancel func()
started []*StartedRunnable

publicStoppedCh chan *StartedRunnable
stoppedCh chan *StartedRunnable
}

func (g *group) Add(rs ...Runnable) {
g.runnables = append(g.runnables, rs...)
}

func (g *group) StoppedRunnables() chan *StartedRunnable {
return g.publicStoppedCh
}

func (g *group) Start(ctx context.Context) {
ctx, g.contextCancel = context.WithCancel(ctx)

g.started = make([]*StartedRunnable, 0, len(g.runnables))
g.publicStoppedCh = make(chan *StartedRunnable, len(g.runnables))
g.stoppedCh = make(chan *StartedRunnable, len(g.runnables))

for _, r := range g.runnables {
g.started = append(g.started, StartRunnable(ctx, r, g.stoppedCh))
}

}

func (g *group) Stop() (groupErrors []error) {
g.contextCancel() // stop all other runnings

stopTimeout := time.After(g.timeout)
logTicker := time.NewTicker(3 * time.Second)
defer logTicker.Stop()

running := len(g.started)
for running > 0 {
select {
case stopped := <-g.stoppedCh:
g.publicStoppedCh <- stopped
running--
if stopped.err != nil {
groupErrors = append(groupErrors, stopped.err)
}
case <-logTicker.C:
for _, sr := range g.started {
stopped, _ := sr.State()
if !stopped {
Log(g, "still running: %s", findName(sr.runnable))
}
}
case <-stopTimeout:
Log(g, "waited %s, %d still running", g.timeout, running)
return
}
}

return
}
54 changes: 54 additions & 0 deletions start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package runnable

import (
"context"
"errors"
"sync"
)

type StartedRunnable struct {
baseWrapper

runnable Runnable
stoppedChs []chan *StartedRunnable

mu sync.Mutex
stopped bool
err error
}

func StartRunnable(ctx context.Context, r Runnable, stoppedChs ...chan *StartedRunnable) *StartedRunnable {
s := &StartedRunnable{
baseWrapper: baseWrapper{"", r},
runnable: r,
stoppedChs: stoppedChs,
}

go func() {
Log(r, "started")
err := Recover(r).Run(ctx)
if err == nil || errors.Is(err, context.Canceled) {
Log(r, "stopped")
} else {
Log(r, "stopped with error: %+v", err)
}

s.mu.Lock()
defer s.mu.Unlock()

s.stopped = false
s.err = err

for _, stoppedCh := range stoppedChs {
stoppedCh <- s
}
}()

return s
}

func (s *StartedRunnable) State() (stopped bool, err error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.stopped, s.err
}
4 changes: 2 additions & 2 deletions testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func newDummyRunnable() *dummyRunnable {
type dummyRunnable struct{}

func (r *dummyRunnable) Run(ctx context.Context) error {
Log(r, "started")
// Log(r, "DEBUG:started")
<-ctx.Done()
Log(r, "stopped")
// Log(r, "DEBUG:stopped")
return ctx.Err()
}

Expand Down