Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions internal/impl/redpanda/integration_chaos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ import (
"errors"
"flag"
"fmt"
"net"
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

dockerclient "github.com/docker/docker/client"
"github.com/moby/moby/api/types/container"
"github.com/moby/moby/api/types/network"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
Expand Down Expand Up @@ -58,14 +62,54 @@ func killContainer(ctx context.Context, ctr testcontainers.Container) error {
return cli.ContainerKill(ctx, ctr.GetContainerID(), "SIGKILL")
}

// startChaosCluster starts a single Redpanda broker with a pinned Kafka port.
// Pinning the host port ensures that after container stop/start or kill/start,
// the broker comes back on the same address so Kafka clients can reconnect.
func startChaosCluster(t *testing.T) (redpandatest.Endpoints, testcontainers.Container, error) {
t.Helper()

l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return redpandatest.Endpoints{}, nil, fmt.Errorf("find free port: %w", err)
}
kafkaPort := l.Addr().(*net.TCPAddr).Port
l.Close()

cfg := redpandatest.DefaultConfig
cfg.ExtraOpts = []testcontainers.ContainerCustomizer{
testcontainers.WithHostConfigModifier(func(hc *container.HostConfig) {
if hc.PortBindings == nil {
hc.PortBindings = network.PortMap{}
}
hc.PortBindings[network.MustParsePort("9092/tcp")] = []network.PortBinding{
{HostPort: strconv.Itoa(kafkaPort)},
}
}),
}
return redpandatest.StartSingleBrokerWithConfig(t, cfg)
}

// waitBrokerReady waits for the Redpanda broker to accept TCP connections after a restart.
func waitBrokerReady(t *testing.T, brokerAddr string) {
t.Helper()
require.Eventually(t, func() bool {
conn, err := net.DialTimeout("tcp", brokerAddr, time.Second)
if err != nil {
return false
}
conn.Close()
return true
}, 30*time.Second, 250*time.Millisecond, "broker did not become ready")
}

// TestIntegrationRedpandaChaosGracefulRestart tests client reconnection during
// graceful broker restarts. This simulates rolling upgrades where brokers are
// restarted one at a time.
func TestIntegrationRedpandaChaosGracefulRestart(t *testing.T) {
integration.CheckSkip(t)

t.Log("Given: single broker Redpanda cluster")
endpoints, ctr, err := redpandatest.StartSingleBroker(t)
endpoints, ctr, err := startChaosCluster(t)
require.NoError(t, err)
topic := "reconnect-test"

Expand All @@ -75,12 +119,15 @@ func TestIntegrationRedpandaChaosGracefulRestart(t *testing.T) {
consumeMessagesBackground(t, endpoints, topic, "test-cg", &consumedCount)

t.Log("When: broker is restarted gracefully")
time.Sleep(2 * time.Second)
require.Eventually(t, func() bool {
return producedCount.Load() > 0 && consumedCount.Load() > 0
}, 30*time.Second, 500*time.Millisecond, "messages did not start flowing")
initialProduced := producedCount.Load()
initialConsumed := consumedCount.Load()
t.Logf("Before restart - produced: %d, consumed: %d", initialProduced, initialConsumed)

require.NoError(t, restartContainer(t.Context(), ctr, 30*time.Second))
waitBrokerReady(t, endpoints.BrokerAddr)
t.Log("Broker restarted")

t.Log("Then: consumer reconnects and continues processing")
Expand All @@ -107,7 +154,7 @@ func TestIntegrationRedpandaChaosAbruptFailure(t *testing.T) {
integration.CheckSkip(t)

t.Log("Given: single broker Redpanda cluster")
endpoints, ctr, err := redpandatest.StartSingleBroker(t)
endpoints, ctr, err := startChaosCluster(t)
require.NoError(t, err)
topic := "partition-test"

Expand All @@ -117,7 +164,9 @@ func TestIntegrationRedpandaChaosAbruptFailure(t *testing.T) {
consumeMessagesBackground(t, endpoints, topic, "partition-cg", &consumedCount)

t.Log("When: broker is killed abruptly")
time.Sleep(2 * time.Second)
require.Eventually(t, func() bool {
return producedCount.Load() > 0 && consumedCount.Load() > 0
}, 30*time.Second, 500*time.Millisecond, "messages did not start flowing")
initialProduced := producedCount.Load()
initialConsumed := consumedCount.Load()
t.Logf("Before kill - produced: %d, consumed: %d", initialProduced, initialConsumed)
Expand All @@ -127,6 +176,7 @@ func TestIntegrationRedpandaChaosAbruptFailure(t *testing.T) {

t.Log("And: broker is restarted")
require.NoError(t, ctr.Start(t.Context()))
waitBrokerReady(t, endpoints.BrokerAddr)
t.Log("Broker started")

t.Log("Then: consumer detects failure and reconnects")
Expand Down Expand Up @@ -167,7 +217,7 @@ func TestIntegrationRedpandaChaosStability(t *testing.T) {
flag.Parse()

t.Logf("Given: single broker Redpanda cluster running for %v", duration)
endpoints, ctr, err := redpandatest.StartSingleBroker(t)
endpoints, ctr, err := startChaosCluster(t)
require.NoError(t, err)
topic := "stability-test"

Expand Down Expand Up @@ -196,6 +246,7 @@ func TestIntegrationRedpandaChaosStability(t *testing.T) {
t.Logf("Restart %d - before: produced=%d, consumed=%d", restartCount, beforeProduced, beforeConsumed)

require.NoError(t, restartContainer(t.Context(), ctr, 30*time.Second))
waitBrokerReady(t, endpoints.BrokerAddr)
t.Logf("Restart %d - broker restarted", restartCount)

time.Sleep(5 * time.Second)
Expand Down
7 changes: 5 additions & 2 deletions internal/impl/redpanda/migrator/migrator_schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,8 +947,9 @@ type importModeManager struct {
*schemaRegistryMigrator
active bool

mu sync.RWMutex
prevMode map[string]sr.Mode // destination subject -> previous mode (or noMode if not set)
mu sync.RWMutex
prevMode map[string]sr.Mode // destination subject -> previous mode (or noMode if not set)
callbackMu sync.Mutex // serializes TestingOnSetSubjectMode calls from concurrent goroutines
}

func (m *schemaRegistryMigrator) newImportModeManager(ctx context.Context) (*importModeManager, error) {
Expand Down Expand Up @@ -1125,7 +1126,9 @@ func (c *importModeManager) setSubjectMode(ctx context.Context, subject string,
}

if c.conf.TestingOnSetSubjectMode != nil {
c.callbackMu.Lock()
c.conf.TestingOnSetSubjectMode(subject, mode)
c.callbackMu.Unlock()
}

return nil
Expand Down
3 changes: 3 additions & 0 deletions internal/impl/redpanda/redpandatest/redpandatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Config struct {
Nightly bool
// AutoCreateTopics enables automatic topic creation.
AutoCreateTopics bool
// ExtraOpts are additional testcontainers options applied after the default ones.
ExtraOpts []testcontainers.ContainerCustomizer
}

// DefaultConfig returns the default configuration for starting a Redpanda broker.
Expand Down Expand Up @@ -60,6 +62,7 @@ func StartSingleBrokerWithConfig(t *testing.T, cfg Config) (Endpoints, testconta
if cfg.AutoCreateTopics {
opts = append(opts, tcredpanda.WithAutoCreateTopics())
}
opts = append(opts, cfg.ExtraOpts...)

ctr, err := tcredpanda.Run(t.Context(), img, opts...)
testcontainers.CleanupContainer(t, ctr)
Expand Down
Loading