conpats contains several common concurrency patterns for convenient use.
go get github.com/kiriyms/conpatsconpatsprovides Worker Pool, Pipeline and Tee.
- Use
pool.Poolwhen you need to run jobs concurrently with a goroutine limit. - Use
pool.ErrorPoolwhen you need to run jobs that return errors concurrently with a giroutine limit. - Use
pool.ContextPoolwhen you need to run jobs that return errors and receive acontext.Contextargument concurrently with a giroutine limit.
Every Pool must be created using pool.New(...). To convert it use:
.New(...).WithErrors()to get apool.ErrorPool..New(...).WithErrors().WithContext(ctx)to get apool.ContextPool, where thectxparamater specifies your parent context that needs to be passed to all your jobs.
- Use
pipe.PipeFromChan(...)when you need to run all input values from a given channel through a function concurrently. - Use
pipe.PipeFromSlice(...)when you need to run all values of a given slice through a function concurrently.
Both Pipe functions return channels, making it easy to chain several pipes together or using the output channel in other ways, for example:
- Use
pipe.Collect(chan)when you want to block and collect results from a channel into a slice until it is closed.
The Pipeline implementation uses the pool.Pool by default, but can be modified:
- Use
pipe.WithPool(pool)option parameter to specify the Worker Pool implementation that the Pipe will use.
- Use
tee.NewTee(chan)to create several channels (buffered or unbuffered) that each receive a copy of a value from a providedchanchannel.
Main goals of this package are:
- Make concurrency easier and reduce boilerplate
- Provide a variety of common concurrency patterns in one place
- Avoid any third-party dependencies
This section provides simple usage examples of Worker Pool, Pipeline and Tee usage compared to manual implementation. More examples can be found in these patterns' respective READMEs: Pool, Pipe, Tee.
| Manual | Using pool.Pool |
|---|---|
func main() {
wg := sync.WaitGroup{}
jobs := make(chan func())
for i := 0; i < 10; i++ {
wg.Go(func() {
for job := range jobs {
job()
}
})
}
for i := 0; i < 100; i++ {
jobs <- doWork
}
close(jobs)
wg.Wait()
} |
func main() {
p := pool.New(10)
for i := 0; i < 100; i++ {
p.Go(doWork)
}
p.Wait()
} |
| Manual | Using pipe.PipeFromChan() |
|---|---|
func main() {
nums := []int{1, 2, 3, 4, 5}
in := make(chan int)
go func() {
defer close(in)
for _, n := range nums {
in <- n
}
}()
sqrtChan := make(chan float64)
wgSqrt := sync.WaitGroup{}
go func() {
defer close(sqrtChan)
defer wgSqrt.Wait()
for i := 0; i < 5; i++ {
wgSqrt.Add(1)
go func() {
defer wgSqrt.Done()
for n := range in {
sqrtChan <- float64(math.Sqrt(float64(n)))
}
}()
}
}()
logChan := make(chan string)
wgLog := sync.WaitGroup{}
go func() {
defer close(logChan)
defer wgLog.Wait()
for i := 0; i < 3; i++ {
wgLog.Add(1)
go func() {
defer wgLog.Done()
for sq := range sqrtChan {
logChan <- fmt.Sprintf("Sqrt: %.2f", sq)
}
}()
}
}()
results := make([]string, 0)
for log := range logChan {
results = append(results, log)
}
} |
func main() {
nums := []int{1, 2, 3, 4, 5}
sqrtChan := pipe.PipeFromSlice(func(n int) float64 {
return math.Sqrt(float64(n))
}, nums, 5)
logChan := pipe.PipeFromChan(func(n float64) string {
return fmt.Sprintf("Sqrt: %.2f", n)
}, sqrtChan, 2)
results := pipe.Collect(logChan)
} |
| Manual | Using tee.NewTee() |
|---|---|
func main() {
in := make(chan int)
outs := make([]chan int, 3)
for i := range 3 {
outs[i] = make(chan int)
}
go func() {
defer func() {
for _, out := range outs {
close(out)
}
}()
for item := range in {
for _, out := range outs {
out <- item
}
}
}()
} |
func main() {
in := make(chan int)
outs := tee.NewTee(in, 3, 0)
} |
Note: if one of the output channels is blocked and waiting to be read from, it will cause all other output channels to block too.
The concurrency pattern abstractions in conpats can be easily combined with each other.
To see usage examples that are more complex and closer to real-world problems, check out the Cookbook.
Making a small Go package has been an enlightening and interesting experience. As a result of this endeavor, I've jotted down some final thoughts.
v1 (core API settled).
Common concurrency patterns are implemented. Possible future improvements:
- Add more patters & utility functions (like Fan-in/Fan-out, Pub-Sub, etc.)
- Add more cookbook examples