Node.js マルチスレッド処理:worker_threads と cluster の使い分け

Node.js は「シングルスレッド」で動作するという特徴があります。この特性は多くの場合、開発をシンプルにし、デバッグを容易にしてくれます。しかし、CPU 集約型の処理や大量のリクエストを扱う際には、この特性が性能のボトルネックとなることがあります。
実際の開発現場では、「この処理を並列化したい」「サーバーの性能を向上させたい」という課題に直面することが多いでしょう。そんな時に登場するのが、Node.js のマルチスレッド処理技術です。
本記事では、Node.js で並列処理を実現する 2 つの主要なアプローチであるworker_threads
とcluster
について、実践的な観点から詳しく解説します。どちらを選ぶべきか、どのような場面で効果的なのかを、実際のコード例と共に理解していきましょう。
Node.js のスレッドモデル
イベントループの仕組み
Node.js の心臓部であるイベントループは、シングルスレッドで動作します。この仕組みにより、非同期処理を効率的に扱うことができます。
javascript// イベントループの基本的な動作例
console.log('1. 開始');
setTimeout(() => {
console.log('3. 非同期処理完了');
}, 0);
console.log('2. 即座に実行');
// 出力順序:
// 1. 開始
// 2. 即座に実行
// 3. 非同期処理完了
このコードは、イベントループがどのように非同期処理を管理しているかを示しています。setTimeout
は非同期処理として扱われ、メインスレッドをブロックすることなく実行されます。
シングルスレッドの利点と課題
シングルスレッドモデルには、明確な利点と課題があります。
利点:
- デバッグが容易(競合状態が発生しにくい)
- メモリ使用量が少ない
- コンテキストスイッチングのオーバーヘッドがない
課題:
- CPU 集約型処理でメインスレッドがブロックされる
- マルチコア CPU の性能を活かせない
- 一つの処理が全体の性能に影響する
実際に問題となる場面を見てみましょう:
javascript// CPU集約型処理の例 - 問題のあるコード
function heavyCalculation(n) {
let result = 0;
for (let i = 0; i < n; i++) {
result += Math.sqrt(i) * Math.sin(i);
}
return result;
}
// この処理はメインスレッドをブロックする
console.log('処理開始');
const result = heavyCalculation(10000000); // 数秒間ブロック
console.log('処理完了:', result);
このような処理が実行されると、Web サーバーであれば他のリクエストが処理できなくなってしまいます。
CPU 集約型処理での性能問題
CPU 集約型処理は、Node.js のシングルスレッド特性と最も相性が悪い処理です。画像処理、暗号化、データ分析などが該当します。
javascript// 画像処理の例(実際のエラーが発生しやすい場面)
const sharp = require('sharp');
app.post('/process-image', async (req, res) => {
try {
// 大量の画像処理を行うとメインスレッドがブロックされる
const processedImage = await sharp(req.file.buffer)
.resize(800, 600)
.jpeg({ quality: 90 })
.toBuffer();
res.json({ success: true });
} catch (error) {
// エラー: ENOMEM (メモリ不足) や ETIMEDOUT (タイムアウト) が発生
console.error('画像処理エラー:', error.message);
res.status(500).json({ error: error.message });
}
});
このような場面で、マルチスレッド処理の導入を検討することになります。
worker_threads の基本概念
ワーカースレッドとは
worker_threads
は、Node.js v10.5.0 から導入された、メインスレッドとは別のスレッドで JavaScript コードを実行できるモジュールです。
javascript// worker_threadsの基本的な使用例
const {
Worker,
isMainThread,
parentPort,
workerData,
} = require('worker_threads');
if (isMainThread) {
// メインスレッド側のコード
const worker = new Worker(__filename, {
workerData: { number: 1000000 },
});
worker.on('message', (result) => {
console.log('計算結果:', result);
});
worker.on('error', (error) => {
console.error('ワーカーエラー:', error);
});
} else {
// ワーカースレッド側のコード
const { number } = workerData;
const result = heavyCalculation(number);
parentPort.postMessage(result);
}
メインスレッドとの関係
ワーカースレッドは、メインスレッドと独立して動作しますが、メッセージングを通じて通信します。
javascript// メインスレッドとワーカースレッドの通信例
const { Worker } = require('worker_threads');
// メインスレッド側
const worker = new Worker('./worker.js');
// ワーカーにデータを送信
worker.postMessage({
type: 'CALCULATE',
data: [1, 2, 3, 4, 5],
});
// ワーカーからの結果を受信
worker.on('message', (result) => {
console.log('受信した結果:', result);
});
// エラーハンドリング
worker.on('error', (error) => {
console.error('ワーカーエラー:', error);
// エラー: Worker terminated unexpectedly
});
javascript// worker.js - ワーカースレッド側
const { parentPort } = require('worker_threads');
parentPort.on('message', (message) => {
if (message.type === 'CALCULATE') {
const result = message.data.reduce(
(sum, num) => sum + num,
0
);
parentPort.postMessage({
type: 'RESULT',
data: result,
});
}
});
適用場面と特徴
worker_threads
は以下の場面で特に効果的です:
適用場面:
- CPU 集約型の計算処理
- 画像・動画処理
- 暗号化処理
- データ変換・加工
特徴:
- メモリを共有できる(SharedArrayBuffer)
- 軽量なスレッド切り替え
- メインスレッドをブロックしない
javascript// CPU集約型処理の並列化例
const { Worker } = require('worker_threads');
function runParallelCalculations(numbers) {
const workers = [];
const results = [];
// 各数値に対してワーカーを作成
numbers.forEach((number, index) => {
const worker = new Worker('./calculation-worker.js', {
workerData: { number, index },
});
worker.on('message', (result) => {
results[result.index] = result.value;
// 全ての計算が完了したかチェック
if (
results.filter((r) => r !== undefined).length ===
numbers.length
) {
console.log('全計算完了:', results);
}
});
workers.push(worker);
});
}
cluster の基本概念
クラスターモジュールとは
cluster
モジュールは、Node.js アプリケーションを複数のプロセスで実行するための機能です。各プロセスは独立した V8 インスタンスで動作します。
javascript// clusterの基本的な使用例
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(
`マスタープロセス ${process.pid} が起動しました`
);
// ワーカープロセスをフォーク
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(
`ワーカー ${worker.process.pid} が終了しました`
);
// 新しいワーカーを起動(フォールトトレランス)
cluster.fork();
});
} else {
// ワーカープロセス
http
.createServer((req, res) => {
res.writeHead(200);
res.end(`ワーカー ${process.pid} が処理しました`);
})
.listen(8000);
console.log(
`ワーカープロセス ${process.pid} が起動しました`
);
}
プロセスベースの並列処理
cluster
はプロセスレベルでの並列処理を提供します。各プロセスは独立したメモリ空間を持ちます。
javascript// プロセス間の独立性を示す例
const cluster = require('cluster');
if (cluster.isMaster) {
// マスタープロセス
const worker1 = cluster.fork();
const worker2 = cluster.fork();
// 各ワーカーに異なるデータを送信
worker1.send({ type: 'SET_DATA', data: 'Worker 1 Data' });
worker2.send({ type: 'SET_DATA', data: 'Worker 2 Data' });
// 結果を受信
worker1.on('message', (msg) => {
console.log('Worker 1 結果:', msg);
});
worker2.on('message', (msg) => {
console.log('Worker 2 結果:', msg);
});
} else {
// ワーカープロセス
let workerData = null;
process.on('message', (msg) => {
if (msg.type === 'SET_DATA') {
workerData = msg.data;
process.send({
pid: process.pid,
data: workerData,
memory: process.memoryUsage(),
});
}
});
}
適用場面と特徴
cluster
は以下の場面で特に効果的です:
適用場面:
- Web サーバーの負荷分散
- マイクロサービスアーキテクチャ
- 長時間実行されるバッチ処理
- 高可用性が求められるアプリケーション
特徴:
- プロセスレベルの分離(メモリ、CPU、ファイルディスクリプタ)
- フォールトトレランス(一つのプロセスが落ちても他は継続)
- ロードバランシング機能
javascript// Webサーバーの負荷分散例
const cluster = require('cluster');
const express = require('express');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`マスタープロセス ${process.pid} が起動`);
// CPUコア数分のワーカーを起動
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// ワーカーの死活監視
cluster.on('exit', (worker, code, signal) => {
console.log(
`ワーカー ${worker.process.pid} が終了 (${
signal || code
})`
);
// 自動的に新しいワーカーを起動
cluster.fork();
});
} else {
// ワーカープロセス
const app = express();
app.get('/', (req, res) => {
// 重い処理をシミュレート
const start = Date.now();
while (Date.now() - start < 100) {
// 100msの処理
}
res.json({
message: 'Hello from cluster!',
worker: process.pid,
uptime: process.uptime(),
});
});
app.listen(3000, () => {
console.log(
`ワーカー ${process.pid} がポート 3000 で起動`
);
});
}
worker_threads vs cluster:詳細比較
アーキテクチャの違い
worker_threads
とcluster
は、根本的に異なるアーキテクチャを採用しています。
worker_threads(スレッドベース):
- 同一プロセス内で複数のスレッド
- メモリ空間を共有可能
- 軽量なコンテキストスイッチ
cluster(プロセスベース):
- 複数の独立したプロセス
- 完全に分離されたメモリ空間
- 重いプロセス切り替え
javascript// アーキテクチャの違いを実感できる例
const { Worker } = require('worker_threads');
const cluster = require('cluster');
if (cluster.isMaster) {
console.log('=== Cluster モード ===');
console.log('マスタープロセス PID:', process.pid);
// ワーカープロセスを起動
const worker = cluster.fork();
worker.on('message', (msg) => {
console.log('ワーカーからのメッセージ:', msg);
console.log('メモリ使用量:', process.memoryUsage());
});
} else {
// ワーカープロセス
console.log('ワーカープロセス PID:', process.pid);
process.send({
type: 'INFO',
pid: process.pid,
memory: process.memoryUsage(),
});
}
メモリ使用量の比較
メモリ使用量は、アプリケーションの設計において重要な考慮事項です。
javascript// メモリ使用量を比較するベンチマーク
const { Worker } = require('worker_threads');
const cluster = require('cluster');
function measureMemoryUsage() {
const usage = process.memoryUsage();
return {
rss: Math.round(usage.rss / 1024 / 1024), // MB
heapUsed: Math.round(usage.heapUsed / 1024 / 1024), // MB
heapTotal: Math.round(usage.heapTotal / 1024 / 1024), // MB
};
}
// worker_threads のメモリ使用量測定
function testWorkerThreads() {
const workers = [];
const startMemory = measureMemoryUsage();
for (let i = 0; i < 4; i++) {
const worker = new Worker(
`
const { parentPort } = require('worker_threads');
parentPort.on('message', (data) => {
parentPort.postMessage({ result: data * 2 });
});
`,
{ eval: true }
);
workers.push(worker);
}
const endMemory = measureMemoryUsage();
console.log('Worker Threads メモリ使用量:', {
start: startMemory,
end: endMemory,
difference: {
rss: endMemory.rss - startMemory.rss,
heapUsed: endMemory.heapUsed - startMemory.heapUsed,
},
});
workers.forEach((w) => w.terminate());
}
通信オーバーヘッド
通信方式の違いは、パフォーマンスに大きな影響を与えます。
javascript// 通信オーバーヘッドを測定する例
const { Worker } = require('worker_threads');
// worker_threads の通信テスト
function testWorkerCommunication() {
const worker = new Worker(
`
const { parentPort } = require('worker_threads');
parentPort.on('message', (data) => {
parentPort.postMessage({
received: data,
timestamp: Date.now()
});
});
`,
{ eval: true }
);
const startTime = Date.now();
let messageCount = 0;
worker.on('message', (data) => {
messageCount++;
if (messageCount >= 1000) {
const endTime = Date.now();
const duration = endTime - startTime;
console.log(
`Worker Threads 通信: ${messageCount} メッセージを ${duration}ms で処理`
);
worker.terminate();
} else {
worker.postMessage({ count: messageCount });
}
});
worker.postMessage({ count: 0 });
}
エラーハンドリング
エラーハンドリングの仕組みも、両者で大きく異なります。
javascript// worker_threads のエラーハンドリング
const { Worker } = require('worker_threads');
const worker = new Worker(
`
const { parentPort } = require('worker_threads');
// 意図的にエラーを発生させる
setTimeout(() => {
throw new Error('Worker thread error');
}, 1000);
parentPort.on('message', (data) => {
parentPort.postMessage({ result: data * 2 });
});
`,
{ eval: true }
);
worker.on('error', (error) => {
console.error('Worker Threads エラー:', error.message);
// エラー: Worker terminated unexpectedly
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`ワーカーが異常終了しました: ${code}`);
}
});
javascript// cluster のエラーハンドリング
const cluster = require('cluster');
if (cluster.isMaster) {
const worker = cluster.fork();
worker.on('exit', (code, signal) => {
if (code !== 0) {
console.error(
`ワーカープロセスが異常終了: ${code} (${signal})`
);
// 新しいワーカーを自動起動
cluster.fork();
}
});
worker.on('disconnect', () => {
console.log('ワーカーが切断されました');
});
} else {
// ワーカープロセスでエラーを発生
setTimeout(() => {
process.exit(1); // プロセスを強制終了
}, 1000);
}
使い分けの判断基準
CPU 集約型処理の場合
CPU 集約型処理では、worker_threads
が適しています。メモリ共有が可能で、通信オーバーヘッドが少ないためです。
javascript// CPU集約型処理の実装例
const { Worker } = require('worker_threads');
function processCPUIntensiveTask(data) {
return new Promise((resolve, reject) => {
const worker = new Worker(
`
const { parentPort } = require('worker_threads');
function fibonacci(n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
parentPort.on('message', (data) => {
const result = fibonacci(data.n);
parentPort.postMessage({ result });
});
`,
{ eval: true }
);
worker.on('message', (result) => {
resolve(result.result);
worker.terminate();
});
worker.on('error', reject);
worker.postMessage(data);
});
}
// 使用例
async function example() {
try {
const result = await processCPUIntensiveTask({ n: 40 });
console.log('フィボナッチ結果:', result);
} catch (error) {
console.error('計算エラー:', error);
}
}
I/O 集約型処理の場合
I/O 集約型処理では、cluster
が適しています。各プロセスが独立して I/O 処理を行えるためです。
javascript// I/O集約型処理の実装例
const cluster = require('cluster');
const fs = require('fs').promises;
const path = require('path');
if (cluster.isMaster) {
console.log('マスタープロセスが起動');
// 複数のワーカーでファイル処理を並列実行
const files = [
'file1.txt',
'file2.txt',
'file3.txt',
'file4.txt',
];
files.forEach((file, index) => {
const worker = cluster.fork();
worker.send({ file, index });
worker.on('message', (result) => {
console.log(
`ファイル ${result.file} の処理完了:`,
result.size
);
});
});
} else {
process.on('message', async (data) => {
try {
const filePath = path.join(__dirname, data.file);
const content = await fs.readFile(filePath, 'utf8');
const size = content.length;
process.send({
file: data.file,
size,
worker: process.pid,
});
} catch (error) {
console.error(`ファイル処理エラー: ${error.message}`);
process.exit(1);
}
});
}
メモリ使用量を重視する場合
メモリ使用量を重視する場合は、worker_threads
が有利です。メモリ共有により、全体のメモリ使用量を削減できます。
javascript// メモリ効率的な実装例
const {
Worker,
SharedArrayBuffer,
} = require('worker_threads');
function processWithSharedMemory(dataSize) {
// 共有メモリを作成
const sharedBuffer = new SharedArrayBuffer(dataSize * 4); // 4バイト整数
const sharedArray = new Int32Array(sharedBuffer);
// データを初期化
for (let i = 0; i < dataSize; i++) {
sharedArray[i] = i;
}
const worker = new Worker(
`
const { parentPort, workerData } = require('worker_threads');
const sharedArray = new Int32Array(workerData.sharedBuffer);
// 共有メモリ上のデータを処理
let sum = 0;
for (let i = 0; i < sharedArray.length; i++) {
sum += sharedArray[i];
}
parentPort.postMessage({ sum });
`,
{
workerData: { sharedBuffer },
}
);
return new Promise((resolve) => {
worker.on('message', (result) => {
resolve(result.sum);
worker.terminate();
});
});
}
開発・デバッグの容易さ
開発・デバッグの観点では、worker_threads
の方が有利です。同一プロセス内で動作するため、デバッガーが使いやすくなります。
javascript// デバッグしやすい worker_threads の実装
const { Worker } = require('worker_threads');
function debugFriendlyWorker() {
const worker = new Worker(
`
const { parentPort } = require('worker_threads');
// デバッグ用のログ出力
console.log('ワーカースレッドが起動しました');
parentPort.on('message', (data) => {
console.log('ワーカーがメッセージを受信:', data);
try {
const result = processData(data);
console.log('処理結果:', result);
parentPort.postMessage({ success: true, result });
} catch (error) {
console.error('ワーカーでエラーが発生:', error);
parentPort.postMessage({ success: false, error: error.message });
}
});
function processData(data) {
// 実際の処理ロジック
return data.map(x => x * 2);
}
`,
{ eval: true }
);
worker.on('message', (result) => {
if (result.success) {
console.log('処理成功:', result.result);
} else {
console.error('処理失敗:', result.error);
}
});
worker.on('error', (error) => {
console.error('ワーカーエラー:', error);
});
return worker;
}
実装例:worker_threads
基本的な実装パターン
worker_threads
の基本的な実装パターンを理解しましょう。
javascript// 基本的なワーカーの実装
const {
Worker,
isMainThread,
parentPort,
workerData,
} = require('worker_threads');
if (isMainThread) {
// メインスレッド側
function createWorker(data) {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename, {
workerData: data,
});
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`ワーカーが異常終了: ${code}`));
}
});
});
}
// 使用例
async function main() {
try {
const result = await createWorker({
numbers: [1, 2, 3, 4, 5],
});
console.log('計算結果:', result);
} catch (error) {
console.error('エラー:', error);
}
}
main();
} else {
// ワーカースレッド側
const { numbers } = workerData;
const sum = numbers.reduce((a, b) => a + b, 0);
parentPort.postMessage({ sum, count: numbers.length });
}
データの受け渡し方法
ワーカースレッドとのデータの受け渡しには、複数の方法があります。
javascript// メッセージングによるデータ受け渡し
const { Worker } = require('worker_threads');
const worker = new Worker(
`
const { parentPort } = require('worker_threads');
parentPort.on('message', (data) => {
switch (data.type) {
case 'CALCULATE':
const result = data.numbers.reduce((sum, num) => sum + num, 0);
parentPort.postMessage({ type: 'RESULT', data: result });
break;
case 'PROCESS_STRING':
const processed = data.text.toUpperCase();
parentPort.postMessage({ type: 'PROCESSED', data: processed });
break;
case 'EXIT':
parentPort.close();
break;
}
});
`,
{ eval: true }
);
// メインスレッドからデータを送信
worker.postMessage({
type: 'CALCULATE',
numbers: [1, 2, 3, 4, 5],
});
worker.on('message', (result) => {
console.log('受信:', result);
});
エラーハンドリング
ワーカースレッドでのエラーハンドリングは、アプリケーションの安定性に重要です。
javascript// 包括的なエラーハンドリング
const { Worker } = require('worker_threads');
class WorkerManager {
constructor() {
this.workers = new Map();
this.retryCount = new Map();
}
createWorker(taskId, workerScript) {
const worker = new Worker(workerScript, { eval: true });
// エラーハンドリング
worker.on('error', (error) => {
console.error(`ワーカー ${taskId} でエラー:`, error);
this.handleWorkerError(taskId, worker, error);
});
worker.on('exit', (code, signal) => {
if (code !== 0) {
console.error(
`ワーカー ${taskId} が異常終了: ${code} (${signal})`
);
this.handleWorkerExit(taskId, worker, code, signal);
}
});
worker.on('message', (message) => {
this.handleWorkerMessage(taskId, message);
});
this.workers.set(taskId, worker);
this.retryCount.set(taskId, 0);
return worker;
}
handleWorkerError(taskId, worker, error) {
const retries = this.retryCount.get(taskId) || 0;
if (retries < 3) {
console.log(
`ワーカー ${taskId} を再起動します (${
retries + 1
}/3)`
);
this.retryCount.set(taskId, retries + 1);
worker.terminate();
// 新しいワーカーを作成
this.createWorker(taskId, worker.workerData);
} else {
console.error(
`ワーカー ${taskId} の再試行回数が上限に達しました`
);
this.workers.delete(taskId);
}
}
handleWorkerExit(taskId, worker, code, signal) {
// クリーンアップ処理
this.workers.delete(taskId);
this.retryCount.delete(taskId);
}
handleWorkerMessage(taskId, message) {
console.log(
`ワーカー ${taskId} からのメッセージ:`,
message
);
}
terminateAll() {
for (const [taskId, worker] of this.workers) {
worker.terminate();
}
this.workers.clear();
this.retryCount.clear();
}
}
実装例:cluster
基本的な実装パターン
cluster
の基本的な実装パターンを理解しましょう。
javascript// 基本的なクラスターの実装
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`マスタープロセス ${process.pid} が起動`);
// ワーカープロセスを起動
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// ワーカーの死活監視
cluster.on('exit', (worker, code, signal) => {
console.log(
`ワーカー ${worker.process.pid} が終了 (${
signal || code
})`
);
// 新しいワーカーを起動(フォールトトレランス)
if (code !== 0 && !worker.exitedAfterDisconnect) {
console.log('新しいワーカーを起動します');
cluster.fork();
}
});
// グレースフルシャットダウン
process.on('SIGTERM', () => {
console.log(
'マスタープロセスがシャットダウンシグナルを受信'
);
for (const id in cluster.workers) {
cluster.workers[id].kill();
}
});
} else {
// ワーカープロセス
const server = http.createServer((req, res) => {
res.writeHead(200, {
'Content-Type': 'application/json',
});
res.end(
JSON.stringify({
message: 'Hello from cluster!',
worker: process.pid,
uptime: process.uptime(),
memory: process.memoryUsage(),
})
);
});
server.listen(3000, () => {
console.log(
`ワーカー ${process.pid} がポート 3000 で起動`
);
});
// ワーカープロセスのグレースフルシャットダウン
process.on('SIGTERM', () => {
console.log(
`ワーカー ${process.pid} がシャットダウンシグナルを受信`
);
server.close(() => {
console.log(`ワーカー ${process.pid} が正常終了`);
process.exit(0);
});
});
}
ロードバランシング
cluster
モジュールは、自動的にロードバランシングを行います。
javascript// カスタムロードバランシングの実装
const cluster = require('cluster');
const http = require('http');
const net = require('net');
if (cluster.isMaster) {
// マスタープロセスでロードバランサーを作成
const workers = [];
const numCPUs = require('os').cpus().length;
// ワーカープロセスを起動
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// ラウンドロビン方式のロードバランシング
let currentWorker = 0;
const server = net.createServer((connection) => {
const worker = workers[currentWorker];
// 接続をワーカーに転送
worker.send('sticky-session:connection', connection);
// 次のワーカーを選択
currentWorker = (currentWorker + 1) % workers.length;
});
server.listen(3000, () => {
console.log('ロードバランサーがポート 3000 で起動');
});
// ワーカーの死活監視
cluster.on('exit', (worker, code, signal) => {
console.log(`ワーカー ${worker.process.pid} が終了`);
// 終了したワーカーを配列から削除
const index = workers.indexOf(worker);
if (index > -1) {
workers.splice(index, 1);
}
// 新しいワーカーを起動
const newWorker = cluster.fork();
workers.push(newWorker);
// currentWorkerを調整
if (index <= currentWorker) {
currentWorker = Math.max(0, currentWorker - 1);
}
});
} else {
// ワーカープロセス
const http = require('http');
const server = http.createServer((req, res) => {
// 重い処理をシミュレート
const start = Date.now();
while (Date.now() - start < Math.random() * 100) {
// ランダムな処理時間
}
res.writeHead(200, {
'Content-Type': 'application/json',
});
res.end(
JSON.stringify({
worker: process.pid,
requestTime: Date.now() - start,
memory: process.memoryUsage(),
})
);
});
// マスターからの接続を受信
process.on('message', (message, connection) => {
if (message === 'sticky-session:connection') {
server.emit('connection', connection);
}
});
console.log(`ワーカー ${process.pid} が起動`);
}
プロセス管理
プロセス管理は、クラスターアプリケーションの重要な要素です。
javascript// 高度なプロセス管理の実装
const cluster = require('cluster');
const http = require('http');
class ClusterManager {
constructor(options = {}) {
this.numWorkers =
options.numWorkers || require('os').cpus().length;
this.workers = new Map();
this.restartCount = new Map();
this.maxRestarts = options.maxRestarts || 5;
this.restartDelay = options.restartDelay || 1000;
}
start() {
if (cluster.isMaster) {
console.log(`マスタープロセス ${process.pid} が起動`);
// 初期ワーカーを起動
for (let i = 0; i < this.numWorkers; i++) {
this.forkWorker();
}
// イベントハンドラーを設定
this.setupEventHandlers();
// ヘルスチェックを開始
this.startHealthCheck();
} else {
this.startWorker();
}
}
forkWorker() {
const worker = cluster.fork();
const workerId = worker.id;
this.workers.set(workerId, worker);
this.restartCount.set(workerId, 0);
console.log(
`ワーカー ${workerId} (PID: ${worker.process.pid}) を起動`
);
return worker;
}
setupEventHandlers() {
cluster.on('exit', (worker, code, signal) => {
const workerId = worker.id;
const restartCount =
this.restartCount.get(workerId) || 0;
console.log(
`ワーカー ${workerId} (PID: ${
worker.process.pid
}) が終了: ${signal || code}`
);
this.workers.delete(workerId);
if (
restartCount < this.maxRestarts &&
!worker.exitedAfterDisconnect
) {
console.log(
`${this.restartDelay}ms 後にワーカー ${workerId} を再起動します`
);
setTimeout(() => {
this.restartCount.set(workerId, restartCount + 1);
this.forkWorker();
}, this.restartDelay);
} else {
console.error(
`ワーカー ${workerId} の再起動回数が上限に達しました`
);
this.restartCount.delete(workerId);
}
});
cluster.on('disconnect', (worker) => {
console.log(`ワーカー ${worker.id} が切断されました`);
});
}
startHealthCheck() {
setInterval(() => {
for (const [workerId, worker] of this.workers) {
// ワーカーの応答をチェック
worker.send('health-check');
}
}, 30000); // 30秒ごと
}
startWorker() {
const server = http.createServer((req, res) => {
res.writeHead(200, {
'Content-Type': 'application/json',
});
res.end(
JSON.stringify({
worker: process.pid,
uptime: process.uptime(),
memory: process.memoryUsage(),
})
);
});
server.listen(3000, () => {
console.log(
`ワーカー ${process.pid} がポート 3000 で起動`
);
});
// ヘルスチェックに応答
process.on('message', (message) => {
if (message === 'health-check') {
process.send('health-ok');
}
});
// グレースフルシャットダウン
process.on('SIGTERM', () => {
console.log(
`ワーカー ${process.pid} がシャットダウンシグナルを受信`
);
server.close(() => {
console.log(`ワーカー ${process.pid} が正常終了`);
process.exit(0);
});
});
}
gracefulShutdown() {
console.log('グレースフルシャットダウンを開始');
for (const [workerId, worker] of this.workers) {
worker.disconnect();
}
setTimeout(() => {
console.log('強制終了を実行');
for (const [workerId, worker] of this.workers) {
worker.kill();
}
process.exit(0);
}, 10000); // 10秒後に強制終了
}
}
// 使用例
if (cluster.isMaster) {
const manager = new ClusterManager({
numWorkers: 4,
maxRestarts: 3,
restartDelay: 2000,
});
manager.start();
// シャットダウンシグナルの処理
process.on('SIGTERM', () => {
manager.gracefulShutdown();
});
}
パフォーマンステストとベンチマーク
実際の性能比較
実際のアプリケーションでの性能を比較してみましょう。
javascript// 性能比較テスト
const { Worker } = require('worker_threads');
const cluster = require('cluster');
const http = require('http');
const { performance } = require('perf_hooks');
// テスト用の重い処理
function heavyTask(iterations) {
let result = 0;
for (let i = 0; i < iterations; i++) {
result += Math.sqrt(i) * Math.sin(i);
}
return result;
}
// worker_threads の性能テスト
async function testWorkerThreads() {
const startTime = performance.now();
const promises = [];
for (let i = 0; i < 4; i++) {
const promise = new Promise((resolve) => {
const worker = new Worker(
`
const { parentPort, workerData } = require('worker_threads');
function heavyTask(iterations) {
let result = 0;
for (let i = 0; i < iterations; i++) {
result += Math.sqrt(i) * Math.sin(i);
}
return result;
}
const result = heavyTask(workerData.iterations);
parentPort.postMessage(result);
`,
{
workerData: { iterations: 1000000 },
eval: true,
}
);
worker.on('message', (result) => {
resolve(result);
worker.terminate();
});
});
promises.push(promise);
}
const results = await Promise.all(promises);
const endTime = performance.now();
return {
method: 'worker_threads',
duration: endTime - startTime,
results: results.length,
};
}
// cluster の性能テスト
function testCluster() {
return new Promise((resolve) => {
if (cluster.isMaster) {
const startTime = performance.now();
const workers = [];
let completedWorkers = 0;
for (let i = 0; i < 4; i++) {
const worker = cluster.fork();
workers.push(worker);
worker.on('message', (result) => {
completedWorkers++;
if (completedWorkers === 4) {
const endTime = performance.now();
// 全ワーカーを終了
workers.forEach((w) => w.kill());
resolve({
method: 'cluster',
duration: endTime - startTime,
results: completedWorkers,
});
}
});
}
} else {
// ワーカープロセス
const result = heavyTask(1000000);
process.send(result);
}
});
}
// メモリ使用量の測定
function measureMemoryUsage() {
const usage = process.memoryUsage();
return {
rss: Math.round(usage.rss / 1024 / 1024), // MB
heapUsed: Math.round(usage.heapUsed / 1024 / 1024), // MB
heapTotal: Math.round(usage.heapTotal / 1024 / 1024), // MB
};
}
// 統合テスト
async function runPerformanceTest() {
console.log('=== 性能比較テスト開始 ===');
// メモリ使用量の初期測定
const initialMemory = measureMemoryUsage();
console.log('初期メモリ使用量:', initialMemory);
// worker_threads テスト
const workerThreadsResult = await testWorkerThreads();
console.log('Worker Threads 結果:', workerThreadsResult);
// 少し待機
await new Promise((resolve) => setTimeout(resolve, 1000));
// cluster テスト
const clusterResult = await testCluster();
console.log('Cluster 結果:', clusterResult);
// 最終メモリ使用量
const finalMemory = measureMemoryUsage();
console.log('最終メモリ使用量:', finalMemory);
// 結果の比較
console.log('\n=== 結果比較 ===');
console.log(
`Worker Threads: ${workerThreadsResult.duration.toFixed(
2
)}ms`
);
console.log(
`Cluster: ${clusterResult.duration.toFixed(2)}ms`
);
console.log(
`メモリ増加: ${finalMemory.rss - initialMemory.rss}MB`
);
}
メモリ使用量の測定
メモリ使用量は、アプリケーションの設計において重要な指標です。
javascript// メモリ使用量の詳細測定
const { Worker } = require('worker_threads');
const cluster = require('cluster');
class MemoryProfiler {
constructor() {
this.measurements = [];
}
measure(label) {
const usage = process.memoryUsage();
const measurement = {
label,
timestamp: Date.now(),
rss: Math.round(usage.rss / 1024 / 1024),
heapUsed: Math.round(usage.heapUsed / 1024 / 1024),
heapTotal: Math.round(usage.heapTotal / 1024 / 1024),
external: Math.round(usage.external / 1024 / 1024),
};
this.measurements.push(measurement);
console.log(`${label}:`, measurement);
return measurement;
}
compare(label1, label2) {
const m1 = this.measurements.find(
(m) => m.label === label1
);
const m2 = this.measurements.find(
(m) => m.label === label2
);
if (m1 && m2) {
const diff = {
rss: m2.rss - m1.rss,
heapUsed: m2.heapUsed - m1.heapUsed,
heapTotal: m2.heapTotal - m1.heapTotal,
external: m2.external - m1.external,
};
console.log(`\n${label1} → ${label2} の差分:`, diff);
return diff;
}
}
generateReport() {
console.log('\n=== メモリ使用量レポート ===');
this.measurements.forEach((m, index) => {
if (index > 0) {
const prev = this.measurements[index - 1];
const diff = {
rss: m.rss - prev.rss,
heapUsed: m.heapUsed - prev.heapUsed,
};
console.log(
`${prev.label} → ${m.label}: +${diff.rss}MB RSS, +${diff.heapUsed}MB Heap`
);
}
});
}
}
// メモリプロファイリングの実行
async function profileMemoryUsage() {
const profiler = new MemoryProfiler();
profiler.measure('初期状態');
// worker_threads のメモリ測定
const workers = [];
for (let i = 0; i < 4; i++) {
const worker = new Worker(
`
const { parentPort } = require('worker_threads');
parentPort.on('message', (data) => {
parentPort.postMessage({ processed: data.length });
});
`,
{ eval: true }
);
workers.push(worker);
}
profiler.measure('Worker Threads 作成後');
// ワーカーを終了
workers.forEach((w) => w.terminate());
await new Promise((resolve) => setTimeout(resolve, 1000));
profiler.measure('Worker Threads 終了後');
// cluster のメモリ測定
if (cluster.isMaster) {
const clusterWorkers = [];
for (let i = 0; i < 4; i++) {
const worker = cluster.fork();
clusterWorkers.push(worker);
}
profiler.measure('Cluster 作成後');
// ワーカーを終了
clusterWorkers.forEach((w) => w.kill());
await new Promise((resolve) =>
setTimeout(resolve, 1000)
);
profiler.measure('Cluster 終了後');
profiler.generateReport();
}
}
スループットの比較
スループットは、実際のアプリケーションでの重要な性能指標です。
javascript// スループット比較テスト
const { Worker } = require('worker_threads');
const cluster = require('cluster');
const http = require('http');
// HTTP リクエストのスループットテスト
function createHttpServer(port, responseTime = 0) {
return http
.createServer((req, res) => {
// 応答時間をシミュレート
setTimeout(() => {
res.writeHead(200, {
'Content-Type': 'application/json',
});
res.end(
JSON.stringify({
worker: process.pid,
timestamp: Date.now(),
memory: process.memoryUsage(),
})
);
}, responseTime);
})
.listen(port);
}
// worker_threads でのスループットテスト
async function testWorkerThreadsThroughput() {
const workers = [];
const results = [];
// 4つのワーカーを作成
for (let i = 0; i < 4; i++) {
const worker = new Worker(
`
const http = require('http');
const server = http.createServer((req, res) => {
setTimeout(() => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
worker: process.pid,
timestamp: Date.now()
}));
}, 10); // 10ms の処理時間
});
server.listen(0, () => {
const port = server.address().port;
process.send({ type: 'READY', port });
});
`,
{ eval: true }
);
workers.push(worker);
}
// 全ワーカーが準備完了するまで待機
const ports = await Promise.all(
workers.map((worker) => {
return new Promise((resolve) => {
worker.on('message', (message) => {
if (message.type === 'READY') {
resolve(message.port);
}
});
});
})
);
// スループットテスト実行
const startTime = Date.now();
const requests = 1000;
const promises = [];
for (let i = 0; i < requests; i++) {
const port = ports[i % ports.length];
const promise = fetch(`http://localhost:${port}`)
.then((res) => res.json())
.then((data) => ({
success: true,
worker: data.worker,
}))
.catch((err) => ({
success: false,
error: err.message,
}));
promises.push(promise);
}
const results = await Promise.all(promises);
const endTime = Date.now();
// ワーカーを終了
workers.forEach((w) => w.terminate());
const successful = results.filter(
(r) => r.success
).length;
const duration = endTime - startTime;
const rps = (successful / duration) * 1000; // リクエスト/秒
return {
method: 'worker_threads',
requests,
successful,
duration,
rps: Math.round(rps),
};
}
// cluster でのスループットテスト
function testClusterThroughput() {
return new Promise((resolve) => {
if (cluster.isMaster) {
const workers = [];
// 4つのワーカープロセスを起動
for (let i = 0; i < 4; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// 全ワーカーが準備完了するまで待機
let readyWorkers = 0;
cluster.on('message', (worker, message) => {
if (message.type === 'READY') {
readyWorkers++;
if (readyWorkers === 4) {
// スループットテスト開始
runThroughputTest().then(resolve);
}
}
});
} else {
// ワーカープロセス
const server = http.createServer((req, res) => {
setTimeout(() => {
res.writeHead(200, {
'Content-Type': 'application/json',
});
res.end(
JSON.stringify({
worker: process.pid,
timestamp: Date.now(),
})
);
}, 10);
});
server.listen(3000, () => {
process.send({ type: 'READY' });
});
}
});
}
async function runThroughputTest() {
const startTime = Date.now();
const requests = 1000;
const promises = [];
for (let i = 0; i < requests; i++) {
const promise = fetch('http://localhost:3000')
.then((res) => res.json())
.then((data) => ({
success: true,
worker: data.worker,
}))
.catch((err) => ({
success: false,
error: err.message,
}));
promises.push(promise);
}
const results = await Promise.all(promises);
const endTime = Date.now();
const successful = results.filter(
(r) => r.success
).length;
const duration = endTime - startTime;
const rps = (successful / duration) * 1000;
return {
method: 'cluster',
requests,
successful,
duration,
rps: Math.round(rps),
};
}
// 統合スループットテスト
async function runThroughputComparison() {
console.log('=== スループット比較テスト ===');
console.log('\nWorker Threads テスト中...');
const workerThreadsResult =
await testWorkerThreadsThroughput();
console.log('Worker Threads 結果:', workerThreadsResult);
console.log('\nCluster テスト中...');
const clusterResult = await testClusterThroughput();
console.log('Cluster 結果:', clusterResult);
console.log('\n=== 比較結果 ===');
console.log(
`Worker Threads: ${workerThreadsResult.rps} RPS`
);
console.log(`Cluster: ${clusterResult.rps} RPS`);
console.log(
`差: ${Math.abs(
workerThreadsResult.rps - clusterResult.rps
)} RPS`
);
}
まとめ
Node.js のマルチスレッド処理技術であるworker_threads
とcluster
は、それぞれ異なる特性と適用場面を持っています。
適切な選択の重要性
技術選択は、アプリケーションの成功に直結します。間違った選択は、性能問題やメンテナンス性の低下を招く可能性があります。
worker_threads を選ぶべき場面:
- CPU 集約型の計算処理
- メモリ使用量を最小限に抑えたい場合
- 開発・デバッグの容易さを重視する場合
- 軽量な並列処理が必要な場合
cluster を選ぶべき場面:
- Web サーバーの負荷分散
- 高可用性が求められるアプリケーション
- プロセスレベルの分離が必要な場合
- 長時間実行されるバッチ処理
実際の開発では、これらの技術を組み合わせることで、より効果的なソリューションを構築できます。例えば、Web サーバーはcluster
で負荷分散し、その中で CPU 集約型処理はworker_threads
で並列化するといったアプローチです。
今後の発展方向性
Node.js の並列処理技術は、今後も進化し続けるでしょう。worker_threads
の機能拡張や、新しい並列処理 API の登場が期待されます。
重要なのは、これらの技術を理解し、適切な場面で使い分けることです。技術の特性を理解することで、より良いアプリケーションを構築できるようになります。
実際のプロジェクトでは、まずはシンプルな実装から始めて、必要に応じて段階的に最適化していくことをお勧めします。パフォーマンステストを定期的に実行し、アプリケーションの特性に合わせて技術選択を見直すことが重要です。
関連リンク
- review
今の自分に満足していますか?『持たざる者の逆襲 まだ何者でもない君へ』溝口勇児
- review
ついに語られた業界の裏側!『フジテレビの正体』堀江貴文が描くテレビ局の本当の姿
- review
愛する勇気を持てば人生が変わる!『幸せになる勇気』岸見一郎・古賀史健のアドラー実践編で真の幸福を手に入れる
- review
週末を変えれば年収も変わる!『世界の一流は「休日」に何をしているのか』越川慎司の一流週末メソッド
- review
新しい自分に会いに行こう!『自分の変え方』村岡大樹の認知科学コーチングで人生リセット
- review
科学革命から AI 時代へ!『サピエンス全史 下巻』ユヴァル・ノア・ハラリが予見する人類の未来