RxJS6便利なよく使うOperatorsの使い方まとめ

RxJSPercalTypeScript
RxJS6便利なよく使うOperatorsの使い方まとめ
Article
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

RxJXがv6へアップデートされ オペレータ周りやインポートパスの変更が入ったため あらためてRxJS6の便利なオペレータ周りの使い方をまとめました。

とりあえず動かすための設定から
紹介させていただきたいと思います。

環境

  • RxJS 6.2.0
  • typescript 2.9.1
  • parcel 1.8.1
  • Yarn 1.6.0

インストール

インストールはYarnで行いました。
RxJSをインストールします。

terminal
$ yarn add rxjs

typescriptRxJSの型ファイルと
動作環境用にparcelをインストールします。

terminal
$ yarn add -D typescript @types/rx parcel-bundler

package.jsonへ下記を追加します。

package.json
"scripts": { "dev": "parcel index.html" },

index.htmlを追加します。

index.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta http-equiv="X-UA-Compatible" content="ie=edge"> <title>Document</title> </head> <body> <ul id="itemLists"></ul> <script src="./src/app.ts"></script> </body> </html>

src/app.tsを追加します。

import { of, from, interval, fromEvent, timer, ObservableInput
} from 'rxjs';
import {
  map,
  filter,
  skip,
  startWith,
  takeUntil,
  mergeMap,
  concatMap,
  switchMap,
  debounceTime,
  throttleTime,
  withLatestFrom,
  delay,
} from 'rxjs/operators';
import { Observable } from 'rx';

これで動かす準備は整いました。 早速Observableの使い方を見ていきましょう。

Operators

map

VirtualBox
新しい要素を返します。
/*
* map
* 新しい要素を返します。
* map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R>
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    map((x: number): number => x * 2)
  )
  .subscribe(
    (x: number): void => console.log(x),
  );
// 結果
// 0
// 2
// 4
// 6
// 8
// 10

filter

VirtualBox
trueを返した要素を返します(絞り込まれる)。
/*
* filter
* trueを返した要素を返します(絞り込まれる)。
* filter<T>(predicate: (value: T, index: number) => boolean, thisArg?: any): MonoTypeOperatorFunction<T>
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    filter((x: number): boolean => x%2 === 0)
  )
  .subscribe(
    (x: number): void => console.log(x),
  );
// 結果
// 0
// 2
// 4

skip

VirtualBox
引数で渡したデータの回数分を無視します。
/*
* skip
* 引数で渡したデータの回数分を無視します。
* skip<T>(count: number): MonoTypeOperatorFunction<T>
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    skip(3)
  )
  .subscribe(
    (x: number): void => console.log(x),
  );
// 結果
// 3
// 4
// 5

startWith

VirtualBox
引数で渡したデータを最初に返します。
/*
* startWith
* 引数で渡したデータを最初に返します。
* startWith<T>(...array: Array<T | SchedulerLike>): MonoTypeOperatorFunction<T>
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    startWith(4)
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 4
// 0
// 2
// 4
// 6
// 8
// 10

takeUntil

VirtualBox
値が流れたら処理を中断します。
/*
* takeUntil
* 値が流れたら処理を中断します。
* takeUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T>
*/
const clicks = fromEvent(document, 'click');
interval(1000)
  .pipe(
    takeUntil(clicks)
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 0
// 1
// clickしたら終了します

mergeMap

VirtualBox
新しいストリームを返します。
/*
* mergeMap
* 新しいストリームを返します。
* mergeMap<T, I, R>(project: (value: T, index: number) => ObservableInput<I>, resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number, concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, I | R>
*/
of(1, 2, 3, 4, 5)
  .pipe(
    mergeMap((x: number): ObservableInput<number> => from(of(x).pipe(delay((6-x) * 10))))
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 5
// 4
// 5
// 2
// 1

concatMap

VirtualBox
新しいストリームを返します。非同期だった場合発行順は無視され、非同期の解決順で並びます。
/*
* concatMap
* 新しいストリームを返します。非同期だった場合発行順は無視され、非同期の解決順で並びます。
*
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    concatMap((x: number): ObservableInput<number> => from(of(x).pipe(delay((6-x) * 10))))
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 1
// 2
// 3
// 4
// 5

switchMap

VirtualBox
新しいストリームを返します。前の非同期処理が解決する前に次の処理が流れてくると前のものはキャンセルされてしまいます。
/*
* switchMap
* 新しいストリームを返します。前の非同期処理が解決する前に次の処理が流れてくると前のものはキャンセルされてしまいます。
*
*/
of(0, 1, 2, 3, 4, 5)
  .pipe(
    switchMap((x: number): ObservableInput<number> => from(of(x).pipe(delay(x * 10))))
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// 5

debounceTime

VirtualBox
発生するデータから指定した時間以内のデータを間引き連続されたデータの最後を返します。
/*
* debounceTime
* 発生するデータから指定した時間以内のデータを間引き連続されたデータの最後を返します。
* debounceTime<T>(dueTime: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T>
*/
fromEvent(document, 'click')
  .pipe(
    debounceTime(1000)
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// click連打
// MouseEvent {isTrusted: true, screenX: 244, screenY: -746, clientX: 651, clientY: 571, …}

throttleTime

VirtualBox
発生する最初のデータから指定した時間以内のデータを間引き連続されたデータの最後を返します。
/*
* throttleTime
* 発生する最初のデータから指定した時間以内のデータを間引き連続されたデータの最後を返します。
* throttleTime<T>(duration: number, scheduler: SchedulerLike = async, config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction<T>
*/
fromEvent(document, 'click')
  .pipe(
    throttleTime(1000)
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// click連打
// 一秒ごとに発生
// MouseEvent {isTrusted: true, screenX: 244, screenY: -746, clientX: 651, clientY: 571, …}

withLatestFrom

VirtualBox
Observableにもう一方のObservabeの最新値を合成します。
/*
* withLatestFrom
* Observableにもう一方のObservabeの最新値を合成します。
* withLatestFrom<T, R>(...args: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): OperatorFunction<T, R>
*/
fromEvent(document, 'click')
  .pipe(
    withLatestFrom(interval(1000))
  )
  .subscribe(
    (x: any): void => console.log(x),
  );
// 結果
// click
// クリックイベントとintervalの結果が帰ってきます。
//  [MouseEvent, 0]

終わりに

最後までご覧いただきありがとうございます。
この記事ではRxJS6便利なよく使うOperatorsの使い方まとめについて紹介させていただきました。

これからも皆様の開発に役立つ情報を提供していきたいと考えています。
今後ともよろしくお願いいたします。