RxJS6便利なよく使うOperatorsの使い方まとめ

RxJSPercalTypeScript
RxJS6便利なよく使うOperatorsの使い方まとめ
Article

RxJXがv6へアップデートされ オペレータ周りやインポートパスの変更が入ったため あらためてRxJS6の便利なオペレータ周りの使い方をまとめました。

とりあえず動かすための設定から
紹介させていただきたいと思います。

サンプルコード

Github
https://github.com/t-creator/demo-rxjs6-operators

環境

  • RxJS 6.2.0
  • typescript 2.9.1
  • parcel 1.8.1
  • Yarn 1.6.0

インストール

インストールはYarnで行いました。
RxJSをインストールします。

terminal
$ yarn add rxjs

typescriptRxJSの型ファイルと
動作環境用にparcelをインストールします。

terminal
$ yarn add -D typescript @types/rx parcel-bundler

package.jsonへ下記を追加します。

package.json
"scripts": { "dev": "parcel index.html" },

index.htmlを追加します。

index.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta http-equiv="X-UA-Compatible" content="ie=edge"> <title>Document</title> </head> <body> <ul id="itemLists"></ul> <script src="./src/app.ts"></script> </body> </html>

src/app.tsを追加します。

import { of, from, interval, fromEvent, timer, ObservableInput
} from 'rxjs';
import {
  map,
  filter,
  skip,
  startWith,
  takeUntil,
  mergeMap,
  concatMap,
  switchMap,
  debounceTime,
  throttleTime,
  withLatestFrom,
  delay,
} from 'rxjs/operators';
import { Observable } from 'rx';

これで動かす準備は整いました。 早速Observableの使い方を見ていきましょう。

Operators

map

map 新しい要素を返します。

/*
* map
* 新しい要素を返します。
* map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R>
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    map((x: number): number => x * 2)
  )
  .subscribe(
    (x: number): void => console.log(x),
  );
// 結果
// 0
// 2
// 4
// 6
// 8
// 10

filter

filter trueを返した要素を返します(絞り込まれる)。

/*
* filter
* trueを返した要素を返します(絞り込まれる)。
* filter<T>(predicate: (value: T, index: number) => boolean, thisArg?: any): MonoTypeOperatorFunction<T>
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    filter((x: number): boolean => x%2 === 0)
  )
  .subscribe(
    (x: number): void => console.log(x),
  );
// 結果
// 0
// 2
// 4

skip

skip 引数で渡したデータの回数分を無視します。

/*
* skip
* 引数で渡したデータの回数分を無視します。
* skip<T>(count: number): MonoTypeOperatorFunction<T>
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    skip(3)
  )
  .subscribe(
    (x: number): void => console.log(x),
  );
// 結果
// 3
// 4
// 5

startWith

startWith 引数で渡したデータを最初に返します。

/*
* startWith
* 引数で渡したデータを最初に返します。
* startWith<T>(...array: Array<T | SchedulerLike>): MonoTypeOperatorFunction<T>
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    startWith(4)
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 4
// 0
// 2
// 4
// 6
// 8
// 10

takeUntil

takeUntil 値が流れたら処理を中断します。

/*
* takeUntil
* 値が流れたら処理を中断します。
* takeUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T>
*/
const clicks = fromEvent(document, 'click');
interval(1000)
  .pipe(
    takeUntil(clicks)
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 0
// 1
// clickしたら終了します

mergeMap

mergeMap 新しいストリームを返します。

/*
* mergeMap
* 新しいストリームを返します。
* mergeMap<T, I, R>(project: (value: T, index: number) => ObservableInput<I>, resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number, concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, I | R>
*/
of(1, 2, 3, 4, 5)
  .pipe(
    mergeMap((x: number): ObservableInput<number> => from(of(x).pipe(delay((6-x) * 10))))
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 5
// 4
// 5
// 2
// 1

concatMap

concatMap 新しいストリームを返します。非同期だった場合発行順は無視され、非同期の解決順で並びます。

/*
* concatMap
* 新しいストリームを返します。非同期だった場合発行順は無視され、非同期の解決順で並びます。
*
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    concatMap((x: number): ObservableInput<number> => from(of(x).pipe(delay((6-x) * 10))))
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 1
// 2
// 3
// 4
// 5

switchMap

switchMap 新しいストリームを返します。前の非同期処理が解決する前に次の処理が流れてくると前のものはキャンセルされてしまいます。

/*
* switchMap
* 新しいストリームを返します。前の非同期処理が解決する前に次の処理が流れてくると前のものはキャンセルされてしまいます。
*
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    switchMap((x: number): ObservableInput<number> => from(of(x).pipe(delay(x * 10))))
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 5

debounceTime

debounceTime 発生するデータから指定した時間以内のデータを間引き連続されたデータの最後を返します。

/*
* debounceTime
* 発生するデータから指定した時間以内のデータを間引き連続されたデータの最後を返します。
* debounceTime<T>(dueTime: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T>
*/
fromEvent(document, 'click')
  .pipe(
    debounceTime(1000)
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// click連打
// MouseEvent {isTrusted: true, screenX: 244, screenY: -746, clientX: 651, clientY: 571, …}

throttleTime

throttleTime 発生する最初のデータから指定した時間以内のデータを間引き連続されたデータの最後を返します。

/*
* throttleTime
* 発生する最初のデータから指定した時間以内のデータを間引き連続されたデータの最後を返します。
* throttleTime<T>(duration: number, scheduler: SchedulerLike = async, config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction<T>
*/
fromEvent(document, 'click')
  .pipe(
    throttleTime(1000)
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// click連打
// 一秒ごとに発生
// MouseEvent {isTrusted: true, screenX: 244, screenY: -746, clientX: 651, clientY: 571, …}

withLatestFrom

withLatestFrom Observableにもう一方のObservabeの最新値を合成します。

/*
* withLatestFrom
* Observableにもう一方のObservabeの最新値を合成します。
* withLatestFrom<T, R>(...args: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): OperatorFunction<T, R>
*/
fromEvent(document, 'click')
  .pipe(
    withLatestFrom(interval(1000))
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// click
// クリックイベントとintervalの結果が帰ってきます。
//  [MouseEvent, 0]