T-CREATOR

Node.js シグナルとプロセス間通信(IPC)の仕組み

Node.js シグナルとプロセス間通信(IPC)の仕組み

Node.js アプリケーションを本格的に開発する上で、避けて通れないのがシグナル処理とプロセス間通信(IPC)の理解です。これらの技術は、アプリケーションの安定性、スケーラビリティ、そして運用性を大きく左右する重要な要素です。

本記事では、シグナルと IPC の基礎から実践的な応用まで、段階的に学んでいきます。特に、本番環境での運用を意識した実装例や、よくあるエラーとその解決方法についても詳しく解説します。

シグナルの基礎知識

シグナルとは何か

シグナルは、オペレーティングシステムがプロセスに送る短いメッセージです。プロセスの状態を変更したり、特定の処理を実行するよう指示したりするために使用されます。

日常生活で例えるなら、シグナルは「緊急事態を知らせるサイレン」や「作業を中断するためのベル」のようなものです。プログラムが予期しない状況に遭遇した時、シグナルによって適切に対応できるようになります。

オペレーティングシステムレベルでのシグナルの役割

オペレーティングシステムは、以下のような場面でシグナルを送信します:

  • プロセスの強制終了:メモリ不足やシステムクラッシュ時
  • ユーザーからの中断要求:Ctrl+C による処理の停止
  • 子プロセスの終了通知:ゾンビプロセスの防止
  • タイマーによる通知:定期的な処理の実行

シグナルは、プロセスが適切にリソースを解放し、データの整合性を保つための重要な仕組みです。

Node.js で扱える主要なシグナル

Node.js では、以下の主要なシグナルを処理できます:

シグナル名説明用途
SIGTERM15終了要求グレースフルシャットダウン
SIGINT2割り込みCtrl+C による中断
SIGUSR110ユーザー定義 1ログローテーション
SIGUSR212ユーザー定義 2設定の再読み込み
SIGHUP1ハングアップ設定の再読み込み

これらのシグナルを適切に処理することで、アプリケーションの安定性が大幅に向上します。

Node.js でのシグナル処理

process.on()メソッドの使い方

Node.js では、process.on()メソッドを使ってシグナルを処理します。基本的な使い方を確認してみましょう。

javascript// 基本的なシグナルハンドラーの実装
process.on('SIGTERM', () => {
  console.log(
    'SIGTERMを受信しました。グレースフルシャットダウンを開始します。'
  );

  // データベース接続のクローズ
  // ログファイルのフラッシュ
  // 進行中の処理の完了待ち

  process.exit(0);
});

console.log('プロセスID:', process.pid);
console.log('シグナルハンドラーが設定されました。');

このコードは、SIGTERM シグナルを受信した際に、適切なクリーンアップ処理を行ってからプロセスを終了します。

よく使用されるシグナルの種類と用途

実際の開発では、以下のシグナルが頻繁に使用されます:

javascript// SIGINT(Ctrl+C)の処理
process.on('SIGINT', () => {
  console.log('\nユーザーによる中断を検知しました。');
  console.log('安全に終了処理を実行します...');

  // 進行中のHTTPリクエストの完了待ち
  // ファイル書き込みの完了待ち

  process.exit(0);
});

// SIGUSR1(ログローテーション)
process.on('SIGUSR1', () => {
  console.log('ログローテーションを実行します。');

  // ログファイルの切り替え
  // 古いログファイルの圧縮

  console.log('ログローテーションが完了しました。');
});

// SIGUSR2(設定の再読み込み)
process.on('SIGUSR2', () => {
  console.log('設定ファイルを再読み込みします。');

  // 設定ファイルの読み込み
  // 新しい設定の適用

  console.log('設定の再読み込みが完了しました。');
});

シグナルハンドラーの実装方法

実用的なシグナルハンドラーを実装する際は、以下の点に注意が必要です:

javascript// エラーハンドリングを含むシグナルハンドラー
let isShuttingDown = false;

process.on('SIGTERM', async () => {
  if (isShuttingDown) {
    console.log('既にシャットダウン処理中です。');
    return;
  }

  isShuttingDown = true;
  console.log('グレースフルシャットダウンを開始します...');

  try {
    // データベース接続のクローズ
    await closeDatabaseConnections();

    // ログファイルのフラッシュ
    await flushLogFiles();

    // 進行中の処理の完了待ち(最大30秒)
    await waitForOngoingTasks(30000);

    console.log('シャットダウン処理が完了しました。');
    process.exit(0);
  } catch (error) {
    console.error(
      'シャットダウン処理中にエラーが発生しました:',
      error
    );
    process.exit(1);
  }
});

// ヘルパー関数
async function closeDatabaseConnections() {
  console.log('データベース接続をクローズ中...');
  // 実際のデータベース接続クローズ処理
}

async function flushLogFiles() {
  console.log('ログファイルをフラッシュ中...');
  // 実際のログフラッシュ処理
}

async function waitForOngoingTasks(timeout) {
  console.log('進行中の処理の完了を待機中...');
  // 実際の待機処理
}

プロセス間通信(IPC)の概要

IPC が必要な場面

プロセス間通信が必要になる場面は多岐にわたります:

  • マイクロサービスアーキテクチャ:複数のサービス間でのデータ交換
  • 負荷分散:マスタープロセスとワーカープロセス間の通信
  • データ処理:大量データの並列処理
  • 監視・ログ収集:複数プロセスからのログの集約

IPC を適切に実装することで、アプリケーションの性能と信頼性が大幅に向上します。

Node.js で利用可能な IPC 手法の種類

Node.js では、以下の IPC 手法が利用可能です:

手法特徴用途パフォーマンス
パイプ単純で高速親子プロセス間通信
Unix Domain Socketセキュアで高速同一マシン内通信
TCP/IP ソケットネットワーク越し通信分散システム
メッセージキュー非同期通信マイクロサービス

各手法の特徴と使い分け

各 IPC 手法の特徴を理解し、適切に使い分けることが重要です:

javascript// パイプを使った通信(高速、同一マシン内)
const { spawn } = require('child_process');

const child = spawn('node', ['worker.js'], {
  stdio: ['pipe', 'pipe', 'pipe'],
});

// 子プロセスにデータを送信
child.stdin.write(
  JSON.stringify({ type: 'task', data: '処理データ' })
);

// 子プロセスからの応答を受信
child.stdout.on('data', (data) => {
  const response = JSON.parse(data.toString());
  console.log('子プロセスからの応答:', response);
});
javascript// Unix Domain Socketを使った通信(セキュア)
const net = require('net');
const fs = require('fs');

const server = net.createServer((socket) => {
  socket.on('data', (data) => {
    const message = JSON.parse(data.toString());
    console.log('受信したメッセージ:', message);

    // 応答を送信
    socket.write(JSON.stringify({ status: 'success' }));
  });
});

// Unix Domain Socketファイルを作成
server.listen('/tmp/app.sock', () => {
  console.log('Unix Domain Socketサーバーが起動しました。');
});

子プロセスとの通信

child_process モジュールの活用

Node.js のchild_processモジュールは、子プロセスとの通信を簡単に実現できます。

javascript// 基本的な子プロセス作成と通信
const { spawn } = require('child_process');

// 子プロセスを起動
const worker = spawn('node', ['worker.js'], {
  stdio: ['pipe', 'pipe', 'pipe'],
});

// エラーハンドリング
worker.on('error', (error) => {
  console.error('子プロセスでエラーが発生しました:', error);
});

// 子プロセスの終了処理
worker.on('close', (code) => {
  console.log(
    `子プロセスが終了しました。終了コード: ${code}`
  );
});

// 子プロセスにタスクを送信
function sendTask(taskData) {
  const message = JSON.stringify({
    id: Date.now(),
    task: taskData,
    timestamp: new Date().toISOString(),
  });

  worker.stdin.write(message + '\n');
}

// 子プロセスからの応答を受信
worker.stdout.on('data', (data) => {
  const responses = data.toString().trim().split('\n');

  responses.forEach((response) => {
    if (response) {
      const result = JSON.parse(response);
      console.log('タスク結果:', result);
    }
  });
});

パイプを使った通信

パイプは、親子プロセス間で最も効率的な通信方法です。

javascript// 親プロセス(master.js)
const { spawn } = require('child_process');

class ProcessManager {
  constructor() {
    this.workers = [];
    this.taskQueue = [];
  }

  // ワーカープロセスを起動
  startWorker() {
    const worker = spawn('node', ['worker.js'], {
      stdio: ['pipe', 'pipe', 'pipe'],
    });

    // ワーカーの応答を処理
    worker.stdout.on('data', (data) => {
      this.handleWorkerResponse(worker, data);
    });

    // ワーカーのエラーを処理
    worker.stderr.on('data', (data) => {
      console.error('ワーカーエラー:', data.toString());
    });

    this.workers.push(worker);
    console.log(
      `ワーカープロセス ${worker.pid} を起動しました。`
    );

    return worker;
  }

  // タスクをワーカーに送信
  sendTask(worker, task) {
    const message = JSON.stringify({
      id: task.id,
      type: task.type,
      data: task.data,
    });

    worker.stdin.write(message + '\n');
  }

  // ワーカーからの応答を処理
  handleWorkerResponse(worker, data) {
    const responses = data.toString().trim().split('\n');

    responses.forEach((response) => {
      if (response) {
        const result = JSON.parse(response);
        console.log(
          `ワーカー ${worker.pid} からの応答:`,
          result
        );
      }
    });
  }
}

// 使用例
const manager = new ProcessManager();
const worker = manager.startWorker();

// タスクを送信
manager.sendTask(worker, {
  id: 1,
  type: 'calculation',
  data: { numbers: [1, 2, 3, 4, 5] },
});

メッセージパッシングによる通信

child_processsend()メソッドを使ったメッセージパッシングも便利です。

javascript// 親プロセス(parent.js)
const { fork } = require('child_process');

// 子プロセスをフォーク
const child = fork('./child.js');

// 子プロセスにメッセージを送信
child.send({
  type: 'task',
  data: { message: 'Hello from parent!' },
});

// 子プロセスからの応答を受信
child.on('message', (message) => {
  console.log('子プロセスからの応答:', message);

  if (message.type === 'result') {
    console.log('処理結果:', message.data);
  }
});

// 子プロセスの終了を監視
child.on('exit', (code, signal) => {
  console.log(
    `子プロセスが終了しました。コード: ${code}, シグナル: ${signal}`
  );
});
javascript// 子プロセス(child.js)
process.on('message', (message) => {
  console.log('親プロセスからメッセージを受信:', message);

  if (message.type === 'task') {
    // タスクを処理
    const result = processTask(message.data);

    // 結果を親プロセスに送信
    process.send({
      type: 'result',
      data: result,
      timestamp: new Date().toISOString(),
    });
  }
});

function processTask(data) {
  // 実際のタスク処理
  return {
    processed: true,
    originalData: data,
    processedAt: new Date().toISOString(),
  };
}

console.log('子プロセスが起動しました。PID:', process.pid);

クラスターモードでの IPC

cluster モジュールでのワーカープロセス間通信

Node.js のclusterモジュールを使うと、マルチコア CPU を効率的に活用できます。

javascript// マスタープロセス(master.js)
const cluster = require('cluster');
const os = require('os');

if (cluster.isMaster) {
  console.log(
    `マスタープロセス ${process.pid} が起動しました。`
  );

  // CPUコア数分のワーカーを起動
  const numCPUs = os.cpus().length;

  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // ワーカーの終了を監視
  cluster.on('exit', (worker, code, signal) => {
    console.log(
      `ワーカー ${worker.process.pid} が終了しました。`
    );
    console.log('新しいワーカーを起動します。');
    cluster.fork();
  });

  // ワーカーからのメッセージを処理
  cluster.on('message', (worker, message) => {
    console.log(
      `ワーカー ${worker.process.pid} からのメッセージ:`,
      message
    );

    // 他のワーカーにメッセージを転送
    for (const id in cluster.workers) {
      if (cluster.workers[id].id !== worker.id) {
        cluster.workers[id].send(message);
      }
    }
  });
} else {
  // ワーカープロセス
  console.log(
    `ワーカープロセス ${process.pid} が起動しました。`
  );

  // マスタープロセスからのメッセージを受信
  process.on('message', (message) => {
    console.log(
      `ワーカー ${process.pid} がメッセージを受信:`,
      message
    );

    // メッセージに応じた処理を実行
    handleMessage(message);
  });

  // 定期的にマスタープロセスに状態を報告
  setInterval(() => {
    process.send({
      type: 'status',
      workerId: process.pid,
      memory: process.memoryUsage(),
      uptime: process.uptime(),
    });
  }, 5000);
}

function handleMessage(message) {
  switch (message.type) {
    case 'task':
      console.log('タスクを処理中...');
      // 実際のタスク処理
      break;
    case 'status':
      console.log('状態確認要求を受信');
      break;
    default:
      console.log('未知のメッセージタイプ:', message.type);
  }
}

マスタープロセスとワーカープロセスの役割分担

クラスターモードでは、マスターとワーカーが明確に役割を分担します。

javascript// マスタープロセスの役割
const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {
  // ロードバランサーの実装
  const workers = [];
  let currentWorker = 0;

  // ワーカーを起動
  for (let i = 0; i < 4; i++) {
    const worker = cluster.fork();
    workers.push(worker);
  }

  // HTTPサーバーを作成(ロードバランサー)
  const server = http.createServer((req, res) => {
    // ラウンドロビン方式でワーカーを選択
    const worker = workers[currentWorker];
    currentWorker = (currentWorker + 1) % workers.length;

    // リクエストをワーカーに転送
    worker.send({
      type: 'request',
      url: req.url,
      method: req.method,
      headers: req.headers,
    });

    // ワーカーからの応答を待機
    worker.once('message', (response) => {
      res.writeHead(response.statusCode, response.headers);
      res.end(response.body);
    });
  });

  server.listen(3000, () => {
    console.log(
      'ロードバランサーがポート3000で起動しました。'
    );
  });
} else {
  // ワーカープロセスの役割
  console.log(`ワーカー ${process.pid} が起動しました。`);

  // マスタープロセスからのリクエストを処理
  process.on('message', (message) => {
    if (message.type === 'request') {
      handleRequest(message);
    }
  });

  function handleRequest(requestData) {
    // 実際のリクエスト処理
    const response = {
      statusCode: 200,
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        message: 'Hello from worker!',
        workerId: process.pid,
        url: requestData.url,
      }),
    };

    // マスタープロセスに応答を送信
    process.send(response);
  }
}

負荷分散と IPC の関係

クラスターモードでの負荷分散は、IPC と密接に関連しています。

javascript// 高度な負荷分散システム
const cluster = require('cluster');
const os = require('os');

if (cluster.isMaster) {
  const workers = new Map();
  const taskQueue = [];

  // ワーカーの負荷状況を監視
  function monitorWorkers() {
    workers.forEach((worker, id) => {
      if (worker.load > 0.8) {
        // 負荷が80%を超えた場合
        console.log(
          `ワーカー ${id} の負荷が高いため、新しいワーカーを起動します。`
        );
        spawnWorker();
      }
    });
  }

  // 新しいワーカーを起動
  function spawnWorker() {
    const worker = cluster.fork();
    workers.set(worker.id, {
      process: worker,
      load: 0,
      tasks: 0,
    });

    worker.on('message', (message) => {
      handleWorkerMessage(worker, message);
    });
  }

  // ワーカーメッセージを処理
  function handleWorkerMessage(worker, message) {
    const workerInfo = workers.get(worker.id);

    switch (message.type) {
      case 'load_update':
        workerInfo.load = message.load;
        workerInfo.tasks = message.tasks;
        break;
      case 'task_complete':
        workerInfo.tasks--;
        // タスクキューから次のタスクを割り当て
        assignNextTask(worker);
        break;
    }
  }

  // タスクをワーカーに割り当て
  function assignNextTask(worker) {
    if (taskQueue.length > 0) {
      const task = taskQueue.shift();
      worker.send({ type: 'new_task', task });
    }
  }

  // 初期ワーカーを起動
  for (let i = 0; i < os.cpus().length; i++) {
    spawnWorker();
  }

  // 定期的にワーカーの負荷を監視
  setInterval(monitorWorkers, 1000);
} else {
  let currentLoad = 0;
  let activeTasks = 0;

  // 負荷状況を定期的に報告
  setInterval(() => {
    process.send({
      type: 'load_update',
      load: currentLoad,
      tasks: activeTasks,
    });
  }, 1000);

  // マスタープロセスからのタスクを受信
  process.on('message', (message) => {
    if (message.type === 'new_task') {
      activeTasks++;
      currentLoad = Math.min(1.0, currentLoad + 0.1);

      // タスクを処理
      processTask(message.task);
    }
  });

  function processTask(task) {
    // 実際のタスク処理
    setTimeout(() => {
      activeTasks--;
      currentLoad = Math.max(0, currentLoad - 0.1);

      process.send({
        type: 'task_complete',
        taskId: task.id,
      });
    }, Math.random() * 5000); // ランダムな処理時間
  }
}

実践的な応用例

グレースフルシャットダウンの実装

本番環境では、グレースフルシャットダウンが必須です。

javascript// グレースフルシャットダウンの実装
const http = require('http');
const cluster = require('cluster');

class GracefulShutdown {
  constructor() {
    this.isShuttingDown = false;
    this.activeConnections = new Set();
    this.shutdownTimeout = 30000; // 30秒

    this.setupSignalHandlers();
  }

  // シグナルハンドラーを設定
  setupSignalHandlers() {
    process.on('SIGTERM', () =>
      this.handleShutdown('SIGTERM')
    );
    process.on('SIGINT', () =>
      this.handleShutdown('SIGINT')
    );
  }

  // シャットダウン処理
  async handleShutdown(signal) {
    if (this.isShuttingDown) {
      console.log('既にシャットダウン処理中です。');
      return;
    }

    this.isShuttingDown = true;
    console.log(
      `${signal}を受信しました。グレースフルシャットダウンを開始します。`
    );

    try {
      // 新しい接続の受け入れを停止
      await this.stopAcceptingConnections();

      // 既存の接続の完了を待機
      await this.waitForConnections();

      // リソースのクリーンアップ
      await this.cleanup();

      console.log(
        'グレースフルシャットダウンが完了しました。'
      );
      process.exit(0);
    } catch (error) {
      console.error(
        'シャットダウン処理中にエラーが発生しました:',
        error
      );
      process.exit(1);
    }
  }

  // 新しい接続の受け入れを停止
  async stopAcceptingConnections() {
    console.log('新しい接続の受け入れを停止中...');

    if (cluster.isMaster) {
      // マスタープロセスの場合、全ワーカーにシャットダウンを通知
      for (const id in cluster.workers) {
        cluster.workers[id].send({ type: 'shutdown' });
      }
    }

    // HTTPサーバーの停止
    if (this.server) {
      this.server.close();
    }
  }

  // 既存の接続の完了を待機
  async waitForConnections() {
    console.log(
      `既存の接続の完了を待機中... (最大${this.shutdownTimeout}ms)`
    );

    return new Promise((resolve) => {
      const timeout = setTimeout(() => {
        console.log(
          'シャットダウンタイムアウト。強制終了します。'
        );
        resolve();
      }, this.shutdownTimeout);

      const checkConnections = () => {
        if (this.activeConnections.size === 0) {
          clearTimeout(timeout);
          resolve();
        } else {
          console.log(
            `残りの接続数: ${this.activeConnections.size}`
          );
          setTimeout(checkConnections, 1000);
        }
      };

      checkConnections();
    });
  }

  // リソースのクリーンアップ
  async cleanup() {
    console.log('リソースのクリーンアップを実行中...');

    // データベース接続のクローズ
    // ログファイルのフラッシュ
    // キャッシュの保存

    console.log('クリーンアップが完了しました。');
  }

  // 接続を追跡
  trackConnection(req, res) {
    if (this.isShuttingDown) {
      res.writeHead(503, {
        'Content-Type': 'application/json',
      });
      res.end(
        JSON.stringify({ error: 'Service unavailable' })
      );
      return;
    }

    this.activeConnections.add(res);

    res.on('finish', () => {
      this.activeConnections.delete(res);
    });
  }
}

// 使用例
const gracefulShutdown = new GracefulShutdown();

const server = http.createServer((req, res) => {
  gracefulShutdown.trackConnection(req, res);

  // 実際のリクエスト処理
  res.writeHead(200, {
    'Content-Type': 'application/json',
  });
  res.end(JSON.stringify({ message: 'Hello World' }));
});

server.listen(3000, () => {
  console.log('サーバーがポート3000で起動しました。');
});

gracefulShutdown.server = server;

ログローテーションとシグナル処理

ログローテーションは、シグナル処理と組み合わせることで効果的に実装できます。

javascript// ログローテーションシステム
const fs = require('fs');
const path = require('path');

class LogRotator {
  constructor(options = {}) {
    this.logDir = options.logDir || './logs';
    this.maxSize = options.maxSize || 10 * 1024 * 1024; // 10MB
    this.maxFiles = options.maxFiles || 5;
    this.currentLogFile = path.join(this.logDir, 'app.log');

    this.setupSignalHandlers();
    this.ensureLogDirectory();
  }

  // シグナルハンドラーを設定
  setupSignalHandlers() {
    process.on('SIGUSR1', () => this.rotateLog());
    process.on('SIGUSR2', () => this.reloadConfig());
  }

  // ログディレクトリの作成
  ensureLogDirectory() {
    if (!fs.existsSync(this.logDir)) {
      fs.mkdirSync(this.logDir, { recursive: true });
    }
  }

  // ログローテーション
  async rotateLog() {
    console.log('ログローテーションを開始します...');

    try {
      // 現在のログファイルのサイズをチェック
      const stats = fs.statSync(this.currentLogFile);

      if (stats.size > this.maxSize) {
        await this.performRotation();
      } else {
        console.log(
          'ログファイルサイズが閾値を超えていません。'
        );
      }
    } catch (error) {
      console.error(
        'ログローテーション中にエラーが発生しました:',
        error
      );
    }
  }

  // ログローテーションの実行
  async performRotation() {
    const timestamp = new Date()
      .toISOString()
      .replace(/[:.]/g, '-');
    const newLogFile = path.join(
      this.logDir,
      `app-${timestamp}.log`
    );

    // 現在のログファイルをリネーム
    fs.renameSync(this.currentLogFile, newLogFile);

    // 新しいログファイルを作成
    fs.writeFileSync(this.currentLogFile, '');

    // 古いログファイルを削除
    this.cleanupOldLogs();

    console.log(
      `ログローテーションが完了しました: ${newLogFile}`
    );
  }

  // 古いログファイルの削除
  cleanupOldLogs() {
    const logFiles = fs
      .readdirSync(this.logDir)
      .filter(
        (file) =>
          file.startsWith('app-') && file.endsWith('.log')
      )
      .map((file) => ({
        name: file,
        path: path.join(this.logDir, file),
        mtime: fs.statSync(path.join(this.logDir, file))
          .mtime,
      }))
      .sort((a, b) => b.mtime - a.mtime);

    // 最大ファイル数を超えた古いファイルを削除
    if (logFiles.length > this.maxFiles) {
      const filesToDelete = logFiles.slice(this.maxFiles);

      filesToDelete.forEach((file) => {
        fs.unlinkSync(file.path);
        console.log(
          `古いログファイルを削除しました: ${file.name}`
        );
      });
    }
  }

  // 設定の再読み込み
  reloadConfig() {
    console.log('設定ファイルを再読み込みします...');

    try {
      // 設定ファイルの読み込み処理
      // 実際の実装では、設定ファイルを読み込んで
      // アプリケーションの設定を更新する

      console.log('設定の再読み込みが完了しました。');
    } catch (error) {
      console.error(
        '設定の再読み込み中にエラーが発生しました:',
        error
      );
    }
  }

  // ログの書き込み
  writeLog(level, message) {
    const timestamp = new Date().toISOString();
    const logEntry = `[${timestamp}] [${level}] ${message}\n`;

    fs.appendFileSync(this.currentLogFile, logEntry);
  }
}

// 使用例
const logRotator = new LogRotator({
  logDir: './logs',
  maxSize: 5 * 1024 * 1024, // 5MB
  maxFiles: 3,
});

// ログの書き込みテスト
setInterval(() => {
  logRotator.writeLog(
    'INFO',
    'アプリケーションが正常に動作しています。'
  );
}, 1000);

console.log('ログローテーションシステムが起動しました。');
console.log(
  'SIGUSR1でログローテーション、SIGUSR2で設定再読み込みを実行できます。'
);

複数プロセス間でのデータ共有

複数のプロセス間でデータを共有する方法をいくつか紹介します。

javascript// Redisを使ったプロセス間データ共有
const redis = require('redis');
const { promisify } = require('util');

class SharedDataManager {
  constructor() {
    this.client = redis.createClient();
    this.getAsync = promisify(this.client.get).bind(
      this.client
    );
    this.setAsync = promisify(this.client.set).bind(
      this.client
    );
    this.publishAsync = promisify(this.client.publish).bind(
      this.client
    );

    this.setupSubscriber();
  }

  // Redisサブスクライバーの設定
  setupSubscriber() {
    this.subscriber = redis.createClient();
    this.subscriber.subscribe('data-updates');

    this.subscriber.on('message', (channel, message) => {
      const data = JSON.parse(message);
      this.handleDataUpdate(data);
    });
  }

  // データの更新処理
  handleDataUpdate(data) {
    console.log('データ更新を受信:', data);

    // 実際のデータ更新処理
    switch (data.type) {
      case 'user_login':
        this.updateUserSession(data);
        break;
      case 'cache_invalidation':
        this.invalidateCache(data);
        break;
      default:
        console.log('未知のデータ更新タイプ:', data.type);
    }
  }

  // ユーザーセッションの更新
  updateUserSession(data) {
    console.log(
      `ユーザー ${data.userId} のセッションを更新しました。`
    );
  }

  // キャッシュの無効化
  invalidateCache(data) {
    console.log(
      `キャッシュキー ${data.key} を無効化しました。`
    );
  }

  // データの共有
  async shareData(key, value) {
    await this.setAsync(key, JSON.stringify(value));
    await this.publishAsync(
      'data-updates',
      JSON.stringify({
        type: 'data_shared',
        key,
        value,
        timestamp: new Date().toISOString(),
      })
    );
  }

  // データの取得
  async getData(key) {
    const data = await this.getAsync(key);
    return data ? JSON.parse(data) : null;
  }
}

// 使用例
const dataManager = new SharedDataManager();

// 定期的にデータを共有
setInterval(async () => {
  await dataManager.shareData('system_status', {
    uptime: process.uptime(),
    memory: process.memoryUsage(),
    timestamp: new Date().toISOString(),
  });
}, 5000);
javascript// ファイルベースのプロセス間通信
const fs = require('fs');
const path = require('path');

class FileBasedIPC {
  constructor(ipcDir = './ipc') {
    this.ipcDir = ipcDir;
    this.messageQueue = [];

    this.ensureIPCDirectory();
    this.startMessageListener();
  }

  // IPCディレクトリの作成
  ensureIPCDirectory() {
    if (!fs.existsSync(this.ipcDir)) {
      fs.mkdirSync(this.ipcDir, { recursive: true });
    }
  }

  // メッセージリスナーの開始
  startMessageListener() {
    setInterval(() => {
      this.checkForMessages();
    }, 1000);
  }

  // メッセージの確認
  checkForMessages() {
    const files = fs
      .readdirSync(this.ipcDir)
      .filter((file) => file.endsWith('.msg'))
      .sort();

    files.forEach((file) => {
      this.processMessage(file);
    });
  }

  // メッセージの処理
  processMessage(filename) {
    const filepath = path.join(this.ipcDir, filename);

    try {
      const content = fs.readFileSync(filepath, 'utf8');
      const message = JSON.parse(content);

      this.handleMessage(message);

      // 処理済みメッセージファイルを削除
      fs.unlinkSync(filepath);
    } catch (error) {
      console.error(
        `メッセージ処理中にエラーが発生しました: ${filename}`,
        error
      );
    }
  }

  // メッセージの処理
  handleMessage(message) {
    console.log('メッセージを受信:', message);

    switch (message.type) {
      case 'task':
        this.processTask(message.data);
        break;
      case 'status':
        this.updateStatus(message.data);
        break;
      default:
        console.log(
          '未知のメッセージタイプ:',
          message.type
        );
    }
  }

  // タスクの処理
  processTask(data) {
    console.log('タスクを処理中:', data);
    // 実際のタスク処理
  }

  // ステータスの更新
  updateStatus(data) {
    console.log('ステータスを更新:', data);
    // 実際のステータス更新処理
  }

  // メッセージの送信
  sendMessage(type, data) {
    const message = {
      type,
      data,
      timestamp: new Date().toISOString(),
      pid: process.pid,
    };

    const filename = `msg_${Date.now()}_${process.pid}.msg`;
    const filepath = path.join(this.ipcDir, filename);

    fs.writeFileSync(
      filepath,
      JSON.stringify(message, null, 2)
    );
    console.log(`メッセージを送信しました: ${filename}`);
  }
}

// 使用例
const ipc = new FileBasedIPC();

// 定期的にメッセージを送信
setInterval(() => {
  ipc.sendMessage('status', {
    uptime: process.uptime(),
    memory: process.memoryUsage(),
  });
}, 3000);

まとめ

Node.js におけるシグナルとプロセス間通信(IPC)の仕組みについて、基礎から実践的な応用まで詳しく解説しました。

シグナル処理は、アプリケーションの安定性と信頼性を確保するための重要な技術です。適切なシグナルハンドラーを実装することで、グレースフルシャットダウンやログローテーションなどの運用に不可欠な機能を実現できます。

プロセス間通信は、マルチプロセスアーキテクチャやマイクロサービスにおいて、プロセス間でのデータ交換や協調動作を実現するための基盤技術です。パイプ、ソケット、メッセージキューなど、用途に応じて適切な手法を選択することが重要です。

これらの技術を組み合わせることで、高可用性、スケーラビリティ、保守性に優れた Node.js アプリケーションを構築できます。本番環境での運用を意識した実装例を参考に、実際のプロジェクトで活用していただければと思います。

関連リンク