diff --git a/internal/impl/redpanda/integration_chaos_test.go b/internal/impl/redpanda/integration_chaos_test.go index db4cc9d341..e0ebcb1b09 100644 --- a/internal/impl/redpanda/integration_chaos_test.go +++ b/internal/impl/redpanda/integration_chaos_test.go @@ -19,13 +19,17 @@ import ( "errors" "flag" "fmt" + "net" "os" + "strconv" "sync" "sync/atomic" "testing" "time" dockerclient "github.com/docker/docker/client" + "github.com/moby/moby/api/types/container" + "github.com/moby/moby/api/types/network" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" @@ -58,6 +62,46 @@ func killContainer(ctx context.Context, ctr testcontainers.Container) error { return cli.ContainerKill(ctx, ctr.GetContainerID(), "SIGKILL") } +// startChaosCluster starts a single Redpanda broker with a pinned Kafka port. +// Pinning the host port ensures that after container stop/start or kill/start, +// the broker comes back on the same address so Kafka clients can reconnect. +func startChaosCluster(t *testing.T) (redpandatest.Endpoints, testcontainers.Container, error) { + t.Helper() + + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + return redpandatest.Endpoints{}, nil, fmt.Errorf("find free port: %w", err) + } + kafkaPort := l.Addr().(*net.TCPAddr).Port + l.Close() + + cfg := redpandatest.DefaultConfig + cfg.ExtraOpts = []testcontainers.ContainerCustomizer{ + testcontainers.WithHostConfigModifier(func(hc *container.HostConfig) { + if hc.PortBindings == nil { + hc.PortBindings = network.PortMap{} + } + hc.PortBindings[network.MustParsePort("9092/tcp")] = []network.PortBinding{ + {HostPort: strconv.Itoa(kafkaPort)}, + } + }), + } + return redpandatest.StartSingleBrokerWithConfig(t, cfg) +} + +// waitBrokerReady waits for the Redpanda broker to accept TCP connections after a restart. +func waitBrokerReady(t *testing.T, brokerAddr string) { + t.Helper() + require.Eventually(t, func() bool { + conn, err := net.DialTimeout("tcp", brokerAddr, time.Second) + if err != nil { + return false + } + conn.Close() + return true + }, 30*time.Second, 250*time.Millisecond, "broker did not become ready") +} + // TestIntegrationRedpandaChaosGracefulRestart tests client reconnection during // graceful broker restarts. This simulates rolling upgrades where brokers are // restarted one at a time. @@ -65,7 +109,7 @@ func TestIntegrationRedpandaChaosGracefulRestart(t *testing.T) { integration.CheckSkip(t) t.Log("Given: single broker Redpanda cluster") - endpoints, ctr, err := redpandatest.StartSingleBroker(t) + endpoints, ctr, err := startChaosCluster(t) require.NoError(t, err) topic := "reconnect-test" @@ -75,12 +119,15 @@ func TestIntegrationRedpandaChaosGracefulRestart(t *testing.T) { consumeMessagesBackground(t, endpoints, topic, "test-cg", &consumedCount) t.Log("When: broker is restarted gracefully") - time.Sleep(2 * time.Second) + require.Eventually(t, func() bool { + return producedCount.Load() > 0 && consumedCount.Load() > 0 + }, 30*time.Second, 500*time.Millisecond, "messages did not start flowing") initialProduced := producedCount.Load() initialConsumed := consumedCount.Load() t.Logf("Before restart - produced: %d, consumed: %d", initialProduced, initialConsumed) require.NoError(t, restartContainer(t.Context(), ctr, 30*time.Second)) + waitBrokerReady(t, endpoints.BrokerAddr) t.Log("Broker restarted") t.Log("Then: consumer reconnects and continues processing") @@ -107,7 +154,7 @@ func TestIntegrationRedpandaChaosAbruptFailure(t *testing.T) { integration.CheckSkip(t) t.Log("Given: single broker Redpanda cluster") - endpoints, ctr, err := redpandatest.StartSingleBroker(t) + endpoints, ctr, err := startChaosCluster(t) require.NoError(t, err) topic := "partition-test" @@ -117,7 +164,9 @@ func TestIntegrationRedpandaChaosAbruptFailure(t *testing.T) { consumeMessagesBackground(t, endpoints, topic, "partition-cg", &consumedCount) t.Log("When: broker is killed abruptly") - time.Sleep(2 * time.Second) + require.Eventually(t, func() bool { + return producedCount.Load() > 0 && consumedCount.Load() > 0 + }, 30*time.Second, 500*time.Millisecond, "messages did not start flowing") initialProduced := producedCount.Load() initialConsumed := consumedCount.Load() t.Logf("Before kill - produced: %d, consumed: %d", initialProduced, initialConsumed) @@ -127,6 +176,7 @@ func TestIntegrationRedpandaChaosAbruptFailure(t *testing.T) { t.Log("And: broker is restarted") require.NoError(t, ctr.Start(t.Context())) + waitBrokerReady(t, endpoints.BrokerAddr) t.Log("Broker started") t.Log("Then: consumer detects failure and reconnects") @@ -167,7 +217,7 @@ func TestIntegrationRedpandaChaosStability(t *testing.T) { flag.Parse() t.Logf("Given: single broker Redpanda cluster running for %v", duration) - endpoints, ctr, err := redpandatest.StartSingleBroker(t) + endpoints, ctr, err := startChaosCluster(t) require.NoError(t, err) topic := "stability-test" @@ -196,6 +246,7 @@ func TestIntegrationRedpandaChaosStability(t *testing.T) { t.Logf("Restart %d - before: produced=%d, consumed=%d", restartCount, beforeProduced, beforeConsumed) require.NoError(t, restartContainer(t.Context(), ctr, 30*time.Second)) + waitBrokerReady(t, endpoints.BrokerAddr) t.Logf("Restart %d - broker restarted", restartCount) time.Sleep(5 * time.Second) diff --git a/internal/impl/redpanda/migrator/migrator_schema_registry.go b/internal/impl/redpanda/migrator/migrator_schema_registry.go index f1d0be485f..869f835131 100644 --- a/internal/impl/redpanda/migrator/migrator_schema_registry.go +++ b/internal/impl/redpanda/migrator/migrator_schema_registry.go @@ -947,8 +947,9 @@ type importModeManager struct { *schemaRegistryMigrator active bool - mu sync.RWMutex - prevMode map[string]sr.Mode // destination subject -> previous mode (or noMode if not set) + mu sync.RWMutex + prevMode map[string]sr.Mode // destination subject -> previous mode (or noMode if not set) + callbackMu sync.Mutex // serializes TestingOnSetSubjectMode calls from concurrent goroutines } func (m *schemaRegistryMigrator) newImportModeManager(ctx context.Context) (*importModeManager, error) { @@ -1125,7 +1126,9 @@ func (c *importModeManager) setSubjectMode(ctx context.Context, subject string, } if c.conf.TestingOnSetSubjectMode != nil { + c.callbackMu.Lock() c.conf.TestingOnSetSubjectMode(subject, mode) + c.callbackMu.Unlock() } return nil diff --git a/internal/impl/redpanda/redpandatest/redpandatest.go b/internal/impl/redpanda/redpandatest/redpandatest.go index b0cbf626c2..cbe1ee03c4 100644 --- a/internal/impl/redpanda/redpandatest/redpandatest.go +++ b/internal/impl/redpanda/redpandatest/redpandatest.go @@ -33,6 +33,8 @@ type Config struct { Nightly bool // AutoCreateTopics enables automatic topic creation. AutoCreateTopics bool + // ExtraOpts are additional testcontainers options applied after the default ones. + ExtraOpts []testcontainers.ContainerCustomizer } // DefaultConfig returns the default configuration for starting a Redpanda broker. @@ -60,6 +62,7 @@ func StartSingleBrokerWithConfig(t *testing.T, cfg Config) (Endpoints, testconta if cfg.AutoCreateTopics { opts = append(opts, tcredpanda.WithAutoCreateTopics()) } + opts = append(opts, cfg.ExtraOpts...) ctr, err := tcredpanda.Run(t.Context(), img, opts...) testcontainers.CleanupContainer(t, ctr)