T-CREATOR

JavaScript Streams API 活用ガイド:巨大データを分割して途切れず処理する

JavaScript Streams API 活用ガイド:巨大データを分割して途切れず処理する

巨大なデータファイルを処理する際、メモリ不足でブラウザがクラッシュしたり、処理が止まってしまった経験はありませんか?そんな課題を一気に解決する革新的な技術、それが JavaScript Streams API です。

従来のメモリ制約を突破する革新的なデータ処理手法として、Streams API は現代の Web アプリケーション開発において必須の技術となっています。この記事では、基礎概念から実践的な活用方法まで、段階的に学んでいただけるよう丁寧に解説いたします。

背景

巨大データ処理における従来の課題

現代の Web アプリケーションでは、ユーザーが扱うデータ量が飛躍的に増加しています。数十 MB〜数 GB のファイルを処理することも珍しくありません。

従来の JavaScript では、ファイル全体をメモリに読み込んでから処理する方式が一般的でした。この方式では、以下のような深刻な問題が発生していました。

javascript// 従来の問題のあるアプローチ
async function processLargeFile(file) {
  // ファイル全体をメモリに読み込み(危険)
  const content = await file.text();

  // 巨大ファイルの場合、ここでメモリ不足エラーが発生
  const processedData = processData(content);
  return processedData;
}

このコードは小さなファイルでは問題ありませんが、100MB を超えるファイルでは確実にメモリ不足エラーを引き起こします。

問題点従来の処理方式影響
1ファイル全体の一括読み込みメモリ使用量の急激な増加
2同期的な処理待機UI の完全フリーズ
3エラー時の全データ損失処理の完全やり直し

ストリーミング処理の必要性

ストリーミング処理とは、データを小さな塊(チャンク)に分割し、順次処理していく手法です。この方式により、メモリ使用量を一定に保ちながら、どれだけ大きなデータでも処理できるようになります。

ストリーミング処理の概念を図で理解しましょう。

mermaidflowchart LR
  large_data[巨大データ] -->|分割| chunk1[チャンク1]
  large_data -->|分割| chunk2[チャンク2]
  large_data -->|分割| chunk3[チャンク3]
  large_data -->|分割| chunk_n[チャンクN]

  chunk1 -->|処理| process1[処理1]
  chunk2 -->|処理| process2[処理2]
  chunk3 -->|処理| process3[処理3]
  chunk_n -->|処理| process_n[処理N]

  process1 -->|結合| result[最終結果]
  process2 -->|結合| result
  process3 -->|結合| result
  process_n -->|結合| result

図で理解できる要点:

  • データを小さなチャンクに分割することで、メモリ使用量を制御
  • 各チャンクは独立して処理されるため、エラー耐性が向上
  • 並列処理により、全体の処理速度も向上

Streams API の登場背景

Streams API は、HTML5 の仕様として策定され、ブラウザネイティブでストリーミング処理を実現する画期的な技術です。

2015 年頃から仕様の検討が始まり、現在では主要ブラウザで幅広くサポートされています。Node.js でも互換性のある Streams API が提供されており、フロントエンドとバックエンドで統一した開発が可能です。

javascript// Streams APIの基本的な使用例
const stream = new ReadableStream({
  start(controller) {
    // ストリームの初期化処理
  },
  pull(controller) {
    // データの読み込み処理
  },
  cancel() {
    // ストリームのキャンセル処理
  },
});

課題

メモリ不足エラーの実態

実際の開発現場では、メモリ不足エラーが深刻な問題となっています。特に以下のようなケースで頻発しています。

javascript// メモリ不足を引き起こすコード例
async function badFileProcessing(file) {
  try {
    // 500MBのファイルを一度にメモリに読み込もうとする
    const buffer = await file.arrayBuffer();

    // Uncaught RangeError: Invalid array length
    // または OutOfMemoryError が発生
    return new Uint8Array(buffer);
  } catch (error) {
    console.error('メモリ不足エラー:', error.message);
    // Error: Maximum call stack size exceeded
    // Error: out of memory
  }
}

このエラーは以下の条件で発生しやすくなります。

条件発生確率対策の緊急度
ファイルサイズ > 100MB緊急
同時処理ファイル数 > 5 個
低スペック端末での処理緊急

大容量ファイル処理の限界

従来のアプローチでは、ファイルサイズに比例してメモリ使用量が増加するため、処理できるファイルサイズに明確な上限が存在します。

javascript// ファイルサイズによる処理限界の例
function checkFileProcessability(fileSize) {
  const availableMemory =
    performance.memory?.usedJSHeapSize || 0;
  const freeMemory =
    availableMemory - performance.memory?.totalJSHeapSize;

  if (fileSize > freeMemory * 0.8) {
    throw new Error(
      `ファイルサイズが大きすぎます: ${fileSize}bytes`
    );
  }
}

実際のブラウザ環境での処理限界を示します。

ブラウザ推奨最大ファイルサイズ絶対上限
Chrome100MB2GB
Firefox80MB1.5GB
Safari50MB1GB
Edge100MB2GB

ユーザー体験への悪影響

大容量ファイルの処理は、ユーザー体験に深刻な悪影響を与えます。

処理中のユーザー体験の問題を図解します。

mermaidsequenceDiagram
  participant User as ユーザー
  participant App as アプリケーション
  participant Memory as メモリ

  User->>App: 大容量ファイル選択
  App->>Memory: ファイル全体読み込み開始

  Note over App,Memory: 長時間の無応答状態

  Memory-->>App: メモリ不足エラー
  App->>User: エラーメッセージ表示

  Note over User: 作業データの完全損失

図で理解できる要点:

  • ファイル処理中はアプリケーションが完全に無応答になる
  • エラー発生時は、それまでの作業がすべて失われる
  • ユーザーは進捗状況を把握できない
javascript// ユーザー体験を悪化させるコード例
function badUserExperience(file) {
  // 進捗表示なし、キャンセル不可
  document.body.style.cursor = 'wait';

  return file
    .text()
    .then((content) => {
      // 処理完了まで一切のフィードバックなし
      return processContent(content);
    })
    .catch((error) => {
      // エラー時は何も残らない
      alert('処理に失敗しました');
    });
}

解決策

Streams API の基本概念

Streams API は、データを連続的な流れとして扱い、チャンク単位で処理する仕組みです。まず、基本的な概念を理解しましょう。

javascript// Streams APIの基本構造
const readableStream = new ReadableStream({
  // ストリーム開始時の初期化
  start(controller) {
    console.log('ストリーム開始');
  },

  // データの読み込み要求時
  pull(controller) {
    // チャンクデータを生成
    const chunk = generateChunk();
    controller.enqueue(chunk);
  },
});

Streams API では、データの流れを 3 つの主要コンポーネントで制御します。これらの関係性を図で確認しましょう。

mermaidflowchart LR
  source[データソース] --> readable[ReadableStream]
  readable --> transform[TransformStream]
  transform --> writable[WritableStream]
  writable --> destination[出力先]

  subgraph "ストリーム処理"
    readable
    transform
    writable
  end

図で理解できる要点:

  • データソースから読み取り、変換し、出力先に書き込む一連の流れ
  • 各段階でチャンク単位の処理が行われる
  • メモリ使用量は一定に保たれる

ReadableStream、WritableStream、TransformStream の役割

それぞれのストリームには明確な役割分担があります。

ReadableStream(読み取り可能ストリーム)

データソースからチャンク単位でデータを読み取る役割を担います。

javascript// ReadableStreamの実装例
function createFileReadableStream(file) {
  let offset = 0;
  const chunkSize = 64 * 1024; // 64KB ずつ読み取り

  return new ReadableStream({
    async pull(controller) {
      if (offset >= file.size) {
        controller.close();
        return;
      }

      // チャンク単位でファイルを読み取り
      const chunk = file.slice(offset, offset + chunkSize);
      const buffer = await chunk.arrayBuffer();

      controller.enqueue(new Uint8Array(buffer));
      offset += chunkSize;
    },
  });
}

WritableStream(書き込み可能ストリーム)

処理済みのデータを出力先に書き込む役割です。

javascript// WritableStreamの実装例
function createFileWritableStream() {
  const chunks = [];

  return new WritableStream({
    write(chunk) {
      console.log(`チャンク受信: ${chunk.length}bytes`);
      chunks.push(chunk);
    },

    close() {
      console.log('書き込み完了');
      // 全チャンクを結合して最終ファイルを生成
      const blob = new Blob(chunks);
      return blob;
    },
  });
}

TransformStream(変換ストリーム)

データの変換処理を担当します。

javascript// TransformStreamの実装例
function createTextToUpperCaseTransform() {
  return new TransformStream({
    transform(chunk, controller) {
      // テキストデータを大文字に変換
      const text = new TextDecoder().decode(chunk);
      const upperText = text.toUpperCase();
      const upperChunk = new TextEncoder().encode(
        upperText
      );

      controller.enqueue(upperChunk);
    },
  });
}

パイプライン処理による効率化

Streams API の真価は、複数のストリームを連結(パイプライン)して効率的な処理を実現することです。

javascript// パイプライン処理の実装
async function efficientFileProcessing(file) {
  try {
    // 読み取り → 変換 → 書き込みのパイプライン
    await createFileReadableStream(file)
      .pipeThrough(createTextToUpperCaseTransform())
      .pipeTo(createFileWritableStream());

    console.log('処理完了');
  } catch (error) {
    console.error('処理エラー:', error);
  }
}

パイプライン処理の動作フローを図解します。

mermaidflowchart TD
  file[大容量ファイル] --> reader[ReadableStream]
  reader -->|64KB チャンク| transform1[文字変換]
  transform1 --> transform2[データ圧縮]
  transform2 --> writer[WritableStream]
  writer --> output[処理済みファイル]

  subgraph "メモリ使用量"
    memory[64KB × 3 = 192KB]
  end

図で理解できる要点:

  • ファイルサイズに関係なく、メモリ使用量は一定
  • 各段階で並行処理が可能
  • エラー発生時も部分的な復旧が可能

具体例

大容量 CSV ファイルの読み込み処理

実際の業務でよく遭遇する、大容量 CSV ファイルの処理を実装してみましょう。

javascript// CSVファイル読み込み用のReadableStream
function createCSVReadableStream(file) {
  let reader;
  let buffer = '';

  return new ReadableStream({
    async start() {
      reader = file.stream().getReader();
    },

    async pull(controller) {
      try {
        const { done, value } = await reader.read();

        if (done) {
          // 最後のレコードを処理
          if (buffer.trim()) {
            controller.enqueue(buffer.trim());
          }
          controller.close();
          return;
        }

        // テキストデータをバッファに追加
        const text = new TextDecoder().decode(value);
        buffer += text;

        // 改行でレコードを分割
        const records = buffer.split('\n');
        buffer = records.pop(); // 最後の不完全な行は保持

        // 完全なレコードを出力
        for (const record of records) {
          if (record.trim()) {
            controller.enqueue(record.trim());
          }
        }
      } catch (error) {
        controller.error(error);
      }
    },
  });
}

CSV データの変換処理を実装します。

javascript// CSV変換用のTransformStream
function createCSVTransformStream() {
  let isFirstRow = true;
  let headers = [];

  return new TransformStream({
    transform(chunk, controller) {
      try {
        const row = chunk.split(',');

        if (isFirstRow) {
          headers = row;
          isFirstRow = false;
          controller.enqueue({
            type: 'headers',
            data: headers,
          });
          return;
        }

        // オブジェクト形式に変換
        const record = {};
        headers.forEach((header, index) => {
          record[header] = row[index] || '';
        });

        controller.enqueue({
          type: 'record',
          data: record,
        });
      } catch (error) {
        controller.error(
          new Error(`CSV解析エラー: ${error.message}`)
        );
      }
    },
  });
}

データの蓄積と出力を管理します。

javascript// 処理結果の集約用WritableStream
function createCSVWritableStream(onProgress, onComplete) {
  const records = [];
  let recordCount = 0;

  return new WritableStream({
    write(chunk) {
      if (chunk.type === 'record') {
        records.push(chunk.data);
        recordCount++;

        // 進捗レポート(1000件ごと)
        if (recordCount % 1000 === 0) {
          onProgress?.(recordCount);
        }
      }
    },

    close() {
      onComplete?.(records);
      console.log(`CSV処理完了: ${recordCount}件`);
    },

    abort(reason) {
      console.error('CSV処理中断:', reason);
    },
  });
}

全体を統合した使用例です。

javascript// 大容量CSV処理の実行
async function processLargeCSV(file) {
  console.log(
    `CSV処理開始: ${file.name} (${file.size}bytes)`
  );

  try {
    await createCSVReadableStream(file)
      .pipeThrough(createCSVTransformStream())
      .pipeTo(
        createCSVWritableStream(
          (count) => console.log(`処理中: ${count}件`),
          (records) =>
            console.log(`完了: ${records.length}件`)
        )
      );
  } catch (error) {
    console.error('CSV処理エラー:', error);
  }
}

リアルタイムデータ変換の実装

WebSocket などから受信するリアルタイムデータの変換処理を実装します。

javascript// リアルタイムデータ受信用のReadableStream
function createWebSocketReadableStream(websocketUrl) {
  let websocket;

  return new ReadableStream({
    start(controller) {
      websocket = new WebSocket(websocketUrl);

      websocket.onmessage = (event) => {
        try {
          const data = JSON.parse(event.data);
          controller.enqueue(data);
        } catch (error) {
          controller.error(error);
        }
      };

      websocket.onerror = (error) => {
        controller.error(error);
      };

      websocket.onclose = () => {
        controller.close();
      };
    },

    cancel() {
      websocket?.close();
    },
  });
}

受信データの集約とフィルタリング処理を実装します。

javascript// データ集約用のTransformStream
function createDataAggregatorTransform(windowSize = 10) {
  let buffer = [];

  return new TransformStream({
    transform(chunk, controller) {
      buffer.push({
        ...chunk,
        timestamp: Date.now(),
      });

      // ウィンドウサイズに達したら集約データを出力
      if (buffer.length >= windowSize) {
        const aggregated = {
          count: buffer.length,
          average:
            buffer.reduce(
              (sum, item) => sum + (item.value || 0),
              0
            ) / buffer.length,
          min: Math.min(
            ...buffer.map((item) => item.value || 0)
          ),
          max: Math.max(
            ...buffer.map((item) => item.value || 0)
          ),
          timestamp: Date.now(),
        };

        controller.enqueue(aggregated);
        buffer = [];
      }
    },

    flush(controller) {
      // 残りのデータも処理
      if (buffer.length > 0) {
        const aggregated = {
          count: buffer.length,
          average:
            buffer.reduce(
              (sum, item) => sum + (item.value || 0),
              0
            ) / buffer.length,
          timestamp: Date.now(),
        };
        controller.enqueue(aggregated);
      }
    },
  });
}

ファイルアップロード進捗表示の作成

大容量ファイルのアップロード処理で進捗表示を実装します。

javascript// アップロード進捗付きの処理
function createUploadProgressStream(file, uploadUrl) {
  let uploadedBytes = 0;
  const totalBytes = file.size;

  return new ReadableStream({
    async start(controller) {
      const chunkSize = 1024 * 1024; // 1MBずつアップロード
      let offset = 0;

      while (offset < totalBytes) {
        const chunk = file.slice(
          offset,
          offset + chunkSize
        );
        const chunkBuffer = await chunk.arrayBuffer();

        // アップロード実行
        await uploadChunk(chunkBuffer, offset, uploadUrl);

        uploadedBytes += chunkBuffer.byteLength;
        offset += chunkSize;

        // 進捗情報を出力
        const progress = {
          uploadedBytes,
          totalBytes,
          percentage: Math.round(
            (uploadedBytes / totalBytes) * 100
          ),
        };

        controller.enqueue(progress);
      }

      controller.close();
    },
  });
}

チャンクアップロードの実装です。

javascript// 個別チャンクのアップロード処理
async function uploadChunk(chunkBuffer, offset, uploadUrl) {
  const formData = new FormData();
  formData.append('chunk', new Blob([chunkBuffer]));
  formData.append('offset', offset.toString());

  const response = await fetch(uploadUrl, {
    method: 'POST',
    body: formData,
  });

  if (!response.ok) {
    throw new Error(
      `アップロードエラー: ${response.status}`
    );
  }

  return response.json();
}

進捗表示の利用例です。

javascript// ファイルアップロードの実行
async function uploadLargeFile(file, uploadUrl) {
  const progressElement = document.getElementById(
    'upload-progress'
  );

  try {
    const reader = createUploadProgressStream(
      file,
      uploadUrl
    ).getReader();

    while (true) {
      const { done, value } = await reader.read();

      if (done) break;

      // 進捗バーの更新
      progressElement.style.width = `${value.percentage}%`;
      progressElement.textContent = `${value.percentage}% (${value.uploadedBytes}/${value.totalBytes}bytes)`;
    }

    console.log('アップロード完了');
  } catch (error) {
    console.error('アップロードエラー:', error);
  }
}

画像リサイズのストリーミング処理

複数の画像ファイルを効率的にリサイズする処理を実装します。

javascript// 画像リサイズ用のTransformStream
function createImageResizeTransform(
  targetWidth,
  targetHeight
) {
  return new TransformStream({
    async transform(imageFile, controller) {
      try {
        // 画像をCanvasでリサイズ
        const resizedBlob = await resizeImageFile(
          imageFile,
          targetWidth,
          targetHeight
        );

        controller.enqueue({
          originalFile: imageFile,
          resizedBlob: resizedBlob,
          compressionRatio:
            imageFile.size / resizedBlob.size,
        });
      } catch (error) {
        controller.error(
          new Error(`画像リサイズエラー: ${error.message}`)
        );
      }
    },
  });
}

Canvas を使った実際のリサイズ処理です。

javascript// 画像リサイズの実装
async function resizeImageFile(
  file,
  targetWidth,
  targetHeight
) {
  return new Promise((resolve, reject) => {
    const img = new Image();
    const canvas = document.createElement('canvas');
    const ctx = canvas.getContext('2d');

    img.onload = () => {
      // キャンバスサイズの設定
      canvas.width = targetWidth;
      canvas.height = targetHeight;

      // 画像を描画(自動的にリサイズされる)
      ctx.drawImage(img, 0, 0, targetWidth, targetHeight);

      // Blobとして出力
      canvas.toBlob(
        (blob) => {
          if (blob) {
            resolve(blob);
          } else {
            reject(new Error('画像変換に失敗しました'));
          }
        },
        'image/jpeg',
        0.8
      );
    };

    img.onerror = () =>
      reject(new Error('画像読み込みエラー'));
    img.src = URL.createObjectURL(file);
  });
}

複数画像の一括処理例です。

javascript// 複数画像の一括リサイズ処理
async function batchResizeImages(
  imageFiles,
  targetWidth,
  targetHeight
) {
  const results = [];

  // 画像ファイルのストリームを作成
  const imageStream = new ReadableStream({
    start(controller) {
      for (const file of imageFiles) {
        controller.enqueue(file);
      }
      controller.close();
    },
  });

  // 結果収集用のWritableStream
  const resultCollector = new WritableStream({
    write(chunk) {
      results.push(chunk);
      console.log(
        `リサイズ完了: ${chunk.originalFile.name}`
      );
    },

    close() {
      console.log(`全画像処理完了: ${results.length}件`);
    },
  });

  // ストリーミング処理の実行
  try {
    await imageStream
      .pipeThrough(
        createImageResizeTransform(
          targetWidth,
          targetHeight
        )
      )
      .pipeTo(resultCollector);

    return results;
  } catch (error) {
    console.error('画像処理エラー:', error);
    throw error;
  }
}

まとめ

JavaScript Streams API は、従来のメモリ制約を突破する革新的なデータ処理手法です。本記事では、基礎概念から実践的な活用方法まで詳しく解説いたしました。

主要なポイント

項目従来の方式Streams API
メモリ使用量ファイルサイズに比例一定(チャンクサイズ)
処理速度大容量で低下一定の性能
エラー耐性全データ損失部分的復旧可能
ユーザー体験無応答状態リアルタイム進捗

Streams API を活用することで、以下のメリットが得られます。

  • メモリ効率の向上: どれだけ大きなファイルでも一定のメモリ使用量で処理可能
  • レスポンシブな UI: チャンク単位の処理により、UI の応答性を維持
  • エラー処理の改善: 部分的な処理結果の保持と復旧が可能
  • スケーラビリティ: ユーザー数や데이터量の増加に対応

現代の Web アプリケーション開発において、Streams API は必須の技術となっています。ぜひ今回学んだ内容を実際のプロジェクトで活用し、より良いユーザー体験を提供してください。

大容量データの処理に悩んでいる開発者の皆様にとって、この記事が問題解決の一助となれば幸いです。

関連リンク