Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add static analysis #35

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ on:
pull_request:

jobs:
lint:
runs-on: ubuntu-24.04
timeout-minutes: 5

steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: "oldstable"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps use an explicit version here?

cache: false
- name: Create go.mod
run: |
ln -s vendor.mod go.mod
ln -s vendor.sum go.sum
- uses: golangci/golangci-lint-action@v6
with:
version: v1.60.1
skip-cache: true

test:
runs-on: ubuntu-24.04
timeout-minutes: 5
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Docker Events Package

[![GoDoc](https://godoc.org/github.com/docker/go-events?status.svg)](https://godoc.org/github.com/docker/go-events)
[![ci](https://github.com/docker/go-events/actions/workflows/ci.yml/badge.svg)](https://github.com/docker/go-events/actions/workflows/ci.yml)
[![Go Report Card](https://goreportcard.com/badge/github.com/docker/go-events)](https://goreportcard.com/report/github.com/docker/go-events)

The Docker `events` package implements a composable event distribution package
for Go.
Expand Down
41 changes: 33 additions & 8 deletions broadcast_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"fmt"
"sync"
"testing"
)
Expand All @@ -11,25 +12,46 @@ func TestBroadcaster(t *testing.T) {
b := NewBroadcaster()
for i := 0; i < 10; i++ {
sinks = append(sinks, newTestSink(t, nEvents))
b.Add(sinks[i])
b.Add(sinks[i]) // noop
err := b.Add(sinks[i])
if err != nil {
t.Errorf("expected nil error, got %v", err)
}

err = b.Add(sinks[i]) // noop
if err != nil {
t.Errorf("expected nil error, got %v", err)
}
}

var wg sync.WaitGroup
var (
wg sync.WaitGroup
asyncErr error
once sync.Once
)
for i := 1; i <= nEvents; i++ {
wg.Add(1)
go func(event Event) {
defer wg.Done()

if err := b.Write(event); err != nil {
t.Fatalf("error writing event %v: %v", event, err)
once.Do(func() {
asyncErr = fmt.Errorf("error writing event(%v): %v", event, err)
})
}
wg.Done()
}("event")
}(fmt.Sprintf("event-%d", i))
}

wg.Wait() // Wait until writes complete

if asyncErr != nil {
t.Fatalf("expected nil error, got %v", asyncErr)
}

for i := range sinks {
b.Remove(sinks[i])
err := b.Remove(sinks[i])
if err != nil {
t.Errorf("expected nil error, got %v", err)
}
}

// sending one more should trigger test failure if they weren't removed.
Expand All @@ -39,7 +61,10 @@ func TestBroadcaster(t *testing.T) {

// add them back to test closing.
for i := range sinks {
b.Add(sinks[i])
err := b.Add(sinks[i])
if err != nil {
t.Errorf("expected nil error, got %v", err)
}
}

checkClose(t, b)
Expand Down
45 changes: 38 additions & 7 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,55 @@ import (
func TestChannel(t *testing.T) {
const nevents = 100

errCh := make(chan error)
sink := NewChannel(0)

go func() {
var wg sync.WaitGroup
var (
wg sync.WaitGroup
asyncErr error
once sync.Once
)
for i := 1; i <= nevents; i++ {
event := "event-" + fmt.Sprint(i)
wg.Add(1)
go func(event Event) {
defer wg.Done()

if err := sink.Write(event); err != nil {
t.Fatalf("error writing event: %v", err)
once.Do(func() {
asyncErr = fmt.Errorf("error writing event(%v): %v", event, err)
})
}
}(event)
}(fmt.Sprintf("event-%d", i))
}

wg.Wait()

if asyncErr != nil {
errCh <- asyncErr
return
}

sink.Close()

// now send another bunch of events and ensure we stay closed
for i := 1; i <= nevents; i++ {
if err := sink.Write(i); err != ErrSinkClosed {
t.Fatalf("unexpected error: %v != %v", err, ErrSinkClosed)
}
wg.Add(1)
go func(event Event) {
defer wg.Done()

if err := sink.Write(event); err != ErrSinkClosed {
once.Do(func() {
asyncErr = fmt.Errorf("expected %v, got %v", ErrSinkClosed, err)
})
}
}(fmt.Sprintf("event-%d", i))
}

wg.Wait()

if asyncErr != nil {
errCh <- asyncErr
}
}()

Expand All @@ -40,11 +67,15 @@ loop:
select {
case <-sink.C:
received++
case err := <-errCh:
t.Fatal(err)
case <-sink.Done():
break loop
}
}

close(errCh)

sink.Close()
_, ok := <-sink.Done() // test will timeout if this hangs
if ok {
Expand Down
2 changes: 1 addition & 1 deletion common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,6 @@ func benchmarkSink(b *testing.B, sink Sink) {
defer sink.Close()
var event = "myevent"
for i := 0; i < b.N; i++ {
sink.Write(event)
_ = sink.Write(event)
}
}
20 changes: 16 additions & 4 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,30 @@ func TestQueue(t *testing.T) {
})
time.Sleep(10 * time.Millisecond) // let's queue settle to wait conidition.

var wg sync.WaitGroup
var (
wg sync.WaitGroup
asyncErr error
once sync.Once
)
for i := 1; i <= nevents; i++ {
wg.Add(1)
go func(event Event) {
defer wg.Done()

if err := eq.Write(event); err != nil {
t.Fatalf("error writing event: %v", err)
once.Do(func() {
asyncErr = fmt.Errorf("error writing event(%v): %v", event, err)
})
}
wg.Done()
}("event-" + fmt.Sprint(i))
}(fmt.Sprintf("event-%d", i))
}

wg.Wait()

if asyncErr != nil {
t.Fatalf("expected nil error, got %v", asyncErr)
}

checkClose(t, eq)

ts.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (b *Breaker) Proceed(event Event) time.Duration {
return 0
}

return b.last.Add(b.backoff).Sub(time.Now())
return time.Until(b.last.Add(b.backoff))
}

// Success resets the breaker.
Expand Down
21 changes: 16 additions & 5 deletions retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ func testRetryingSink(t *testing.T, strategy RetryStrategy) {

s := NewRetryingSink(flaky, strategy)

var wg sync.WaitGroup
var (
wg sync.WaitGroup
asyncErr error
once sync.Once
)
for i := 1; i <= nevents; i++ {
event := "myevent-" + fmt.Sprint(i)
wg.Add(1)

// Above 50, set the failure rate lower
if i > 50 {
Expand All @@ -43,16 +47,23 @@ func testRetryingSink(t *testing.T, strategy RetryStrategy) {
flaky.mu.Unlock()
}

wg.Add(1)
go func(event Event) {
defer wg.Done()

if err := s.Write(event); err != nil {
t.Fatalf("error writing event: %v", err)
once.Do(func() {
asyncErr = fmt.Errorf("error writing event(%v): %v", event, err)
})
}
}(event)
}(fmt.Sprintf("event-%d", i))
}

wg.Wait()

if asyncErr != nil {
t.Fatalf("expected nil error, got %v", asyncErr)
}

checkClose(t, s)

ts.mu.Lock()
Expand Down