Skip to content

Counting items of a Connectable Observable #385

@matejpavlovic

Description

@matejpavlovic

Hello, I am trying to count events produced by a Connectable Observable in 2 ways:

  • The total number of events and
  • The number of events that pass a filter.

Here is some example code:

package main

import (
	"context"
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func main() {
	events := rxgo.Create([]rxgo.Producer{func(_ context.Context, next chan<- rxgo.Item) {
		next <- rxgo.Of(expensiveReadFromDisk(0))
		next <- rxgo.Of(expensiveReadFromDisk(1))
		next <- rxgo.Of(expensiveReadFromDisk(2))
	}}, rxgo.WithPublishStrategy())

	total := events.Count()
	filtered := events.Filter(func(i interface{}) bool {
		return i.(int) > 0
	}).Count()

	events.Connect(context.Background())

	t, _ := total.Get()
	fmt.Printf("   Total: %d\n", t.V)
	f, _ := filtered.Get()
	fmt.Printf("Filtered: %d\n", f.V)
}

func expensiveReadFromDisk(e int) int {
	fmt.Printf("Reading event: %d\n", e)
	return e
}

I expected the code to output

Reading event: 0
Reading event: 1
Reading event: 2
   Total: 3
Filtered: 2

Instead, however, the code outputs only this:

Reading event: 0
Reading event: 1
Reading event: 2
   Total: 3

Then it blocks on the following line and gets stuck forever.

	f, _ := filtered.Get()

Is this the intended behavior? If yes, what would be the correct way of achieving the intended result?
Thank you very much!

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionQuestion regarding how RxGo is working etc.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions