Skip to content

RxBlocking does't work well with PublishSubject and a suggestion to improve it #2617

Open
@xrloong

Description

@xrloong

Short description of the issue:

I know there is already an issue, #1480, discussing this case, and @abdulowork had replied the reason and gave an alternative way to test PublishSubject.

But I don't think the limitation is reasonable since when we test some mechanism using Observable, we not know if this observable is derived from PublishSubject or not.

Moreover, it's possible that developers may change underlying implementation from ReplaySubject to PublishSubject for some reason, and then related test cases failed

Thus, I suggest to fix the behavior of for Observable.toBlocking(). Here is the possible implementation:

private var disposeBag: DisposeBag!
extension ObservableConvertibleType {
    public func toBlocking(timeout: TimeInterval? = nil) -> BlockingObservable<Element> {
         let replaySubject = ReplaySubject<Element>.createUnbounded()

         // forward all events to replaySubject
         asObservable().subscribe(replaySubject)
             .disposed(by: disposeBag)

         return BlockingObservable(timeout: timeout, source: replaySubject)
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions