T-CREATOR

TypeScript で進化する非同期ストリーム処理:AsyncIterator と型安全なデータフロー設計

TypeScript で進化する非同期ストリーム処理:AsyncIterator と型安全なデータフロー設計

現代の Web アプリケーション開発において、大量のデータを効率的に処理することは避けて通れない課題となっています。特に、リアルタイムデータの処理や、メモリを効率的に使いながらの段階的なデータ変換が求められる場面が増えてきました。

TypeScript の強力な型システムと組み合わせることで、AsyncIterator を活用した非同期ストリーム処理は、従来の問題を解決する画期的なアプローチを提供します。本記事では、実践的な観点から AsyncIterator の活用方法を詳しく解説していきます。

背景

従来の非同期処理の限界

従来の Promise ベースの非同期処理では、データ全体をメモリに読み込んでから処理するアプローチが一般的でした。しかし、このアプローチには重大な制限があります。

typescript// 従来のPromiseベースの処理例
async function processLargeDataset(
  url: string
): Promise<ProcessedData[]> {
  const response = await fetch(url);
  const data = await response.json(); // 全データを一度にメモリに読み込み

  return data.map((item) => processItem(item)); // 一括処理
}

この従来のアプローチでは、以下のような問題が発生します。

#問題点影響
1メモリ使用量の急増大容量データで OutOfMemory エラー
2ブロッキング処理UI の応答性低下
3全データ読み込み待機初回表示の遅延

ストリーム処理が求められる理由

モダンな Web アプリケーションでは、ユーザー体験の向上とシステムの安定性確保のために、ストリーム処理が重要な役割を果たします。

ストリーム処理が特に有効なシナリオを図で確認してみましょう。

mermaidflowchart TD
  A[大容量データ] --> B{処理方式}
  B -->|従来方式| C[一括読み込み]
  B -->|ストリーム方式| D[段階的読み込み]

  C --> E[メモリ枯渇]
  C --> F[UI凍結]

  D --> G[効率的メモリ使用]
  D --> H[応答性維持]
  D --> I[段階的表示]

ストリーム処理により、データを小さなチャンクに分割して順次処理することで、メモリ効率とユーザー体験の両方を向上させることができます。

TypeScript における型安全性の課題

JavaScript の動的な性質により、非同期処理においては実行時まで型エラーが発見されない場合があります。特に、データストリームの処理では、以下の課題が顕在化します。

typescript// 型安全でない従来の処理
function processStream(stream: any) {
  return stream.map((item) => {
    // item の型が不明確
    return item.someProperty; // 実行時エラーの可能性
  });
}

TypeScript の型システムを活用することで、コンパイル時に型エラーを検出し、より安全なコードを記述できるようになります。

課題

大量データ処理でのメモリ効率

現代の Web アプリケーションでは、数百 MB から数 GB のデータを扱う場面が珍しくありません。従来の処理方式では、これらのデータを一度にメモリに読み込むため、システムリソースが枯渇してしまいます。

typescript// メモリ効率の悪い例
async function loadAllUsers(): Promise<User[]> {
  const response = await fetch('/api/users'); // 100万件のユーザーデータ
  const users = await response.json(); // 全データをメモリに展開

  return users.filter((user) => user.isActive); // メモリ上で一括処理
}

この問題により、以下のような深刻な影響が発生します。

#影響対策の必要性
1アプリケーションクラッシュ
2レスポンス時間の増加
3サーバーリソースの浪費

型推論の複雑さと実行時エラー

非同期ストリーム処理では、データの型が処理段階によって変化するため、TypeScript の型推論が複雑になります。

typescript// 型推論が困難な例
async function* transformData(
  source: AsyncIterable<RawData>
) {
  for await (const item of source) {
    // item の型は RawData だが、変換後の型は?
    const transformed = await processItem(item);
    yield transformed; // 型推論が曖昧
  }
}

適切な型注釈がない場合、以下のような問題が発生します。

typescript// 実行時エラーの例
const result = transformData(dataSource);
// result の型が不明確なため、プロパティアクセスでエラー
console.log(result.someProperty); // TypeError: Cannot read property 'someProperty'

既存ライブラリとの統合問題

多くの既存ライブラリは従来の Promise ベースの API を提供しており、AsyncIterator との統合に課題があります。

typescript// ライブラリ統合の課題例
import { existingLibrary } from 'some-library';

async function integrateWithLibrary(
  data: AsyncIterable<Data>
) {
  for await (const item of data) {
    // 既存ライブラリはPromiseベースのAPI
    const result = await existingLibrary.process(item);
    // 型の不整合が発生する可能性
  }
}

これらの統合問題を解決するには、適切なアダプターパターンの実装が必要となります。

解決策

AsyncIterator による段階的データ処理

AsyncIterator は、非同期データストリームを効率的に処理するための強力な仕組みです。データを必要な分だけ段階的に取得し、メモリ使用量を最適化できます。

AsyncIterator の基本的な動作を図で確認しましょう。

mermaidsequenceDiagram
    participant Client
    participant AsyncIterator
    participant DataSource

    Client->>AsyncIterator: next()
    AsyncIterator->>DataSource: データ要求
    DataSource->>AsyncIterator: チャンクデータ
    AsyncIterator->>Client: {value, done: false}

    Client->>AsyncIterator: next()
    AsyncIterator->>DataSource: 次のデータ要求
    DataSource->>AsyncIterator: 次のチャンク
    AsyncIterator->>Client: {value, done: false}

    Client->>AsyncIterator: next()
    AsyncIterator->>DataSource: データ要求
    DataSource->>AsyncIterator: 終了通知
    AsyncIterator->>Client: {done: true}

基本的な AsyncIterator の実装例を見てみましょう。

typescript// AsyncIteratorの基本実装
class DataStreamIterator
  implements AsyncIterator<ProcessedItem>
{
  private currentIndex = 0;

  constructor(private dataSource: DataSource) {}

  async next(): Promise<IteratorResult<ProcessedItem>> {
    if (this.currentIndex >= this.dataSource.length) {
      return { done: true, value: undefined };
    }

    const rawItem = await this.dataSource.getItem(
      this.currentIndex
    );
    const processedItem = await this.processItem(rawItem);
    this.currentIndex++;

    return { done: false, value: processedItem };
  }

  private async processItem(
    item: RawItem
  ): Promise<ProcessedItem> {
    // データ変換処理
    return {
      id: item.id,
      processedAt: new Date(),
      data: item.rawData.toUpperCase(),
    };
  }
}

より実用的な Generator 関数を使った実装も可能です。

typescript// Generator関数による実装
async function* createDataStream(
  source: DataSource
): AsyncGenerator<ProcessedItem> {
  let index = 0;

  while (index < source.length) {
    const item = await source.getItem(index);
    yield await processItem(item);
    index++;
  }
}

この段階的処理により、メモリ使用量を大幅に削減できます。

TypeScript 型システムを活用した安全な設計

TypeScript の強力な型システムを活用することで、AsyncIterator を使った処理でも高い型安全性を実現できます。

typescript// 型安全なAsyncIterator実装
interface DataItem {
  id: number;
  name: string;
  category: string;
}

interface ProcessedItem {
  id: number;
  displayName: string;
  categoryCode: string;
  processedAt: Date;
}

async function* processDataStream(
  source: AsyncIterable<DataItem>
): AsyncGenerator<ProcessedItem, void, unknown> {
  for await (const item of source) {
    // 型推論により item は DataItem 型
    yield {
      id: item.id,
      displayName: `${item.name} (${item.category})`,
      categoryCode: item.category.toLowerCase(),
      processedAt: new Date(),
    };
  }
}

型ガードを活用したより安全な実装も可能です。

typescript// 型ガードによる安全性向上
function isValidDataItem(item: unknown): item is DataItem {
  return (
    typeof item === 'object' &&
    item !== null &&
    typeof (item as DataItem).id === 'number' &&
    typeof (item as DataItem).name === 'string' &&
    typeof (item as DataItem).category === 'string'
  );
}

async function* safeProcessDataStream(
  source: AsyncIterable<unknown>
): AsyncGenerator<ProcessedItem, void, unknown> {
  for await (const item of source) {
    if (isValidDataItem(item)) {
      // 型安全な処理
      yield processValidItem(item);
    } else {
      console.warn('Invalid data item:', item);
      // エラーハンドリング
    }
  }
}

ジェネリクスを使った再利用可能な実装

ジェネリクスを活用することで、様々な型のデータに対応できる再利用可能な AsyncIterator を作成できます。

typescript// ジェネリクスを使った汎用的な実装
interface StreamProcessor<T, U> {
  process(item: T): Promise<U>;
  shouldInclude?(item: T): boolean;
}

async function* createProcessingStream<T, U>(
  source: AsyncIterable<T>,
  processor: StreamProcessor<T, U>
): AsyncGenerator<U, void, unknown> {
  for await (const item of source) {
    // オプショナルなフィルタリング
    if (
      processor.shouldInclude &&
      !processor.shouldInclude(item)
    ) {
      continue;
    }

    try {
      const processed = await processor.process(item);
      yield processed;
    } catch (error) {
      console.error('Processing error:', error);
      // エラー処理の継続
    }
  }
}

具体的な使用例を見てみましょう。

typescript// ユーザーデータ処理の例
interface User {
  id: number;
  email: string;
  lastLogin?: Date;
}

interface UserSummary {
  id: number;
  domain: string;
  isActive: boolean;
}

const userProcessor: StreamProcessor<User, UserSummary> = {
  async process(user: User): Promise<UserSummary> {
    return {
      id: user.id,
      domain: user.email.split('@')[1],
      isActive: user.lastLogin
        ? Date.now() - user.lastLogin.getTime() <
          30 * 24 * 60 * 60 * 1000
        : false,
    };
  },

  shouldInclude(user: User): boolean {
    return user.email.includes('@');
  },
};

// 使用例
async function processUsers(users: AsyncIterable<User>) {
  const processedStream = createProcessingStream(
    users,
    userProcessor
  );

  for await (const summary of processedStream) {
    console.log(
      `User ${summary.id}: ${summary.domain} (${
        summary.isActive ? 'active' : 'inactive'
      })`
    );
  }
}

具体例

シンプルな AsyncIterator の実装

最も基本的な AsyncIterator の実装から始めてみましょう。ファイルから行単位でデータを読み取る例を示します。

typescriptimport { createReadStream } from 'fs';
import { createInterface } from 'readline';

// ファイル行読み取り用のAsyncIterator
async function* readFileLines(
  filePath: string
): AsyncGenerator<string, void, unknown> {
  const fileStream = createReadStream(filePath);
  const rl = createInterface({
    input: fileStream,
    crlfDelay: Infinity,
  });

  try {
    for await (const line of rl) {
      yield line;
    }
  } finally {
    rl.close();
  }
}

使用例を見てみましょう。

typescript// ファイル処理の実行例
async function processLogFile(filePath: string) {
  const lineCount = { total: 0, errors: 0 };

  for await (const line of readFileLines(filePath)) {
    lineCount.total++;

    if (line.includes('ERROR')) {
      lineCount.errors++;
      console.log(`Error found: ${line}`);
    }

    // 大量ファイルでも少ないメモリで処理可能
    if (lineCount.total % 1000 === 0) {
      console.log(`Processed ${lineCount.total} lines...`);
    }
  }

  console.log(
    `Total: ${lineCount.total}, Errors: ${lineCount.errors}`
  );
}

データ変換パイプラインの構築

複数の変換処理を組み合わせた、効率的なデータパイプラインを構築してみましょう。

typescript// パイプライン用のユーティリティ型
type AsyncTransformer<T, U> = (
  source: AsyncIterable<T>
) => AsyncGenerator<U, void, unknown>;

// フィルタリング変換
function asyncFilter<T>(
  predicate: (item: T) => boolean | Promise<boolean>
): AsyncTransformer<T, T> {
  return async function* (source: AsyncIterable<T>) {
    for await (const item of source) {
      if (await predicate(item)) {
        yield item;
      }
    }
  };
}

マップ変換とバッチ処理の実装も追加します。

typescript// マップ変換
function asyncMap<T, U>(
  mapper: (item: T) => U | Promise<U>
): AsyncTransformer<T, U> {
  return async function* (source: AsyncIterable<T>) {
    for await (const item of source) {
      yield await mapper(item);
    }
  };
}

// バッチ処理
function asyncBatch<T>(
  batchSize: number
): AsyncTransformer<T, T[]> {
  return async function* (source: AsyncIterable<T>) {
    let batch: T[] = [];

    for await (const item of source) {
      batch.push(item);

      if (batch.length >= batchSize) {
        yield batch;
        batch = [];
      }
    }

    // 残りのアイテムを処理
    if (batch.length > 0) {
      yield batch;
    }
  };
}

パイプライン関数で処理を組み合わせます。

typescript// パイプライン関数
function pipe<T>(source: AsyncIterable<T>): {
  filter: (
    predicate: (item: T) => boolean | Promise<boolean>
  ) => any;
  map: <U>(mapper: (item: T) => U | Promise<U>) => any;
  batch: (size: number) => any;
  collect: () => Promise<T[]>;
} {
  return {
    filter: (predicate) =>
      pipe(asyncFilter(predicate)(source)),
    map: (mapper) => pipe(asyncMap(mapper)(source)),
    batch: (size) => pipe(asyncBatch(size)(source)),

    async collect(): Promise<T[]> {
      const result: T[] = [];
      for await (const item of source) {
        result.push(item);
      }
      return result;
    },
  };
}

実際の使用例です。

typescript// パイプライン使用例
async function processUserData(users: AsyncIterable<User>) {
  const activeUserEmails = await pipe(users)
    .filter(
      (user) =>
        user.lastLogin &&
        user.lastLogin > new Date('2024-01-01')
    )
    .map((user) => user.email.toLowerCase())
    .filter((email) => email.endsWith('.com'))
    .collect();

  console.log('Active .com users:', activeUserEmails);
}

エラーハンドリングとキャンセレーション

本格的なアプリケーションでは、エラーハンドリングとキャンセレーション機能が重要です。

typescript// エラーハンドリング付きのAsyncIterator
class RobustAsyncIterator<T> implements AsyncIterator<T> {
  private abortController = new AbortController();
  private currentIndex = 0;

  constructor(
    private dataSource: AsyncIterable<T>,
    private errorHandler?: (error: Error, item?: T) => void
  ) {}

  async next(): Promise<IteratorResult<T>> {
    try {
      // キャンセレーション確認
      if (this.abortController.signal.aborted) {
        return { done: true, value: undefined };
      }

      const iterator =
        this.dataSource[Symbol.asyncIterator]();
      const result = await iterator.next();

      return result;
    } catch (error) {
      const err = error as Error;

      if (this.errorHandler) {
        this.errorHandler(err);
      }

      // エラー後も処理を継続するかどうか
      throw err;
    }
  }

  // キャンセレーション用メソッド
  cancel(): void {
    this.abortController.abort();
  }

  // リソースクリーンアップ
  async return(): Promise<IteratorResult<T>> {
    this.cancel();
    return { done: true, value: undefined };
  }
}

タイムアウト機能付きの実装も追加しましょう。

typescript// タイムアウト付きAsyncIterator
async function* withTimeout<T>(
  source: AsyncIterable<T>,
  timeoutMs: number
): AsyncGenerator<T, void, unknown> {
  for await (const item of source) {
    const timeoutPromise = new Promise<never>(
      (_, reject) => {
        setTimeout(
          () => reject(new Error('Processing timeout')),
          timeoutMs
        );
      }
    );

    const valuePromise = Promise.resolve(item);

    try {
      yield await Promise.race([
        valuePromise,
        timeoutPromise,
      ]);
    } catch (error) {
      console.error('Timeout error:', error);
      break;
    }
  }
}

実際の API データ処理シナリオ

実用的な API データ取得と処理のシナリオを実装してみましょう。

mermaidflowchart LR
  A[API Request] --> B[Pagination]
  B --> C[Data Chunk]
  C --> D[Validation]
  D --> E[Transformation]
  E --> F[Output]

  D -->|Invalid| G[Error Handling]
  G --> H[Retry Logic]
  H --> B

API からのページネーションデータ取得を実装します。

typescript// APIレスポンス型定義
interface ApiResponse<T> {
  data: T[];
  pagination: {
    page: number;
    limit: number;
    total: number;
    hasNext: boolean;
  };
}

interface ApiUser {
  id: number;
  username: string;
  email: string;
  created_at: string;
}

// ページネーション対応のAsyncGenerator
async function* fetchAllUsers(
  baseUrl: string,
  limit: number = 100
): AsyncGenerator<ApiUser, void, unknown> {
  let page = 1;
  let hasNext = true;

  while (hasNext) {
    try {
      const response = await fetch(
        `${baseUrl}/users?page=${page}&limit=${limit}`
      );

      if (!response.ok) {
        throw new Error(
          `API Error: ${response.status} ${response.statusText}`
        );
      }

      const apiResponse: ApiResponse<ApiUser> =
        await response.json();

      // 各ユーザーを個別にyield
      for (const user of apiResponse.data) {
        yield user;
      }

      hasNext = apiResponse.pagination.hasNext;
      page++;

      // API負荷軽減のための待機時間
      if (hasNext) {
        await new Promise((resolve) =>
          setTimeout(resolve, 100)
        );
      }
    } catch (error) {
      console.error(`Failed to fetch page ${page}:`, error);
      throw error;
    }
  }
}

取得したデータの変換と処理を行います。

