视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
详细介绍RxJS在Angular中的应用
2020-11-27 22:28:59 责编:小采
文档


RxJS是一种针对异步数据流编程工具,或者叫响应式扩展编程;可不管如何解释RxJS其目标就是异步编程,Angular引入RxJS为了就是让异步可控、更简单。

而今就是要探讨什么是Observable、observer、operator、Submit、EventEmmit,以及如何去使用它们。

什么是Observable?

Observable只是一个普通函数,要想让他有所作为,就需要跟observer一起使用;前者是受后者是攻。而这个observer(后面我们会介绍)只是一个带有 next、error、complete 的简单对象而已。最后,还需要通过 subscribe 订阅来启动Observable;否则它是不会有任何反应;可以理解为陌*为了他们能在一起而提供的环境,而订阅也会返回一个可用于取消操作(在RxJS里叫 unsubscribe)。

当Observable设置观察者后,而连接并获取原始数据的这个过程叫生产者,可能是DOM中的 click 事件、input 事件、或者更加复杂的HTTP通信。

为了更好理解,先从一个简单的示例开始:

import { Component } from '@angular/core';
import { Observable, Subscription } from 'rxjs';

@Component({
 selector: 'app-home',
 template: `<input type="text"> `
})
export class HomeComponent {
 ngOnInit() {
 const node = document.querySelector('input[type=text]');

 // 第二个参数 input 是事件名,对于input元素有一个 oninput 事件用于接受用户输入
 const input$ = Observable.fromEvent(node, 'input');
 input$.subscribe({
 next: (event: any) => console.log(`You just typed ${event.target.value}!`),
 error: (err) => console.log(`Oops... ${err}`),
 complete: () => console.log(`Complete!`)
 });
 }
}

示例中 Observable.fromEvent() 会返回一个Observable,并且监听 input 事件,当事件被触发后会发送一个 Event 给对应的observer观察者。

什么是observer?

observer非常简单,像上面示例中 subscribe 订阅就是接收一个 observer 方法。

一般在Angular我们 subscribe 会这么写:

input$.subscribe((event: any) => {

});

从语法角度来讲和 subscribe({ next, error, complete }) 是一样的。

当Observable产生一个新值时,会通知 observer 的 next(),而当捕获失败可以调用 error()。

当Observable被订阅后,除非调用observer的 complete() 或 unsubscribe() 取消订阅两情况以外;会一直将值传递给 observer。

Observable的生产的值允许经过一序列格式化或操作,最终得到一个有价值的数据给观察者,而这一切是由一序列链式operator来完成的,每一个operator都会产生一个新的Observable。而我们也称这一序列过程为:流。

什么是operator?

正如前面说到的,Observable可以链式写法,这意味着我们可以这样:

Observable.fromEvent(node, 'input')
 .map((event: any) => event.target.value)
 .filter(value => value.length >= 2)
 .subscribe(value => { console.log(value); });

下面是整个顺序步骤:

  • 假设用户输入:a
  • Observable对触发 oninput 事件作出反应,将值以参数的形式传递给observer的 next()。
  • map() 根据 event.target.value 的内容返回一个新的 Observable,并调用 next() 传递给下一个observer。
  • filter() 如果值长度 >=2 的话,则返回一个新的 Observable,并调用 next() 传递给下一个observer。
  • 最后,将结果传递给 subscribe 订阅块。
  • 你只要记住每一次 operator 都会返回一个新的 Observable,不管 operator 有多少个,最终只有最后一个 Observable 会被订阅。

    不要忘记取消订阅

    为什么需要取消订阅

    Observable 当有数据产生时才会推送给订阅者,所以它可能会无限次向订阅者推送数据。正因为如此,在Angular里面创建组件的时候务必要取消订阅操作,以避免内存泄漏,要知道在SPA世界里懂得擦屁股是一件必须的事。

    unsubscribe

    前面示例讲过,调用 subscribe() 后,会返回一个 Subscription 可用于取消操作 unsubscribe()。最合理的方式在 ngOnDestroy 调用它。

    ngOnDestroy() {
     this.inputSubscription.unsubscribe();
    }

    takeWhile

    如果组件有很多订阅者的话,则需要将这些订阅者存储在数组中,并组件被销毁时再逐个取消订阅。但,我们有更好的办法:

    使用 takeWhile() operator,它会在你传递一个布尔值是调用 next() 还是 complete()。

    private alive: boolean = true;
    ngOnInit() {
     const node = document.querySelector('input[type=text]');
    
     this.s = Observable.fromEvent(node, 'input')
     .takeWhile(() => this.alive)
     .map((event: any) => event.target.value)
     .filter(value => value.length >= 2)
     .subscribe(value => { console.log(value) });
    }
    
    ngOnDestroy() {
     this.alive = false;
    }
    

    简单有效,而且优雅。

    Subject

    如果说 Observable 与 observer 是攻受结合体的话,那么 Subject 就是一个人即攻亦受。正因为如此,我们在写一个Service用于数据传递时,总是使用 new Subject。

    @Injectable()
    export class MessageService {
     private subject = new Subject<any>();
    
     send(message: any) {
     this.subject.next(message);
     }
    
     get(): Observable<any> {
     return this.subject.asObservable();
     }
    }
    

    当F组件需要向M组件传递数据时,我们可以在F组件中使用 send()。

    constructor(public srv: MessageService) { }
    
    ngOnInit() {
     this.srv.send('w s k f m?')
    }
    

    而M组件只需要订阅内容就行:

    constructor(private srv: MessageService) {}
    
    message: any;
    ngOnInit() {
     this.srv.get().subscribe((result) => {
     this.message = result;
     })
    }
    

    EventEmitter

    其实EventEmitter跟RxJS没有直接关系,因为他是Angular的产物,而非RxJS的东西。或者我们压根没必要去谈,因为EventEmitter就是Subject。

    EventEmitter的作用是使指令或组件能自定义事件。

    @Output() changed = new EventEmitter<string>();
    
    click() {
     this.changed.emit('hi~');
    }
    

    @Component({
     template: `<comp (changed)="subscribe($event)"></comp>`
    })
    export class HomeComponent {
     subscribe(message: string) {
     // 接收:hi~
     }
    }
    

    上面示例其实和上一个示例中 MessageService 如出一辙,只不过是将 next() 换成 emit() 仅此而已。

    结论

    RxJS最难我想就是各种operator的应用了,这需要一些经验的积累。

    RxJS很火很大原因我认还是提供了丰富的API,以下是摘抄:

    创建数据流:

  • 单值:of, empty, never
  • 多值:from
  • 定时:interval, timer
  • 从事件创建:fromEvent
  • 从Promise创建:fromPromise
  • 自定义创建:create
  • 转换操作:

  • 改变数据形态:map, mapTo, pluck
  • 过滤一些值:filter, skip, first, last, take
  • 时间轴上的操作:delay, timeout, throttle, debounce, audit, bufferTime
  • 累加:reduce, scan
  • 异常处理:throw, catch, retry, finally
  • 条件执行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
  • 转接:switch
  • 组合数据流:

  • concat,保持原来的序列顺序连接两个数据流
  • merge,合并序列
  • race,预设条件为其中一个数据流完成
  • forkJoin,预设条件为所有数据流都完成
  • zip,取各来源数据流最后一个值合并为对象
  • combineLatest,取各来源数据流最后一个值合并为数组
  • 另,最好使用 $ 结尾的命名方式来表示Observable,例:input$。

    下载本文
    显示全文
    专题