Skip to content

Data race in tester package #481

@ditrytus

Description

@ditrytus

Descriptions

When running the following test with -race, multiple times I get an error

Test:

package main

import (
	"context"
	"testing"
	"time"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
	"github.com/lovoo/goka/tester"
)

func TestGokaRace(t *testing.T) {
	gokaTester := tester.New(t)

	ctx, cancel := context.WithCancel(context.Background())

	// Create a processor with slow processing
	proc, _ := goka.NewProcessor(
		nil,
		goka.DefineGroup(
			goka.Group("test-group"),
			goka.Input(goka.Stream("test-stream"), new(codec.String),
				func(ctx goka.Context, msg interface{}) {
					// Simulate slow processing
					time.Sleep(50 * time.Millisecond)
				}),
		),
		goka.WithTester(gokaTester),
	)

	// Start processor
	go proc.Run(ctx)
	proc.WaitForReadyContext(ctx)

	// Send message
	go gokaTester.Consume("test-stream", "key", "message")

	// Cancel context while processing
	// This triggers the race as the processor shuts down
	// while the tester is still active
	time.Sleep(10 * time.Millisecond)
	cancel()

	// Cleanup time
	time.Sleep(100 * time.Millisecond)
}

Run:

go test -count=10 -race .

Output:

2025/08/04 12:47:15 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
==================
WARNING: DATA RACE
Write at 0x00c000122010 by goroutine 14:
  runtime.recvDirect()
      /opt/homebrew/Cellar/go/1.24.2/libexec/src/runtime/chan.go:405 +0x7c
  github.com/lovoo/goka/tester.(*cgClaim).close()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/tester/consumergroup.go:380 +0xdc
  github.com/lovoo/goka/tester.(*consumerGroup).Consume.func2()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/tester/consumergroup.go:85 +0xcc
  github.com/lovoo/goka/tester.(*consumerGroup).Consume.(*ErrGroup).Go.func4()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/multierr/errgroup.go:48 +0x40
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /Users/jakub.gruszecki/go/pkg/mod/golang.org/x/sync@v0.11.0/errgroup/errgroup.go:78 +0x7c

Previous read at 0x00c000122010 by goroutine 16:
  runtime.chansend1()
      /opt/homebrew/Cellar/go/1.24.2/libexec/src/runtime/chan.go:162 +0x2c
  github.com/lovoo/goka/tester.(*cgSession).pushMessageToClaim()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/tester/consumergroup.go:325 +0x49c
  github.com/lovoo/goka/tester.(*cgSession).catchupAndWait()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/tester/consumergroup.go:286 +0x394
  github.com/lovoo/goka/tester.(*consumerGroup).catchupAndWait()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/tester/consumergroup.go:53 +0xa8
  github.com/lovoo/goka/tester.(*client).catchup()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/tester/client.go:24 +0x5c
  github.com/lovoo/goka/tester.(*Tester).waitForClients()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/tester/tester.go:369 +0x174
  github.com/lovoo/goka/tester.(*Tester).Consume()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/tester/tester.go:419 +0x1ec
  github.com/ditrytus/goka-tester-race.TestGokaRace.gowrap2()
      /Users/jakub.gruszecki/Documents/goka-tester-race/goka_tester_race_test.go:44 +0x70

Goroutine 14 (running) created at:
  golang.org/x/sync/errgroup.(*Group).Go()
      /Users/jakub.gruszecki/go/pkg/mod/golang.org/x/sync@v0.11.0/errgroup/errgroup.go:75 +0x10c
  github.com/lovoo/goka/multierr.(*ErrGroup).Go()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/multierr/errgroup.go:47 +0x6d0
  github.com/lovoo/goka/tester.(*consumerGroup).Consume()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/tester/consumergroup.go:80 +0x548
  github.com/lovoo/goka.(*Processor).rebalanceLoop()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/processor.go:382 +0x7d8
  github.com/lovoo/goka.(*Processor).Run.func6()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/processor.go:333 +0x54
  github.com/lovoo/goka.(*Processor).Run.(*ErrGroup).Go.func8()
      /Users/jakub.gruszecki/go/pkg/mod/github.com/lovoo/goka@v1.1.14/multierr/errgroup.go:48 +0x40
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /Users/jakub.gruszecki/go/pkg/mod/golang.org/x/sync@v0.11.0/errgroup/errgroup.go:78 +0x7c

Goroutine 16 (finished) created at:
  github.com/ditrytus/goka-tester-race.TestGokaRace()
      /Users/jakub.gruszecki/Documents/goka-tester-race/goka_tester_race_test.go:44 +0x6a4
  testing.tRunner()
      /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1792 +0x180
  testing.(*T).Run.gowrap1()
      /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1851 +0x40
==================
2025/08/04 12:47:15 [Processor test-group] Consumer group cancelled. Stopping
--- FAIL: TestGokaRace (0.11s)
    testing.go:1490: race detected during execution of test
2025/08/04 12:47:15 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
2025/08/04 12:47:15 [Processor test-group] Consumer group cancelled. Stopping
2025/08/04 12:47:15 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
2025/08/04 12:47:15 [Processor test-group] Consumer group cancelled. Stopping
2025/08/04 12:47:15 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
2025/08/04 12:47:15 [Processor test-group] Consumer group cancelled. Stopping
2025/08/04 12:47:15 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
2025/08/04 12:47:15 [Processor test-group] Consumer group cancelled. Stopping
2025/08/04 12:47:15 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
2025/08/04 12:47:15 [Processor test-group] Consumer group cancelled. Stopping
2025/08/04 12:47:15 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
2025/08/04 12:47:15 [Processor test-group] Consumer group cancelled. Stopping
2025/08/04 12:47:15 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
2025/08/04 12:47:15 [Processor test-group] Consumer group cancelled. Stopping
2025/08/04 12:47:16 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
2025/08/04 12:47:16 [Processor test-group] Consumer group cancelled. Stopping
2025/08/04 12:47:16 [Processor test-group] setup generation 0, claims=map[string][]int32{"test-stream":[]int32{0}}
2025/08/04 12:47:16 [Processor test-group] Consumer group cancelled. Stopping
FAIL
FAIL	github.com/ditrytus/goka-tester-race	1.416s
FAIL

Judging by the stack traces returned by go test -race I believe this issue is a different one than the one reported in #202.

Full working example

https://github.com/ditrytus/goka-tester-race

Background

I am often running multiple processors within a single process. When one processor closes due to panic or ctx.Fail, I am cancelling the outer context to gracefully shut down other processors and http servers hosted in the same process. An important path in that graceful shutdown code is cancelling a processor's context while it is in the middle of its callback function. This scenario appears to work correctly in production code, but can't be unit-tested due to the race condition in tester.Tester. I always run my tests with -race flag to detect data races in my code, and I don't want to make an exception here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions