首頁 > 軟體

RxJS中的Observable和Observer範例詳解

2022-08-17 14:01:47

引言

最近在專案當中別的小夥伴使用到了Rxjs,我一眼看上去有點懵,感覺挺複雜,挺繞的。於是抓緊補補課,然後就可以和小夥伴們一起交流怎麼能優雅的使用Rxjs。由於內容比較多,會分為三篇來講解說明

  • 初識 RxJS中的 ObservableObserver
  • 細說 RxJS中的 Operators
  • 在談 RxJS中的 SubjectSchedulers

概念

RxJS是一個庫,可以使用可觀察佇列來編寫非同步和基於事件的程式的庫。

RxJS 中管理和解決非同步事件的幾個關鍵點:

  • Observable: 表示未來值或事件的可呼叫集合的概念。
  • Observer: 是一個回撥集合,它知道如何監聽 Observable 傳遞的值。
  • Subscription: 表示一個 Observable 的執行,主要用於取消執行。
  • Operators:** 是純函數,可以使用函數語言程式設計風格來處理具有mapfilterconcatreduce等操作的集合。
  • Subject: 相當於一個EventEmitter,也是將一個值或事件多播到多個Observers的唯一方式。
  • Schedulers 是控制並行的集中排程程式,允許我們在計算髮生在 eg setTimeoutor requestAnimationFrame或者其它上時進行協調。

牛刀小試

我們通過在dom上繫結事件的小案例,感受一下Rxjs的魅力。

  • 在dom繫結事件,我們通常這樣處理
document.addEventListener('click', () => console.log('Clicked!'));

用Rxjs建立一個observable,內容如下

import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(() => console.log('Clicked!'));
  • 這時候我們簡單升級一下,需要記錄一下點選的數量
let count = 0;
document.addEventListener('click', () => console.log(`Clicked ${++count} times`));

用Rxjs可以隔離狀態,

import { fromEvent, scan } from 'rxjs';
fromEvent(document, 'click')
  .pipe(scan((count) => count + 1, 0))
  .subscribe((count) => console.log(`Clicked ${count} times`));

可以看到,我們用到了scan操作符,該操作符的工作方式和陣列的reduce類似,回撥函數接收一個值, 回撥的返回值作為下一次回撥執行暴露的一個值。

通過上面的案例可以看出,RxJS的強大之處在於它能夠使用純函數生成值。這意味著您的程式碼不太容易出錯。 通常你會建立一個不純的函數,你的程式碼的其他部分可能會弄亂你的狀態。

  • 這時候,需求又有變動了,要求我們一秒內只能有一次點選
let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate) {
    console.log(`Clicked ${++count} times`);
    lastClick = Date.now();
  }
});

使用Rxjs

fromEvent(document, 'click')
  .pipe(
    throttleTime(1000),
    scan((count) => count + 1, 0)
  )
  .subscribe((count) => console.log(`Clicked ${count} times`));

RxJS 有一系列的操作符,可以幫助你控制事件如何在你的 observables 中流動。

  • 這時候,我們要每次累計滑鼠x的值
let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', (event) => {
  if (Date.now() - lastClick >= rate) {
    count += event.clientX;
    console.log(count);
    lastClick = Date.now();
  }
});

使用Rxjs

import { fromEvent, throttleTime, map, scan } from 'rxjs';
fromEvent(document, 'click')
  .pipe(
    throttleTime(1000),
    map((event) => event.clientX),
    scan((count, clientX) => count + clientX, 0)
  )
  .subscribe((count) => console.log(count));

從上面看可以通過map去轉換observables 的值。

Observable

我們先來寫一個案例程式碼,大家可以猜下它的執行順序

import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});
console.log('just before subscribe');
observable.subscribe({
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
});
console.log('just after subscribe');

可以稍微想一下,正確的輸出結果

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

怎麼樣,和大家想的結果一樣嗎,我們來一下分析一下。

Observable 剖析

Observable 有兩種方式建立,一種是通過new Observable(),還有一種是通過Rx.Observable.create()的方式去建立。

Observable 核心的關注點:

  • 建立Observable
  • 訂閱Observable
  • 執行Observable
  • 取消Observable

建立Observable

const observable = new Observable(function subscribe(subscriber) {
  const id = setInterval(() => {
    subscriber.next('hi')
  }, 1000);
});

該程式碼是建立一個Observable,然後每隔1s向訂閱者傳送訊息。我們看到上邊的回撥函數是subscribe, 該函數是描述Observable最重要的部分。

  • 訂閱Observable
observable.subscribe(x => console.log(x));

observable中的subscribe中引數是一個回撥x => console.log(x),官方叫它Observer,其實Observer有多種形式,後邊我們會說到,在這裡就簡單理解,Observer 可以去消費資料,比如,在react中,我們這可以更新狀態資料等。

  • 執行Observable
 subscriber.next(1);   // Next 通知
 subscriber.complete(); // 完成 通知
 subscriber.error(err);  // Error 通知

其實就是執行一個惰性計算,可同步可非同步,

Observable Execution 可以傳遞三種型別的值:

  • Next:傳送數值、字串、物件等。
  • Error:傳送 JavaScript 錯誤或異常。
  • complete:不傳送值。

Next通知是最重要和最常見的型別:它們代表傳遞給訂閱者的實際資料。在 Observable 執行期間,Errorcomplete通知可能只發生一次,並且只能有其中之一。

  • 取消Observable
function subscribe(subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
  return function unsubscribe() {
    clearInterval(intervalId);
  };
}
const observable = new Observable(subscribe)
const unsubscribe = observable.subscribe({next: (x) => console.log(x)});
// Later:
unsubscribe(); // 取消執行

我們有看程式碼,建立了一個每秒輸出一個hi內容的Observable,但在我們的使用場景中,會有取消改行為,這時候就需要返回一個unsubscribe的方法,用於取消。

Observer

我們在上邊的場景中也提到了Observer, 但什麼是Observer呢,其實就是資料的消費者,先回顧一下上面的程式碼

observable.subscribe(x => console.log(x));

其實可以寫成

const observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);

這樣應就比較清晰了,observer只是具有三個回撥的物件,每一個用於Observable 可能傳遞不同型別的通知。注意,observer 物件中的型別可以不必要全都寫。

其實observer有許多變種,我們看下它的TS宣告就比較清楚了。

可以直接傳遞一個observer物件,或者只傳遞一個next回撥函數,在或者傳多個可選的回撥函數型別。

結束語

RxJS不建議大家盲目的去用,一定要有合適的場景,盲目的去用可能會造成專案的複雜度會大幅度的提升。

以上就是RxJS中的Observable和Observer範例詳解的詳細內容,更多關於RxJS Observable Observer的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com