Skip to content

Fix producers reconnection deadlock #394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

yurahaid
Copy link
Contributor

A potential deadlock issue was identified in the function when handling changes in stream metadata. This issue occurs when the stream members are updated (e.g., disabling or modifying replicas). The deadlock caused execution to hang indefinitely, leaving publisher disconnected

How the Issue Was Discovered:

To reproduce the issue:

  • A new test, should reconnect to the same partition after a close event, was introduced.
  • The test successfully replicates the failure by simulating metadata updates and validating the ability of producers to reconnect to a specific partition after a clean-up operation.

Impact of the Issue:

  • When did it happen?

    • This issue was triggered when stream metadata was changed, such as updates to stream replicas.
  • How did the application behave?

    • The application ran without restrictions with errors during publishing, but the reconnect is not established
    • In the RabbitMQ Management UI, the consumer was visibly disconnected but was not recreated by the application.
  • Severity Level:

    • The issue is critical, as it blocks RabbitMQ updates in production, where disabling a replica can cause a cascading failure of publisher connections. This poses a significant risk to high-availability implementations of RabbitMQ.

Fix Implemented:

  • Removed unnecessary mutex locking in the method to prevent the possibility of a deadlock scenario. maybeCleanProducers

Step-by-step description of the problem

  1. Thread 1: maybeCleanConsumers locks client mutex pkg/stream/environment.go:513
  2. Thread 1: c.coordinator.RemoveConsumerById triggers signal to producers closeHandler function pkg/stream/producer.go:641
  3. Thread 1:closeHandler triggers partitionCloseEvent that the client can handle and try to reconnect (as in test)
  4. Thread 2:client's reconnection call (from test) event.Context.ConnectPartition trigger environmentCoordinator mutex.Lock
  5. Thread 2: clientResult.connect() triggers clients mutex.Lock, but it is already locked on step 1
  6. Thread 1: calls coordinator.maybeCleanClients() in producers cleanUp function and tries to lock producersEnvironment mutex, but it is already locked on step 4 by Thread 2
  7. Thread 1 is waiting producersEnvironment's mutex that blocked Thread 2 and Thread2 is waiting client's mutex that blocked Thread 1 - we got deadlock

yurahaid added 2 commits May 12, 2025 18:37
- Added a new test to validate reconnections on partition close events and catching deadlock
- Updated `maybeCleanProducers` to remove unnecessary mutex locks to pevent deadlock
@Gsantomaggio
Copy link
Member

Thank you @yurahaid
We will check it soon

@Gsantomaggio
Copy link
Member

I am doing some testing, and the patch makes sense to me.
@yurahaid Thank you for digging into this issue. The coordinator part has always been too complex to handle. I intended to rewrite that part at some point and make everything easier.

At the moment, I don't have time. Hope to do it in the future.

I will continue with some tests.
Will also wait for some feedback from @hiimjako

@Gsantomaggio
Copy link
Member

@yurahaid @hiimjako I am still convinced about the PR, but:

executed via make test ( it uses --race)

 WARNING: DATA RACE
  Write at 0x00c00067fa40 by goroutine 736:
    runtime.mapaccess2()
        /usr/local/go/src/internal/runtime/maps/runtime_swiss.go:117 +0x2dc
    github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.(*Coordinator).ExtractProducerById()
        /home/rabbitmq-stream-go-client/pkg/stream/coordinator.go:252 +0x1ac
    github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.(*Producer).close()
        /home/rabbitmq-stream-go-client/pkg/stream/producer.go:664 +0x4b4
    github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.(*Producer).Close()
        /home/rabbitmq-stream-go-client/pkg/stream/producer.go:620 +0x2c0
    github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.(*SuperStreamProducer).Close()
        /home/rabbitmq-stream-go-client/pkg/stream/super_stream_producer.go:393 +0x1b4
    github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.init.func12.12()
        /home/rabbitmq-stream-go-client/pkg/stream/super_stream_producer_test.go:531 +0xa68
    github.com/onsi/ginkgo/v2/internal.extractBodyFunction.func3()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/internal/node.go:475 +0x30
    github.com/onsi/ginkgo/v2/internal.(*Suite).runNode.func3()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/internal/suite.go:894 +0xf0

  Previous read at 0x00c00067fa40 by goroutine 776:
    runtime.mapIterStart()
        /usr/local/go/src/runtime/map_swiss.go:165 +0x9c
    github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.(*Client).maybeCleanProducers()
        /home/rabbitmq-stream-go-client/pkg/stream/environment.go:513 +0xd8
    github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.init.func12.12.3()
        /home/rabbitmq-stream-go-client/pkg/stream/super_stream_producer_test.go:519 +0x150

  Goroutine 736 (running) created at:
    github.com/onsi/ginkgo/v2/internal.(*Suite).runNode()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/internal/suite.go:881 +0xdec
    github.com/onsi/ginkgo/v2/internal.(*group).attemptSpec()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/internal/group.go:199 +0xb60
    github.com/onsi/ginkgo/v2/internal.(*group).run()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/internal/group.go:349 +0xc78
    github.com/onsi/ginkgo/v2/internal.(*Suite).runSpecs()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/internal/suite.go:489 +0xc30
    github.com/onsi/ginkgo/v2/internal.(*Suite).Run()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/internal/suite.go:130 +0x420
    github.com/onsi/ginkgo/v2.RunSpecs()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/core_dsl.go:300 +0xcf4
    github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream_test.TestStream()
        /home/rabbitmq-stream-go-client/pkg/stream/stream_suite_test.go:16 +0x70
    testing.tRunner()
        /usr/local/go/src/testing/testing.go:1792 +0x180
    testing.(*T).Run.gowrap1()
        /usr/local/go/src/testing/testing.go:1851 +0x40

  Goroutine 776 (finished) created at:
    github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.init.func12.12()
        /home/rabbitmq-stream-go-client/pkg/stream/super_stream_producer_test.go:516 +0x8b4
    github.com/onsi/ginkgo/v2/internal.extractBodyFunction.func3()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/internal/node.go:475 +0x30
    github.com/onsi/ginkgo/v2/internal.(*Suite).runNode.func3()
        /home/go/pkg/mod/github.com/onsi/ginkgo/[email protected]/internal/suite.go:894 +0xf0
  ==================
  << Captured StdOut/StdErr Output

@hiimjako
Copy link
Collaborator

@yurahaid @hiimjako I am still convinced about the PR, but:

executed via make test ( it uses --race)

I encountered the same problem and was investigating the race condition.
I also suspect that there is unnecessary use of locks in the client, although removing them seems like a significant effort.

@yurahaid
Copy link
Contributor Author

@hiimjako @Gsantomaggio
Yes, the problem is indeed.
Locking does not work during iteration over a map, so my fix is not ​​entirely correct

@yurahaid
Copy link
Contributor Author

I see two possible solutions for this

  1. Getter should return only a copy of the map
func (coordinator *Coordinator) Producers() map[interface{}]interface{} {
	coordinator.mutex.Lock()
	defer coordinator.mutex.Unlock()

	producersCopy := make(map[interface{}]interface{}, len(coordinator.producers))
	for k, v := range coordinator.producers {
		producersCopy[k] = v
	}

	return producersCopy
}
  1. Use sync.Map

Considering the current code in the library, it seems that the simpler and more reliable solution is to use the first option with copying.

@hiimjako @Gsantomaggio What do you think?

@Gsantomaggio
Copy link
Member

Thank you @yurahaid

I would vote for sync.Map

@yurahaid
Copy link
Contributor Author

I changed it to sync.Map, and everything works.

Also, I changed the signatures of the Coordinator.Consumers and Coordinator.Producers functions. These are public functions, but Coordinator looks like a structure that should be used only inside the library, and no client should use it directly, so these changes should be backward-compatible, right?

@Gsantomaggio
Copy link
Member

Gsantomaggio commented May 14, 2025

Thanks @yurahaid

but Coordinator looks like a structure that should be used only inside the library, and no client should use it directly

yes, Coordinator should be only internal.

so these changes should be backward-compatible, right?

Not necessarily. I doubt that someone uses the Coordinator

Copy link
Collaborator

@hiimjako hiimjako left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mergeable for me now that it runs locally without issues :)

@Gsantomaggio Gsantomaggio merged commit 073a047 into rabbitmq:main May 14, 2025
2 checks passed
@Gsantomaggio
Copy link
Member

Thanks @yurahaid @hiimjako

@Gsantomaggio Gsantomaggio added this to the 1.5.5 milestone May 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants