视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
关于RxJS Subject的学习笔记
2020-11-27 22:02:48 责编:小采
文档


Observable subscribe

在介绍RxJS - Subject 之前,我们先来看个示例:

const interval$ = Rx.Observable.interval(1000).take(3);

interval$.subscribe({
 next: value => console.log('Observer A get value: ' + value);
});

setTimeout(() => {
 interval$.subscribe({
 next: value => console.log('Observer B get value: ' + value);
 });
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 0
Observer A get value: 2
Observer B get value: 1
Observer B get value: 2

通过以上示例,我们可以得出以下结论:

  • Observable 对象可以被重复订阅
  • Observable 对象每次被订阅后,都会重新执行
  • 上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:

    function interval() {
     setInterval(() => console.log('..'), 1000);
    }
    
    interval();
    
    setTimeout(() => {
     interval();
    }, 1000);

    Observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播 (multicast),那我们要怎么实现呢 ?回想一下我们刚才介绍过观察者模式,你脑海中是不是已经想到方案了。没错,我们可以通过自定义 Subject 来实现上述功能。

    自定义 Subject

    Subject 类定义

    class Subject { 
     constructor() {
     this.observers = [];
     }
     
     addObserver(observer) { 
     this.observers.push(observer);
     }
     
     next(value) { 
     this.observers.forEach(o => o.next(value)); 
     }
     
     error(error){ 
     this.observers.forEach(o => o.error(error));
     }
     
     complete() {
     this.observers.forEach(o => o.complete());
     }
    }

    使用示例

    const interval$ = Rx.Observable.interval(1000).take(3);
    let subject = new Subject();
    
    let observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.addObserver(observerA); // 添加观察者A
    interval$.subscribe(subject); // 订阅interval$对象
    setTimeout(() => {
     subject.addObserver(observerB); // 添加观察者B
    }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 0
    Observer A get value: 1
    Observer B get value: 1
    Observer A get value: 2
    Observer B get value: 2
    Observer A complete!
    Observer B complete!

    通过自定义 Subject,我们实现了前面提到的功能。接下来我们进入正题 - RxJS Subject。

    RxJS Subject

    首先我们通过 RxJS Subject 来重写一下上面的示例:

    const interval$ = Rx.Observable.interval(1000).take(3);
    let subject = new Rx.Subject();
    
    let observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.subscribe(observerA); // 添加观察者A
    interval$.subscribe(subject); // 订阅interval$对象
    setTimeout(() => {
     subject.subscribe(observerB); // 添加观察者B
    }, 1000);

    RxJS Subject 源码片段

    /**
     * Suject继承于Observable 
     */
    export class Subject extends Observable {
     constructor() {
     super();
     this.observers = []; // 观察者列表
     this.closed = false;
     this.isStopped = false;
     this.hasError = false;
     this.thrownError = null;
     }
     
     next(value) {
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     if (!this.isStopped) {
     const { observers } = this;
     const len = observers.length;
     const copy = observers.slice();
     for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
     copy[i].next(value);
     }
     }
     }
     
     error(err) {
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     this.hasError = true;
     this.thrownError = err;
     this.isStopped = true;
     const { observers } = this;
     const len = observers.length;
     const copy = observers.slice();
     for (let i = 0; i < len; i++) { // 循环调用观察者error方法
     copy[i].error(err);
     }
     this.observers.length = 0;
     }
     
     complete() {
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     this.isStopped = true;
     const { observers } = this;
     const len = observers.length;
     const copy = observers.slice();
     for (let i = 0; i < len; i++) { // 循环调用观察者complete方法
     copy[i].complete();
     }
     this.observers.length = 0; // 清空内部观察者列表
     }
    }

    通过 RxJS Subject 示例和源码片段,对于 Subject 我们可以得出以下结论:

  • Subject 既是 Observable 对象,又是 Observer 对象
  • 当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast)
  • Angular 2 RxJS Subject 应用

    在 Angular 2 中,我们可以利用 RxJS Subject 来实现组件通信,具体示例如下:

    message.service.ts

    import { Injectable } from '@angular/core';
    import {Observable} from 'rxjs/Observable';
    import { Subject } from 'rxjs/Subject';
    
    @Injectable()
    export class MessageService {
     private subject = new Subject<any>();
    
     sendMessage(message: string) {
     this.subject.next({ text: message });
     }
    
     clearMessage() {
     this.subject.next();
     }
    
     getMessage(): Observable<any> {
     return this.subject.asObservable();
     }
    }

    home.component.ts

    import { Component } from '@angular/core';
    
    import { MessageService } from '../_services/index';
    
    @Component({
     moduleId: module.id,
     templateUrl: 'home.component.html'
    })
    
    export class HomeComponent {
     constructor(private messageService: MessageService) {}
     
     sendMessage(): void { // 发送消息
     this.messageService.sendMessage('Message from Home Component to App Component!');
     }
    
     clearMessage(): void { // 清除消息
     this.messageService.clearMessage();
     }
    }

    app.component.ts

    import { Component, OnDestroy } from '@angular/core';
    import { Subscription } from 'rxjs/Subscription';
    
    import { MessageService } from './_services/index';
    
    @Component({
     moduleId: module.id,
     selector: 'app',
     templateUrl: 'app.component.html'
    })
    
    export class AppComponent implements OnDestroy {
     message: any;
     subscription: Subscription;
    
     constructor(private messageService: MessageService) {
     this.subscription = this.messageService.getMessage()
     .subscribe(message => { this.message = message; });
     }
    
     ngOnDestroy() {
     this.subscription.unsubscribe();
     }
    }

    以上示例实现的功能是组件之间消息通信,即 HomeComponent 子组件,向 AppComponent 父组件发送消息。代码运行后,浏览器的显示结果如下:

    Subject 存在的问题

    因为 Subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next 方法,具体如下:

    next(value) {
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     if (!this.isStopped) {
     const { observers } = this;
     const len = observers.length;
     const copy = observers.slice();
     for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
     copy[i].next(value);
     }
     }
    }

    这样会有一个大问题,如果某个 observer 在执行时出现异常,却没进行异常处理,就会影响到其它的订阅者,具体示例如下:

    const source = Rx.Observable.interval(1000);
    const subject = new Rx.Subject();
    
    const example = subject.map(x => {
     if (x === 1) {
     throw new Error('oops');
     }
     return x;
    });
    subject.subscribe(x => console.log('A', x));
    example.subscribe(x => console.log('B', x));
    subject.subscribe(x => console.log('C', x));
    
    source.subscribe(subject);

    以上代码运行后,控制台的输出结果:

    A 0
    B 0
    C 0
    A 1
    Rx.min.js:74 Uncaught Error: oops

    JSBin - Subject Problem Demo

    在代码运行前,大家会认为观察者B 会在接收到 1 值时抛出异常,观察者 A 和 C 仍会正常运行。但实际上,在当前的 RxJS 版本中若观察者 B 报错,观察者 A 和 C 也会停止运行。那么应该如何解决这个问题呢?目前最简单的方式就是为所有的观察者添加异常处理,更新后的代码如下:

    const source = Rx.Observable.interval(1000);
    const subject = new Rx.Subject();
    
    const example = subject.map(x => {
     if (x === 1) {
     throw new Error('oops');
     }
     return x;
    });
    
    subject.subscribe(
     x => console.log('A', x),
     error => console.log('A Error:' + error)
    );
     
    example.subscribe(
     x => console.log('B', x),
     error => console.log('B Error:' + error)
    );
    
    subject.subscribe(
     x => console.log('C', x),
     error => console.log('C Error:' + error)
    );
    
    source.subscribe(subject);

    JSBin - RxJS Subject Problem Solved Demo

    RxJS Subject & Observable

    Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next() 方法,把值一一送出。

    Subject 之所以具有 Observable 中的所有方法,是因为 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法:

  • next - 每当 Subject 对象接收到新值的时候,next 方法会被调用
  • error - 运行中出现异常,error 方法会被调用
  • complete - Subject 订阅的 Observable 对象结束后,complete 方法会被调用
  • subscribe - 添加观察者
  • unsubscribe - 取消订阅 (设置终止标识符、清空观察者列表)
  • BehaviorSubject

    BehaviorSubject 定义

    BehaviorSubject 源码片段

    export class BehaviorSubject extends Subject {
     constructor(_value) { // 设置初始值
     super();
     this._value = _value;
     }
     get value() { // 获取当前值
     return this.getValue();
     }
     _subscribe(subscriber) {
     const subscription = super._subscribe(subscriber);
     if (subscription && !subscription.closed) {
     subscriber.next(this._value); // 为新的订阅者发送当前最新的值
     }
     return subscription;
     }
     getValue() {
     if (this.hasError) {
     throw this.thrownError;
     }
     else if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     else {
     return this._value;
     }
     }
     next(value) { // 调用父类Subject的next方法,同时更新当前值
     super.next(this._value = value);
     }
    }

    BehaviorSubject 应用

    有些时候我们会希望 Subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,我们希望 Subject 能够立即发出当前最新的值,而不是没有任何响应。具体我们先看一下示例:

    var subject = new Rx.Subject();
    
    var observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    setTimeout(() => {
     subject.subscribe(observerB); // 1秒后订阅
    }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3

    通过输出结果,我们发现在 observerB 订阅 Subject 对象后,它再也没有收到任何值了。因为 Subject 对象没有再调用 next() 方法。但很多时候我们会希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,我们就需要使用 BehaviorSubject。

    BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用来保存当前最新的值,而不是单纯的发送事件。BehaviorSubject 会记住最近一次发送的值,并把该值作为当前值保存在内部的属性中。接下来我们来使用 BehaviorSubject 重新一下上面的示例:

    var subject = new Rx.BehaviorSubject(0); // 设定初始值
    
    var observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    setTimeout(() => {
     subject.subscribe(observerB); // 1秒后订阅
    }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 0
    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3
    Observer B get value: 3

    JSBin - BehaviorSubject

    ReplaySubject

    ReplaySubject 定义

    ReplaySubject 源码片段

    export class ReplaySubject extends Subject {
     constructor(bufferSize = Number.POSITIVE_INFINITY, 
     windowTime = Number.POSITIVE_INFINITY, 
     scheduler) {
     super();
     this.scheduler = scheduler;
     this._events = []; // ReplayEvent对象列表
     this._bufferSize = bufferSize < 1 ? 1 : bufferSize; // 设置缓冲区大小
     this._windowTime = windowTime < 1 ? 1 : windowTime;
     }
     
     next(value) {
     const now = this._getNow();
     this._events.push(new ReplayEvent(now, value));
     this._trimBufferThenGetEvents();
     super.next(value);
     }
     
     _subscribe(subscriber) {
     const _events = this._trimBufferThenGetEvents(); // 过滤ReplayEvent对象列表
     let subscription;
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     ...
     else {
     this.observers.push(subscriber);
     subscription = new SubjectSubscription(this, subscriber);
     }
     ...
     const len = _events.length;
     // 重新发送设定的最后bufferSize个值
     for (let i = 0; i < len && !subscriber.closed; i++) {
     subscriber.next(_events[i].value);
     }
     ...
     return subscription;
     }
    }
    
    class ReplayEvent {
     constructor(time, value) {
     this.time = time;
     this.value = value;
     }
    }

    ReplaySubject 应用

    有些时候我们希望在 Subject 新增订阅者后,能向新增的订阅者重新发送最后几个值,这时我们就可以使用 ReplaySubject ,具体示例如下:

    var subject = new Rx.ReplaySubject(2); // 重新发送最后2个值
    
    var observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    setTimeout(() => {
     subject.subscribe(observerB); // 1秒后订阅
    }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3
    Observer B get value: 2
    Observer B get value: 3

    可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放。

    JSBin - ReplaySubject

    AsyncSubject

    AsyncSubject 定义

    AsyncSubject 源码片段

    export class AsyncSubject extends Subject {
     constructor() {
     super(...arguments);
     this.value = null;
     this.hasNext = false;
     this.hasCompleted = false; // 标识是否已完成
     }
     _subscribe(subscriber) {
     if (this.hasError) {
     subscriber.error(this.thrownError);
     return Subscription.EMPTY;
     }
     else if (this.hasCompleted && this.hasNext) { // 等到完成后,才发出最后的值
     subscriber.next(this.value);
     subscriber.complete();
     return Subscription.EMPTY;
     }
     return super._subscribe(subscriber);
     }
     next(value) {
     if (!this.hasCompleted) { // 若未完成,保存当前的值
     this.value = value;
     this.hasNext = true;
     }
     }
    }

    AsyncSubject 应用

    AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值,具体示例如下:

    var subject = new Rx.AsyncSubject();
    
     var observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
     };
    
     var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
     };
    
     subject.subscribe(observerA);
    
     subject.next(1);
     subject.next(2);
     subject.next(3);
    
     subject.complete();
    
     setTimeout(() => {
     subject.subscribe(observerB); // 1秒后订阅
     }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 3
    Observer A complete!
    Observer B get value: 3
    Observer B complete!

    JSBin - AsyncSubject

    下载本文
    显示全文
    专题