typescript// ユーザーデータの変換処理
interface ProcessedUser {
  id: number;
  displayName: string;
  email: string;
  accountAge: number;
  isRecentUser: boolean;
}

async function* processApiUsers(
  apiUsers: AsyncIterable<ApiUser>
): AsyncGenerator<ProcessedUser, void, unknown> {
  const now = new Date();

  for await (const apiUser of apiUsers) {
    try {
      const createdAt = new Date(apiUser.created_at);
      const accountAge = Math.floor(
        (now.getTime() - createdAt.getTime()) /
          (1000 * 60 * 60 * 24)
      );

      yield {
        id: apiUser.id,
        displayName: `@${apiUser.username}`,
        email: apiUser.email,
        accountAge,
        isRecentUser: accountAge < 30,
      };
    } catch (error) {
      console.warn(
        `Failed to process user ${apiUser.id}:`,
        error
      );
      // 個別エラーは処理をスキップして継続
    }
  }
}

最終的な統合処理の実装です。

typescript// 統合処理の実行
async function analyzeUserData(apiBaseUrl: string) {
  const stats = {
    totalUsers: 0,
    recentUsers: 0,
    averageAccountAge: 0,
    topDomains: new Map<string, number>(),
  };

  const allUsers = fetchAllUsers(apiBaseUrl);
  const processedUsers = processApiUsers(allUsers);

  let totalAge = 0;

  for await (const user of processedUsers) {
    stats.totalUsers++;
    totalAge += user.accountAge;

    if (user.isRecentUser) {
      stats.recentUsers++;
    }

    // ドメイン統計
    const domain = user.email.split('@')[1];
    stats.topDomains.set(
      domain,
      (stats.topDomains.get(domain) || 0) + 1
    );

    // 進捗表示
    if (stats.totalUsers % 100 === 0) {
      console.log(`Processed ${stats.totalUsers} users...`);
    }
  }

  stats.averageAccountAge = totalAge / stats.totalUsers;

  // 結果の表示
  console.log('=== ユーザー分析結果 ===');
  console.log(`総ユーザー数: ${stats.totalUsers}`);
  console.log(`新規ユーザー数: ${stats.recentUsers}`);
  console.log(
    `平均アカウント年数: ${stats.averageAccountAge.toFixed(
      1
    )}日`
  );

  // トップ5ドメインを表示
  const sortedDomains = Array.from(
    stats.topDomains.entries()
  )
    .sort(([, a], [, b]) => b - a)
    .slice(0, 5);

  console.log('トップ5ドメイン:');
  sortedDomains.forEach(([domain, count]) => {
    console.log(`  ${domain}: ${count}人`);
  });
}

使用例とエラーハンドリングの実装です。

typescript// 実行例
async function main() {
  try {
    await analyzeUserData('https://api.example.com');
  } catch (error) {
    console.error('Analysis failed:', error);
    process.exit(1);
  }
}

main();

これらの実装により、大量の API データを効率的に処理しながら、型安全性とエラーハンドリングを両立させることができます。

まとめ

TypeScript と AsyncIterator を組み合わせた非同期ストリーム処理は、現代の Web アプリケーション開発における重要な技術となっています。本記事で紹介した手法により、以下のメリットを享受できます。

主要な改善点

#改善項目従来方式AsyncIterator 方式
1メモリ効率全データをメモリに保持必要分のみメモリ使用
2応答性UI 凍結の可能性ノンブロッキング処理
3スケーラビリティデータ量に制限大容量データ対応
4型安全性実行時エラーリスクコンパイル時検証
5エラーハンドリング一括失敗段階的リカバリー

実装における重要なポイント

TypeScript 型システムを活用した安全な設計により、開発効率と保守性が大幅に向上します。特に、ジェネリクスを使った再利用可能な実装パターンは、様々なプロジェクトで応用できる強力な手法です。

エラーハンドリングとキャンセレーション機能の実装により、本格的なプロダクションレベルのアプリケーションにも対応可能です。これらの機能は、ユーザー体験の向上とシステムの安定性確保に不可欠となっています。

今後の発展

AsyncIterator を基盤とした非同期ストリーム処理は、リアルタイムデータ処理、マイクロサービス間通信、IoT データ処理など、様々な領域で活用の幅が広がっています。TypeScript の型システムと組み合わせることで、より複雑なデータフローも安全に実装できるでしょう。

皆さんも、ぜひ今回紹介した手法を活用して、効率的で安全な非同期データ処理を実現してください。

関連リンク