Skip to content

No messages from SUBSCRIBE __redis__:invalidate #927

@boekkooi-impossiblecloud

Description

Good day,

At the moment I'm busy migrating to rueidis and I'm learning a lot about Redis in the process.

Now I'm trying to move away from keyevents and use client tracking sadly I hit a little snag.

I was trying to subscribe to __redis__:invalidate using CoreClient.Receive but sadly this seems to not be supported as the pipe.handlePush is intercepting the messages.

Now I can work around this by creating another Client with the onInvalidations and then using client.Receive(ctx,client.B().Subscribe().Channel("__redis__:invalidate").Build(),func(msg rueidis.PubSubMessage) {}) so it's not a mayor issue but more of a bummer.

Normally I would try to create a PR but I'm honestly not sure what the right solution would be and I'm not very familiar with this library.

Thanks of maintaining this library and checking out the issue!
Cheers,
Warnar

Example

func TestExample(t *testing.T) {
	const prefix = "key"
	var redisClient rueidis.Client
	redisClient, _ = createRedisClient(t)

	expectedChangeCount := 10
	go func() {
		changes := 0
		err := redisClient.Dedicated(func(client rueidis.DedicatedClient) error {
			clientTrackingCmd := client.B().ClientTracking().On().Prefix().Prefix(prefix).Bcast().Build()
			err := client.Do(t.Context(), clientTrackingCmd).Error()
			if err != nil {
				return err
			}

			subscribeCmd := client.B().Subscribe().Channel(`__redis__:invalidate`).Build()
			return client.Receive(t.Context(),
				subscribeCmd,
				func(msg rueidis.PubSubMessage) {
					changes++
					t.Log(msg.Message)
				},
			)
		})
		if err != nil && !errors.Is(err, context.Canceled) {
			t.Error(err)
		}
		if changes != expectedChangeCount {
			t.Errorf("expected changes to be %d, got %d", expectedChangeCount, changes)
		}
	}()
	time.Sleep(time.Millisecond * 100)

	for i := 0; i < expectedChangeCount; i++ {
		err := redisClient.Do(t.Context(), redisClient.B().Set().Key(prefix+"1").Value("v"+strconv.Itoa(i)).Build()).Error()
		if err != nil {
			t.Error(err)
		}
	}
	time.Sleep(time.Millisecond * 100)
}

Workaround

func TestWorkaround(t *testing.T) {
	const prefix = "key"
	var (
		redisClient        rueidis.Client
		redisClientOptions rueidis.ClientOption
	)
	redisClient, redisClientOptions = createRedisClient(t)

	expectedChangeCount := 10
	go func() {
		changes := 0
		redisClientOptions.OnInvalidations = func(messages []rueidis.RedisMessage) {
			for _, message := range messages {
				changes++
				t.Log(message.ToString())
			}
		}

		workaroundClient, err := rueidis.NewClient(redisClientOptions)
		if err != nil {
			t.Error(err)
			return
		}

		err = workaroundClient.Dedicated(func(client rueidis.DedicatedClient) error {
			clientTrackingCmd := client.B().ClientTracking().On().Prefix().Prefix(prefix).Bcast().Build()
			err := client.Do(t.Context(), clientTrackingCmd).Error()
			if err != nil {
				return err
			}

			subscribeCmd := client.B().Subscribe().Channel(`__redis__:invalidate`).Build()
			return client.Receive(t.Context(),
				subscribeCmd,
				func(msg rueidis.PubSubMessage) {
					// Using OnInvalidations
				},
			)
		})
		if err != nil && !errors.Is(err, context.Canceled) {
			t.Error(err)
		}
		if changes != expectedChangeCount {
			t.Errorf("expected changes to be %d, got %d", expectedChangeCount, changes)
		}
	}()
	time.Sleep(time.Millisecond * 100)

	for i := 0; i < expectedChangeCount; i++ {
		err := redisClient.Do(t.Context(), redisClient.B().Set().Key(prefix+"1").Value("v"+strconv.Itoa(i)).Build()).Error()
		if err != nil {
			t.Error(err)
		}
	}
	time.Sleep(time.Millisecond * 100)
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions