T-CREATOR

Node.js マルチスレッド処理:worker_threads と cluster の使い分け

Node.js マルチスレッド処理:worker_threads と cluster の使い分け

Node.js は「シングルスレッド」で動作するという特徴があります。この特性は多くの場合、開発をシンプルにし、デバッグを容易にしてくれます。しかし、CPU 集約型の処理や大量のリクエストを扱う際には、この特性が性能のボトルネックとなることがあります。

実際の開発現場では、「この処理を並列化したい」「サーバーの性能を向上させたい」という課題に直面することが多いでしょう。そんな時に登場するのが、Node.js のマルチスレッド処理技術です。

本記事では、Node.js で並列処理を実現する 2 つの主要なアプローチであるworker_threadsclusterについて、実践的な観点から詳しく解説します。どちらを選ぶべきか、どのような場面で効果的なのかを、実際のコード例と共に理解していきましょう。

Node.js のスレッドモデル

イベントループの仕組み

Node.js の心臓部であるイベントループは、シングルスレッドで動作します。この仕組みにより、非同期処理を効率的に扱うことができます。

javascript// イベントループの基本的な動作例
console.log('1. 開始');

setTimeout(() => {
  console.log('3. 非同期処理完了');
}, 0);

console.log('2. 即座に実行');

// 出力順序:
// 1. 開始
// 2. 即座に実行
// 3. 非同期処理完了

このコードは、イベントループがどのように非同期処理を管理しているかを示しています。setTimeoutは非同期処理として扱われ、メインスレッドをブロックすることなく実行されます。

シングルスレッドの利点と課題

シングルスレッドモデルには、明確な利点と課題があります。

利点:

  • デバッグが容易(競合状態が発生しにくい)
  • メモリ使用量が少ない
  • コンテキストスイッチングのオーバーヘッドがない

課題:

  • CPU 集約型処理でメインスレッドがブロックされる
  • マルチコア CPU の性能を活かせない
  • 一つの処理が全体の性能に影響する

実際に問題となる場面を見てみましょう:

javascript// CPU集約型処理の例 - 問題のあるコード
function heavyCalculation(n) {
  let result = 0;
  for (let i = 0; i < n; i++) {
    result += Math.sqrt(i) * Math.sin(i);
  }
  return result;
}

// この処理はメインスレッドをブロックする
console.log('処理開始');
const result = heavyCalculation(10000000); // 数秒間ブロック
console.log('処理完了:', result);

このような処理が実行されると、Web サーバーであれば他のリクエストが処理できなくなってしまいます。

CPU 集約型処理での性能問題

CPU 集約型処理は、Node.js のシングルスレッド特性と最も相性が悪い処理です。画像処理、暗号化、データ分析などが該当します。

javascript// 画像処理の例(実際のエラーが発生しやすい場面)
const sharp = require('sharp');

app.post('/process-image', async (req, res) => {
  try {
    // 大量の画像処理を行うとメインスレッドがブロックされる
    const processedImage = await sharp(req.file.buffer)
      .resize(800, 600)
      .jpeg({ quality: 90 })
      .toBuffer();

    res.json({ success: true });
  } catch (error) {
    // エラー: ENOMEM (メモリ不足) や ETIMEDOUT (タイムアウト) が発生
    console.error('画像処理エラー:', error.message);
    res.status(500).json({ error: error.message });
  }
});

このような場面で、マルチスレッド処理の導入を検討することになります。

worker_threads の基本概念

ワーカースレッドとは

worker_threadsは、Node.js v10.5.0 から導入された、メインスレッドとは別のスレッドで JavaScript コードを実行できるモジュールです。

javascript// worker_threadsの基本的な使用例
const {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} = require('worker_threads');

if (isMainThread) {
  // メインスレッド側のコード
  const worker = new Worker(__filename, {
    workerData: { number: 1000000 },
  });

  worker.on('message', (result) => {
    console.log('計算結果:', result);
  });

  worker.on('error', (error) => {
    console.error('ワーカーエラー:', error);
  });
} else {
  // ワーカースレッド側のコード
  const { number } = workerData;
  const result = heavyCalculation(number);
  parentPort.postMessage(result);
}

メインスレッドとの関係

ワーカースレッドは、メインスレッドと独立して動作しますが、メッセージングを通じて通信します。

javascript// メインスレッドとワーカースレッドの通信例
const { Worker } = require('worker_threads');

// メインスレッド側
const worker = new Worker('./worker.js');

// ワーカーにデータを送信
worker.postMessage({
  type: 'CALCULATE',
  data: [1, 2, 3, 4, 5],
});

// ワーカーからの結果を受信
worker.on('message', (result) => {
  console.log('受信した結果:', result);
});

// エラーハンドリング
worker.on('error', (error) => {
  console.error('ワーカーエラー:', error);
  // エラー: Worker terminated unexpectedly
});
javascript// worker.js - ワーカースレッド側
const { parentPort } = require('worker_threads');

parentPort.on('message', (message) => {
  if (message.type === 'CALCULATE') {
    const result = message.data.reduce(
      (sum, num) => sum + num,
      0
    );
    parentPort.postMessage({
      type: 'RESULT',
      data: result,
    });
  }
});

適用場面と特徴

worker_threadsは以下の場面で特に効果的です:

適用場面:

  • CPU 集約型の計算処理
  • 画像・動画処理
  • 暗号化処理
  • データ変換・加工

特徴:

  • メモリを共有できる(SharedArrayBuffer)
  • 軽量なスレッド切り替え
  • メインスレッドをブロックしない
javascript// CPU集約型処理の並列化例
const { Worker } = require('worker_threads');

function runParallelCalculations(numbers) {
  const workers = [];
  const results = [];

  // 各数値に対してワーカーを作成
  numbers.forEach((number, index) => {
    const worker = new Worker('./calculation-worker.js', {
      workerData: { number, index },
    });

    worker.on('message', (result) => {
      results[result.index] = result.value;

      // 全ての計算が完了したかチェック
      if (
        results.filter((r) => r !== undefined).length ===
        numbers.length
      ) {
        console.log('全計算完了:', results);
      }
    });

    workers.push(worker);
  });
}

cluster の基本概念

クラスターモジュールとは

clusterモジュールは、Node.js アプリケーションを複数のプロセスで実行するための機能です。各プロセスは独立した V8 インスタンスで動作します。

javascript// clusterの基本的な使用例
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(
    `マスタープロセス ${process.pid} が起動しました`
  );

  // ワーカープロセスをフォーク
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(
      `ワーカー ${worker.process.pid} が終了しました`
    );
    // 新しいワーカーを起動(フォールトトレランス)
    cluster.fork();
  });
} else {
  // ワーカープロセス
  http
    .createServer((req, res) => {
      res.writeHead(200);
      res.end(`ワーカー ${process.pid} が処理しました`);
    })
    .listen(8000);

  console.log(
    `ワーカープロセス ${process.pid} が起動しました`
  );
}

プロセスベースの並列処理

clusterはプロセスレベルでの並列処理を提供します。各プロセスは独立したメモリ空間を持ちます。

javascript// プロセス間の独立性を示す例
const cluster = require('cluster');

if (cluster.isMaster) {
  // マスタープロセス
  const worker1 = cluster.fork();
  const worker2 = cluster.fork();

  // 各ワーカーに異なるデータを送信
  worker1.send({ type: 'SET_DATA', data: 'Worker 1 Data' });
  worker2.send({ type: 'SET_DATA', data: 'Worker 2 Data' });

  // 結果を受信
  worker1.on('message', (msg) => {
    console.log('Worker 1 結果:', msg);
  });

  worker2.on('message', (msg) => {
    console.log('Worker 2 結果:', msg);
  });
} else {
  // ワーカープロセス
  let workerData = null;

  process.on('message', (msg) => {
    if (msg.type === 'SET_DATA') {
      workerData = msg.data;
      process.send({
        pid: process.pid,
        data: workerData,
        memory: process.memoryUsage(),
      });
    }
  });
}

適用場面と特徴

clusterは以下の場面で特に効果的です:

適用場面:

  • Web サーバーの負荷分散
  • マイクロサービスアーキテクチャ
  • 長時間実行されるバッチ処理
  • 高可用性が求められるアプリケーション

特徴:

  • プロセスレベルの分離(メモリ、CPU、ファイルディスクリプタ)
  • フォールトトレランス(一つのプロセスが落ちても他は継続)
  • ロードバランシング機能
javascript// Webサーバーの負荷分散例
const cluster = require('cluster');
const express = require('express');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`マスタープロセス ${process.pid} が起動`);

  // CPUコア数分のワーカーを起動
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // ワーカーの死活監視
  cluster.on('exit', (worker, code, signal) => {
    console.log(
      `ワーカー ${worker.process.pid} が終了 (${
        signal || code
      })`
    );
    // 自動的に新しいワーカーを起動
    cluster.fork();
  });
} else {
  // ワーカープロセス
  const app = express();

  app.get('/', (req, res) => {
    // 重い処理をシミュレート
    const start = Date.now();
    while (Date.now() - start < 100) {
      // 100msの処理
    }

    res.json({
      message: 'Hello from cluster!',
      worker: process.pid,
      uptime: process.uptime(),
    });
  });

  app.listen(3000, () => {
    console.log(
      `ワーカー ${process.pid} がポート 3000 で起動`
    );
  });
}

worker_threads vs cluster:詳細比較

アーキテクチャの違い

worker_threadsclusterは、根本的に異なるアーキテクチャを採用しています。

worker_threads(スレッドベース):

  • 同一プロセス内で複数のスレッド
  • メモリ空間を共有可能
  • 軽量なコンテキストスイッチ

cluster(プロセスベース):

  • 複数の独立したプロセス
  • 完全に分離されたメモリ空間
  • 重いプロセス切り替え
javascript// アーキテクチャの違いを実感できる例
const { Worker } = require('worker_threads');
const cluster = require('cluster');

if (cluster.isMaster) {
  console.log('=== Cluster モード ===');
  console.log('マスタープロセス PID:', process.pid);

  // ワーカープロセスを起動
  const worker = cluster.fork();

  worker.on('message', (msg) => {
    console.log('ワーカーからのメッセージ:', msg);
    console.log('メモリ使用量:', process.memoryUsage());
  });
} else {
  // ワーカープロセス
  console.log('ワーカープロセス PID:', process.pid);
  process.send({
    type: 'INFO',
    pid: process.pid,
    memory: process.memoryUsage(),
  });
}

メモリ使用量の比較

メモリ使用量は、アプリケーションの設計において重要な考慮事項です。

javascript// メモリ使用量を比較するベンチマーク
const { Worker } = require('worker_threads');
const cluster = require('cluster');

function measureMemoryUsage() {
  const usage = process.memoryUsage();
  return {
    rss: Math.round(usage.rss / 1024 / 1024), // MB
    heapUsed: Math.round(usage.heapUsed / 1024 / 1024), // MB
    heapTotal: Math.round(usage.heapTotal / 1024 / 1024), // MB
  };
}

// worker_threads のメモリ使用量測定
function testWorkerThreads() {
  const workers = [];
  const startMemory = measureMemoryUsage();

  for (let i = 0; i < 4; i++) {
    const worker = new Worker(
      `
      const { parentPort } = require('worker_threads');
      parentPort.on('message', (data) => {
        parentPort.postMessage({ result: data * 2 });
      });
    `,
      { eval: true }
    );

    workers.push(worker);
  }

  const endMemory = measureMemoryUsage();
  console.log('Worker Threads メモリ使用量:', {
    start: startMemory,
    end: endMemory,
    difference: {
      rss: endMemory.rss - startMemory.rss,
      heapUsed: endMemory.heapUsed - startMemory.heapUsed,
    },
  });

  workers.forEach((w) => w.terminate());
}

通信オーバーヘッド

通信方式の違いは、パフォーマンスに大きな影響を与えます。

javascript// 通信オーバーヘッドを測定する例
const { Worker } = require('worker_threads');

// worker_threads の通信テスト
function testWorkerCommunication() {
  const worker = new Worker(
    `
    const { parentPort } = require('worker_threads');
    parentPort.on('message', (data) => {
      parentPort.postMessage({ 
        received: data,
        timestamp: Date.now()
      });
    });
  `,
    { eval: true }
  );

  const startTime = Date.now();
  let messageCount = 0;

  worker.on('message', (data) => {
    messageCount++;
    if (messageCount >= 1000) {
      const endTime = Date.now();
      const duration = endTime - startTime;
      console.log(
        `Worker Threads 通信: ${messageCount} メッセージを ${duration}ms で処理`
      );
      worker.terminate();
    } else {
      worker.postMessage({ count: messageCount });
    }
  });

  worker.postMessage({ count: 0 });
}

エラーハンドリング

エラーハンドリングの仕組みも、両者で大きく異なります。

javascript// worker_threads のエラーハンドリング
const { Worker } = require('worker_threads');

const worker = new Worker(
  `
  const { parentPort } = require('worker_threads');
  
  // 意図的にエラーを発生させる
  setTimeout(() => {
    throw new Error('Worker thread error');
  }, 1000);
  
  parentPort.on('message', (data) => {
    parentPort.postMessage({ result: data * 2 });
  });
`,
  { eval: true }
);

worker.on('error', (error) => {
  console.error('Worker Threads エラー:', error.message);
  // エラー: Worker terminated unexpectedly
});

worker.on('exit', (code) => {
  if (code !== 0) {
    console.error(`ワーカーが異常終了しました: ${code}`);
  }
});
javascript// cluster のエラーハンドリング
const cluster = require('cluster');

if (cluster.isMaster) {
  const worker = cluster.fork();

  worker.on('exit', (code, signal) => {
    if (code !== 0) {
      console.error(
        `ワーカープロセスが異常終了: ${code} (${signal})`
      );
      // 新しいワーカーを自動起動
      cluster.fork();
    }
  });

  worker.on('disconnect', () => {
    console.log('ワーカーが切断されました');
  });
} else {
  // ワーカープロセスでエラーを発生
  setTimeout(() => {
    process.exit(1); // プロセスを強制終了
  }, 1000);
}

使い分けの判断基準

CPU 集約型処理の場合

CPU 集約型処理では、worker_threadsが適しています。メモリ共有が可能で、通信オーバーヘッドが少ないためです。

javascript// CPU集約型処理の実装例
const { Worker } = require('worker_threads');

function processCPUIntensiveTask(data) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(
      `
      const { parentPort } = require('worker_threads');
      
      function fibonacci(n) {
        if (n <= 1) return n;
        return fibonacci(n - 1) + fibonacci(n - 2);
      }
      
      parentPort.on('message', (data) => {
        const result = fibonacci(data.n);
        parentPort.postMessage({ result });
      });
    `,
      { eval: true }
    );

    worker.on('message', (result) => {
      resolve(result.result);
      worker.terminate();
    });

    worker.on('error', reject);
    worker.postMessage(data);
  });
}

// 使用例
async function example() {
  try {
    const result = await processCPUIntensiveTask({ n: 40 });
    console.log('フィボナッチ結果:', result);
  } catch (error) {
    console.error('計算エラー:', error);
  }
}

I/O 集約型処理の場合

I/O 集約型処理では、clusterが適しています。各プロセスが独立して I/O 処理を行えるためです。

javascript// I/O集約型処理の実装例
const cluster = require('cluster');
const fs = require('fs').promises;
const path = require('path');

if (cluster.isMaster) {
  console.log('マスタープロセスが起動');

  // 複数のワーカーでファイル処理を並列実行
  const files = [
    'file1.txt',
    'file2.txt',
    'file3.txt',
    'file4.txt',
  ];

  files.forEach((file, index) => {
    const worker = cluster.fork();
    worker.send({ file, index });

    worker.on('message', (result) => {
      console.log(
        `ファイル ${result.file} の処理完了:`,
        result.size
      );
    });
  });
} else {
  process.on('message', async (data) => {
    try {
      const filePath = path.join(__dirname, data.file);
      const content = await fs.readFile(filePath, 'utf8');
      const size = content.length;

      process.send({
        file: data.file,
        size,
        worker: process.pid,
      });
    } catch (error) {
      console.error(`ファイル処理エラー: ${error.message}`);
      process.exit(1);
    }
  });
}

メモリ使用量を重視する場合

メモリ使用量を重視する場合は、worker_threadsが有利です。メモリ共有により、全体のメモリ使用量を削減できます。

javascript// メモリ効率的な実装例
const {
  Worker,
  SharedArrayBuffer,
} = require('worker_threads');

function processWithSharedMemory(dataSize) {
  // 共有メモリを作成
  const sharedBuffer = new SharedArrayBuffer(dataSize * 4); // 4バイト整数
  const sharedArray = new Int32Array(sharedBuffer);

  // データを初期化
  for (let i = 0; i < dataSize; i++) {
    sharedArray[i] = i;
  }

  const worker = new Worker(
    `
    const { parentPort, workerData } = require('worker_threads');
    const sharedArray = new Int32Array(workerData.sharedBuffer);
    
    // 共有メモリ上のデータを処理
    let sum = 0;
    for (let i = 0; i < sharedArray.length; i++) {
      sum += sharedArray[i];
    }
    
    parentPort.postMessage({ sum });
  `,
    {
      workerData: { sharedBuffer },
    }
  );

  return new Promise((resolve) => {
    worker.on('message', (result) => {
      resolve(result.sum);
      worker.terminate();
    });
  });
}

開発・デバッグの容易さ

開発・デバッグの観点では、worker_threadsの方が有利です。同一プロセス内で動作するため、デバッガーが使いやすくなります。

javascript// デバッグしやすい worker_threads の実装
const { Worker } = require('worker_threads');

function debugFriendlyWorker() {
  const worker = new Worker(
    `
    const { parentPort } = require('worker_threads');
    
    // デバッグ用のログ出力
    console.log('ワーカースレッドが起動しました');
    
    parentPort.on('message', (data) => {
      console.log('ワーカーがメッセージを受信:', data);
      
      try {
        const result = processData(data);
        console.log('処理結果:', result);
        parentPort.postMessage({ success: true, result });
      } catch (error) {
        console.error('ワーカーでエラーが発生:', error);
        parentPort.postMessage({ success: false, error: error.message });
      }
    });
    
    function processData(data) {
      // 実際の処理ロジック
      return data.map(x => x * 2);
    }
  `,
    { eval: true }
  );

  worker.on('message', (result) => {
    if (result.success) {
      console.log('処理成功:', result.result);
    } else {
      console.error('処理失敗:', result.error);
    }
  });

  worker.on('error', (error) => {
    console.error('ワーカーエラー:', error);
  });

  return worker;
}

実装例:worker_threads

基本的な実装パターン

worker_threadsの基本的な実装パターンを理解しましょう。

javascript// 基本的なワーカーの実装
const {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} = require('worker_threads');

if (isMainThread) {
  // メインスレッド側
  function createWorker(data) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: data,
      });

      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0) {
          reject(new Error(`ワーカーが異常終了: ${code}`));
        }
      });
    });
  }

  // 使用例
  async function main() {
    try {
      const result = await createWorker({
        numbers: [1, 2, 3, 4, 5],
      });
      console.log('計算結果:', result);
    } catch (error) {
      console.error('エラー:', error);
    }
  }

  main();
} else {
  // ワーカースレッド側
  const { numbers } = workerData;
  const sum = numbers.reduce((a, b) => a + b, 0);
  parentPort.postMessage({ sum, count: numbers.length });
}

データの受け渡し方法

ワーカースレッドとのデータの受け渡しには、複数の方法があります。

javascript// メッセージングによるデータ受け渡し
const { Worker } = require('worker_threads');

const worker = new Worker(
  `
  const { parentPort } = require('worker_threads');
  
  parentPort.on('message', (data) => {
    switch (data.type) {
      case 'CALCULATE':
        const result = data.numbers.reduce((sum, num) => sum + num, 0);
        parentPort.postMessage({ type: 'RESULT', data: result });
        break;
      
      case 'PROCESS_STRING':
        const processed = data.text.toUpperCase();
        parentPort.postMessage({ type: 'PROCESSED', data: processed });
        break;
      
      case 'EXIT':
        parentPort.close();
        break;
    }
  });
`,
  { eval: true }
);

// メインスレッドからデータを送信
worker.postMessage({
  type: 'CALCULATE',
  numbers: [1, 2, 3, 4, 5],
});

worker.on('message', (result) => {
  console.log('受信:', result);
});

エラーハンドリング

ワーカースレッドでのエラーハンドリングは、アプリケーションの安定性に重要です。

javascript// 包括的なエラーハンドリング
const { Worker } = require('worker_threads');

class WorkerManager {
  constructor() {
    this.workers = new Map();
    this.retryCount = new Map();
  }

  createWorker(taskId, workerScript) {
    const worker = new Worker(workerScript, { eval: true });

    // エラーハンドリング
    worker.on('error', (error) => {
      console.error(`ワーカー ${taskId} でエラー:`, error);
      this.handleWorkerError(taskId, worker, error);
    });

    worker.on('exit', (code, signal) => {
      if (code !== 0) {
        console.error(
          `ワーカー ${taskId} が異常終了: ${code} (${signal})`
        );
        this.handleWorkerExit(taskId, worker, code, signal);
      }
    });

    worker.on('message', (message) => {
      this.handleWorkerMessage(taskId, message);
    });

    this.workers.set(taskId, worker);
    this.retryCount.set(taskId, 0);

    return worker;
  }

  handleWorkerError(taskId, worker, error) {
    const retries = this.retryCount.get(taskId) || 0;

    if (retries < 3) {
      console.log(
        `ワーカー ${taskId} を再起動します (${
          retries + 1
        }/3)`
      );
      this.retryCount.set(taskId, retries + 1);
      worker.terminate();
      // 新しいワーカーを作成
      this.createWorker(taskId, worker.workerData);
    } else {
      console.error(
        `ワーカー ${taskId} の再試行回数が上限に達しました`
      );
      this.workers.delete(taskId);
    }
  }

  handleWorkerExit(taskId, worker, code, signal) {
    // クリーンアップ処理
    this.workers.delete(taskId);
    this.retryCount.delete(taskId);
  }

  handleWorkerMessage(taskId, message) {
    console.log(
      `ワーカー ${taskId} からのメッセージ:`,
      message
    );
  }

  terminateAll() {
    for (const [taskId, worker] of this.workers) {
      worker.terminate();
    }
    this.workers.clear();
    this.retryCount.clear();
  }
}

実装例:cluster

基本的な実装パターン

clusterの基本的な実装パターンを理解しましょう。

javascript// 基本的なクラスターの実装
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`マスタープロセス ${process.pid} が起動`);

  // ワーカープロセスを起動
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // ワーカーの死活監視
  cluster.on('exit', (worker, code, signal) => {
    console.log(
      `ワーカー ${worker.process.pid} が終了 (${
        signal || code
      })`
    );

    // 新しいワーカーを起動(フォールトトレランス)
    if (code !== 0 && !worker.exitedAfterDisconnect) {
      console.log('新しいワーカーを起動します');
      cluster.fork();
    }
  });

  // グレースフルシャットダウン
  process.on('SIGTERM', () => {
    console.log(
      'マスタープロセスがシャットダウンシグナルを受信'
    );
    for (const id in cluster.workers) {
      cluster.workers[id].kill();
    }
  });
} else {
  // ワーカープロセス
  const server = http.createServer((req, res) => {
    res.writeHead(200, {
      'Content-Type': 'application/json',
    });
    res.end(
      JSON.stringify({
        message: 'Hello from cluster!',
        worker: process.pid,
        uptime: process.uptime(),
        memory: process.memoryUsage(),
      })
    );
  });

  server.listen(3000, () => {
    console.log(
      `ワーカー ${process.pid} がポート 3000 で起動`
    );
  });

  // ワーカープロセスのグレースフルシャットダウン
  process.on('SIGTERM', () => {
    console.log(
      `ワーカー ${process.pid} がシャットダウンシグナルを受信`
    );
    server.close(() => {
      console.log(`ワーカー ${process.pid} が正常終了`);
      process.exit(0);
    });
  });
}

ロードバランシング

clusterモジュールは、自動的にロードバランシングを行います。

javascript// カスタムロードバランシングの実装
const cluster = require('cluster');
const http = require('http');
const net = require('net');

if (cluster.isMaster) {
  // マスタープロセスでロードバランサーを作成
  const workers = [];
  const numCPUs = require('os').cpus().length;

  // ワーカープロセスを起動
  for (let i = 0; i < numCPUs; i++) {
    const worker = cluster.fork();
    workers.push(worker);
  }

  // ラウンドロビン方式のロードバランシング
  let currentWorker = 0;

  const server = net.createServer((connection) => {
    const worker = workers[currentWorker];

    // 接続をワーカーに転送
    worker.send('sticky-session:connection', connection);

    // 次のワーカーを選択
    currentWorker = (currentWorker + 1) % workers.length;
  });

  server.listen(3000, () => {
    console.log('ロードバランサーがポート 3000 で起動');
  });

  // ワーカーの死活監視
  cluster.on('exit', (worker, code, signal) => {
    console.log(`ワーカー ${worker.process.pid} が終了`);

    // 終了したワーカーを配列から削除
    const index = workers.indexOf(worker);
    if (index > -1) {
      workers.splice(index, 1);
    }

    // 新しいワーカーを起動
    const newWorker = cluster.fork();
    workers.push(newWorker);

    // currentWorkerを調整
    if (index <= currentWorker) {
      currentWorker = Math.max(0, currentWorker - 1);
    }
  });
} else {
  // ワーカープロセス
  const http = require('http');

  const server = http.createServer((req, res) => {
    // 重い処理をシミュレート
    const start = Date.now();
    while (Date.now() - start < Math.random() * 100) {
      // ランダムな処理時間
    }

    res.writeHead(200, {
      'Content-Type': 'application/json',
    });
    res.end(
      JSON.stringify({
        worker: process.pid,
        requestTime: Date.now() - start,
        memory: process.memoryUsage(),
      })
    );
  });

  // マスターからの接続を受信
  process.on('message', (message, connection) => {
    if (message === 'sticky-session:connection') {
      server.emit('connection', connection);
    }
  });

  console.log(`ワーカー ${process.pid} が起動`);
}

プロセス管理

プロセス管理は、クラスターアプリケーションの重要な要素です。

javascript// 高度なプロセス管理の実装
const cluster = require('cluster');
const http = require('http');

class ClusterManager {
  constructor(options = {}) {
    this.numWorkers =
      options.numWorkers || require('os').cpus().length;
    this.workers = new Map();
    this.restartCount = new Map();
    this.maxRestarts = options.maxRestarts || 5;
    this.restartDelay = options.restartDelay || 1000;
  }

  start() {
    if (cluster.isMaster) {
      console.log(`マスタープロセス ${process.pid} が起動`);

      // 初期ワーカーを起動
      for (let i = 0; i < this.numWorkers; i++) {
        this.forkWorker();
      }

      // イベントハンドラーを設定
      this.setupEventHandlers();

      // ヘルスチェックを開始
      this.startHealthCheck();
    } else {
      this.startWorker();
    }
  }

  forkWorker() {
    const worker = cluster.fork();
    const workerId = worker.id;

    this.workers.set(workerId, worker);
    this.restartCount.set(workerId, 0);

    console.log(
      `ワーカー ${workerId} (PID: ${worker.process.pid}) を起動`
    );

    return worker;
  }

  setupEventHandlers() {
    cluster.on('exit', (worker, code, signal) => {
      const workerId = worker.id;
      const restartCount =
        this.restartCount.get(workerId) || 0;

      console.log(
        `ワーカー ${workerId} (PID: ${
          worker.process.pid
        }) が終了: ${signal || code}`
      );

      this.workers.delete(workerId);

      if (
        restartCount < this.maxRestarts &&
        !worker.exitedAfterDisconnect
      ) {
        console.log(
          `${this.restartDelay}ms 後にワーカー ${workerId} を再起動します`
        );

        setTimeout(() => {
          this.restartCount.set(workerId, restartCount + 1);
          this.forkWorker();
        }, this.restartDelay);
      } else {
        console.error(
          `ワーカー ${workerId} の再起動回数が上限に達しました`
        );
        this.restartCount.delete(workerId);
      }
    });

    cluster.on('disconnect', (worker) => {
      console.log(`ワーカー ${worker.id} が切断されました`);
    });
  }

  startHealthCheck() {
    setInterval(() => {
      for (const [workerId, worker] of this.workers) {
        // ワーカーの応答をチェック
        worker.send('health-check');
      }
    }, 30000); // 30秒ごと
  }

  startWorker() {
    const server = http.createServer((req, res) => {
      res.writeHead(200, {
        'Content-Type': 'application/json',
      });
      res.end(
        JSON.stringify({
          worker: process.pid,
          uptime: process.uptime(),
          memory: process.memoryUsage(),
        })
      );
    });

    server.listen(3000, () => {
      console.log(
        `ワーカー ${process.pid} がポート 3000 で起動`
      );
    });

    // ヘルスチェックに応答
    process.on('message', (message) => {
      if (message === 'health-check') {
        process.send('health-ok');
      }
    });

    // グレースフルシャットダウン
    process.on('SIGTERM', () => {
      console.log(
        `ワーカー ${process.pid} がシャットダウンシグナルを受信`
      );
      server.close(() => {
        console.log(`ワーカー ${process.pid} が正常終了`);
        process.exit(0);
      });
    });
  }

  gracefulShutdown() {
    console.log('グレースフルシャットダウンを開始');

    for (const [workerId, worker] of this.workers) {
      worker.disconnect();
    }

    setTimeout(() => {
      console.log('強制終了を実行');
      for (const [workerId, worker] of this.workers) {
        worker.kill();
      }
      process.exit(0);
    }, 10000); // 10秒後に強制終了
  }
}

// 使用例
if (cluster.isMaster) {
  const manager = new ClusterManager({
    numWorkers: 4,
    maxRestarts: 3,
    restartDelay: 2000,
  });

  manager.start();

  // シャットダウンシグナルの処理
  process.on('SIGTERM', () => {
    manager.gracefulShutdown();
  });
}

パフォーマンステストとベンチマーク

実際の性能比較

実際のアプリケーションでの性能を比較してみましょう。

javascript// 性能比較テスト
const { Worker } = require('worker_threads');
const cluster = require('cluster');
const http = require('http');
const { performance } = require('perf_hooks');

// テスト用の重い処理
function heavyTask(iterations) {
  let result = 0;
  for (let i = 0; i < iterations; i++) {
    result += Math.sqrt(i) * Math.sin(i);
  }
  return result;
}

// worker_threads の性能テスト
async function testWorkerThreads() {
  const startTime = performance.now();
  const promises = [];

  for (let i = 0; i < 4; i++) {
    const promise = new Promise((resolve) => {
      const worker = new Worker(
        `
        const { parentPort, workerData } = require('worker_threads');
        
        function heavyTask(iterations) {
          let result = 0;
          for (let i = 0; i < iterations; i++) {
            result += Math.sqrt(i) * Math.sin(i);
          }
          return result;
        }
        
        const result = heavyTask(workerData.iterations);
        parentPort.postMessage(result);
      `,
        {
          workerData: { iterations: 1000000 },
          eval: true,
        }
      );

      worker.on('message', (result) => {
        resolve(result);
        worker.terminate();
      });
    });

    promises.push(promise);
  }

  const results = await Promise.all(promises);
  const endTime = performance.now();

  return {
    method: 'worker_threads',
    duration: endTime - startTime,
    results: results.length,
  };
}

// cluster の性能テスト
function testCluster() {
  return new Promise((resolve) => {
    if (cluster.isMaster) {
      const startTime = performance.now();
      const workers = [];
      let completedWorkers = 0;

      for (let i = 0; i < 4; i++) {
        const worker = cluster.fork();
        workers.push(worker);

        worker.on('message', (result) => {
          completedWorkers++;
          if (completedWorkers === 4) {
            const endTime = performance.now();

            // 全ワーカーを終了
            workers.forEach((w) => w.kill());

            resolve({
              method: 'cluster',
              duration: endTime - startTime,
              results: completedWorkers,
            });
          }
        });
      }
    } else {
      // ワーカープロセス
      const result = heavyTask(1000000);
      process.send(result);
    }
  });
}

// メモリ使用量の測定
function measureMemoryUsage() {
  const usage = process.memoryUsage();
  return {
    rss: Math.round(usage.rss / 1024 / 1024), // MB
    heapUsed: Math.round(usage.heapUsed / 1024 / 1024), // MB
    heapTotal: Math.round(usage.heapTotal / 1024 / 1024), // MB
  };
}

// 統合テスト
async function runPerformanceTest() {
  console.log('=== 性能比較テスト開始 ===');

  // メモリ使用量の初期測定
  const initialMemory = measureMemoryUsage();
  console.log('初期メモリ使用量:', initialMemory);

  // worker_threads テスト
  const workerThreadsResult = await testWorkerThreads();
  console.log('Worker Threads 結果:', workerThreadsResult);

  // 少し待機
  await new Promise((resolve) => setTimeout(resolve, 1000));

  // cluster テスト
  const clusterResult = await testCluster();
  console.log('Cluster 結果:', clusterResult);

  // 最終メモリ使用量
  const finalMemory = measureMemoryUsage();
  console.log('最終メモリ使用量:', finalMemory);

  // 結果の比較
  console.log('\n=== 結果比較 ===');
  console.log(
    `Worker Threads: ${workerThreadsResult.duration.toFixed(
      2
    )}ms`
  );
  console.log(
    `Cluster: ${clusterResult.duration.toFixed(2)}ms`
  );
  console.log(
    `メモリ増加: ${finalMemory.rss - initialMemory.rss}MB`
  );
}

メモリ使用量の測定

メモリ使用量は、アプリケーションの設計において重要な指標です。

javascript// メモリ使用量の詳細測定
const { Worker } = require('worker_threads');
const cluster = require('cluster');

class MemoryProfiler {
  constructor() {
    this.measurements = [];
  }

  measure(label) {
    const usage = process.memoryUsage();
    const measurement = {
      label,
      timestamp: Date.now(),
      rss: Math.round(usage.rss / 1024 / 1024),
      heapUsed: Math.round(usage.heapUsed / 1024 / 1024),
      heapTotal: Math.round(usage.heapTotal / 1024 / 1024),
      external: Math.round(usage.external / 1024 / 1024),
    };

    this.measurements.push(measurement);
    console.log(`${label}:`, measurement);

    return measurement;
  }

  compare(label1, label2) {
    const m1 = this.measurements.find(
      (m) => m.label === label1
    );
    const m2 = this.measurements.find(
      (m) => m.label === label2
    );

    if (m1 && m2) {
      const diff = {
        rss: m2.rss - m1.rss,
        heapUsed: m2.heapUsed - m1.heapUsed,
        heapTotal: m2.heapTotal - m1.heapTotal,
        external: m2.external - m1.external,
      };

      console.log(`\n${label1}${label2} の差分:`, diff);
      return diff;
    }
  }

  generateReport() {
    console.log('\n=== メモリ使用量レポート ===');
    this.measurements.forEach((m, index) => {
      if (index > 0) {
        const prev = this.measurements[index - 1];
        const diff = {
          rss: m.rss - prev.rss,
          heapUsed: m.heapUsed - prev.heapUsed,
        };
        console.log(
          `${prev.label}${m.label}: +${diff.rss}MB RSS, +${diff.heapUsed}MB Heap`
        );
      }
    });
  }
}

// メモリプロファイリングの実行
async function profileMemoryUsage() {
  const profiler = new MemoryProfiler();

  profiler.measure('初期状態');

  // worker_threads のメモリ測定
  const workers = [];
  for (let i = 0; i < 4; i++) {
    const worker = new Worker(
      `
      const { parentPort } = require('worker_threads');
      parentPort.on('message', (data) => {
        parentPort.postMessage({ processed: data.length });
      });
    `,
      { eval: true }
    );

    workers.push(worker);
  }

  profiler.measure('Worker Threads 作成後');

  // ワーカーを終了
  workers.forEach((w) => w.terminate());
  await new Promise((resolve) => setTimeout(resolve, 1000));

  profiler.measure('Worker Threads 終了後');

  // cluster のメモリ測定
  if (cluster.isMaster) {
    const clusterWorkers = [];
    for (let i = 0; i < 4; i++) {
      const worker = cluster.fork();
      clusterWorkers.push(worker);
    }

    profiler.measure('Cluster 作成後');

    // ワーカーを終了
    clusterWorkers.forEach((w) => w.kill());
    await new Promise((resolve) =>
      setTimeout(resolve, 1000)
    );

    profiler.measure('Cluster 終了後');

    profiler.generateReport();
  }
}

スループットの比較

スループットは、実際のアプリケーションでの重要な性能指標です。

javascript// スループット比較テスト
const { Worker } = require('worker_threads');
const cluster = require('cluster');
const http = require('http');

// HTTP リクエストのスループットテスト
function createHttpServer(port, responseTime = 0) {
  return http
    .createServer((req, res) => {
      // 応答時間をシミュレート
      setTimeout(() => {
        res.writeHead(200, {
          'Content-Type': 'application/json',
        });
        res.end(
          JSON.stringify({
            worker: process.pid,
            timestamp: Date.now(),
            memory: process.memoryUsage(),
          })
        );
      }, responseTime);
    })
    .listen(port);
}

