Skip to content

identity_current_thread is bad "default" scheduler for operators #593

Open
@victimsnino

Description

@victimsnino

Hi everyone and hi @kirkshoop!

Most of the operators in rxcpp which requires schedulers has such an fallback scheduler:

If scheduler is omitted, identity_current_thread is used.

For example, merge operator also has it as default. BUT it doesn't provide ANY synchronization/serialization in case of multithreaded application.

Example:

    rxcpp::observable<>::just(1, rxcpp::observe_on_new_thread())
        .repeat()
        .merge(rxcpp::observable<>::just(2, rxcpp::observe_on_new_thread())
                    .repeat())
    .take(10).as_blocking().subscribe([](int v){
        std::cout << "==================\n" << std::this_thread::get_id() << " START " << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds{1});
        std::cout << v << std::endl;
        std::cout << std::this_thread::get_id() << " END " << std::endl << "==================\n\n";

    });

Possible output:

==================
0x70000ee59000 START 
==================
0x70000eedc000 START 
1
0x70000ee59000 END 
==================

2
0x70000eedc000 END 
==================

==================
0x70000ef5f000 START 
==================
0x70000efe2000 START 
1
0x70000efe2000 END 
==================

2
0x70000ef5f000 END 
==================

==================
0x70000ee59000 START 
==================
0x70000eedc000 START 
1
0x70000ee59000 END 
==================

2
0x70000eedc000 END 
==================

==================
0x70000ef5f000 START 
==================
0x70000efe2000 START 
1
0x70000ef5f000 END 
==================

2
0x70000efe2000 END 
==================

==================
0x70000ee59000 START 
==================
0x70000eedc000 START 
1
0x70000ee59000 END 
==================

2
0x70000eedc000 END 

As you can see, it is mixed, but ReactiveX requires that any observable should be serialized.

In case of using any valid scheduler in merge like observe_on_new_thread output is valid:

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

Expected: at least default behavior of any operator should be thread-safe...

In my understanding, best default scheduler for such an "multhithreaded" opertators can be "serialize_immediate" (not exist, but actually just emit emissions under mutex to provide exclusive access to subscriber and guarantee that only one observable pushes item at the same time). Tested locally: also provides valid output

BTW: it is how i've implemented merge in ReactivePlusPlus: each callback to subscriber of merge just called under mutex. As a result there is no way to obtain "mixed" log. @kirkshoop, what do you think ?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions