Skip to content
This repository was archived by the owner on Feb 20, 2024. It is now read-only.

refactor: change (almost) everything #33

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,124 +1,154 @@
![go workers](https://raw.githubusercontent.com/catmullet/go-workers/assets/constworker_header_anim.gif)
# gorkers

<!-- [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge-flat.svg)](https://github.com/avelino/awesome-go#goroutines) -->

[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge-flat.svg)](https://github.com/avelino/awesome-go#goroutines)
[![Maintainability](https://api.codeclimate.com/v1/badges/402fee86fbd1e24defb2/maintainability)](https://codeclimate.com/github/catmullet/go-workers/maintainability)
[![GoCover](http://gocover.io/_badge/github.com/catmullet/go-workers)](http://gocover.io/github.com/catmullet/go-workers)
[![Go Reference](https://pkg.go.dev/badge/github.com/catmullet/go-workers.svg)](https://pkg.go.dev/github.com/catmullet/go-workers)

# Examples
* [Quickstart](https://github.com/catmullet/go-workers/blob/master/examples/quickstart/quickstart.go)
* [Multiple Go Workers](https://github.com/catmullet/go-workers/blob/master/examples/multiple_workers/multipleworkers.go)
* [Passing Fields](https://github.com/catmullet/go-workers/blob/master/examples/passing_fields/passingfields.go)

- [Quickstart](https://github.com/catmullet/go-workers/blob/master/examples/quickstart/quickstart.go)
- [Multiple Go Workers](https://github.com/catmullet/go-workers/blob/master/examples/multiple_workers/multipleworkers.go)
- [Passing Fields](https://github.com/catmullet/go-workers/blob/master/examples/passing_fields/passingfields.go)

# Getting Started

### Pull in the dependency
```zsh

```sh
go get github.com/catmullet/go-workers
```

### Add the import to your project
giving an alias helps since go-workers doesn't exactly follow conventions.

giving an alias helps since go-workers doesn't exactly follow conventions.
_(If you're using a JetBrains IDE it should automatically give it an alias)_

```go
import (
workers "github.com/catmullet/go-workers"
)
```

### Create a new worker <img src="https://raw.githubusercontent.com/catmullet/go-workers/assets/constworker.png" alt="worker" width="35"/>
The NewWorker factory method returns a new worker.

The NewWorker factory method returns a new worker.
_(Method chaining can be performed on this method like calling .Work() immediately after.)_

```go
type MyWorker struct {}

func NewMyWorker() Worker {
return &MyWorker{}
return &MyWorker{}
}

func (my *MyWorker) Work(in interface{}, out chan<- interface{}) error {
// work iteration here
// work iteration here
}

runner := workers.NewRunner(ctx, NewMyWorker(), numberOfWorkers)
runner := workers.NewRunner(ctx, NewMyWorker(), numberOfWorkers, numbersOfBuffers)
```

### Send work to worker
Send accepts an interface. So send it anything you want.

Send accepts an interface. So send it anything you want.

```go
runner.Send("Hello World")
```

### Wait for the worker to finish and handle errors

Any error that bubbles up from your worker functions will return here.

```go
if err := runner.Wait(); err != nil {
//Handle error
}
```

## Working With Multiple Workers
### Passing work form one worker to the next

### Passing work form one worker to the next

By using the InFrom method you can tell `workerTwo` to accept output from `workerOne`

```go
runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100).Work()
runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100).InFrom(workerOne).Work()
runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100, 100).Work()
runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100, 100).InFrom(workerOne).Work()
```

### Accepting output from multiple workers
It is possible to accept output from more than one worker but it is up to you to determine what is coming from which worker. (They will send on the same channel.)

It is possible to accept output from more than one worker but it is up to you to determine what is coming from which worker. (They will send on the same channel.)

```go
runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100).Work()
runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100).Work()
runnerThree := workers.NewRunner(ctx, NewMyWorkerThree(), 100).InFrom(workerOne, workerTwo).Work()
runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100, 100).Work()
runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100, 100).Work()
runnerThree := workers.NewRunner(ctx, NewMyWorkerThree(), 100, 100).InFrom(workerOne, workerTwo).Work()
```

## Passing Fields To Workers

### Adding Values
Fields can be passed via the workers object. Be sure as with any concurrency in Golang that your variables are concurrent safe. Most often the golang documentation will state the package or parts of it are concurrent safe. If it does not state so there is a good chance it isn't. Use the sync package to lock and unlock for writes on unsafe variables. (It is good practice NOT to defer in the work function.)

Fields can be passed via the workers object. Be sure as with any concurrency in Golang that your variables are concurrent safe. Most often the golang documentation will state the package or parts of it are concurrent safe. If it does not state so there is a good chance it isn't. Use the sync package to lock and unlock for writes on unsafe variables. (It is good practice NOT to defer in the work function.)

<img src="https://raw.githubusercontent.com/catmullet/go-workers/assets/constworker2.png" alt="worker" width="35"/> **ONLY** use the `Send()` method to get data into your worker. It is not shared memory unlike the worker objects values.

```go
type MyWorker struct {
message string
message string
}

func NewMyWorker(message string) Worker {
return &MyWorker{message}
return &MyWorker{message}
}

func (my *MyWorker) Work(in interface{}, out chan<- interface{}) error {
fmt.Println(my.message)
fmt.Println(my.message)
}

runner := workers.NewRunner(ctx, NewMyWorker(), 100).Work()
runner := workers.NewRunner(ctx, NewMyWorker(), 100, 100).Work()
```

### Setting Timeouts or Deadlines

If your workers needs to stop at a deadline or you just need to have a timeout use the SetTimeout or SetDeadline methods. (These must be in place before setting the workers off to work.)

```go
// Setting a timeout of 2 seconds
timeoutRunner.SetTimeout(2 * time.Second)
timeoutRunner.SetWorkerTimeout(2 * time.Second)

// Setting a deadline of 4 hours from now
deadlineRunner.SetDeadline(time.Now().Add(4 * time.Hour))

func workerFunction(in interface{}, out chan<- interface{} error {
fmt.Println(in)
time.Sleep(1 * time.Second)
fmt.Println(in)
time.Sleep(1 * time.Second)
}
```


## Performance Hints

### Buffered Writer
If you want to write out to a file or just stdout you can use SetWriterOut(writer io.Writer). The worker will have the following methods available

If you want to write out to a file or just stdout you can use SetWriterOut(writer io.Writer). The worker will have the following methods available

```go
runner.Println()
runner.Printf()
runner.Print()
```
The workers use a buffered writer for output and can be up to 3 times faster than the fmt package. Just be mindful it won't write out to the console as quickly as an unbuffered writer. It will sync and eventually flush everything at the end, making it ideal for writing out to a file.

The workers use a buffered writer for output and can be up to 3 times faster than the fmt package. Just be mindful it won't write out to the console as quickly as an unbuffered writer. It will sync and eventually flush everything at the end, making it ideal for writing out to a file.

### Using GOGC env variable

If your application is based solely around using workers, consider upping the percentage of when the scheduler will garbage collect. (ex. GOGC=200) 200% -> 300% is a good starting point. Make sure your machine has some good memory behind it.
By upping the percentage your application will interupt the workers less, meaning they get more work done. However, be aware of the rest of your applications needs when modifying this variable.
By upping the percentage your application will interupt the workers less, meaning they get more work done. However, be aware of the rest of your applications needs when modifying this variable.

### Using GOMAXPROCS env variable
For workers that run quick bursts of lots of simple data consider lowering the GOMAXPROCS. Be carfeful though, this can affect your entire applicaitons performance. Profile your application and benchmark it. See where your application runs best.

For workers that run quick bursts of lots of simple data consider lowering the GOMAXPROCS. Be carfeful though, this can affect your entire applicaitons performance. Profile your application and benchmark it. See where your application runs best.
21 changes: 12 additions & 9 deletions examples/deadline_worker/deadlineworker.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,42 @@
//go:build ignore
// +build ignore

package main

import (
"context"
"fmt"
"github.com/catmullet/go-workers"
"time"

"github.com/guilhem/gorkers"
)

func main() {
ctx := context.Background()
t := time.Now()

deadlineWorker := workers.NewRunner(ctx, NewDeadlineWorker(), 100).
SetDeadline(t.Add(200 * time.Millisecond)).Start()
deadlineWorker := gorkers.NewRunner(ctx, NewDeadlineWorker().Work, 100, 100).
SetDeadline(t.Add(200 * time.Millisecond))
deadlineWorker.Start()
if err := deadlineWorker.Start(); err != nil {
fmt.Println(err)
}

for i := 0; i < 1000000; i++ {
deadlineWorker.Send("hello")
}

err := deadlineWorker.Wait()
if err != nil {
fmt.Println(err)
}
deadlineWorker.Wait().Stop()
fmt.Println("finished")
}

type DeadlineWorker struct{}

func NewDeadlineWorker() workers.Worker {
func NewDeadlineWorker() *DeadlineWorker {
return &DeadlineWorker{}
}

func (dlw *DeadlineWorker) Work(in interface{}, out chan<- interface{}) error {
func (dlw *DeadlineWorker) Work(_ context.Context, in interface{}, out chan<- interface{}) error {
fmt.Println(in)
time.Sleep(1 * time.Second)
return nil
Expand Down
26 changes: 17 additions & 9 deletions examples/multiple_workers/multipleworkers.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
//go:build ignore
// +build ignore

package main

import (
"context"
"fmt"
"github.com/catmullet/go-workers"
"math/rand"
"sync"

"github.com/guilhem/gorkers"
)

var (
Expand All @@ -18,19 +20,25 @@ var (
func main() {
ctx := context.Background()

workerOne := workers.NewRunner(ctx, NewWorkerOne(), 1000).Start()
workerTwo := workers.NewRunner(ctx, NewWorkerTwo(), 1000).InFrom(workerOne).Start()
workerOne := gorkers.NewRunner(ctx, NewWorkerOne().Work, 1000, 1000)
workerTwo := gorkers.NewRunner(ctx, NewWorkerTwo().Work, 1000, 1000).InFrom(workerOne)
if err := workerOne.Start(); err != nil {
fmt.Println(err)
}
if err := workerTwo.Start(); err != nil {
fmt.Println(err)
}

go func() {
for i := 0; i < 100000; i++ {
workerOne.Send(rand.Intn(100))
}
if err := workerOne.Wait(); err != nil {
if err := workerOne.Wait().Stop(); err != nil {
fmt.Println(err)
}
}()

if err := workerTwo.Wait(); err != nil {
if err := workerTwo.Wait().Stop(); err != nil {
fmt.Println(err)
}

Expand All @@ -44,15 +52,15 @@ type WorkerOne struct {
type WorkerTwo struct {
}

func NewWorkerOne() workers.Worker {
func NewWorkerOne() *WorkerOne {
return &WorkerOne{}
}

func NewWorkerTwo() workers.Worker {
func NewWorkerTwo() *WorkerTwo {
return &WorkerTwo{}
}

func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error {
func (wo *WorkerOne) Work(_ context.Context, in interface{}, out chan<- interface{}) error {
var workerOne = "worker_one"
mut.Lock()
if val, ok := count[workerOne]; ok {
Expand All @@ -68,7 +76,7 @@ func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error {
return nil
}

func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error {
func (wt *WorkerTwo) Work(_ context.Context, in interface{}, out chan<- interface{}) error {
var workerTwo = "worker_two"
mut.Lock()
if val, ok := count[workerTwo]; ok {
Expand Down
29 changes: 16 additions & 13 deletions examples/passing_fields/passingfields.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
//go:build ignore
// +build ignore

package main

import (
"context"
"fmt"
"github.com/catmullet/go-workers"
"math/rand"

"github.com/guilhem/gorkers"
)

func main() {
ctx := context.Background()
workerOne := workers.NewRunner(ctx, NewWorkerOne(2), 100).Start()
workerTwo := workers.NewRunner(ctx, NewWorkerTwo(4), 100).InFrom(workerOne).Start()

for i := 0; i < 15; i++ {
workerOne.Send(rand.Intn(100))
workerOne := gorkers.NewRunner(ctx, NewWorkerOne(2).Work, 100, 100)
workerTwo := gorkers.NewRunner(ctx, NewWorkerTwo(4).Work, 100, 100).InFrom(workerOne)
if err := workerOne.Start(); err != nil {
fmt.Println(err)
}

if err := workerOne.Wait(); err != nil {
if err := workerTwo.Start(); err != nil {
fmt.Println(err)
}

if err := workerTwo.Wait(); err != nil {
fmt.Println(err)
for i := 0; i < 15; i++ {
workerOne.Send(rand.Intn(100))
}
workerOne.Wait().Stop()
workerTwo.Wait().Stop()
}

type WorkerOne struct {
Expand All @@ -34,26 +37,26 @@ type WorkerTwo struct {
amountToMultiply int
}

func NewWorkerOne(amountToMultiply int) workers.Worker {
func NewWorkerOne(amountToMultiply int) *WorkerOne {
return &WorkerOne{
amountToMultiply: amountToMultiply,
}
}

func NewWorkerTwo(amountToMultiply int) workers.Worker {
func NewWorkerTwo(amountToMultiply int) *WorkerTwo {
return &WorkerTwo{
amountToMultiply,
}
}

func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error {
func (wo *WorkerOne) Work(ctx context.Context, in interface{}, out chan<- interface{}) error {
total := in.(int) * wo.amountToMultiply
fmt.Println("worker1", fmt.Sprintf("%d * %d = %d", in.(int), wo.amountToMultiply, total))
out <- total
return nil
}

func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error {
func (wt *WorkerTwo) Work(ctx context.Context, in interface{}, out chan<- interface{}) error {
totalFromWorkerOne := in.(int)
fmt.Println("worker2", fmt.Sprintf("%d * %d = %d", totalFromWorkerOne, wt.amountToMultiply, totalFromWorkerOne*wt.amountToMultiply))
return nil
Expand Down
Loading