// worker_threads でのスループットテスト
async function testWorkerThreadsThroughput() {
  const workers = [];
  const results = [];

  // 4つのワーカーを作成
  for (let i = 0; i < 4; i++) {
    const worker = new Worker(
      `
      const http = require('http');
      
      const server = http.createServer((req, res) => {
        setTimeout(() => {
          res.writeHead(200, { 'Content-Type': 'application/json' });
          res.end(JSON.stringify({
            worker: process.pid,
            timestamp: Date.now()
          }));
        }, 10); // 10ms の処理時間
      });
      
      server.listen(0, () => {
        const port = server.address().port;
        process.send({ type: 'READY', port });
      });
    `,
      { eval: true }
    );

    workers.push(worker);
  }

  // 全ワーカーが準備完了するまで待機
  const ports = await Promise.all(
    workers.map((worker) => {
      return new Promise((resolve) => {
        worker.on('message', (message) => {
          if (message.type === 'READY') {
            resolve(message.port);
          }
        });
      });
    })
  );

  // スループットテスト実行
  const startTime = Date.now();
  const requests = 1000;
  const promises = [];

  for (let i = 0; i < requests; i++) {
    const port = ports[i % ports.length];
    const promise = fetch(`http://localhost:${port}`)
      .then((res) => res.json())
      .then((data) => ({
        success: true,
        worker: data.worker,
      }))
      .catch((err) => ({
        success: false,
        error: err.message,
      }));

    promises.push(promise);
  }

  const results = await Promise.all(promises);
  const endTime = Date.now();

  // ワーカーを終了
  workers.forEach((w) => w.terminate());

  const successful = results.filter(
    (r) => r.success
  ).length;
  const duration = endTime - startTime;
  const rps = (successful / duration) * 1000; // リクエスト/

  return {
    method: 'worker_threads',
    requests,
    successful,
    duration,
    rps: Math.round(rps),
  };
}

// cluster でのスループットテスト
function testClusterThroughput() {
  return new Promise((resolve) => {
    if (cluster.isMaster) {
      const workers = [];

      // 4つのワーカープロセスを起動
      for (let i = 0; i < 4; i++) {
        const worker = cluster.fork();
        workers.push(worker);
      }

      // 全ワーカーが準備完了するまで待機
      let readyWorkers = 0;
      cluster.on('message', (worker, message) => {
        if (message.type === 'READY') {
          readyWorkers++;
          if (readyWorkers === 4) {
            // スループットテスト開始
            runThroughputTest().then(resolve);
          }
        }
      });
    } else {
      // ワーカープロセス
      const server = http.createServer((req, res) => {
        setTimeout(() => {
          res.writeHead(200, {
            'Content-Type': 'application/json',
          });
          res.end(
            JSON.stringify({
              worker: process.pid,
              timestamp: Date.now(),
            })
          );
        }, 10);
      });

      server.listen(3000, () => {
        process.send({ type: 'READY' });
      });
    }
  });
}

async function runThroughputTest() {
  const startTime = Date.now();
  const requests = 1000;
  const promises = [];

  for (let i = 0; i < requests; i++) {
    const promise = fetch('http://localhost:3000')
      .then((res) => res.json())
      .then((data) => ({
        success: true,
        worker: data.worker,
      }))
      .catch((err) => ({
        success: false,
        error: err.message,
      }));

    promises.push(promise);
  }

  const results = await Promise.all(promises);
  const endTime = Date.now();

  const successful = results.filter(
    (r) => r.success
  ).length;
  const duration = endTime - startTime;
  const rps = (successful / duration) * 1000;

  return {
    method: 'cluster',
    requests,
    successful,
    duration,
    rps: Math.round(rps),
  };
}

// 統合スループットテスト
async function runThroughputComparison() {
  console.log('=== スループット比較テスト ===');

  console.log('\nWorker Threads テスト中...');
  const workerThreadsResult =
    await testWorkerThreadsThroughput();
  console.log('Worker Threads 結果:', workerThreadsResult);

  console.log('\nCluster テスト中...');
  const clusterResult = await testClusterThroughput();
  console.log('Cluster 結果:', clusterResult);

  console.log('\n=== 比較結果 ===');
  console.log(
    `Worker Threads: ${workerThreadsResult.rps} RPS`
  );
  console.log(`Cluster: ${clusterResult.rps} RPS`);
  console.log(
    `差: ${Math.abs(
      workerThreadsResult.rps - clusterResult.rps
    )} RPS`
  );
}

まとめ

Node.js のマルチスレッド処理技術であるworker_threadsclusterは、それぞれ異なる特性と適用場面を持っています。

適切な選択の重要性

技術選択は、アプリケーションの成功に直結します。間違った選択は、性能問題やメンテナンス性の低下を招く可能性があります。

worker_threads を選ぶべき場面:

  • CPU 集約型の計算処理
  • メモリ使用量を最小限に抑えたい場合
  • 開発・デバッグの容易さを重視する場合
  • 軽量な並列処理が必要な場合

cluster を選ぶべき場面:

  • Web サーバーの負荷分散
  • 高可用性が求められるアプリケーション
  • プロセスレベルの分離が必要な場合
  • 長時間実行されるバッチ処理

実際の開発では、これらの技術を組み合わせることで、より効果的なソリューションを構築できます。例えば、Web サーバーはclusterで負荷分散し、その中で CPU 集約型処理はworker_threadsで並列化するといったアプローチです。

今後の発展方向性

Node.js の並列処理技術は、今後も進化し続けるでしょう。worker_threadsの機能拡張や、新しい並列処理 API の登場が期待されます。

重要なのは、これらの技術を理解し、適切な場面で使い分けることです。技術の特性を理解することで、より良いアプリケーションを構築できるようになります。

実際のプロジェクトでは、まずはシンプルな実装から始めて、必要に応じて段階的に最適化していくことをお勧めします。パフォーマンステストを定期的に実行し、アプリケーションの特性に合わせて技術選択を見直すことが重要です。

関連リンク