Skip to content

【Rxjs一】Observable 基本用法 #44

@xinbaihui

Description

@xinbaihui

概念

  • Rxjs:Reactive Extention Js,一个JS事件库,使用Observable进行异步事件管理

  • Operator: 采用函数式编程风格的纯函数, 使用像map, filter, concat, flatMap等操作符处理集合

  • 基于观察者模式 + 迭代器模式的函数响应式编程
    观察者模式:生产者(Publisher)发布数据推送给 订阅者(Observer)
    image

    迭代器模式:迭代器提供通用的接口,让使用者不用关心数据集合的具体实现方式

  • 使用纯函数保证应用状态的隔离,保证数据的纯净性

Observable基本用法

const onSubscribe = subscriber => {
  // 这里的subscriber是对subscribe中subscriber的封装
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  // subscriber.error("Error");  // 函数内部会执行unsubscribe(),so 再调用complete()也不会再执行
  subscriber.complete()  // 函数内部会执行unsubscribe()
}

const source$ = new Observable(onSubscribe);

const subscriber = {
  next: val => console.log(val),
  error: err => console.log(err),
  complete: () => console.log("No more data")
}

source$.subscribe(subscriber);
  • 状态变化
    正常状态到出错状态或者完结状态,且不可逆(Like Promise)。subscriber.error("Error")执行后,即使调用subscriber.complete()也不会执行。因为onSubscribe 中的subscriber是subscribe(observer)中observer的封装, 会封装成类Subscriber。

  • 退订Observable
    unsubscribe 断开Observable 和 Observer的连接。一旦调用unsubscribe被调用, subscriber 就不会再收到数据。即使unsubscribe函数内什么都不做(clearInterval() 被注释后, 仍然有 “create data i: 4”, “create data i: 5”......, 但是observer的next不会执行)

    // defination
    const onSubscribe = subscriber => {
        let i = 1;
        const handle = setInterval(() => {
          console.log('create data i:' + i);
          observer.next(i++);
        }, 1000);
        return {
          unsubscribe: () => {
            clearInterval(handle); // 即使这里被注释掉,subscriber也接收不到数据了,因为Subscriber类中isStopped = true了
          }
        }
      }
      
      const source$ = new Observable(onSubscribe);
      
      const subscriber = {
        next: (val: any) => console.log(val),
        error: (err: any) => console.log(err),
        complete: () => console.log("No more data")
      }
      
      const subscription = source$.subscribe(subscriber);
    
      setTimeout(() => {
        subscription.unsubscribe();  // 封装的函数内部 this.isStopped = true
      }, 3500)
    

源码参考:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts
https://juejin.cn/post/6985348097064828942

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions