diff --git a/internal/impl/redpanda/migrator/migrator.go b/internal/impl/redpanda/migrator/migrator.go index 05042c02a5..6bfb1f281e 100644 --- a/internal/impl/redpanda/migrator/migrator.go +++ b/internal/impl/redpanda/migrator/migrator.go @@ -522,8 +522,7 @@ func (m *Migrator) onInputConnected(ctx context.Context, fr *kafka.FranzReaderOr m.src = fr.Client m.srcAdm = kadm.NewClient(fr.Client) m.srcClusterID = []byte(metadata.Cluster) - m.groups.src = fr.Client - m.groups.srcAdm = m.srcAdm + m.groups.setSource(fr.Client, m.srcAdm) m.mu.Unlock() return nil @@ -553,11 +552,9 @@ func (m *Migrator) onOutputConnected(_ context.Context, fw franzWriter) error { } m.mu.Lock() - m.groups.offsetHeader = m.offsetHeader m.dstAdm = dstAdm m.dstClusterID = []byte(metadata.Cluster) - m.groups.dst = clientInfo.Client - m.groups.dstAdm = dstAdm + m.groups.setDestination(clientInfo.Client, dstAdm, m.offsetHeader) m.mu.Unlock() // Start a periodic topic sync loop to handle empty topics that would diff --git a/internal/impl/redpanda/migrator/migrator_groups.go b/internal/impl/redpanda/migrator/migrator_groups.go index 2a2e562925..d51d4c539b 100644 --- a/internal/impl/redpanda/migrator/migrator_groups.go +++ b/internal/impl/redpanda/migrator/migrator_groups.go @@ -192,14 +192,18 @@ type GroupOffset struct { // - Runs in one-shot or continuous sync modes // - Provides metrics and caching for performance type groupsMigrator struct { - conf GroupsMigratorConfig + conf GroupsMigratorConfig + metrics *groupsMetrics + log *service.Logger + + // mu protects the fields below which are written during connection + // (onInputConnected / onOutputConnected) and read from SyncLoop. + mu sync.RWMutex offsetHeader string src *kgo.Client srcAdm *kadm.Client dst *kgo.Client dstAdm *kadm.Client - metrics *groupsMetrics - log *service.Logger topicIDs map[string]kadm.TopicID dstTopicIDs map[string]kadm.TopicID @@ -636,7 +640,24 @@ func (m *groupsMigrator) Sync(ctx context.Context, getTopics func() []TopicMappi return nil } +func (m *groupsMigrator) setSource(src *kgo.Client, srcAdm *kadm.Client) { + m.mu.Lock() + defer m.mu.Unlock() + m.src = src + m.srcAdm = srcAdm +} + +func (m *groupsMigrator) setDestination(dst *kgo.Client, dstAdm *kadm.Client, offsetHeader string) { + m.mu.Lock() + defer m.mu.Unlock() + m.dst = dst + m.dstAdm = dstAdm + m.offsetHeader = offsetHeader +} + func (m *groupsMigrator) enabled() bool { + m.mu.RLock() + defer m.mu.RUnlock() return m.conf.Enabled && (m.srcAdm != nil || m.dstAdm != nil) }