T-CREATOR

Node.js ストリーム逆引き:`pipeline`/`finished`/`stream/promises` 一枚まとめ

Node.js ストリーム逆引き:`pipeline`/`finished`/`stream/promises` 一枚まとめ

Node.js でのストリーム処理において、従来のイベントベースな書き方から脱却し、よりモダンで安全な方法をご存知でしょうか。

pipeline()finished()、そして stream​/​promises モジュール、これら 3 つの API を適切に使い分けることで、メモリリークやエラーハンドリングの課題を解決できるのです。特に大容量ファイルの処理やリアルタイムデータの変換において、これらの知識は必須といえますね。

この記事では、Node.js ストリーム API の核となる 3 つの機能を徹底的に解説し、実際の開発現場ですぐに活用できる実装パターンをお届けします。

逆引きリファレンス

やりたいことから適切な API を素早く見つけられる逆引き表です。

#やりたいこと推奨 API使用例
1複数のストリームを安全に連結したいpipeline()pipeline(readable, transform, writable, callback)
2ファイルを圧縮しながらコピーしたいpipeline() + zlibpipeline(fs.createReadStream(), zlib.createGzip(), fs.createWriteStream())
3ストリームの完了を確実に検知したいfinished()finished(stream, callback)
4書き込み完了まで待機したいfinished()await finished(writableStream)
5async/await でストリーム処理したいstream​/​promisesawait pipeline(readable, writable)
6複数ファイルを並列処理したいstream​/​promises + Promise.all()Promise.all([pipeline(...), pipeline(...)])
7エラー時の自動クリーンアップが欲しいpipeline()自動的にすべてのストリームを破棄
8TypeScript で型安全にストリーム処理したいstream​/​promisesimport した型付き API を使用
9レガシーコードと統合したいコールバック版 APIpipeline(), finished() のコールバック版
10カスタム変換を組み込みたいTransform + pipeline()new Transform() を pipeline に組み込み

背景

Node.js ストリーム処理の世界では、長らくイベントベースな実装が主流でした。しかし、この方法には重大な課題が潜んでいたのです。

従来のストリーム処理では、各ストリームのライフサイクルを手動で管理する必要がありました。読み込み完了の検知、エラーハンドリング、メモリリークの防止など、開発者が細心の注意を払って実装しなければならない部分が多数存在していました。

javascriptconst fs = require('fs');
const zlib = require('zlib');

// 従来の書き方(問題のあるパターン)
const readable = fs.createReadStream('input.txt');
const gzip = zlib.createGzip();
const writable = fs.createWriteStream('output.txt.gz');

readable.pipe(gzip).pipe(writable);

// エラーハンドリングが複雑
readable.on('error', (err) => {
  // ストリームのクリーンアップを手動で行う必要
  gzip.destroy();
  writable.destroy();
  console.error(err);
});

このような実装では、複数のストリームが関わる場合に特に問題が顕著になります。

次の図は、従来のストリーム処理における課題構造を示しています。

mermaidflowchart TD
    input["ファイル読み込み"] --> transform["データ変換"]
    transform --> output["ファイル書き込み"]

    error1["読み込みエラー"] -.-> input
    error2["変換エラー"] -.-> transform
    error3["書き込みエラー"] -.-> output

    input -.-> cleanup1["手動クリーンアップ"]
    transform -.-> cleanup2["手動クリーンアップ"]
    output -.-> cleanup3["手動クリーンアップ"]

    style error1 fill:#ffcccc
    style error2 fill:#ffcccc
    style error3 fill:#ffcccc
    style cleanup1 fill:#ffffcc
    style cleanup2 fill:#ffffcc
    style cleanup3 fill:#ffffcc

図で理解できる要点:

  • 各ストリーム段階でエラー処理が必要
  • 手動でのリソースクリーンアップが複雑
  • エラー発生時の後続処理停止が困難

課題

従来のストリーム処理方式では、以下のような具体的な課題が発生していました。

メモリリークの危険性

ストリームが正常に終了しない場合、ファイルハンドルやメモリが解放されずに残り続ける問題がありました。特に長時間稼働するサーバーアプリケーションでは、これが致命的な障害につながることも少なくありません。

javascript// 問題のあるコード例
function processFile(inputPath, outputPath) {
  const readable = fs.createReadStream(inputPath);
  const writable = fs.createWriteStream(outputPath);

  readable.pipe(writable);

  // エラー時のクリーンアップが不十分
  readable.on('error', console.error);
  writable.on('error', console.error);

  // ストリームが正常終了したかの確認方法が複雑
}

エラーハンドリングの複雑さ

複数のストリームを連結する際、どこでエラーが発生したかを特定し、適切にクリーンアップを行うロジックを書くのは非常に困難でした。

非同期処理の制御困難

ストリーム処理の完了タイミングを正確に把握するには、複数のイベントリスナーを組み合わせる必要があり、コードの可読性と保守性が著しく低下していました。

次の図は、エラー発生時の問題の連鎖を示しています。

mermaidstateDiagram-v2
    [*] --> Reading: ストリーム開始
    Reading --> Transform: データ変換
    Transform --> Writing: ファイル書き込み
    Writing --> [*]: 正常終了

    Reading --> Error1: 読み込みエラー
    Transform --> Error2: 変換エラー
    Writing --> Error3: 書き込みエラー

    Error1 --> Leak1: メモリリーク
    Error2 --> Leak2: ファイルハンドル残存
    Error3 --> Leak3: 不完全ファイル

    Leak1 --> [*]: 手動対応必要
    Leak2 --> [*]: 手動対応必要
    Leak3 --> [*]: 手動対応必要

図で理解できる要点:

  • エラー発生箇所によって対応方法が異なる
  • 各段階でリソースリークの可能性
  • 手動でのエラー回復処理が複雑

解決策

これらの課題を解決するために、Node.js では段階的に新しい API が導入されました。pipeline()finished()、そして stream​/​promises モジュールです。

これらの API は、従来の手動管理から自動管理への転換を可能にし、より安全で保守性の高いストリーム処理を実現します。

各 API の役割分担は以下のとおりです:

API主な役割解決する課題
pipeline()ストリーム連結と自動クリーンアップメモリリーク、エラー処理の複雑さ
finished()ストリーム終了検知非同期処理の制御困難
stream​/​promisesPromise ベースの API 提供async/await パターンでの利用

次の図は、モダンなストリーム処理アーキテクチャを示しています。

mermaidflowchart LR
    subgraph modern ["モダンなストリーム処理"]
        pipeline_func["pipeline()"] --> auto_cleanup["自動クリーンアップ"]
        finished_func["finished()"] --> completion["確実な終了検知"]
        promises_mod["stream/promises"] --> async_await["async/await対応"]
    end

    subgraph benefits ["得られる効果"]
        auto_cleanup --> no_leak["メモリリーク防止"]
        completion --> reliable["信頼性向上"]
        async_await --> readable["可読性向上"]
    end

    style pipeline_func fill:#e1f5fe
    style finished_func fill:#e8f5e8
    style promises_mod fill:#fff3e0

図で理解できる要点:

  • 3 つの API が相互補完して問題を解決
  • 各 API が特定の責務を担当
  • 組み合わせることで総合的な改善を実現

pipeline() 完全攻略

pipeline() は Node.js v10.0.0 で導入された、ストリーム連結のための革新的な API です。従来の .pipe() メソッドが抱えていた課題を根本的に解決します。

基本的な使い方

pipeline() の最大の特徴は、ストリームチェーン全体のライフサイクルを自動管理してくれることです。

javascriptconst { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

基本的な使用パターンを見てみましょう。

javascript// ファイル圧縮の例
pipeline(
  fs.createReadStream('input.txt'), // 読み込みストリーム
  zlib.createGzip(), // 変換ストリーム
  fs.createWriteStream('output.txt.gz'), // 書き込みストリーム
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

この実装により、以下の処理が自動的に行われます:

  1. 自動的なエラー伝播: どのストリームでエラーが発生しても、コールバックに通知
  2. 自動クリーンアップ: エラー時に全ストリームの適切な破棄
  3. バックプレッシャー処理: ストリーム間の速度差を自動調整

エラーハンドリング

pipeline() のエラーハンドリングは非常にシンプルです。

javascriptfunction processFileWithGzip(inputPath, outputPath) {
  pipeline(
    fs.createReadStream(inputPath),
    zlib.createGzip(),
    fs.createWriteStream(outputPath),
    (err) => {
      if (err) {
        // エラーコード別の処理
        switch (err.code) {
          case 'ENOENT':
            console.error('File not found:', inputPath);
            break;
          case 'EACCES':
            console.error('Permission denied:', outputPath);
            break;
          default:
            console.error('Unexpected error:', err.message);
        }
      } else {
        console.log('File processed successfully');
      }
    }
  );
}

複数の Transform ストリームを連結する場合も同様にシンプルです。

javascriptconst { Transform } = require('stream');

// カスタム変換ストリーム
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, chunk.toString().toUpperCase());
  },
});

const lineNumberTransform = new Transform({
  transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    const numberedLines = lines.map(
      (line, index) => `${index + 1}: ${line}`
    );
    callback(null, numberedLines.join('\n'));
  },
});

これらを組み合わせた処理パイプラインは以下のようになります。

javascriptpipeline(
  fs.createReadStream('input.txt'),
  upperCaseTransform, // 大文字変換
  lineNumberTransform, // 行番号付与
  fs.createWriteStream('output.txt'),
  (err) => {
    if (err) {
      console.error('Text processing failed:', err);
    } else {
      console.log('Text processing completed');
    }
  }
);

型安全な pipeline の実装

TypeScript を使用する場合、pipeline() の型安全性を確保できます。

typescriptimport { pipeline } from 'stream';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
import { promisify } from 'util';

// pipeline を Promise 化
const pipelineAsync = promisify(pipeline);

型定義を活用した実装例です。

typescriptinterface ProcessingOptions {
  inputPath: string;
  outputPath: string;
  compression?: boolean;
}

async function processFileTypeSafe(
  options: ProcessingOptions
): Promise<void> {
  const {
    inputPath,
    outputPath,
    compression = false,
  } = options;

  const streams = [
    createReadStream(inputPath),
    ...(compression ? [createGzip()] : []),
    createWriteStream(outputPath),
  ];

  try {
    await pipelineAsync(...streams);
    console.log('Processing completed successfully');
  } catch (error) {
    console.error('Processing failed:', error);
    throw error;
  }
}

使用例は以下のとおりです。

typescript// 圧縮なし
await processFileTypeSafe({
  inputPath: 'input.txt',
  outputPath: 'output.txt',
});

// 圧縮あり
await processFileTypeSafe({
  inputPath: 'input.txt',
  outputPath: 'output.txt.gz',
  compression: true,
});

finished() 詳解

finished() は、ストリームが完全に終了したタイミングを正確に検知するための API です。特に書き込みストリームの完了検知において威力を発揮します。

ストリーム終了検知の仕組み

従来の方法では、ストリームの終了検知が複雑でした。

javascript// 従来の問題のある方法
const writable = fs.createWriteStream('output.txt');

writable.write('Hello ');
writable.write('World');
writable.end();

// この時点では書き込みが完了していない可能性
console.log('Writing finished?'); // 不正確

finished() を使用することで、確実な終了検知が可能になります。

javascriptconst { finished } = require('stream');

const writable = fs.createWriteStream('output.txt');

writable.write('Hello ');
writable.write('World');
writable.end();

finished(writable, (err) => {
  if (err) {
    console.error('Stream failed:', err);
  } else {
    console.log('Stream finished successfully');
  }
});

読み込みストリームの場合も同様です。

javascriptconst readable = fs.createReadStream('input.txt');

readable.on('data', (chunk) => {
  console.log('Received chunk:', chunk.length);
});

finished(readable, (err) => {
  if (err) {
    console.error('Reading failed:', err);
  } else {
    console.log('Reading completed');
  }
});

Promise 化によるモダンな書き方

finished() を Promise 化することで、async/await パターンでの使用が可能になります。

javascriptconst { promisify } = require('util');
const finishedAsync = promisify(finished);

Promise 版の使用例です。

javascriptasync function processStreamWithCompletion() {
  const readable = fs.createReadStream('input.txt');
  const writable = fs.createWriteStream('output.txt');

  // データ転送開始
  readable.pipe(writable);

  try {
    // 書き込み完了まで待機
    await finishedAsync(writable);
    console.log('File copy completed');
  } catch (error) {
    console.error('File copy failed:', error);
  }
}

複数ストリームの同期処理も簡潔に書けます。

javascriptasync function processMultipleStreams() {
  const readable1 = fs.createReadStream('input1.txt');
  const readable2 = fs.createReadStream('input2.txt');
  const writable1 = fs.createWriteStream('output1.txt');
  const writable2 = fs.createWriteStream('output2.txt');

  // 並列処理開始
  readable1.pipe(writable1);
  readable2.pipe(writable2);

  try {
    // 両方の完了を待機
    await Promise.all([
      finishedAsync(writable1),
      finishedAsync(writable2),
    ]);
    console.log('All streams completed');
  } catch (error) {
    console.error('At least one stream failed:', error);
  }
}

実際のユースケース

finished() の実用的な活用例をご紹介します。

javascriptclass FileProcessor {
  constructor(options = {}) {
    this.options = options;
    this.finishedAsync = promisify(finished);
  }

  async processFile(inputPath, outputPath, transformFn) {
    const readable = fs.createReadStream(inputPath);
    const writable = fs.createWriteStream(outputPath);

    // カスタム変換処理
    if (transformFn) {
      const transform = new Transform({
        transform(chunk, encoding, callback) {
          const transformed = transformFn(chunk);
          callback(null, transformed);
        },
      });

      readable.pipe(transform).pipe(writable);
    } else {
      readable.pipe(writable);
    }

    // 完了を確実に待機
    await this.finishedAsync(writable);

    return {
      inputPath,
      outputPath,
      timestamp: new Date().toISOString(),
    };
  }
}

使用例は以下のとおりです。

javascriptconst processor = new FileProcessor();

// テキストを大文字に変換
const result = await processor.processFile(
  'input.txt',
  'output.txt',
  (chunk) => chunk.toString().toUpperCase()
);

console.log('Processing result:', result);

stream/promises モジュール

Node.js v15.0.0 で安定化された stream​/​promises モジュールは、ストリーム API の Promise 版を提供します。これにより、モダンな JavaScript の async/await パターンで自然にストリーム処理を記述できるようになりました。

pipeline と finished の Promise 版

stream​/​promises モジュールから直接 Promise 版の API を使用できます。

javascriptconst { pipeline, finished } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');

Promise 版 pipeline() の基本的な使用方法です。

javascriptasync function compressFile(inputPath, outputPath) {
  try {
    await pipeline(
      fs.createReadStream(inputPath),
      zlib.createGzip(),
      fs.createWriteStream(outputPath)
    );
    console.log('Compression completed');
  } catch (error) {
    console.error('Compression failed:', error);
  }
}

Promise 版 finished() の使用例です。

javascriptasync function waitForStreamCompletion(stream) {
  try {
    await finished(stream);
    console.log('Stream completed successfully');
  } catch (error) {
    console.error('Stream failed:', error);
  }
}

async/await との組み合わせ

stream​/​promises を使用することで、非常に読みやすいコードが書けるようになります。

javascriptasync function processMultipleFiles(filePaths) {
  const results = [];

  for (const filePath of filePaths) {
    try {
      const outputPath = `${filePath}.gz`;

      await pipeline(
        fs.createReadStream(filePath),
        zlib.createGzip(),
        fs.createWriteStream(outputPath)
      );

      results.push({
        input: filePath,
        output: outputPath,
        status: 'success',
      });
    } catch (error) {
      results.push({
        input: filePath,
        status: 'failed',
        error: error.message,
      });
    }
  }

  return results;
}

並列処理も簡潔に記述できます。

javascriptasync function processFilesInParallel(filePaths) {
  const promises = filePaths.map(async (filePath) => {
    const outputPath = `${filePath}.processed`;

    await pipeline(
      fs.createReadStream(filePath),
      new Transform({
        transform(chunk, encoding, callback) {
          // 何らかの変換処理
          const processed = chunk.toString().toUpperCase();
          callback(null, processed);
        },
      }),
      fs.createWriteStream(outputPath)
    );

    return { input: filePath, output: outputPath };
  });

  try {
    const results = await Promise.all(promises);
    console.log('All files processed:', results);
    return results;
  } catch (error) {
    console.error('Parallel processing failed:', error);
    throw error;
  }
}

TypeScript での型定義

TypeScript 環境では、より厳密な型安全性を確保できます。

typescriptimport { pipeline, finished } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip, createGunzip } from 'zlib';
import { Transform } from 'stream';

型定義を活用したクラス実装の例です。

typescriptinterface StreamProcessorOptions {
  compression?: 'gzip' | 'none';
  encoding?: BufferEncoding;
  bufferSize?: number;
}

class StreamProcessor {
  private options: Required<StreamProcessorOptions>;

  constructor(options: StreamProcessorOptions = {}) {
    this.options = {
      compression: 'none',
      encoding: 'utf8',
      bufferSize: 64 * 1024,
      ...options,
    };
  }

  async processText(
    inputPath: string,
    outputPath: string,
    transformer: (text: string) => string
  ): Promise<void> {
    const textTransform = new Transform({
      transform(
        chunk: Buffer,
        encoding: BufferEncoding,
        callback
      ) {
        const text = chunk.toString(encoding);
        const transformed = transformer(text);
        callback(null, Buffer.from(transformed, encoding));
      },
    });

    const streams = [
      createReadStream(inputPath, {
        encoding: this.options.encoding,
        highWaterMark: this.options.bufferSize,
      }),
      textTransform,
      ...(this.options.compression === 'gzip'
        ? [createGzip()]
        : []),
      createWriteStream(outputPath),
    ];

    await pipeline(...streams);
  }

  async decompressFile(
    inputPath: string,
    outputPath: string
  ): Promise<void> {
    await pipeline(
      createReadStream(inputPath),
      createGunzip(),
      createWriteStream(outputPath)
    );
  }
}

使用例は以下のとおりです。

typescriptconst processor = new StreamProcessor({
  compression: 'gzip',
  encoding: 'utf8',
});

// テキスト変換と圧縮
await processor.processText(
  'input.txt',
  'output.txt.gz',
  (text) => text.replace(/\s+/g, ' ').trim()
);

// ファイル展開
await processor.decompressFile(
  'compressed.gz',
  'decompressed.txt'
);

実践的な使い分け

3 つの API を適切に使い分けることで、最適なストリーム処理を実現できます。それぞれの特徴と使用場面を理解することが重要です。

どの API をいつ使うべきか

各 API の適切な使用場面をまとめると以下のようになります。

場面推奨 API理由
複数ストリームの連結pipeline()自動クリーンアップとエラー処理
ストリーム完了の検知finished()確実な終了タイミングの把握
async/await での処理stream​/​promisesモダンな非同期パターン
レガシーコードとの統合コールバック版既存コードとの互換性
TypeScript プロジェクトstream​/​promises型安全性の確保

具体的な判断基準を示します。

javascript// ケース1: シンプルなファイルコピー
// → pipeline() が最適
async function copyFile(src, dest) {
  const { pipeline } = require('stream/promises');
  await pipeline(
    fs.createReadStream(src),
    fs.createWriteStream(dest)
  );
}

// ケース2: ストリーム作成後の完了待機
// → finished() が最適
async function waitForWrite(data, outputPath) {
  const { finished } = require('stream/promises');
  const writable = fs.createWriteStream(outputPath);

  writable.write(data);
  writable.end();

  await finished(writable);
}

// ケース3: 複雑な非同期フロー
// → stream/promises の組み合わせが最適
async function complexProcessing(inputPaths, outputDir) {
  const { pipeline } = require('stream/promises');

  const results = await Promise.allSettled(
    inputPaths.map(async (inputPath) => {
      const outputPath = path.join(
        outputDir,
        path.basename(inputPath)
      );
      await pipeline(
        fs.createReadStream(inputPath),
        new SomeTransform(),
        fs.createWriteStream(outputPath)
      );
      return { inputPath, outputPath };
    })
  );

  return results;
}

パフォーマンスの比較

各 API のパフォーマンス特性を理解することで、適切な選択ができます。

javascript// パフォーマンステスト用のヘルパー関数
async function measurePerformance(name, fn) {
  const start = process.hrtime.bigint();
  await fn();
  const end = process.hrtime.bigint();
  const duration = Number(end - start) / 1000000; // ナノ秒をミリ秒に変換
  console.log(`${name}: ${duration.toFixed(2)}ms`);
}

実際の比較テストです。

javascriptasync function performanceComparison() {
  const testFile = 'large-test-file.txt'; // 大容量テストファイル

  // pipeline() (stream/promises)
  await measurePerformance(
    'pipeline (promises)',
    async () => {
      const { pipeline } = require('stream/promises');
      await pipeline(
        fs.createReadStream(testFile),
        fs.createWriteStream('output1.txt')
      );
    }
  );

  // pipeline() (コールバック版)
  await measurePerformance(
    'pipeline (callback)',
    async () => {
      const { pipeline } = require('stream');
      const { promisify } = require('util');
      const pipelineAsync = promisify(pipeline);

      await pipelineAsync(
        fs.createReadStream(testFile),
        fs.createWriteStream('output2.txt')
      );
    }
  );

  // 従来の pipe() メソッド
  await measurePerformance('traditional pipe', async () => {
    const { finished } = require('stream/promises');
    const readable = fs.createReadStream(testFile);
    const writable = fs.createWriteStream('output3.txt');

    readable.pipe(writable);
    await finished(writable);
  });
}

一般的な傾向として以下が観察されます:

  • pipeline(): 最も安全で、パフォーマンスも良好
  • stream​/​promises: 若干のオーバーヘッドがあるが、開発効率が向上
  • 従来の .pipe(): 最速だが、エラー処理とクリーンアップが不完全

エラー処理の違い

各 API におけるエラー処理の特徴を比較します。

javascript// pipeline() のエラー処理
async function pipelineErrorHandling() {
  try {
    await pipeline(
      fs.createReadStream('nonexistent.txt'), // ファイルが存在しない
      fs.createWriteStream('output.txt')
    );
  } catch (error) {
    // 全てのストリームが自動的にクリーンアップされる
    console.error('Pipeline error:', error.code); // ENOENT
  }
}

// finished() のエラー処理
async function finishedErrorHandling() {
  const readable = fs.createReadStream('nonexistent.txt');

  try {
    await finished(readable);
  } catch (error) {
    // ストリーム自体のエラーを検知
    console.error('Stream error:', error.code); // ENOENT
  }
}

// 従来の方法のエラー処理
function traditionalErrorHandling() {
  const readable = fs.createReadStream('nonexistent.txt');
  const writable = fs.createWriteStream('output.txt');

  readable.on('error', (err) => {
    // 手動でクリーンアップが必要
    writable.destroy();
    console.error('Read error:', err);
  });

  writable.on('error', (err) => {
    // 手動でクリーンアップが必要
    readable.destroy();
    console.error('Write error:', err);
  });

  readable.pipe(writable);
}

エラー処理の比較表は以下のとおりです。

API自動クリーンアップエラー検知範囲実装の複雑さ
pipeline()✅ 完全全ストリーム🔴 簡単
finished()❌ 手動単一ストリーム🟡 中程度
従来の方法❌ 手動ストリーム毎🔴 複雑

次の図は、各 API のエラー処理フローを示しています。

mermaidflowchart TD
    subgraph pipeline_flow ["pipeline() エラー処理"]
        p_error["エラー発生"] --> p_auto["自動クリーンアップ"]
        p_auto --> p_callback["コールバック通知"]
    end

    subgraph finished_flow ["finished() エラー処理"]
        f_error["エラー発生"] --> f_detect["エラー検知"]
        f_detect --> f_manual["手動クリーンアップ"]
    end

    subgraph traditional_flow ["従来方法 エラー処理"]
        t_error["エラー発生"] --> t_listen["イベントリスナー"]
        t_listen --> t_manual["手動クリーンアップ"]
        t_manual --> t_complex["複雑な状態管理"]
    end

    style p_auto fill:#e8f5e8
    style f_manual fill:#fff3e0
    style t_complex fill:#ffebee

図で理解できる要点:

  • pipeline() は最も自動化されたエラー処理
  • finished() は検知は確実だが手動対応が必要
  • 従来方法は全て手動で複雑な実装が必要

まとめ

Node.js ストリーム API の pipeline()finished()stream​/​promises について詳しく解説してきました。

これらの API を適切に使い分けることで、従来のストリーム処理で発生していたメモリリーク、エラーハンドリングの複雑さ、非同期処理の制御困難といった課題を根本的に解決できるのです。

重要なポイントをまとめると以下のようになります:

  • pipeline() は複数ストリームの連結において最も安全で効率的な選択肢
  • finished() はストリーム完了の確実な検知に特化した API
  • stream​/​promises は async/await パターンでモダンな非同期処理を実現

特に大容量データの処理やリアルタイムストリーミングにおいて、これらの API の恩恵は計り知れません。エラー時の自動クリーンアップ機能により、長時間稼働するアプリケーションでも安心してストリーム処理を行えるでしょう。

今後 Node.js でストリーム処理を実装する際は、ぜひこれらのモダンな API を積極的に活用してください。コードの安全性、保守性、そして開発効率が大幅に向上することをお約束します。

関連リンク