T-CREATOR

Node.js のストリーム入門:大量データ処理の基礎

Node.js のストリーム入門:大量データ処理の基礎

Node.js でアプリケーションを開発していると、大量のデータを効率的に処理する必要に迫られることがあります。従来のメモリベースの処理では、巨大なファイルやリアルタイムデータを扱う際にメモリ不足やパフォーマンスの問題が発生しがちです。

そんな課題を解決する強力な仕組みが「ストリーム」です。ストリームを理解することで、メモリ効率的で高速なデータ処理が可能になり、Node.js アプリケーションの品質が格段に向上します。

本記事では、Node.js ストリームの基本概念から実装方法まで、初心者の方にもわかりやすく解説していきます。

Node.js ストリームとは何か

Node.js におけるストリームとは、データを小さなチャンク(塊)に分割して、連続的に処理する仕組みのことです。水道管を流れる水のように、データが途切れることなく流れ続けるイメージで理解していただけるでしょう。

ストリームの基本的な概念

ストリームは、以下の特徴を持つデータ処理のパラダイムです。

javascript// 従来の方法(非効率)
const fs = require('fs');

// ファイル全体をメモリに読み込む
const data = fs.readFileSync('large-file.txt');
console.log(data.toString());
// 大きなファイルの場合、メモリ不足が発生する可能性

上記のコードは、ファイル全体をメモリに読み込んでから処理を行います。これに対して、ストリームを使用した場合は以下のようになります。

javascript// ストリームを使用した方法(効率的)
const fs = require('fs');

// ストリームでファイルを読み込む
const readStream = fs.createReadStream('large-file.txt');

readStream.on('data', (chunk) => {
  // データのチャンクごとに処理
  console.log(
    `受信したチャンクサイズ: ${chunk.length} bytes`
  );
  process.stdout.write(chunk);
});

readStream.on('end', () => {
  console.log('\nファイル読み込み完了');
});

ストリームの利点

利点説明具体例
メモリ効率データを小さなチャンクで処理するため、大量データでもメモリ使用量が一定10GB のログファイルを数 MB のメモリで処理
リアルタイム処理データが到着次第、即座に処理を開始チャットアプリケーションのメッセージ配信
パイプライン処理複数の処理を連鎖させて効率的なデータフロー構築読み込み → 変換 → 圧縮 → 書き込み
バックプレッシャー処理速度の違いを自動調整高速データ生成と低速データ処理の調和

ストリームが解決する問題

Node.js でデータ処理を行う際に直面する主要な問題と、ストリームによる解決策を見ていきましょう。

メモリ制限の問題

Node.js のデフォルトのメモリ制限は約 1.4GB です。大きなファイルを一度にメモリに読み込もうとすると、この制限に引っかかります。

javascript// 問題のあるコード例
const fs = require('fs');

try {
  // 2GB のファイルを読み込もうとする
  const largeData = fs.readFileSync('huge-dataset.json');
  const parsed = JSON.parse(largeData.toString());
  console.log(parsed);
} catch (error) {
  console.error('メモリ不足エラー:', error.message);
  // "JavaScript heap out of memory" エラーが発生
}

ストリームを使用することで、この問題を解決できます。

javascript// ストリームによる解決例
const fs = require('fs');
const { Transform } = require('stream');

// JSON Lines 形式のファイルを行ごとに処理
const lineProcessor = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');

    lines.forEach((line) => {
      if (line.trim()) {
        try {
          const jsonData = JSON.parse(line);
          // 各行を個別に処理
          this.push(jsonData);
        } catch (error) {
          console.error(
            'JSON パースエラー:',
            error.message
          );
        }
      }
    });

    callback();
  },
});

fs.createReadStream('huge-dataset.jsonl')
  .pipe(lineProcessor)
  .on('data', (data) => {
    // 各 JSON オブジェクトを処理
    console.log('処理中のデータ:', data.id);
  })
  .on('end', () => {
    console.log('全データの処理が完了しました');
  });

レスポンス時間の問題

従来の方法では、すべてのデータが準備できるまでユーザーは待機する必要があります。ストリームを使用することで、部分的なデータでも即座にレスポンスを開始できます。

javascript// Express.js でのストリーミングレスポンス例
const express = require('express');
const fs = require('fs');
const app = express();

app.get('/download/:filename', (req, res) => {
  const filename = req.params.filename;
  const filePath = `./files/${filename}`;

  // ファイルの存在確認
  if (!fs.existsSync(filePath)) {
    return res.status(404).send('ファイルが見つかりません');
  }

  // ストリーミングダウンロード
  const readStream = fs.createReadStream(filePath);

  // 適切なヘッダーを設定
  res.setHeader('Content-Type', 'application/octet-stream');
  res.setHeader(
    'Content-Disposition',
    `attachment; filename="${filename}"`
  );

  // ストリームをレスポンスにパイプ
  readStream.pipe(res);

  // エラーハンドリング
  readStream.on('error', (error) => {
    console.error('ファイル読み込みエラー:', error);
    res.status(500).send('ファイル読み込みに失敗しました');
  });
});

app.listen(3000, () => {
  console.log('サーバーがポート 3000 で起動しました');
});

リソース使用効率の問題

複数の処理を同時に実行する際、ストリームを使用することでシステムリソースを効率的に活用できます。

javascript// 複数ファイルの並列処理例
const fs = require('fs');
const path = require('path');
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');

class FileProcessor {
  static async processMultipleFiles(inputDir, outputDir) {
    const files = await fs.promises.readdir(inputDir);
    const txtFiles = files.filter(
      (file) => path.extname(file) === '.txt'
    );

    // 各ファイルを並列で処理
    const promises = txtFiles.map(async (filename) => {
      const inputPath = path.join(inputDir, filename);
      const outputPath = path.join(
        outputDir,
        `processed-${filename}`
      );

      // テキスト変換ストリーム
      const textTransformer = new Transform({
        transform(chunk, encoding, callback) {
          // 文字列を大文字に変換
          const upperCaseText = chunk
            .toString()
            .toUpperCase();
          callback(null, upperCaseText);
        },
      });

      try {
        await pipeline(
          fs.createReadStream(inputPath),
          textTransformer,
          fs.createWriteStream(outputPath)
        );

        console.log(`処理完了: ${filename}`);
        return { success: true, filename };
      } catch (error) {
        console.error(
          `処理エラー ${filename}:`,
          error.message
        );
        return {
          success: false,
          filename,
          error: error.message,
        };
      }
    });

    const results = await Promise.all(promises);
    return results;
  }
}

// 使用例
async function main() {
  try {
    const results =
      await FileProcessor.processMultipleFiles(
        './input',
        './output'
      );

    const successful = results.filter((r) => r.success);
    const failed = results.filter((r) => !r.success);

    console.log(
      `成功: ${successful.length}件, 失敗: ${failed.length}件`
    );
  } catch (error) {
    console.error('処理中にエラーが発生:', error);
  }
}

main();

4 つのストリームタイプの詳細

Node.js では、用途に応じて 4 つの異なるタイプのストリームが提供されています。それぞれの特徴と使用場面を詳しく見ていきましょう。

Readable ストリーム

Readable ストリームは、データソースからデータを読み取るためのストリームです。ファイル読み込み、HTTP リクエストの受信、データベースからのデータ取得などに使用されます。

基本的な使用方法
javascriptconst fs = require('fs');
const { Readable } = require('stream');

// ファイルから読み取るReadableストリーム
const fileStream = fs.createReadStream('data.txt', {
  encoding: 'utf8', // 文字エンコーディング
  highWaterMark: 1024, // バッファサイズ(1KB)
  start: 0, // 読み取り開始位置
  end: 100, // 読み取り終了位置
});

// データイベントの監視
fileStream.on('data', (chunk) => {
  console.log(`受信データ: ${chunk.length} bytes`);
  console.log('内容:', chunk);
});

// 読み取り完了イベント
fileStream.on('end', () => {
  console.log('ファイル読み取り完了');
});

// エラーイベント
fileStream.on('error', (error) => {
  console.error('読み取りエラー:', error.message);
});
カスタム Readable ストリームの作成

独自のデータソースから読み取るストリームを作成することができます。

javascriptconst { Readable } = require('stream');

class NumberGenerator extends Readable {
  constructor(options = {}) {
    super(options);
    this.currentNumber = 1;
    this.maxNumber = options.max || 10;
  }

  _read() {
    if (this.currentNumber <= this.maxNumber) {
      // データをストリームにプッシュ
      const data = `数値: ${this.currentNumber}\n`;
      this.push(data);
      this.currentNumber++;
    } else {
      // データの終了を示す
      this.push(null);
    }
  }
}

// 使用例
const numberStream = new NumberGenerator({ max: 5 });

numberStream.on('data', (chunk) => {
  process.stdout.write(chunk);
});

numberStream.on('end', () => {
  console.log('数値生成完了');
});
実用的な Readable ストリーム例

API からのデータ取得を例に、実用的な使用方法を見てみましょう。

javascriptconst https = require('https');
const { Readable } = require('stream');

class ApiDataStream extends Readable {
  constructor(apiUrl, options = {}) {
    super(options);
    this.apiUrl = apiUrl;
    this.pageNumber = 1;
    this.maxPages = options.maxPages || 5;
    this.fetching = false;
  }

  _read() {
    if (this.fetching || this.pageNumber > this.maxPages) {
      return;
    }

    this.fetching = true;
    const url = `${this.apiUrl}?page=${this.pageNumber}`;

    https
      .get(url, (res) => {
        let data = '';

        res.on('data', (chunk) => {
          data += chunk;
        });

        res.on('end', () => {
          try {
            const jsonData = JSON.parse(data);

            // データをストリームにプッシュ
            this.push(JSON.stringify(jsonData) + '\n');

            this.pageNumber++;
            this.fetching = false;

            // 最後のページの場合は終了
            if (this.pageNumber > this.maxPages) {
              this.push(null);
            }
          } catch (error) {
            this.emit('error', error);
          }
        });
      })
      .on('error', (error) => {
        this.emit('error', error);
      });
  }
}

// 使用例
const apiStream = new ApiDataStream(
  'https://api.example.com/data',
  { maxPages: 3 }
);

apiStream.on('data', (chunk) => {
  console.log('取得したデータ:', chunk.toString());
});

apiStream.on('end', () => {
  console.log('全ページの取得完了');
});

apiStream.on('error', (error) => {
  console.error('API取得エラー:', error.message);
});

Writable ストリーム

Writable ストリームは、データの書き込み先となるストリームです。ファイルへの書き込み、HTTP レスポンスの送信、データベースへのデータ保存などに使用されます。

基本的な使用方法
javascriptconst fs = require('fs');
const { Writable } = require('stream');

// ファイルへ書き込むWritableストリーム
const fileWriteStream = fs.createWriteStream('output.txt', {
  encoding: 'utf8',
  flags: 'w', // 書き込みモード('a'は追記モード)
  highWaterMark: 1024, // バッファサイズ
});

// データの書き込み
fileWriteStream.write('最初の行\n');
fileWriteStream.write('2番目の行\n');
fileWriteStream.write('3番目の行\n');

// 書き込み完了
fileWriteStream.end('最後の行\n');

// イベント監視
fileWriteStream.on('finish', () => {
  console.log('ファイル書き込み完了');
});

fileWriteStream.on('error', (error) => {
  console.error('書き込みエラー:', error.message);
});
カスタム Writable ストリームの作成

データを特定の形式で処理・保存するストリームを作成できます。

javascriptconst { Writable } = require('stream');

class DataLogger extends Writable {
  constructor(options = {}) {
    super(options);
    this.logCount = 0;
    this.logFile = options.logFile || './application.log';
    this.fs = require('fs');

    // ログファイルの初期化
    if (options.clearLog) {
      this.fs.writeFileSync(this.logFile, '');
    }
  }

  _write(chunk, encoding, callback) {
    this.logCount++;

    // タイムスタンプ付きでログを記録
    const timestamp = new Date().toISOString();
    const logEntry = `[${timestamp}] #${
      this.logCount
    }: ${chunk.toString()}\n`;

    this.fs.appendFile(this.logFile, logEntry, (error) => {
      if (error) {
        callback(error);
      } else {
        console.log(
          `ログ記録完了: エントリ #${this.logCount}`
        );
        callback();
      }
    });
  }

  _final(callback) {
    // ストリーム終了時の処理
    const summary = `\n=== ログセッション終了 ===\n総エントリ数: ${
      this.logCount
    }\n終了時刻: ${new Date().toISOString()}\n\n`;

    this.fs.appendFile(this.logFile, summary, callback);
  }
}

// 使用例
const logger = new DataLogger({
  logFile: './app.log',
  clearLog: true,
});

logger.write('アプリケーション開始');
logger.write('ユーザーログイン: user123');
logger.write('データ処理開始');
logger.end('データ処理完了');

logger.on('finish', () => {
  console.log('ログセッション完了');
});
実用的な Writable ストリーム例

CSV ファイルを生成するストリームの例を見てみましょう。

javascriptconst { Writable } = require('stream');
const fs = require('fs');

class CsvWriter extends Writable {
  constructor(filename, headers, options = {}) {
    super({ objectMode: true, ...options });
    this.filename = filename;
    this.headers = headers;
    this.isFirstRow = true;
    this.writeStream = fs.createWriteStream(filename);

    // ヘッダー行を書き込み
    this.writeStream.write(headers.join(',') + '\n');
  }

  _write(data, encoding, callback) {
    try {
      // オブジェクトから CSV 行を生成
      const row = this.headers.map((header) => {
        const value = data[header] || '';
        // カンマやダブルクォートを含む値をエスケープ
        return this.escapeCsvValue(value.toString());
      });

      this.writeStream.write(row.join(',') + '\n');
      callback();
    } catch (error) {
      callback(error);
    }
  }

  _final(callback) {
    this.writeStream.end(callback);
  }

  escapeCsvValue(value) {
    if (
      value.includes(',') ||
      value.includes('"') ||
      value.includes('\n')
    ) {
      return `"${value.replace(/"/g, '""')}"`;
    }
    return value;
  }
}

// 使用例
const csvWriter = new CsvWriter('users.csv', [
  'id',
  'name',
  'email',
  'age',
]);

// ユーザーデータを書き込み
const users = [
  {
    id: 1,
    name: '田中太郎',
    email: 'tanaka@example.com',
    age: 30,
  },
  {
    id: 2,
    name: '佐藤花子',
    email: 'sato@example.com',
    age: 25,
  },
  {
    id: 3,
    name: '鈴木次郎',
    email: 'suzuki@example.com',
    age: 35,
  },
];

users.forEach((user) => {
  csvWriter.write(user);
});

csvWriter.end();

csvWriter.on('finish', () => {
  console.log('CSV ファイルの作成が完了しました');
});

Transform ストリーム

Transform ストリームは、データを読み取り、変換処理を行い、変換されたデータを出力するストリームです。Readable と Writable の機能を組み合わせた双方向ストリームです。

基本的な使用方法
javascriptconst { Transform } = require('stream');
const fs = require('fs');

// テキストを大文字に変換するTransformストリーム
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    // 入力データを大文字に変換
    const upperCaseData = chunk.toString().toUpperCase();
    // 変換されたデータを出力
    callback(null, upperCaseData);
  },
});

// ストリームのパイプライン
fs.createReadStream('input.txt')
  .pipe(upperCaseTransform) // データを変換
  .pipe(fs.createWriteStream('output.txt'))
  .on('finish', () => {
    console.log('変換処理が完了しました');
  });
カスタム Transform ストリームの作成

JSON データを CSV 形式に変換するストリームを作成してみましょう。

javascriptconst { Transform } = require('stream');

class JsonToCsvTransform extends Transform {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.headers = options.headers || [];
    this.isFirstRow = true;
  }

  _transform(chunk, encoding, callback) {
    try {
      const data = JSON.parse(chunk.toString());

      // 最初の行の場合、ヘッダーを生成
      if (this.isFirstRow) {
        if (this.headers.length === 0) {
          this.headers = Object.keys(data);
        }

        // ヘッダー行を出力
        this.push(this.headers.join(',') + '\n');
        this.isFirstRow = false;
      }

      // データ行を生成
      const row = this.headers.map((header) => {
        const value = data[header] || '';
        return this.escapeCsvValue(value.toString());
      });

      this.push(row.join(',') + '\n');
      callback();
    } catch (error) {
      callback(error);
    }
  }

  escapeCsvValue(value) {
    if (
      value.includes(',') ||
      value.includes('"') ||
      value.includes('\n')
    ) {
      return `"${value.replace(/"/g, '""')}"`;
    }
    return value;
  }
}

// 使用例
const fs = require('fs');
const jsonToCsv = new JsonToCsvTransform({
  headers: ['id', 'name', 'email', 'department'],
});

fs.createReadStream('employees.jsonl') // JSON Lines形式のファイル
  .pipe(jsonToCsv)
  .pipe(fs.createWriteStream('employees.csv'))
  .on('finish', () => {
    console.log('JSON から CSV への変換が完了しました');
  });
高度な Transform ストリーム例

ログファイルの解析と集計を行うストリームの例です。

javascriptconst { Transform } = require('stream');

class LogAnalyzer extends Transform {
  constructor(options = {}) {
    super(options);
    this.stats = {
      totalLines: 0,
      errorCount: 0,
      warningCount: 0,
      infoCount: 0,
      ipAddresses: new Map(),
      statusCodes: new Map(),
    };
    this.logPattern =
      /^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z) \[(\w+)\] (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (\d{3}) - (.+)$/;
  }

  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');

    lines.forEach((line) => {
      if (line.trim()) {
        this.analyzeLine(line);
      }
    });

    // 解析結果を出力
    this.push(
      JSON.stringify(this.getAnalysisResult()) + '\n'
    );
    callback();
  }

  analyzeLine(line) {
    this.stats.totalLines++;

    const match = line.match(this.logPattern);
    if (match) {
      const [, timestamp, level, ip, statusCode, message] =
        match;

      // ログレベル集計
      switch (level.toLowerCase()) {
        case 'error':
          this.stats.errorCount++;
          break;
        case 'warning':
        case 'warn':
          this.stats.warningCount++;
          break;
        case 'info':
          this.stats.infoCount++;
          break;
      }

      // IP アドレス集計
      const currentIpCount =
        this.stats.ipAddresses.get(ip) || 0;
      this.stats.ipAddresses.set(ip, currentIpCount + 1);

      // ステータスコード集計
      const currentStatusCount =
        this.stats.statusCodes.get(statusCode) || 0;
      this.stats.statusCodes.set(
        statusCode,
        currentStatusCount + 1
      );
    }
  }

  getAnalysisResult() {
    return {
      timestamp: new Date().toISOString(),
      totalLines: this.stats.totalLines,
      logLevels: {
        error: this.stats.errorCount,
        warning: this.stats.warningCount,
        info: this.stats.infoCount,
      },
      topIpAddresses: Array.from(
        this.stats.ipAddresses.entries()
      )
        .sort((a, b) => b[1] - a[1])
        .slice(0, 10),
      statusCodeDistribution: Object.fromEntries(
        this.stats.statusCodes
      ),
    };
  }

  _flush(callback) {
    // ストリーム終了時の最終分析結果
    const finalResult = {
      ...this.getAnalysisResult(),
      analysis: 'complete',
    };

    this.push('\n' + JSON.stringify(finalResult, null, 2));
    callback();
  }
}

// 使用例
const fs = require('fs');
const logAnalyzer = new LogAnalyzer();

fs.createReadStream('access.log')
  .pipe(logAnalyzer)
  .pipe(fs.createWriteStream('log-analysis.json'))
  .on('finish', () => {
    console.log('ログ解析が完了しました');
  });

Duplex ストリーム

Duplex ストリームは、読み取りと書き込みの両方の機能を持つ独立したストリームです。ネットワーク接続(TCP ソケット)やプロセス間通信などで使用されます。

基本的な概念
javascriptconst { Duplex } = require('stream');
const net = require('net');

// TCP サーバーの例
const server = net.createServer((socket) => {
  console.log('クライアントが接続しました');

  // socket は Duplex ストリーム
  socket.on('data', (data) => {
    console.log('受信:', data.toString());

    // エコーバック
    socket.write(`エコー: ${data.toString()}`);
  });

  socket.on('end', () => {
    console.log('クライアントが切断しました');
  });
});

server.listen(8080, () => {
  console.log('サーバーがポート 8080 で起動しました');
});
カスタム Duplex ストリームの作成

チャットルームのような双方向通信を実装してみましょう。

javascriptconst { Duplex } = require('stream');
const EventEmitter = require('events');

class ChatRoom extends EventEmitter {
  constructor() {
    super();
    this.clients = new Map();
    this.messageHistory = [];
  }

  addClient(clientId) {
    const client = new ChatClient(clientId, this);
    this.clients.set(clientId, client);

    // 過去のメッセージを送信
    this.messageHistory.forEach((message) => {
      client.push(JSON.stringify(message) + '\n');
    });

    this.broadcastMessage({
      type: 'system',
      message: `${clientId} がチャットルームに参加しました`,
      timestamp: new Date().toISOString(),
    });

    return client;
  }

  removeClient(clientId) {
    this.clients.delete(clientId);

    this.broadcastMessage({
      type: 'system',
      message: `${clientId} がチャットルームから退出しました`,
      timestamp: new Date().toISOString(),
    });
  }

  broadcastMessage(message) {
    this.messageHistory.push(message);

    // 履歴は最新100件まで保持
    if (this.messageHistory.length > 100) {
      this.messageHistory.shift();
    }

    const messageString = JSON.stringify(message) + '\n';

    this.clients.forEach((client, clientId) => {
      if (message.from !== clientId) {
        client.push(messageString);
      }
    });
  }
}

class ChatClient extends Duplex {
  constructor(clientId, chatRoom) {
    super({ objectMode: false });
    this.clientId = clientId;
    this.chatRoom = chatRoom;
  }

  _read() {
    // 読み取り要求(実際の実装では何もしない)
  }

  _write(chunk, encoding, callback) {
    try {
      const message = JSON.parse(chunk.toString());

      // メッセージにクライアント情報を追加
      const enrichedMessage = {
        ...message,
        from: this.clientId,
        timestamp: new Date().toISOString(),
      };

      // チャットルームにメッセージをブロードキャスト
      this.chatRoom.broadcastMessage(enrichedMessage);

      callback();
    } catch (error) {
      callback(error);
    }
  }

  _final(callback) {
    // クライアント切断時の処理
    this.chatRoom.removeClient(this.clientId);
    callback();
  }
}

// 使用例
const chatRoom = new ChatRoom();

// クライアント1
const client1 = chatRoom.addClient('ユーザー1');
client1.on('data', (data) => {
  console.log('クライアント1が受信:', data.toString());
});

// クライアント2
const client2 = chatRoom.addClient('ユーザー2');
client2.on('data', (data) => {
  console.log('クライアント2が受信:', data.toString());
});

// メッセージ送信テスト
setTimeout(() => {
  client1.write(
    JSON.stringify({
      type: 'message',
      message: 'こんにちは!',
    })
  );
}, 1000);

setTimeout(() => {
  client2.write(
    JSON.stringify({
      type: 'message',
      message: 'こんにちは!元気ですか?',
    })
  );
}, 2000);
実用的な Duplex ストリーム例

WebSocket のような双方向通信を模擬するストリームです。

javascriptconst { Duplex } = require('stream');
const crypto = require('crypto');

class MockWebSocket extends Duplex {
  constructor(options = {}) {
    super(options);
    this.connectionId = crypto.randomUUID();
    this.isConnected = false;
    this.heartbeatInterval = null;
    this.lastPing = null;

    // 接続を開始
    this.connect();
  }

  connect() {
    this.isConnected = true;
    console.log(`WebSocket 接続確立: ${this.connectionId}`);

    // ハートビート開始
    this.startHeartbeat();

    // 接続確立メッセージを送信
    this.push(
      JSON.stringify({
        type: 'connection',
        connectionId: this.connectionId,
        timestamp: new Date().toISOString(),
      }) + '\n'
    );
  }

  _read() {
    // クライアントからの読み取り要求
  }

  _write(chunk, encoding, callback) {
    if (!this.isConnected) {
      return callback(
        new Error('WebSocket が切断されています')
      );
    }

    try {
      const message = JSON.parse(chunk.toString());

      // メッセージタイプに応じた処理
      switch (message.type) {
        case 'ping':
          this.handlePing(message);
          break;
        case 'message':
          this.handleMessage(message);
          break;
        case 'subscribe':
          this.handleSubscribe(message);
          break;
        default:
          console.log(
            '未知のメッセージタイプ:',
            message.type
          );
      }

      callback();
    } catch (error) {
      callback(error);
    }
  }

  handlePing(message) {
    this.lastPing = Date.now();

    // Pong を返送
    this.push(
      JSON.stringify({
        type: 'pong',
        timestamp: new Date().toISOString(),
        latency: 0, // 実際の実装では計算する
      }) + '\n'
    );
  }

  handleMessage(message) {
    // エコーメッセージとして返送
    this.push(
      JSON.stringify({
        type: 'echo',
        originalMessage: message.data,
        timestamp: new Date().toISOString(),
      }) + '\n'
    );
  }

  handleSubscribe(message) {
    console.log(`チャンネル購読: ${message.channel}`);

    // 購読確認を送信
    this.push(
      JSON.stringify({
        type: 'subscribed',
        channel: message.channel,
        timestamp: new Date().toISOString(),
      }) + '\n'
    );

    // 定期的にデータを送信
    this.startDataStream(message.channel);
  }

  startHeartbeat() {
    this.heartbeatInterval = setInterval(() => {
      if (this.isConnected) {
        this.push(
          JSON.stringify({
            type: 'heartbeat',
            timestamp: new Date().toISOString(),
          }) + '\n'
        );
      }
    }, 30000); // 30秒間隔
  }

  startDataStream(channel) {
    // チャンネルに応じたデータストリーム
    const dataInterval = setInterval(() => {
      if (!this.isConnected) {
        clearInterval(dataInterval);
        return;
      }

      this.push(
        JSON.stringify({
          type: 'data',
          channel: channel,
          data: {
            value: Math.random() * 100,
            timestamp: new Date().toISOString(),
          },
        }) + '\n'
      );
    }, 5000); // 5秒間隔
  }

  _final(callback) {
    this.disconnect();
    callback();
  }

  disconnect() {
    this.isConnected = false;

    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval);
    }

    console.log(`WebSocket 切断: ${this.connectionId}`);
  }
}

// 使用例
const mockWs = new MockWebSocket();

mockWs.on('data', (data) => {
  console.log('受信:', data.toString());
});

// メッセージ送信テスト
setTimeout(() => {
  mockWs.write(
    JSON.stringify({
      type: 'subscribe',
      channel: 'sensor-data',
    })
  );
}, 1000);

setTimeout(() => {
  mockWs.write(
    JSON.stringify({
      type: 'message',
      data: 'Hello WebSocket!',
    })
  );
}, 3000);

setTimeout(() => {
  mockWs.write(
    JSON.stringify({
      type: 'ping',
    })
  );
}, 5000);

// 10秒後に切断
setTimeout(() => {
  mockWs.end();
}, 10000);

ストリームのライフサイクルとイベント

Node.js ストリームは、明確なライフサイクルを持ち、各段階で適切なイベントが発行されます。これらのイベントを理解することで、効果的なエラーハンドリングと制御が可能になります。

ストリームの基本イベント

イベント対象ストリーム説明発行タイミング
dataReadable, Transform, Duplexデータチャンクが利用可能になった時データ受信時
endReadable, Transform, Duplexすべてのデータが読み取られた時ストリーム終了時
finishWritable, Transform, Duplexすべてのデータが書き込まれた時書き込み完了時
closeすべてストリームが閉じられた時リソース解放時
errorすべてエラーが発生した時異常時

詳細なライフサイクル管理

javascriptconst fs = require('fs');
const { pipeline } = require('stream');
const { Transform } = require('stream');

class StreamLifecycleMonitor extends Transform {
  constructor(name, options = {}) {
    super(options);
    this.name = name;
    this.startTime = Date.now();
    this.bytesProcessed = 0;
    this.chunksProcessed = 0;

    // ライフサイクルイベントの監視
    this.setupEventMonitoring();
  }

  setupEventMonitoring() {
    // ストリーム開始
    this.on('pipe', (src) => {
      console.log(
        `[${this.name}] パイプ接続: ${src.constructor.name} から受信開始`
      );
    });

    // データ処理
    this.on('data', (chunk) => {
      this.bytesProcessed += chunk.length;
      console.log(
        `[${this.name}] データ出力: ${chunk.length} bytes`
      );
    });

    // 読み取り終了
    this.on('end', () => {
      const duration = Date.now() - this.startTime;
      console.log(
        `[${this.name}] 読み取り終了: ${this.chunksProcessed} chunks, ${this.bytesProcessed} bytes, ${duration}ms`
      );
    });

    // 書き込み終了
    this.on('finish', () => {
      const duration = Date.now() - this.startTime;
      console.log(
        `[${this.name}] 書き込み終了: 処理時間 ${duration}ms`
      );
    });

    // ストリーム閉鎖
    this.on('close', () => {
      console.log(
        `[${this.name}] ストリーム閉鎖: リソース解放完了`
      );
    });

    // エラー処理
    this.on('error', (error) => {
      console.error(
        `[${this.name}] エラー発生:`,
        error.message
      );
    });
  }

  _transform(chunk, encoding, callback) {
    this.chunksProcessed++;

    // 簡単なデータ変換(文字列の場合は行番号を追加)
    if (
      typeof chunk === 'string' ||
      chunk instanceof Buffer
    ) {
      const lines = chunk.toString().split('\n');

      lines.forEach((line) => {
        if (line.trim()) {
          return `${this.chunksProcessed}-${line}`;
        }
        return line;
      });

      callback(null, lines.join('\n'));
    } else {
      callback(null, chunk);
    }
  }
}

// 使用例
async function demonstrateLifecycle() {
  const monitor1 = new StreamLifecycleMonitor('変換処理1');
  const monitor2 = new StreamLifecycleMonitor('変換処理2');

  try {
    await pipeline(
      fs.createReadStream('input.txt'),
      monitor1,
      monitor2,
      fs.createWriteStream('output.txt')
    );

    console.log('パイプライン処理が正常に完了しました');
  } catch (error) {
    console.error(
      'パイプライン処理中にエラーが発生:',
      error.message
    );
  }
}

// demonstrateLifecycle();

エラーハンドリングのベストプラクティス

javascriptconst fs = require('fs');
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');

class SafeJsonParser extends Transform {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.errorCount = 0;
    this.successCount = 0;
    this.maxErrors = options.maxErrors || 10;
  }

  _transform(chunk, encoding, callback) {
    try {
      const lines = chunk.toString().split('\n');

      lines.forEach((line) => {
        if (line.trim()) {
          try {
            const jsonData = JSON.parse(line);
            this.push(jsonData);
            this.successCount++;
          } catch (parseError) {
            this.errorCount++;

            // エラー情報を詳細に記録
            const errorInfo = {
              error: 'JSON_PARSE_ERROR',
              line: line.substring(0, 100), // 最初の100文字のみ
              message: parseError.message,
              lineNumber:
                this.successCount + this.errorCount,
            };

            // エラーイベントを発行(但し、致命的でない場合は継続)
            this.emit('parseError', errorInfo);

            // 最大エラー数を超えた場合は処理を停止
            if (this.errorCount > this.maxErrors) {
              return callback(
                new Error(
                  `最大エラー数(${this.maxErrors})を超えました`
                )
              );
            }
          }
        }
      });

      callback();
    } catch (error) {
      callback(error);
    }
  }

  _flush(callback) {
    // 最終統計を出力
    console.log(
      `解析完了: 成功 ${this.successCount}件, エラー ${this.errorCount}件`
    );
    callback();
  }
}

// エラーハンドリングの使用例
async function safeJsonProcessing() {
  const parser = new SafeJsonParser({ maxErrors: 5 });

  // 非致命的エラーの監視
  parser.on('parseError', (errorInfo) => {
    console.warn(
      `解析エラー (行 ${errorInfo.lineNumber}):`,
      errorInfo.message
    );
  });

  try {
    await pipeline(
      fs.createReadStream('data.jsonl'),
      parser,
      new Transform({
        objectMode: true,
        transform(data, encoding, callback) {
          // 有効なJSONオブジェクトのみ処理
          console.log('処理中:', data.id || 'ID不明');
          callback(null, JSON.stringify(data) + '\n');
        },
      }),
      fs.createWriteStream('cleaned-data.jsonl')
    );

    console.log('JSON処理が完了しました');
  } catch (error) {
    console.error(
      '処理中に致命的エラーが発生:',
      error.message
    );
  }
}

基本的な使用方法とコード例

ストリームの実用的な使用パターンを、具体的なコード例とともに解説します。

パイプライン処理の基本

javascriptconst fs = require('fs');
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
const zlib = require('zlib');

// データ処理パイプラインの例
async function processDataPipeline() {
  // ログファイルの処理と圧縮
  const logProcessor = new Transform({
    transform(chunk, encoding, callback) {
      const lines = chunk.toString().split('\n');

      // エラーログのみ抽出
      const errorLines = lines.filter(
        (line) =>
          line.includes('[ERROR]') ||
          line.includes('[FATAL]')
      );

      if (errorLines.length > 0) {
        callback(null, errorLines.join('\n') + '\n');
      } else {
        callback();
      }
    },
  });

  try {
    await pipeline(
      fs.createReadStream('application.log'), // 入力
      logProcessor, // 変換
      zlib.createGzip(), // 圧縮
      fs.createWriteStream('errors-only.log.gz') // 出力
    );

    console.log('ログ処理が完了しました');
  } catch (error) {
    console.error('パイプライン処理エラー:', error.message);
  }
}

バックプレッシャーの処理

javascriptconst { Writable, Readable } = require('stream');

class SlowProcessor extends Writable {
  constructor(options = {}) {
    super(options);
    this.processDelay = options.delay || 1000;
    this.processedCount = 0;
  }

  async _write(chunk, encoding, callback) {
    // 意図的に遅い処理をシミュレート
    await new Promise((resolve) =>
      setTimeout(resolve, this.processDelay)
    );

    this.processedCount++;
    console.log(
      `処理完了 #${this.processedCount}: ${chunk.length} bytes`
    );

    callback();
  }
}

class FastDataGenerator extends Readable {
  constructor(options = {}) {
    super(options);
    this.count = 0;
    this.maxCount = options.maxCount || 10;
  }

  _read() {
    if (this.count < this.maxCount) {
      const data = `高速データ生成 #${++this.count}\n`;
      console.log(`生成: ${data.trim()}`);
      this.push(data);
    } else {
      this.push(null);
    }
  }
}

// バックプレッシャーのデモンストレーション
function demonstrateBackpressure() {
  const generator = new FastDataGenerator({ maxCount: 5 });
  const processor = new SlowProcessor({ delay: 2000 });

  // バックプレッシャーの監視
  generator.on('pause', () => {
    console.log(
      '生成側が一時停止されました(バックプレッシャー)'
    );
  });

  generator.on('resume', () => {
    console.log('生成側が再開されました');
  });

  generator.pipe(processor);

  processor.on('finish', () => {
    console.log('すべての処理が完了しました');
  });
}

// demonstrateBackpressure();

並列ストリーム処理

javascriptconst fs = require('fs');
const path = require('path');
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');

class ParallelFileProcessor {
  constructor(concurrency = 3) {
    this.concurrency = concurrency;
    this.activePromises = [];
  }

  async processDirectory(inputDir, outputDir, processor) {
    const files = await fs.promises.readdir(inputDir);
    const txtFiles = files.filter(
      (file) => path.extname(file) === '.txt'
    );

    console.log(
      `${txtFiles.length} ファイルを ${this.concurrency} 並列で処理開始`
    );

    // ファイルを並列処理用にグループ化
    const fileGroups = this.chunkArray(
      txtFiles,
      this.concurrency
    );

    for (const group of fileGroups) {
      const promises = group.map(async (filename) => {
        const inputPath = path.join(inputDir, filename);
        const outputPath = path.join(
          outputDir,
          `processed-${filename}`
        );

        try {
          await pipeline(
            fs.createReadStream(inputPath),
            processor(),
            fs.createWriteStream(outputPath)
          );

          console.log(`完了: ${filename}`);
          return { success: true, filename };
        } catch (error) {
          console.error(
            `エラー ${filename}:`,
            error.message
          );
          return {
            success: false,
            filename,
            error: error.message,
          };
        }
      });

      // 並列実行
      const results = await Promise.all(promises);
      console.log(`グループ処理完了:`, results);
    }
  }

  chunkArray(array, chunkSize) {
    const chunks = [];
    for (let i = 0; i < array.length; i += chunkSize) {
      chunks.push(array.slice(i, i + chunkSize));
    }
    return chunks;
  }
}

// 使用例
async function parallelProcessingExample() {
  const processor = new ParallelFileProcessor(2);

  // テキスト変換プロセッサーを定義
  const createProcessor = () =>
    new Transform({
      transform(chunk, encoding, callback) {
        // すべての単語を大文字に変換
        const processedText = chunk
          .toString()
          .split(' ')
          .map((word) => word.toUpperCase())
          .join(' ');

        callback(null, processedText);
      },
    });

  try {
    await processor.processDirectory(
      './input',
      './output',
      createProcessor
    );
    console.log('並列処理が完了しました');
  } catch (error) {
    console.error('並列処理エラー:', error.message);
  }
}

ストリームとバッファリングの関係

ストリームにおけるバッファリングは、パフォーマンスとメモリ使用量のバランスを取る重要な仕組みです。

バッファサイズの最適化

javascriptconst fs = require('fs');
const { Transform } = require('stream');

class BufferAnalyzer extends Transform {
  constructor(options = {}) {
    super(options);
    this.chunkSizes = [];
    this.totalBytes = 0;
    this.chunkCount = 0;
  }

  _transform(chunk, encoding, callback) {
    this.chunkCount++;
    this.totalBytes += chunk.length;
    this.chunkSizes.push(chunk.length);

    console.log(
      `チャンク #${this.chunkCount}: ${chunk.length} bytes`
    );

    // そのまま通す
    callback(null, chunk);
  }

  _flush(callback) {
    // 統計情報を出力
    const avgChunkSize = Math.round(
      this.totalBytes / this.chunkCount
    );
    const minChunkSize = Math.min(...this.chunkSizes);
    const maxChunkSize = Math.max(...this.chunkSizes);

    console.log('\n=== バッファ統計 ===');
    console.log(`総バイト数: ${this.totalBytes}`);
    console.log(`チャンク数: ${this.chunkCount}`);
    console.log(
      `平均チャンクサイズ: ${avgChunkSize} bytes`
    );
    console.log(
      `最小チャンクサイズ: ${minChunkSize} bytes`
    );
    console.log(
      `最大チャンクサイズ: ${maxChunkSize} bytes`
    );

    callback();
  }
}

// 異なるバッファサイズでの比較
async function compareBufferSizes() {
  const bufferSizes = [512, 1024, 4096, 16384, 65536]; // 0.5KB から 64KB

  for (const bufferSize of bufferSizes) {
    console.log(
      `\n--- バッファサイズ: ${bufferSize} bytes ---`
    );

    const analyzer = new BufferAnalyzer();
    const startTime = Date.now();

    try {
      await new Promise((resolve, reject) => {
        const readStream = fs.createReadStream(
          'large-file.txt',
          {
            highWaterMark: bufferSize,
          }
        );

        readStream
          .pipe(analyzer)
          .pipe(
            fs.createWriteStream(`output-${bufferSize}.txt`)
          )
          .on('finish', resolve)
          .on('error', reject);
      });

      const duration = Date.now() - startTime;
      console.log(`処理時間: ${duration}ms`);
    } catch (error) {
      console.error(
        `エラー (バッファサイズ ${bufferSize}):`,
        error.message
      );
    }
  }
}

メモリ使用量の監視

javascriptconst { Transform } = require('stream');
const fs = require('fs');

class MemoryMonitor extends Transform {
  constructor(options = {}) {
    super(options);
    this.monitorInterval = setInterval(() => {
      this.logMemoryUsage();
    }, 1000);

    this.processedBytes = 0;
    this.startTime = Date.now();
  }

  _transform(chunk, encoding, callback) {
    this.processedBytes += chunk.length;
    callback(null, chunk);
  }

  _flush(callback) {
    clearInterval(this.monitorInterval);

    const duration = Date.now() - this.startTime;
    const throughput = Math.round(
      this.processedBytes / (duration / 1000)
    );

    console.log('\n=== 最終統計 ===');
    console.log(`処理バイト数: ${this.processedBytes}`);
    console.log(`処理時間: ${duration}ms`);
    console.log(`スループット: ${throughput} bytes/sec`);

    this.logMemoryUsage();
    callback();
  }

  logMemoryUsage() {
    const memUsage = process.memoryUsage();
    const formatBytes = (bytes) =>
      Math.round((bytes / 1024 / 1024) * 100) / 100;

    console.log(
      `メモリ使用量 - RSS: ${formatBytes(
        memUsage.rss
      )}MB, ` +
        `Heap Used: ${formatBytes(memUsage.heapUsed)}MB, ` +
        `External: ${formatBytes(memUsage.external)}MB`
    );
  }
}

// 使用例
async function monitorMemoryUsage() {
  const monitor = new MemoryMonitor();

  const largeDataProcessor = new Transform({
    transform(chunk, encoding, callback) {
      // 意図的に重い処理をシミュレート
      const data = chunk.toString();
      const processedData = data
        .split('\n')
        .map((line) => line.toUpperCase())
        .join('\n');

      callback(null, processedData);
    },
  });

  try {
    fs.createReadStream('very-large-file.txt')
      .pipe(monitor)
      .pipe(largeDataProcessor)
      .pipe(fs.createWriteStream('processed-output.txt'))
      .on('finish', () => {
        console.log('メモリ監視付き処理が完了しました');
      });
  } catch (error) {
    console.error('処理エラー:', error.message);
  }
}

バックプレッシャーの詳細制御

javascriptconst { Writable, Readable } = require('stream');

class AdaptiveWriter extends Writable {
  constructor(options = {}) {
    super(options);
    this.writeDelay = options.initialDelay || 100;
    this.maxDelay = options.maxDelay || 2000;
    this.minDelay = options.minDelay || 50;
    this.adaptationFactor = options.adaptationFactor || 1.1;
    this.queueLength = 0;
  }

  _write(chunk, encoding, callback) {
    this.queueLength++;

    // キューの長さに応じて遅延を調整
    if (this.queueLength > 10) {
      this.writeDelay = Math.min(
        this.writeDelay * this.adaptationFactor,
        this.maxDelay
      );
      console.log(
        `書き込み速度を下げました: ${this.writeDelay}ms`
      );
    } else if (this.queueLength < 3) {
      this.writeDelay = Math.max(
        this.writeDelay / this.adaptationFactor,
        this.minDelay
      );
      console.log(
        `書き込み速度を上げました: ${this.writeDelay}ms`
      );
    }

    setTimeout(() => {
      this.queueLength--;
      console.log(
        `書き込み完了 (キュー: ${this.queueLength}, 遅延: ${this.writeDelay}ms)`
      );
      callback();
    }, this.writeDelay);
  }
}

class BurstDataGenerator extends Readable {
  constructor(options = {}) {
    super(options);
    this.burstSize = options.burstSize || 5;
    this.burstInterval = options.burstInterval || 3000;
    this.dataCount = 0;
    this.maxData = options.maxData || 50;

    this.startBurstGeneration();
  }

  startBurstGeneration() {
    this.burstTimer = setInterval(() => {
      for (
        let i = 0;
        i < this.burstSize && this.dataCount < this.maxData;
        i++
      ) {
        this.generateData();
      }

      if (this.dataCount >= this.maxData) {
        clearInterval(this.burstTimer);
        this.push(null);
      }
    }, this.burstInterval);
  }

  generateData() {
    this.dataCount++;
    const data = `バーストデータ #${
      this.dataCount
    } - ${new Date().toISOString()}\n`;
    console.log(`生成: ${data.trim()}`);
    this.push(data);
  }

  _read() {
    // 実際の読み取りはタイマーで制御
  }
}

// 適応的バックプレッシャーのデモ
function demonstrateAdaptiveBackpressure() {
  const generator = new BurstDataGenerator({
    burstSize: 3,
    burstInterval: 1000,
    maxData: 20,
  });

  const writer = new AdaptiveWriter({
    initialDelay: 500,
    maxDelay: 3000,
    minDelay: 100,
    adaptationFactor: 1.2,
  });

  generator.pipe(writer);

  writer.on('finish', () => {
    console.log(
      '適応的バックプレッシャー処理が完了しました'
    );
  });
}

// demonstrateAdaptiveBackpressure();

まとめ

Node.js ストリームは、大量データ処理において欠かせない強力な仕組みです。本記事で解説した内容を振り返ってみましょう。

ストリームの重要なポイント

Node.js ストリームを効果的に活用するために、以下の重要なポイントを押さえておきましょう。

項目重要なポイント実践のコツ
4 つのタイプ用途に応じた適切なストリーム選択Readable(読み取り)、Writable(書き込み)、Transform(変換)、Duplex(双方向)の特性を理解
メモリ効率大量データを一定メモリで処理highWaterMark でバッファサイズを最適化
エラーハンドリング堅牢なエラー処理の実装errorendfinishclose イベントの適切な監視
バックプレッシャー処理速度の自動調整遅い処理に対してデータ生成を一時停止する仕組み
パイプライン複数処理の効率的な連鎖pipeline() 関数を使用した安全な処理チェーン

ストリーム活用のベストプラクティス

実際のプロジェクトでストリームを活用する際は、以下のベストプラクティスを参考にしてください。

javascript// ベストプラクティスの統合例
const fs = require('fs');
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');

class ProductionStreamProcessor {
  static async processFile(
    inputPath,
    outputPath,
    options = {}
  ) {
    const processor = new Transform({
      highWaterMark: options.bufferSize || 16384, // 16KB
      transform(chunk, encoding, callback) {
        try {
          // データ変換処理
          const processedData = chunk
            .toString()
            .split('\n')
            .filter((line) => line.trim())
            .map((line) => line.toUpperCase())
            .join('\n');

          callback(null, processedData + '\n');
        } catch (error) {
          callback(error);
        }
      },
    });

    // エラーハンドリングとモニタリング
    processor.on('error', (error) => {
      console.error('変換処理エラー:', error.message);
    });

    try {
      await pipeline(
        fs.createReadStream(inputPath, {
          highWaterMark: options.bufferSize || 16384,
        }),
        processor,
        fs.createWriteStream(outputPath)
      );

      console.log(
        `ファイル処理完了: ${inputPath}${outputPath}`
      );
      return { success: true };
    } catch (error) {
      console.error(
        'パイプライン処理エラー:',
        error.message
      );
      return { success: false, error: error.message };
    }
  }
}

// 使用例
// ProductionStreamProcessor.processFile('./input.txt', './output.txt', { bufferSize: 32768 });

パフォーマンス向上のコツ

ストリームのパフォーマンスを最大限に引き出すためのテクニックをまとめました。

  1. 適切なバッファサイズの選択: 処理するデータの特性に応じて highWaterMark を調整する
  2. 並列処理の活用: 複数ファイルの処理では並列ストリームを利用する
  3. メモリ監視: 長時間稼働するアプリケーションではメモリ使用量を定期的に監視する
  4. エラー処理の最適化: 致命的でないエラーは継続処理し、統計情報として記録する
  5. 適切なクリーンアップ: ストリーム終了時のリソース解放を確実に実行する

今後の学習ステップ

Node.js ストリームをさらに深く理解するために、以下の学習ステップをお勧めします。

  1. 実際のプロジェクトでの実践: 小さなファイル処理ツールから始めて経験を積む
  2. パフォーマンス測定: 様々なバッファサイズやストリーム構成でベンチマークを取る
  3. 高度なパターン: ストリームプールやカスタムストリームの応用例を学習
  4. 他のライブラリとの組み合わせ: Express.js や WebSocket との連携パターンを習得
  5. デバッグ技術: ストリーム処理のトラブルシューティング手法をマスター

Node.js ストリームをマスターすることで、効率的で拡張性の高いアプリケーションを構築できるようになります。メモリ効率とパフォーマンスを両立させながら、大量データを滑らかに処理する魅力を、ぜひ実際のプロジェクトで体験してみてください。

ストリームの世界は奥深く、学び続けることで新たな発見や最適化の機会が見つかるでしょう。今回学んだ基礎知識を土台として、さらなるスキルアップを目指していきましょう!

関連リンク