Node.js シグナルとプロセス間通信(IPC)の仕組み

Node.js アプリケーションを本格的に開発する上で、避けて通れないのがシグナル処理とプロセス間通信(IPC)の理解です。これらの技術は、アプリケーションの安定性、スケーラビリティ、そして運用性を大きく左右する重要な要素です。
本記事では、シグナルと IPC の基礎から実践的な応用まで、段階的に学んでいきます。特に、本番環境での運用を意識した実装例や、よくあるエラーとその解決方法についても詳しく解説します。
シグナルの基礎知識
シグナルとは何か
シグナルは、オペレーティングシステムがプロセスに送る短いメッセージです。プロセスの状態を変更したり、特定の処理を実行するよう指示したりするために使用されます。
日常生活で例えるなら、シグナルは「緊急事態を知らせるサイレン」や「作業を中断するためのベル」のようなものです。プログラムが予期しない状況に遭遇した時、シグナルによって適切に対応できるようになります。
オペレーティングシステムレベルでのシグナルの役割
オペレーティングシステムは、以下のような場面でシグナルを送信します:
- プロセスの強制終了:メモリ不足やシステムクラッシュ時
- ユーザーからの中断要求:Ctrl+C による処理の停止
- 子プロセスの終了通知:ゾンビプロセスの防止
- タイマーによる通知:定期的な処理の実行
シグナルは、プロセスが適切にリソースを解放し、データの整合性を保つための重要な仕組みです。
Node.js で扱える主要なシグナル
Node.js では、以下の主要なシグナルを処理できます:
シグナル名 | 値 | 説明 | 用途 |
---|---|---|---|
SIGTERM | 15 | 終了要求 | グレースフルシャットダウン |
SIGINT | 2 | 割り込み | Ctrl+C による中断 |
SIGUSR1 | 10 | ユーザー定義 1 | ログローテーション |
SIGUSR2 | 12 | ユーザー定義 2 | 設定の再読み込み |
SIGHUP | 1 | ハングアップ | 設定の再読み込み |
これらのシグナルを適切に処理することで、アプリケーションの安定性が大幅に向上します。
Node.js でのシグナル処理
process.on()メソッドの使い方
Node.js では、process.on()
メソッドを使ってシグナルを処理します。基本的な使い方を確認してみましょう。
javascript// 基本的なシグナルハンドラーの実装
process.on('SIGTERM', () => {
console.log(
'SIGTERMを受信しました。グレースフルシャットダウンを開始します。'
);
// データベース接続のクローズ
// ログファイルのフラッシュ
// 進行中の処理の完了待ち
process.exit(0);
});
console.log('プロセスID:', process.pid);
console.log('シグナルハンドラーが設定されました。');
このコードは、SIGTERM シグナルを受信した際に、適切なクリーンアップ処理を行ってからプロセスを終了します。
よく使用されるシグナルの種類と用途
実際の開発では、以下のシグナルが頻繁に使用されます:
javascript// SIGINT(Ctrl+C)の処理
process.on('SIGINT', () => {
console.log('\nユーザーによる中断を検知しました。');
console.log('安全に終了処理を実行します...');
// 進行中のHTTPリクエストの完了待ち
// ファイル書き込みの完了待ち
process.exit(0);
});
// SIGUSR1(ログローテーション)
process.on('SIGUSR1', () => {
console.log('ログローテーションを実行します。');
// ログファイルの切り替え
// 古いログファイルの圧縮
console.log('ログローテーションが完了しました。');
});
// SIGUSR2(設定の再読み込み)
process.on('SIGUSR2', () => {
console.log('設定ファイルを再読み込みします。');
// 設定ファイルの読み込み
// 新しい設定の適用
console.log('設定の再読み込みが完了しました。');
});
シグナルハンドラーの実装方法
実用的なシグナルハンドラーを実装する際は、以下の点に注意が必要です:
javascript// エラーハンドリングを含むシグナルハンドラー
let isShuttingDown = false;
process.on('SIGTERM', async () => {
if (isShuttingDown) {
console.log('既にシャットダウン処理中です。');
return;
}
isShuttingDown = true;
console.log('グレースフルシャットダウンを開始します...');
try {
// データベース接続のクローズ
await closeDatabaseConnections();
// ログファイルのフラッシュ
await flushLogFiles();
// 進行中の処理の完了待ち(最大30秒)
await waitForOngoingTasks(30000);
console.log('シャットダウン処理が完了しました。');
process.exit(0);
} catch (error) {
console.error(
'シャットダウン処理中にエラーが発生しました:',
error
);
process.exit(1);
}
});
// ヘルパー関数
async function closeDatabaseConnections() {
console.log('データベース接続をクローズ中...');
// 実際のデータベース接続クローズ処理
}
async function flushLogFiles() {
console.log('ログファイルをフラッシュ中...');
// 実際のログフラッシュ処理
}
async function waitForOngoingTasks(timeout) {
console.log('進行中の処理の完了を待機中...');
// 実際の待機処理
}
プロセス間通信(IPC)の概要
IPC が必要な場面
プロセス間通信が必要になる場面は多岐にわたります:
- マイクロサービスアーキテクチャ:複数のサービス間でのデータ交換
- 負荷分散:マスタープロセスとワーカープロセス間の通信
- データ処理:大量データの並列処理
- 監視・ログ収集:複数プロセスからのログの集約
IPC を適切に実装することで、アプリケーションの性能と信頼性が大幅に向上します。
Node.js で利用可能な IPC 手法の種類
Node.js では、以下の IPC 手法が利用可能です:
手法 | 特徴 | 用途 | パフォーマンス |
---|---|---|---|
パイプ | 単純で高速 | 親子プロセス間通信 | 高 |
Unix Domain Socket | セキュアで高速 | 同一マシン内通信 | 高 |
TCP/IP ソケット | ネットワーク越し通信 | 分散システム | 中 |
メッセージキュー | 非同期通信 | マイクロサービス | 中 |
各手法の特徴と使い分け
各 IPC 手法の特徴を理解し、適切に使い分けることが重要です:
javascript// パイプを使った通信(高速、同一マシン内)
const { spawn } = require('child_process');
const child = spawn('node', ['worker.js'], {
stdio: ['pipe', 'pipe', 'pipe'],
});
// 子プロセスにデータを送信
child.stdin.write(
JSON.stringify({ type: 'task', data: '処理データ' })
);
// 子プロセスからの応答を受信
child.stdout.on('data', (data) => {
const response = JSON.parse(data.toString());
console.log('子プロセスからの応答:', response);
});
javascript// Unix Domain Socketを使った通信(セキュア)
const net = require('net');
const fs = require('fs');
const server = net.createServer((socket) => {
socket.on('data', (data) => {
const message = JSON.parse(data.toString());
console.log('受信したメッセージ:', message);
// 応答を送信
socket.write(JSON.stringify({ status: 'success' }));
});
});
// Unix Domain Socketファイルを作成
server.listen('/tmp/app.sock', () => {
console.log('Unix Domain Socketサーバーが起動しました。');
});
子プロセスとの通信
child_process モジュールの活用
Node.js のchild_process
モジュールは、子プロセスとの通信を簡単に実現できます。
javascript// 基本的な子プロセス作成と通信
const { spawn } = require('child_process');
// 子プロセスを起動
const worker = spawn('node', ['worker.js'], {
stdio: ['pipe', 'pipe', 'pipe'],
});
// エラーハンドリング
worker.on('error', (error) => {
console.error('子プロセスでエラーが発生しました:', error);
});
// 子プロセスの終了処理
worker.on('close', (code) => {
console.log(
`子プロセスが終了しました。終了コード: ${code}`
);
});
// 子プロセスにタスクを送信
function sendTask(taskData) {
const message = JSON.stringify({
id: Date.now(),
task: taskData,
timestamp: new Date().toISOString(),
});
worker.stdin.write(message + '\n');
}
// 子プロセスからの応答を受信
worker.stdout.on('data', (data) => {
const responses = data.toString().trim().split('\n');
responses.forEach((response) => {
if (response) {
const result = JSON.parse(response);
console.log('タスク結果:', result);
}
});
});
パイプを使った通信
パイプは、親子プロセス間で最も効率的な通信方法です。
javascript// 親プロセス(master.js)
const { spawn } = require('child_process');
class ProcessManager {
constructor() {
this.workers = [];
this.taskQueue = [];
}
// ワーカープロセスを起動
startWorker() {
const worker = spawn('node', ['worker.js'], {
stdio: ['pipe', 'pipe', 'pipe'],
});
// ワーカーの応答を処理
worker.stdout.on('data', (data) => {
this.handleWorkerResponse(worker, data);
});
// ワーカーのエラーを処理
worker.stderr.on('data', (data) => {
console.error('ワーカーエラー:', data.toString());
});
this.workers.push(worker);
console.log(
`ワーカープロセス ${worker.pid} を起動しました。`
);
return worker;
}
// タスクをワーカーに送信
sendTask(worker, task) {
const message = JSON.stringify({
id: task.id,
type: task.type,
data: task.data,
});
worker.stdin.write(message + '\n');
}
// ワーカーからの応答を処理
handleWorkerResponse(worker, data) {
const responses = data.toString().trim().split('\n');
responses.forEach((response) => {
if (response) {
const result = JSON.parse(response);
console.log(
`ワーカー ${worker.pid} からの応答:`,
result
);
}
});
}
}
// 使用例
const manager = new ProcessManager();
const worker = manager.startWorker();
// タスクを送信
manager.sendTask(worker, {
id: 1,
type: 'calculation',
data: { numbers: [1, 2, 3, 4, 5] },
});
メッセージパッシングによる通信
child_process
のsend()
メソッドを使ったメッセージパッシングも便利です。
javascript// 親プロセス(parent.js)
const { fork } = require('child_process');
// 子プロセスをフォーク
const child = fork('./child.js');
// 子プロセスにメッセージを送信
child.send({
type: 'task',
data: { message: 'Hello from parent!' },
});
// 子プロセスからの応答を受信
child.on('message', (message) => {
console.log('子プロセスからの応答:', message);
if (message.type === 'result') {
console.log('処理結果:', message.data);
}
});
// 子プロセスの終了を監視
child.on('exit', (code, signal) => {
console.log(
`子プロセスが終了しました。コード: ${code}, シグナル: ${signal}`
);
});
javascript// 子プロセス(child.js)
process.on('message', (message) => {
console.log('親プロセスからメッセージを受信:', message);
if (message.type === 'task') {
// タスクを処理
const result = processTask(message.data);
// 結果を親プロセスに送信
process.send({
type: 'result',
data: result,
timestamp: new Date().toISOString(),
});
}
});
function processTask(data) {
// 実際のタスク処理
return {
processed: true,
originalData: data,
processedAt: new Date().toISOString(),
};
}
console.log('子プロセスが起動しました。PID:', process.pid);
クラスターモードでの IPC
cluster モジュールでのワーカープロセス間通信
Node.js のcluster
モジュールを使うと、マルチコア CPU を効率的に活用できます。
javascript// マスタープロセス(master.js)
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
console.log(
`マスタープロセス ${process.pid} が起動しました。`
);
// CPUコア数分のワーカーを起動
const numCPUs = os.cpus().length;
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// ワーカーの終了を監視
cluster.on('exit', (worker, code, signal) => {
console.log(
`ワーカー ${worker.process.pid} が終了しました。`
);
console.log('新しいワーカーを起動します。');
cluster.fork();
});
// ワーカーからのメッセージを処理
cluster.on('message', (worker, message) => {
console.log(
`ワーカー ${worker.process.pid} からのメッセージ:`,
message
);
// 他のワーカーにメッセージを転送
for (const id in cluster.workers) {
if (cluster.workers[id].id !== worker.id) {
cluster.workers[id].send(message);
}
}
});
} else {
// ワーカープロセス
console.log(
`ワーカープロセス ${process.pid} が起動しました。`
);
// マスタープロセスからのメッセージを受信
process.on('message', (message) => {
console.log(
`ワーカー ${process.pid} がメッセージを受信:`,
message
);
// メッセージに応じた処理を実行
handleMessage(message);
});
// 定期的にマスタープロセスに状態を報告
setInterval(() => {
process.send({
type: 'status',
workerId: process.pid,
memory: process.memoryUsage(),
uptime: process.uptime(),
});
}, 5000);
}
function handleMessage(message) {
switch (message.type) {
case 'task':
console.log('タスクを処理中...');
// 実際のタスク処理
break;
case 'status':
console.log('状態確認要求を受信');
break;
default:
console.log('未知のメッセージタイプ:', message.type);
}
}
マスタープロセスとワーカープロセスの役割分担
クラスターモードでは、マスターとワーカーが明確に役割を分担します。
javascript// マスタープロセスの役割
const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
// ロードバランサーの実装
const workers = [];
let currentWorker = 0;
// ワーカーを起動
for (let i = 0; i < 4; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// HTTPサーバーを作成(ロードバランサー)
const server = http.createServer((req, res) => {
// ラウンドロビン方式でワーカーを選択
const worker = workers[currentWorker];
currentWorker = (currentWorker + 1) % workers.length;
// リクエストをワーカーに転送
worker.send({
type: 'request',
url: req.url,
method: req.method,
headers: req.headers,
});
// ワーカーからの応答を待機
worker.once('message', (response) => {
res.writeHead(response.statusCode, response.headers);
res.end(response.body);
});
});
server.listen(3000, () => {
console.log(
'ロードバランサーがポート3000で起動しました。'
);
});
} else {
// ワーカープロセスの役割
console.log(`ワーカー ${process.pid} が起動しました。`);
// マスタープロセスからのリクエストを処理
process.on('message', (message) => {
if (message.type === 'request') {
handleRequest(message);
}
});
function handleRequest(requestData) {
// 実際のリクエスト処理
const response = {
statusCode: 200,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
message: 'Hello from worker!',
workerId: process.pid,
url: requestData.url,
}),
};
// マスタープロセスに応答を送信
process.send(response);
}
}
負荷分散と IPC の関係
クラスターモードでの負荷分散は、IPC と密接に関連しています。
javascript// 高度な負荷分散システム
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const workers = new Map();
const taskQueue = [];
// ワーカーの負荷状況を監視
function monitorWorkers() {
workers.forEach((worker, id) => {
if (worker.load > 0.8) {
// 負荷が80%を超えた場合
console.log(
`ワーカー ${id} の負荷が高いため、新しいワーカーを起動します。`
);
spawnWorker();
}
});
}
// 新しいワーカーを起動
function spawnWorker() {
const worker = cluster.fork();
workers.set(worker.id, {
process: worker,
load: 0,
tasks: 0,
});
worker.on('message', (message) => {
handleWorkerMessage(worker, message);
});
}
// ワーカーメッセージを処理
function handleWorkerMessage(worker, message) {
const workerInfo = workers.get(worker.id);
switch (message.type) {
case 'load_update':
workerInfo.load = message.load;
workerInfo.tasks = message.tasks;
break;
case 'task_complete':
workerInfo.tasks--;
// タスクキューから次のタスクを割り当て
assignNextTask(worker);
break;
}
}
// タスクをワーカーに割り当て
function assignNextTask(worker) {
if (taskQueue.length > 0) {
const task = taskQueue.shift();
worker.send({ type: 'new_task', task });
}
}
// 初期ワーカーを起動
for (let i = 0; i < os.cpus().length; i++) {
spawnWorker();
}
// 定期的にワーカーの負荷を監視
setInterval(monitorWorkers, 1000);
} else {
let currentLoad = 0;
let activeTasks = 0;
// 負荷状況を定期的に報告
setInterval(() => {
process.send({
type: 'load_update',
load: currentLoad,
tasks: activeTasks,
});
}, 1000);
// マスタープロセスからのタスクを受信
process.on('message', (message) => {
if (message.type === 'new_task') {
activeTasks++;
currentLoad = Math.min(1.0, currentLoad + 0.1);
// タスクを処理
processTask(message.task);
}
});
function processTask(task) {
// 実際のタスク処理
setTimeout(() => {
activeTasks--;
currentLoad = Math.max(0, currentLoad - 0.1);
process.send({
type: 'task_complete',
taskId: task.id,
});
}, Math.random() * 5000); // ランダムな処理時間
}
}
実践的な応用例
グレースフルシャットダウンの実装
本番環境では、グレースフルシャットダウンが必須です。
javascript// グレースフルシャットダウンの実装
const http = require('http');
const cluster = require('cluster');
class GracefulShutdown {
constructor() {
this.isShuttingDown = false;
this.activeConnections = new Set();
this.shutdownTimeout = 30000; // 30秒
this.setupSignalHandlers();
}
// シグナルハンドラーを設定
setupSignalHandlers() {
process.on('SIGTERM', () =>
this.handleShutdown('SIGTERM')
);
process.on('SIGINT', () =>
this.handleShutdown('SIGINT')
);
}
// シャットダウン処理
async handleShutdown(signal) {
if (this.isShuttingDown) {
console.log('既にシャットダウン処理中です。');
return;
}
this.isShuttingDown = true;
console.log(
`${signal}を受信しました。グレースフルシャットダウンを開始します。`
);
try {
// 新しい接続の受け入れを停止
await this.stopAcceptingConnections();
// 既存の接続の完了を待機
await this.waitForConnections();
// リソースのクリーンアップ
await this.cleanup();
console.log(
'グレースフルシャットダウンが完了しました。'
);
process.exit(0);
} catch (error) {
console.error(
'シャットダウン処理中にエラーが発生しました:',
error
);
process.exit(1);
}
}
// 新しい接続の受け入れを停止
async stopAcceptingConnections() {
console.log('新しい接続の受け入れを停止中...');
if (cluster.isMaster) {
// マスタープロセスの場合、全ワーカーにシャットダウンを通知
for (const id in cluster.workers) {
cluster.workers[id].send({ type: 'shutdown' });
}
}
// HTTPサーバーの停止
if (this.server) {
this.server.close();
}
}
// 既存の接続の完了を待機
async waitForConnections() {
console.log(
`既存の接続の完了を待機中... (最大${this.shutdownTimeout}ms)`
);
return new Promise((resolve) => {
const timeout = setTimeout(() => {
console.log(
'シャットダウンタイムアウト。強制終了します。'
);
resolve();
}, this.shutdownTimeout);
const checkConnections = () => {
if (this.activeConnections.size === 0) {
clearTimeout(timeout);
resolve();
} else {
console.log(
`残りの接続数: ${this.activeConnections.size}`
);
setTimeout(checkConnections, 1000);
}
};
checkConnections();
});
}
// リソースのクリーンアップ
async cleanup() {
console.log('リソースのクリーンアップを実行中...');
// データベース接続のクローズ
// ログファイルのフラッシュ
// キャッシュの保存
console.log('クリーンアップが完了しました。');
}
// 接続を追跡
trackConnection(req, res) {
if (this.isShuttingDown) {
res.writeHead(503, {
'Content-Type': 'application/json',
});
res.end(
JSON.stringify({ error: 'Service unavailable' })
);
return;
}
this.activeConnections.add(res);
res.on('finish', () => {
this.activeConnections.delete(res);
});
}
}
// 使用例
const gracefulShutdown = new GracefulShutdown();
const server = http.createServer((req, res) => {
gracefulShutdown.trackConnection(req, res);
// 実際のリクエスト処理
res.writeHead(200, {
'Content-Type': 'application/json',
});
res.end(JSON.stringify({ message: 'Hello World' }));
});
server.listen(3000, () => {
console.log('サーバーがポート3000で起動しました。');
});
gracefulShutdown.server = server;
ログローテーションとシグナル処理
ログローテーションは、シグナル処理と組み合わせることで効果的に実装できます。
javascript// ログローテーションシステム
const fs = require('fs');
const path = require('path');
class LogRotator {
constructor(options = {}) {
this.logDir = options.logDir || './logs';
this.maxSize = options.maxSize || 10 * 1024 * 1024; // 10MB
this.maxFiles = options.maxFiles || 5;
this.currentLogFile = path.join(this.logDir, 'app.log');
this.setupSignalHandlers();
this.ensureLogDirectory();
}
// シグナルハンドラーを設定
setupSignalHandlers() {
process.on('SIGUSR1', () => this.rotateLog());
process.on('SIGUSR2', () => this.reloadConfig());
}
// ログディレクトリの作成
ensureLogDirectory() {
if (!fs.existsSync(this.logDir)) {
fs.mkdirSync(this.logDir, { recursive: true });
}
}
// ログローテーション
async rotateLog() {
console.log('ログローテーションを開始します...');
try {
// 現在のログファイルのサイズをチェック
const stats = fs.statSync(this.currentLogFile);
if (stats.size > this.maxSize) {
await this.performRotation();
} else {
console.log(
'ログファイルサイズが閾値を超えていません。'
);
}
} catch (error) {
console.error(
'ログローテーション中にエラーが発生しました:',
error
);
}
}
// ログローテーションの実行
async performRotation() {
const timestamp = new Date()
.toISOString()
.replace(/[:.]/g, '-');
const newLogFile = path.join(
this.logDir,
`app-${timestamp}.log`
);
// 現在のログファイルをリネーム
fs.renameSync(this.currentLogFile, newLogFile);
// 新しいログファイルを作成
fs.writeFileSync(this.currentLogFile, '');
// 古いログファイルを削除
this.cleanupOldLogs();
console.log(
`ログローテーションが完了しました: ${newLogFile}`
);
}
// 古いログファイルの削除
cleanupOldLogs() {
const logFiles = fs
.readdirSync(this.logDir)
.filter(
(file) =>
file.startsWith('app-') && file.endsWith('.log')
)
.map((file) => ({
name: file,
path: path.join(this.logDir, file),
mtime: fs.statSync(path.join(this.logDir, file))
.mtime,
}))
.sort((a, b) => b.mtime - a.mtime);
// 最大ファイル数を超えた古いファイルを削除
if (logFiles.length > this.maxFiles) {
const filesToDelete = logFiles.slice(this.maxFiles);
filesToDelete.forEach((file) => {
fs.unlinkSync(file.path);
console.log(
`古いログファイルを削除しました: ${file.name}`
);
});
}
}
// 設定の再読み込み
reloadConfig() {
console.log('設定ファイルを再読み込みします...');
try {
// 設定ファイルの読み込み処理
// 実際の実装では、設定ファイルを読み込んで
// アプリケーションの設定を更新する
console.log('設定の再読み込みが完了しました。');
} catch (error) {
console.error(
'設定の再読み込み中にエラーが発生しました:',
error
);
}
}
// ログの書き込み
writeLog(level, message) {
const timestamp = new Date().toISOString();
const logEntry = `[${timestamp}] [${level}] ${message}\n`;
fs.appendFileSync(this.currentLogFile, logEntry);
}
}
// 使用例
const logRotator = new LogRotator({
logDir: './logs',
maxSize: 5 * 1024 * 1024, // 5MB
maxFiles: 3,
});
// ログの書き込みテスト
setInterval(() => {
logRotator.writeLog(
'INFO',
'アプリケーションが正常に動作しています。'
);
}, 1000);
console.log('ログローテーションシステムが起動しました。');
console.log(
'SIGUSR1でログローテーション、SIGUSR2で設定再読み込みを実行できます。'
);
複数プロセス間でのデータ共有
複数のプロセス間でデータを共有する方法をいくつか紹介します。
javascript// Redisを使ったプロセス間データ共有
const redis = require('redis');
const { promisify } = require('util');
class SharedDataManager {
constructor() {
this.client = redis.createClient();
this.getAsync = promisify(this.client.get).bind(
this.client
);
this.setAsync = promisify(this.client.set).bind(
this.client
);
this.publishAsync = promisify(this.client.publish).bind(
this.client
);
this.setupSubscriber();
}
// Redisサブスクライバーの設定
setupSubscriber() {
this.subscriber = redis.createClient();
this.subscriber.subscribe('data-updates');
this.subscriber.on('message', (channel, message) => {
const data = JSON.parse(message);
this.handleDataUpdate(data);
});
}
// データの更新処理
handleDataUpdate(data) {
console.log('データ更新を受信:', data);
// 実際のデータ更新処理
switch (data.type) {
case 'user_login':
this.updateUserSession(data);
break;
case 'cache_invalidation':
this.invalidateCache(data);
break;
default:
console.log('未知のデータ更新タイプ:', data.type);
}
}
// ユーザーセッションの更新
updateUserSession(data) {
console.log(
`ユーザー ${data.userId} のセッションを更新しました。`
);
}
// キャッシュの無効化
invalidateCache(data) {
console.log(
`キャッシュキー ${data.key} を無効化しました。`
);
}
// データの共有
async shareData(key, value) {
await this.setAsync(key, JSON.stringify(value));
await this.publishAsync(
'data-updates',
JSON.stringify({
type: 'data_shared',
key,
value,
timestamp: new Date().toISOString(),
})
);
}
// データの取得
async getData(key) {
const data = await this.getAsync(key);
return data ? JSON.parse(data) : null;
}
}
// 使用例
const dataManager = new SharedDataManager();
// 定期的にデータを共有
setInterval(async () => {
await dataManager.shareData('system_status', {
uptime: process.uptime(),
memory: process.memoryUsage(),
timestamp: new Date().toISOString(),
});
}, 5000);
javascript// ファイルベースのプロセス間通信
const fs = require('fs');
const path = require('path');
class FileBasedIPC {
constructor(ipcDir = './ipc') {
this.ipcDir = ipcDir;
this.messageQueue = [];
this.ensureIPCDirectory();
this.startMessageListener();
}
// IPCディレクトリの作成
ensureIPCDirectory() {
if (!fs.existsSync(this.ipcDir)) {
fs.mkdirSync(this.ipcDir, { recursive: true });
}
}
// メッセージリスナーの開始
startMessageListener() {
setInterval(() => {
this.checkForMessages();
}, 1000);
}
// メッセージの確認
checkForMessages() {
const files = fs
.readdirSync(this.ipcDir)
.filter((file) => file.endsWith('.msg'))
.sort();
files.forEach((file) => {
this.processMessage(file);
});
}
// メッセージの処理
processMessage(filename) {
const filepath = path.join(this.ipcDir, filename);
try {
const content = fs.readFileSync(filepath, 'utf8');
const message = JSON.parse(content);
this.handleMessage(message);
// 処理済みメッセージファイルを削除
fs.unlinkSync(filepath);
} catch (error) {
console.error(
`メッセージ処理中にエラーが発生しました: ${filename}`,
error
);
}
}
// メッセージの処理
handleMessage(message) {
console.log('メッセージを受信:', message);
switch (message.type) {
case 'task':
this.processTask(message.data);
break;
case 'status':
this.updateStatus(message.data);
break;
default:
console.log(
'未知のメッセージタイプ:',
message.type
);
}
}
// タスクの処理
processTask(data) {
console.log('タスクを処理中:', data);
// 実際のタスク処理
}
// ステータスの更新
updateStatus(data) {
console.log('ステータスを更新:', data);
// 実際のステータス更新処理
}
// メッセージの送信
sendMessage(type, data) {
const message = {
type,
data,
timestamp: new Date().toISOString(),
pid: process.pid,
};
const filename = `msg_${Date.now()}_${process.pid}.msg`;
const filepath = path.join(this.ipcDir, filename);
fs.writeFileSync(
filepath,
JSON.stringify(message, null, 2)
);
console.log(`メッセージを送信しました: ${filename}`);
}
}
// 使用例
const ipc = new FileBasedIPC();
// 定期的にメッセージを送信
setInterval(() => {
ipc.sendMessage('status', {
uptime: process.uptime(),
memory: process.memoryUsage(),
});
}, 3000);
まとめ
Node.js におけるシグナルとプロセス間通信(IPC)の仕組みについて、基礎から実践的な応用まで詳しく解説しました。
シグナル処理は、アプリケーションの安定性と信頼性を確保するための重要な技術です。適切なシグナルハンドラーを実装することで、グレースフルシャットダウンやログローテーションなどの運用に不可欠な機能を実現できます。
プロセス間通信は、マルチプロセスアーキテクチャやマイクロサービスにおいて、プロセス間でのデータ交換や協調動作を実現するための基盤技術です。パイプ、ソケット、メッセージキューなど、用途に応じて適切な手法を選択することが重要です。
これらの技術を組み合わせることで、高可用性、スケーラビリティ、保守性に優れた Node.js アプリケーションを構築できます。本番環境での運用を意識した実装例を参考に、実際のプロジェクトで活用していただければと思います。
関連リンク
- review
今の自分に満足していますか?『持たざる者の逆襲 まだ何者でもない君へ』溝口勇児
- review
ついに語られた業界の裏側!『フジテレビの正体』堀江貴文が描くテレビ局の本当の姿
- review
愛する勇気を持てば人生が変わる!『幸せになる勇気』岸見一郎・古賀史健のアドラー実践編で真の幸福を手に入れる
- review
週末を変えれば年収も変わる!『世界の一流は「休日」に何をしているのか』越川慎司の一流週末メソッド
- review
新しい自分に会いに行こう!『自分の変え方』村岡大樹の認知科学コーチングで人生リセット
- review
科学革命から AI 時代へ!『サピエンス全史 下巻』ユヴァル・ノア・ハラリが予見する人類の未来