Node.js イベントエミッター(EventEmitter)の使い所

Node.js を使った開発において、非同期処理やイベント駆動プログラミングは欠かせない要素です。その中でも EventEmitter は、アプリケーション内でのコンポーネント間通信や状態変化の通知を効率的に行うための強力なツールとして活用されています。
多くの開発者が「いつ EventEmitter を使うべきか」「どのような場面で威力を発揮するのか」という疑問を抱いています。本記事では、EventEmitter の基本概念から実践的な活用場面まで、具体的なコード例とともに詳しく解説していきます。
初心者の方でも理解しやすいよう、段階的に学習を進めていきましょう。実際のプロジェクトで即座に活用できる知識を身につけることで、より柔軟で保守性の高いアプリケーション開発が可能になります。
EventEmitter とは何か
イベント駆動プログラミングの基本概念
EventEmitter を理解するために、まずはイベント駆動プログラミングの基本概念を押さえておきましょう。イベント駆動プログラミングは、プログラムの実行フローがイベントの発生によって制御される設計パラダイムです。
従来の手続き型プログラミングでは、コードが上から下へ順次実行されます。しかし、イベント駆動プログラミングでは、特定のイベントが発生した時点で対応する処理が実行される仕組みになっています。
javascript// 従来の手続き型の例
function processData() {
const data = loadData();
const processed = transformData(data);
saveData(processed);
console.log('処理完了');
}
// イベント駆動の例
const emitter = new EventEmitter();
emitter.on('dataLoaded', (data) => {
const processed = transformData(data);
emitter.emit('dataProcessed', processed);
});
emitter.on('dataProcessed', (processed) => {
saveData(processed);
emitter.emit('processingComplete');
});
emitter.on('processingComplete', () => {
console.log('処理完了');
});
イベント駆動プログラミングの主な利点は以下の通りです。
疎結合性の実現 コンポーネント間の直接的な依存関係を減らし、より柔軟な設計が可能になります。
非同期処理への対応 イベントベースの設計により、非同期処理を自然に扱えるようになります。
拡張性の向上 新しい機能を追加する際、既存のコードを変更することなく、新しいイベントリスナーを追加するだけで対応できます。
Observer パターンとの関係
EventEmitter は、デザインパターンの一つである Observer パターンを実装したものです。Observer パターンは、オブジェクト間の一対多の依存関係を定義し、あるオブジェクトの状態が変化した際に、依存するすべてのオブジェクトに自動的に通知する仕組みです。
# | 要素 | 説明 | EventEmitter での対応 |
---|---|---|---|
1 | Subject(主体) | 状態を持ち、変化を通知する | EventEmitter インスタンス |
2 | Observer(観察者) | 通知を受け取り、処理を実行 | イベントリスナー関数 |
3 | 通知メカニズム | 状態変化の伝達方法 | emit() メソッド |
4 | 購読管理 | Observer の登録・削除 | on()/off() メソッド |
Observer パターンの実装例を見てみましょう。
javascript// Observer パターンの基本実装
class NewsAgency {
constructor() {
this.observers = [];
this.news = '';
}
addObserver(observer) {
this.observers.push(observer);
}
removeObserver(observer) {
this.observers = this.observers.filter(
(obs) => obs !== observer
);
}
setNews(news) {
this.news = news;
this.notifyObservers();
}
notifyObservers() {
this.observers.forEach((observer) =>
observer.update(this.news)
);
}
}
// EventEmitter を使った同等の実装
const EventEmitter = require('events');
class NewsAgency extends EventEmitter {
setNews(news) {
this.news = news;
this.emit('newsUpdate', news);
}
}
const agency = new NewsAgency();
// 複数の購読者を簡単に追加
agency.on('newsUpdate', (news) => {
console.log(`新聞社A: ${news}`);
});
agency.on('newsUpdate', (news) => {
console.log(`新聞社B: ${news}`);
});
agency.setNews('重要なニュースが発生しました');
EventEmitter を使用することで、Observer パターンの実装が大幅に簡素化されることがわかります。
Node.js における EventEmitter の位置づけ
EventEmitter は Node.js の中核的な機能の一つであり、多くの標準モジュールがこれを基盤として構築されています。Node.js のアーキテクチャにおいて、EventEmitter は以下のような重要な役割を果たしています。
Node.js 標準モジュールでの活用例
javascript// HTTP サーバー
const http = require('http');
const server = http.createServer();
// HTTP サーバーは EventEmitter を継承している
server.on('request', (req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('Hello World');
});
server.on('listening', () => {
console.log('サーバーがポート3000で起動しました');
});
server.on('error', (err) => {
console.error('サーバーエラー:', err);
});
server.listen(3000);
javascript// ファイルシステム
const fs = require('fs');
// ReadStream は EventEmitter を継承
const readStream = fs.createReadStream('large-file.txt');
readStream.on('data', (chunk) => {
console.log(`データを受信: ${chunk.length} バイト`);
});
readStream.on('end', () => {
console.log('ファイル読み込み完了');
});
readStream.on('error', (err) => {
console.error('読み込みエラー:', err);
});
Node.js における EventEmitter の特徴を整理すると以下のようになります。
# | 特徴 | 説明 | 利点 |
---|---|---|---|
1 | 非同期処理との親和性 | イベントループと自然に統合 | ノンブロッキング処理の実現 |
2 | 標準モジュールとの統合 | 多くの API が EventEmitter ベース | 一貫したプログラミングモデル |
3 | メモリ効率 | 軽量な実装 | 大量のイベント処理に適している |
4 | エラーハンドリング | 統一されたエラー処理機構 | 堅牢なアプリケーション構築 |
EventEmitter が活用される主要な領域
Node.js エコシステムにおいて、EventEmitter は以下のような場面で広く活用されています。
- I/O 操作: ファイル読み書き、ネットワーク通信
- ストリーム処理: データの流れを制御する際の状態通知
- HTTP 通信: リクエスト・レスポンスの処理
- リアルタイム通信: WebSocket、Socket.io などでの双方向通信
- プロセス管理: 子プロセスの状態監視
これらの特徴により、EventEmitter は Node.js アプリケーション開発において、柔軟で保守性の高いアーキテクチャを構築するための重要な基盤となっています。
EventEmitter の基本的な使い方
イベントの発行(emit)と購読(on/once)
EventEmitter を効果的に活用するためには、まず基本的な API の使い方を理解することが重要です。最も基本となるのは、イベントの発行(emit)と購読(on/once)の仕組みです。
javascriptconst EventEmitter = require('events');
// EventEmitter インスタンスを作成
const myEmitter = new EventEmitter();
// イベントリスナーを登録(購読)
myEmitter.on('greeting', (name) => {
console.log(`こんにちは、${name}さん!`);
});
// イベントを発行
myEmitter.emit('greeting', '田中');
// 出力: こんにちは、田中さん!
on() メソッドと once() メソッドの違い
on()
メソッドは永続的なリスナーを登録し、once()
メソッドは一度だけ実行されるリスナーを登録します。
javascriptconst emitter = new EventEmitter();
// 永続的なリスナー
emitter.on('message', (msg) => {
console.log(`永続リスナー: ${msg}`);
});
// 一回限りのリスナー
emitter.once('message', (msg) => {
console.log(`一回限りリスナー: ${msg}`);
});
// イベントを複数回発行
emitter.emit('message', '1回目');
emitter.emit('message', '2回目');
emitter.emit('message', '3回目');
/* 出力:
永続リスナー: 1回目
一回限りリスナー: 1回目
永続リスナー: 2回目
永続リスナー: 3回目
*/
複数の引数を渡す方法
emit() メソッドでは、複数の引数をリスナー関数に渡すことができます。
javascriptconst emitter = new EventEmitter();
emitter.on('userAction', (userId, action, timestamp) => {
console.log(
`ユーザー ${userId} が ${action} を実行しました(${timestamp})`
);
});
emitter.emit(
'userAction',
'user123',
'ログイン',
new Date().toISOString()
);
// 出力: ユーザー user123 が ログイン を実行しました(2024-01-15T10:30:00.000Z)
非同期処理との組み合わせ
EventEmitter は非同期処理と自然に組み合わせることができます。
javascriptconst emitter = new EventEmitter();
// 非同期処理を含むリスナー
emitter.on('processData', async (data) => {
console.log('データ処理開始...');
// 模擬的な非同期処理
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log(`データ処理完了: ${data}`);
emitter.emit('dataProcessed', data);
});
emitter.on('dataProcessed', (data) => {
console.log(`処理済みデータを保存: ${data}`);
});
emitter.emit('processData', 'サンプルデータ');
リスナーの管理(addListener/removeListener)
アプリケーションが複雑になるにつれて、イベントリスナーの適切な管理が重要になります。EventEmitter では、リスナーの追加・削除・確認を行うための様々なメソッドが提供されています。
基本的なリスナー管理
javascriptconst EventEmitter = require('events');
const emitter = new EventEmitter();
// リスナー関数を変数として定義
const messageHandler = (msg) => {
console.log(`メッセージ受信: ${msg}`);
};
const errorHandler = (err) => {
console.error(`エラー発生: ${err.message}`);
};
// リスナーを追加(on() と addListener() は同じ)
emitter.addListener('message', messageHandler);
emitter.on('error', errorHandler);
// リスナーの削除
emitter.removeListener('message', messageHandler);
// または
emitter.off('error', errorHandler);
// 特定イベントのすべてのリスナーを削除
emitter.removeAllListeners('message');
// すべてのイベントのリスナーを削除
emitter.removeAllListeners();
リスナーの存在確認と情報取得
javascriptconst emitter = new EventEmitter();
const handler1 = () => console.log('ハンドラー1');
const handler2 = () => console.log('ハンドラー2');
emitter.on('test', handler1);
emitter.on('test', handler2);
emitter.on('other', () => console.log('その他'));
// リスナー数の確認
console.log(
'testイベントのリスナー数:',
emitter.listenerCount('test')
); // 2
// 登録されているイベント名の一覧
console.log('イベント名一覧:', emitter.eventNames()); // ['test', 'other']
// 特定イベントのリスナー一覧
console.log(
'testイベントのリスナー:',
emitter.listeners('test')
);
// リスナーが存在するかチェック
const hasListener = emitter
.listeners('test')
.includes(handler1);
console.log('handler1が登録されているか:', hasListener); // true
リスナー数の制限管理
デフォルトでは、一つのイベントに対して 10 個以上のリスナーを登録すると警告が表示されます。この制限は調整可能です。
javascriptconst emitter = new EventEmitter();
// 現在の最大リスナー数を確認
console.log(
'現在の最大リスナー数:',
emitter.getMaxListeners()
); // 10
// 最大リスナー数を変更
emitter.setMaxListeners(20);
// 無制限にする場合は0を設定
emitter.setMaxListeners(0);
// 多数のリスナーを登録してテスト
for (let i = 0; i < 15; i++) {
emitter.on('test', () => console.log(`リスナー ${i}`));
}
console.log(
'登録されたリスナー数:',
emitter.listenerCount('test')
);
基本的な API の使い分け
EventEmitter には多くのメソッドが用意されており、適切な使い分けが重要です。主要な API とその使用場面を整理しましょう。
イベント登録メソッドの使い分け
# | メソッド | 特徴 | 使用場面 |
---|---|---|---|
1 | on() / addListener() | 永続的なリスナー | 継続的な監視が必要な場合 |
2 | once() | 一回限りのリスナー | 初期化完了、エラー処理など |
3 | prependListener() | リスナーを先頭に追加 | 優先度の高い処理 |
4 | prependOnceListener() | 一回限り+先頭追加 | 重要な一回限り処理 |
javascriptconst emitter = new EventEmitter();
// 通常のリスナー(後から追加)
emitter.on('start', () => console.log('通常の開始処理'));
// 優先度の高いリスナー(先頭に追加)
emitter.prependListener('start', () =>
console.log('優先度の高い開始処理')
);
// 一回限りの優先処理
emitter.prependOnceListener('start', () =>
console.log('初回のみの優先処理')
);
emitter.emit('start');
/* 出力:
初回のみの優先処理
優先度の高い開始処理
通常の開始処理
*/
emitter.emit('start');
/* 出力:
優先度の高い開始処理
通常の開始処理
*/
エラーハンドリングの特別な扱い
error
イベントは特別な扱いを受けます。リスナーが登録されていない状態でerror
イベントが発行されると、プロセスがクラッシュします。
javascriptconst emitter = new EventEmitter();
// エラーハンドラーを登録しない場合
// emitter.emit('error', new Error('テストエラー')); // プロセスクラッシュ
// 適切なエラーハンドリング
emitter.on('error', (err) => {
console.error('エラーをキャッチしました:', err.message);
});
// 安全にエラーを発行
emitter.emit('error', new Error('テストエラー'));
同期と非同期の実行順序
EventEmitter のリスナーは同期的に実行されます。非同期処理が必要な場合は、明示的に非同期にする必要があります。
javascriptconst emitter = new EventEmitter();
emitter.on('test', () => {
console.log('同期リスナー1');
});
emitter.on('test', () => {
console.log('同期リスナー2');
});
emitter.on('test', () => {
// 非同期で実行したい場合
setImmediate(() => {
console.log('非同期リスナー');
});
});
console.log('イベント発行前');
emitter.emit('test');
console.log('イベント発行後');
/* 出力:
イベント発行前
同期リスナー1
同期リスナー2
イベント発行後
非同期リスナー
*/
戻り値の活用
一部のメソッドは戻り値を返し、メソッドチェーンが可能です。
javascriptconst emitter = new EventEmitter();
// メソッドチェーンを活用
emitter
.on('data', (data) => console.log('データ:', data))
.on('end', () => console.log('終了'))
.on('error', (err) => console.error('エラー:', err));
// emit() は boolean を返す(リスナーが存在したかどうか)
const hasListeners = emitter.emit('data', 'テストデータ');
console.log('リスナーが実行されたか:', hasListeners); // true
const noListeners = emitter.emit('unknown', 'データ');
console.log('リスナーが実行されたか:', noListeners); // false
これらの基本的な API を適切に使い分けることで、効率的で保守性の高い EventEmitter ベースのアプリケーションを構築できます。次のセクションでは、これらの知識を基に実践的な使用場面を見ていきましょう。
実践的な使用場面
EventEmitter の基本的な使い方を理解したところで、実際の開発現場でどのような場面で活用されるのかを具体的に見ていきましょう。ここでは、特に効果的な 3 つの使用場面を詳しく解説します。
ファイル処理の進捗通知
大きなファイルを処理する際、処理の進捗をリアルタイムで通知する仕組みを EventEmitter で実装できます。これにより、ユーザーに適切なフィードバックを提供できます。
javascriptconst EventEmitter = require('events');
const fs = require('fs');
const path = require('path');
class FileProcessor extends EventEmitter {
constructor() {
super();
this.processedBytes = 0;
this.totalBytes = 0;
}
async processFile(filePath) {
try {
// ファイルサイズを取得
const stats = await fs.promises.stat(filePath);
this.totalBytes = stats.size;
this.emit('start', {
fileName: path.basename(filePath),
totalBytes: this.totalBytes,
});
// ファイルを読み込みながら処理
const readStream = fs.createReadStream(filePath, {
highWaterMark: 1024 * 16, // 16KB chunks
});
readStream.on('data', (chunk) => {
this.processedBytes += chunk.length;
// 進捗を計算
const progress = Math.round(
(this.processedBytes / this.totalBytes) * 100
);
// 進捗イベントを発行
this.emit('progress', {
processedBytes: this.processedBytes,
totalBytes: this.totalBytes,
percentage: progress,
chunk: chunk,
});
// 実際の処理(例:データ変換)
this.processChunk(chunk);
});
readStream.on('end', () => {
this.emit('complete', {
fileName: path.basename(filePath),
totalBytes: this.processedBytes,
});
});
readStream.on('error', (error) => {
this.emit('error', error);
});
} catch (error) {
this.emit('error', error);
}
}
processChunk(chunk) {
// 実際のデータ処理ロジック
// 例:文字列の変換、データの集計など
return chunk.toString().toUpperCase();
}
}
// 使用例
const processor = new FileProcessor();
processor.on('start', (info) => {
console.log(
`処理開始: ${info.fileName} (${info.totalBytes} bytes)`
);
});
processor.on('progress', (progress) => {
// プログレスバーの表示
const bar = '='.repeat(
Math.floor(progress.percentage / 2)
);
const empty = ' '.repeat(50 - bar.length);
process.stdout.write(
`\r[${bar}${empty}] ${progress.percentage}%`
);
});
processor.on('complete', (info) => {
console.log(`\n処理完了: ${info.fileName}`);
});
processor.on('error', (error) => {
console.error('エラーが発生しました:', error.message);
});
// ファイル処理を実行
processor.processFile('./large-data.txt');
より高度な進捗管理システム
複数ファイルの一括処理や、処理の一時停止・再開機能を含む例も見てみましょう。
javascriptclass AdvancedFileProcessor extends EventEmitter {
constructor() {
super();
this.queue = [];
this.isProcessing = false;
this.isPaused = false;
this.currentFile = null;
}
addFile(filePath) {
this.queue.push(filePath);
this.emit('fileAdded', {
filePath,
queueLength: this.queue.length,
});
if (!this.isProcessing) {
this.processQueue();
}
}
pause() {
this.isPaused = true;
this.emit('paused');
}
resume() {
this.isPaused = false;
this.emit('resumed');
if (!this.isProcessing && this.queue.length > 0) {
this.processQueue();
}
}
async processQueue() {
if (
this.isProcessing ||
this.isPaused ||
this.queue.length === 0
) {
return;
}
this.isProcessing = true;
this.emit('queueStarted', {
totalFiles: this.queue.length,
});
while (this.queue.length > 0 && !this.isPaused) {
const filePath = this.queue.shift();
this.currentFile = filePath;
try {
await this.processSingleFile(filePath);
} catch (error) {
this.emit('fileError', { filePath, error });
}
}
this.isProcessing = false;
this.currentFile = null;
if (this.queue.length === 0) {
this.emit('queueComplete');
}
}
async processSingleFile(filePath) {
return new Promise((resolve, reject) => {
const processor = new FileProcessor();
processor.on('start', (info) => {
this.emit('fileStart', { ...info, filePath });
});
processor.on('progress', (progress) => {
this.emit('fileProgress', {
...progress,
filePath,
});
});
processor.on('complete', (info) => {
this.emit('fileComplete', { ...info, filePath });
resolve();
});
processor.on('error', reject);
processor.processFile(filePath);
});
}
}
WebSocket サーバーでのリアルタイム通信
WebSocket を使ったリアルタイム通信において、EventEmitter は接続管理やメッセージの配信制御に威力を発揮します。
javascriptconst EventEmitter = require('events');
const WebSocket = require('ws');
class RealtimeServer extends EventEmitter {
constructor(port = 8080) {
super();
this.port = port;
this.clients = new Map();
this.rooms = new Map();
this.server = null;
}
start() {
this.server = new WebSocket.Server({ port: this.port });
this.server.on('connection', (ws, req) => {
const clientId = this.generateClientId();
// クライアント情報を保存
this.clients.set(clientId, {
ws,
id: clientId,
rooms: new Set(),
lastActivity: Date.now(),
});
this.emit('clientConnected', {
clientId,
totalClients: this.clients.size,
});
// メッセージハンドリング
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
this.handleMessage(clientId, message);
} catch (error) {
this.emit('messageError', {
clientId,
error,
data,
});
}
});
// 切断処理
ws.on('close', () => {
this.handleDisconnect(clientId);
});
// エラー処理
ws.on('error', (error) => {
this.emit('clientError', { clientId, error });
});
// 接続確認メッセージを送信
this.sendToClient(clientId, {
type: 'connected',
clientId,
timestamp: Date.now(),
});
});
this.emit('serverStarted', { port: this.port });
}
handleMessage(clientId, message) {
const client = this.clients.get(clientId);
if (!client) return;
client.lastActivity = Date.now();
switch (message.type) {
case 'joinRoom':
this.joinRoom(clientId, message.roomId);
break;
case 'leaveRoom':
this.leaveRoom(clientId, message.roomId);
break;
case 'roomMessage':
this.broadcastToRoom(
message.roomId,
{
type: 'message',
from: clientId,
content: message.content,
timestamp: Date.now(),
},
clientId
);
break;
case 'privateMessage':
this.sendToClient(message.targetId, {
type: 'privateMessage',
from: clientId,
content: message.content,
timestamp: Date.now(),
});
break;
default:
this.emit('unknownMessage', { clientId, message });
}
}
joinRoom(clientId, roomId) {
const client = this.clients.get(clientId);
if (!client) return;
// ルームが存在しない場合は作成
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
this.emit('roomCreated', { roomId });
}
// クライアントをルームに追加
this.rooms.get(roomId).add(clientId);
client.rooms.add(roomId);
this.emit('clientJoinedRoom', {
clientId,
roomId,
roomSize: this.rooms.get(roomId).size,
});
// ルーム参加通知を送信
this.sendToClient(clientId, {
type: 'joinedRoom',
roomId,
timestamp: Date.now(),
});
// 他のメンバーに通知
this.broadcastToRoom(
roomId,
{
type: 'memberJoined',
clientId,
timestamp: Date.now(),
},
clientId
);
}
leaveRoom(clientId, roomId) {
const client = this.clients.get(clientId);
if (!client || !this.rooms.has(roomId)) return;
this.rooms.get(roomId).delete(clientId);
client.rooms.delete(roomId);
// ルームが空になった場合は削除
if (this.rooms.get(roomId).size === 0) {
this.rooms.delete(roomId);
this.emit('roomDeleted', { roomId });
}
this.emit('clientLeftRoom', { clientId, roomId });
// 他のメンバーに通知
this.broadcastToRoom(roomId, {
type: 'memberLeft',
clientId,
timestamp: Date.now(),
});
}
broadcastToRoom(roomId, message, excludeClientId = null) {
const room = this.rooms.get(roomId);
if (!room) return;
let sentCount = 0;
room.forEach((clientId) => {
if (clientId !== excludeClientId) {
if (this.sendToClient(clientId, message)) {
sentCount++;
}
}
});
this.emit('messageBroadcast', {
roomId,
message,
sentCount,
excludeClientId,
});
}
sendToClient(clientId, message) {
const client = this.clients.get(clientId);
if (
!client ||
client.ws.readyState !== WebSocket.OPEN
) {
return false;
}
try {
client.ws.send(JSON.stringify(message));
return true;
} catch (error) {
this.emit('sendError', { clientId, error });
return false;
}
}
handleDisconnect(clientId) {
const client = this.clients.get(clientId);
if (!client) return;
// 参加していたルームから削除
client.rooms.forEach((roomId) => {
this.leaveRoom(clientId, roomId);
});
this.clients.delete(clientId);
this.emit('clientDisconnected', {
clientId,
totalClients: this.clients.size,
});
}
generateClientId() {
return `client_${Date.now()}_${Math.random()
.toString(36)
.substr(2, 9)}`;
}
// サーバー統計情報
getStats() {
return {
totalClients: this.clients.size,
totalRooms: this.rooms.size,
roomDetails: Array.from(this.rooms.entries()).map(
([roomId, clients]) => ({
roomId,
memberCount: clients.size,
})
),
};
}
}
// 使用例
const server = new RealtimeServer(8080);
// サーバーイベントの監視
server.on('serverStarted', (info) => {
console.log(
`WebSocketサーバーがポート${info.port}で開始されました`
);
});
server.on('clientConnected', (info) => {
console.log(
`クライアント接続: ${info.clientId} (総数: ${info.totalClients})`
);
});
server.on('clientJoinedRoom', (info) => {
console.log(
`${info.clientId} がルーム ${info.roomId} に参加 (ルームサイズ: ${info.roomSize})`
);
});
server.on('messageBroadcast', (info) => {
console.log(
`ルーム ${info.roomId} に ${info.sentCount} 人にメッセージを配信`
);
});
server.start();
カスタムクラスへの組み込み
既存のクラスに EventEmitter の機能を組み込むことで、状態変化の通知や非同期処理の制御を効率的に行えます。
javascriptconst EventEmitter = require('events');
// ユーザー管理システムの例
class UserManager extends EventEmitter {
constructor() {
super();
this.users = new Map();
this.sessions = new Map();
this.loginAttempts = new Map();
}
async createUser(userData) {
const { email, password, name } = userData;
// バリデーション
if (this.findUserByEmail(email)) {
const error = new Error(
'このメールアドレスは既に使用されています'
);
this.emit('userCreationFailed', { email, error });
throw error;
}
// ユーザー作成開始を通知
this.emit('userCreationStarted', { email });
try {
// パスワードハッシュ化(模擬)
const hashedPassword = await this.hashPassword(
password
);
const user = {
id: this.generateUserId(),
email,
name,
password: hashedPassword,
createdAt: new Date(),
isActive: true,
loginCount: 0,
lastLogin: null,
};
this.users.set(user.id, user);
// ユーザー作成成功を通知
this.emit('userCreated', {
userId: user.id,
email: user.email,
name: user.name,
});
return {
id: user.id,
email: user.email,
name: user.name,
};
} catch (error) {
this.emit('userCreationFailed', { email, error });
throw error;
}
}
async authenticateUser(email, password) {
const user = this.findUserByEmail(email);
// ログイン試行を記録
this.recordLoginAttempt(email);
if (!user) {
this.emit('loginFailed', {
email,
reason: 'USER_NOT_FOUND',
});
throw new Error('ユーザーが見つかりません');
}
if (!user.isActive) {
this.emit('loginFailed', {
email,
reason: 'USER_INACTIVE',
});
throw new Error('アカウントが無効化されています');
}
// パスワード検証
const isValidPassword = await this.verifyPassword(
password,
user.password
);
if (!isValidPassword) {
this.emit('loginFailed', {
email,
reason: 'INVALID_PASSWORD',
attemptCount: this.getLoginAttemptCount(email),
});
throw new Error('パスワードが正しくありません');
}
// ログイン成功処理
const sessionId = this.createSession(user.id);
user.loginCount++;
user.lastLogin = new Date();
// ログイン試行回数をリセット
this.loginAttempts.delete(email);
this.emit('loginSuccessful', {
userId: user.id,
email: user.email,
sessionId,
loginCount: user.loginCount,
});
return { sessionId, user: this.sanitizeUser(user) };
}
logout(sessionId) {
const session = this.sessions.get(sessionId);
if (!session) {
this.emit('logoutFailed', {
sessionId,
reason: 'INVALID_SESSION',
});
return false;
}
const user = this.users.get(session.userId);
this.sessions.delete(sessionId);
this.emit('logoutSuccessful', {
userId: session.userId,
email: user?.email,
sessionId,
});
return true;
}
deactivateUser(userId, reason = 'MANUAL') {
const user = this.users.get(userId);
if (!user) {
this.emit('userDeactivationFailed', {
userId,
reason: 'USER_NOT_FOUND',
});
return false;
}
user.isActive = false;
user.deactivatedAt = new Date();
user.deactivationReason = reason;
// ユーザーのすべてのセッションを無効化
const userSessions = Array.from(
this.sessions.entries()
).filter(([_, session]) => session.userId === userId);
userSessions.forEach(([sessionId]) => {
this.sessions.delete(sessionId);
});
this.emit('userDeactivated', {
userId,
email: user.email,
reason,
activeSessions: userSessions.length,
});
return true;
}
// ヘルパーメソッド
findUserByEmail(email) {
return Array.from(this.users.values()).find(
(user) => user.email === email
);
}
recordLoginAttempt(email) {
const attempts = this.loginAttempts.get(email) || 0;
this.loginAttempts.set(email, attempts + 1);
// 試行回数が多い場合は警告
if (attempts + 1 >= 5) {
this.emit('suspiciousLoginActivity', {
email,
attemptCount: attempts + 1,
});
}
}
getLoginAttemptCount(email) {
return this.loginAttempts.get(email) || 0;
}
createSession(userId) {
const sessionId = `session_${Date.now()}_${Math.random()
.toString(36)
.substr(2, 9)}`;
this.sessions.set(sessionId, {
userId,
createdAt: new Date(),
lastActivity: new Date(),
});
return sessionId;
}
sanitizeUser(user) {
const { password, ...safeUser } = user;
return safeUser;
}
generateUserId() {
return `user_${Date.now()}_${Math.random()
.toString(36)
.substr(2, 9)}`;
}
async hashPassword(password) {
// 実際の開発では bcrypt などを使用
return `hashed_${password}`;
}
async verifyPassword(password, hashedPassword) {
// 実際の開発では bcrypt.compare などを使用
return `hashed_${password}` === hashedPassword;
}
// 統計情報
getStats() {
const activeUsers = Array.from(
this.users.values()
).filter((user) => user.isActive).length;
const activeSessions = this.sessions.size;
const suspiciousEmails = Array.from(
this.loginAttempts.entries()
).filter(([_, count]) => count >= 3).length;
return {
totalUsers: this.users.size,
activeUsers,
activeSessions,
suspiciousEmails,
};
}
}
// 使用例とイベント監視
const userManager = new UserManager();
// セキュリティ関連のイベント監視
userManager.on('suspiciousLoginActivity', (info) => {
console.warn(
`⚠️ 不審なログイン活動: ${info.email} (${info.attemptCount}回試行)`
);
// 実際のシステムでは、ここでアラート送信やIP制限などを行う
});
userManager.on('loginFailed', (info) => {
console.log(
`❌ ログイン失敗: ${info.email} - ${info.reason}`
);
});
userManager.on('loginSuccessful', (info) => {
console.log(
`✅ ログイン成功: ${info.email} (${info.loginCount}回目)`
);
});
userManager.on('userCreated', (info) => {
console.log(
`👤 新規ユーザー作成: ${info.name} (${info.email})`
);
});
userManager.on('userDeactivated', (info) => {
console.log(
`🚫 ユーザー無効化: ${info.email} - ${info.reason}`
);
});
// 使用例
async function demonstrateUserManager() {
try {
// ユーザー作成
const user = await userManager.createUser({
email: 'test@example.com',
password: 'password123',
name: '田中太郎',
});
// ログイン
const session = await userManager.authenticateUser(
'test@example.com',
'password123'
);
console.log('セッション作成:', session.sessionId);
// 統計情報表示
console.log('システム統計:', userManager.getStats());
} catch (error) {
console.error('エラー:', error.message);
}
}
demonstrateUserManager();
これらの実践例を通じて、EventEmitter がいかに柔軟で強力な仕組みを提供するかがお分かりいただけたでしょう。次のセクションでは、より大規模なアーキテクチャ設計での活用方法を見ていきます。
EventEmitter を活用したアーキテクチャ設計
EventEmitter の真の力は、大規模なアプリケーションにおいて疎結合で拡張性の高いアーキテクチャを構築する際に発揮されます。ここでは、実際のプロジェクトで活用できる設計パターンを詳しく解説します。
疎結合なモジュール設計
従来の密結合な設計では、モジュール間が直接依存し合うため、変更の影響が広範囲に及びます。EventEmitter を活用することで、モジュール間の依存関係を大幅に削減できます。
従来の密結合な設計例
javascript// 密結合な設計の問題例
class OrderService {
constructor(
emailService,
inventoryService,
paymentService
) {
this.emailService = emailService;
this.inventoryService = inventoryService;
this.paymentService = paymentService;
}
async processOrder(orderData) {
try {
// 在庫確認
await this.inventoryService.reserveItems(
orderData.items
);
// 決済処理
await this.paymentService.processPayment(
orderData.payment
);
// メール送信
await this.emailService.sendConfirmation(
orderData.customer
);
return { success: true };
} catch (error) {
// エラー処理が複雑になる
throw error;
}
}
}
EventEmitter を使った疎結合な設計
javascriptconst EventEmitter = require('events');
// 中央イベントバス
class EventBus extends EventEmitter {
constructor() {
super();
this.setMaxListeners(0); // 制限を無効化
}
}
const eventBus = new EventBus();
// 注文サービス(コアロジックのみに集中)
class OrderService {
constructor(eventBus) {
this.eventBus = eventBus;
this.orders = new Map();
}
async processOrder(orderData) {
const orderId = this.generateOrderId();
try {
// 注文データを保存
this.orders.set(orderId, {
...orderData,
id: orderId,
status: 'processing',
createdAt: new Date(),
});
// 注文処理開始イベントを発行
this.eventBus.emit('order.created', {
orderId,
orderData,
timestamp: Date.now(),
});
return { orderId, status: 'processing' };
} catch (error) {
this.eventBus.emit('order.failed', {
orderId,
orderData,
error: error.message,
timestamp: Date.now(),
});
throw error;
}
}
completeOrder(orderId) {
const order = this.orders.get(orderId);
if (order) {
order.status = 'completed';
order.completedAt = new Date();
this.eventBus.emit('order.completed', {
orderId,
order,
timestamp: Date.now(),
});
}
}
generateOrderId() {
return `order_${Date.now()}_${Math.random()
.toString(36)
.substr(2, 9)}`;
}
}
// 在庫サービス(独立したモジュール)
class InventoryService {
constructor(eventBus) {
this.eventBus = eventBus;
this.inventory = new Map();
this.reservations = new Map();
// 注文イベントを監視
this.eventBus.on(
'order.created',
this.handleOrderCreated.bind(this)
);
}
async handleOrderCreated(data) {
const { orderId, orderData } = data;
try {
// 在庫確認と予約
const reservationResult = await this.reserveItems(
orderData.items
);
this.reservations.set(orderId, reservationResult);
this.eventBus.emit('inventory.reserved', {
orderId,
items: orderData.items,
reservationId: reservationResult.id,
timestamp: Date.now(),
});
} catch (error) {
this.eventBus.emit('inventory.reservation_failed', {
orderId,
error: error.message,
timestamp: Date.now(),
});
}
}
async reserveItems(items) {
// 在庫予約ロジック
const reservationId = `res_${Date.now()}`;
for (const item of items) {
const currentStock =
this.inventory.get(item.productId) || 0;
if (currentStock < item.quantity) {
throw new Error(`在庫不足: ${item.productId}`);
}
}
// 在庫を減らす
items.forEach((item) => {
const currentStock = this.inventory.get(
item.productId
);
this.inventory.set(
item.productId,
currentStock - item.quantity
);
});
return { id: reservationId, items };
}
// 初期在庫設定
setStock(productId, quantity) {
this.inventory.set(productId, quantity);
}
}
// 決済サービス(独立したモジュール)
class PaymentService {
constructor(eventBus) {
this.eventBus = eventBus;
this.payments = new Map();
// 在庫予約完了イベントを監視
this.eventBus.on(
'inventory.reserved',
this.handleInventoryReserved.bind(this)
);
}
async handleInventoryReserved(data) {
const { orderId } = data;
try {
// 決済処理を実行
const paymentResult = await this.processPayment(
orderId
);
this.payments.set(orderId, paymentResult);
this.eventBus.emit('payment.completed', {
orderId,
paymentId: paymentResult.id,
amount: paymentResult.amount,
timestamp: Date.now(),
});
} catch (error) {
this.eventBus.emit('payment.failed', {
orderId,
error: error.message,
timestamp: Date.now(),
});
}
}
async processPayment(orderId) {
// 模擬決済処理
await new Promise((resolve) =>
setTimeout(resolve, 1000)
);
return {
id: `pay_${Date.now()}`,
orderId,
amount: Math.floor(Math.random() * 10000) + 1000,
status: 'completed',
};
}
}
// メール通知サービス(独立したモジュール)
class EmailService {
constructor(eventBus) {
this.eventBus = eventBus;
this.sentEmails = [];
// 決済完了イベントを監視
this.eventBus.on(
'payment.completed',
this.handlePaymentCompleted.bind(this)
);
this.eventBus.on(
'order.failed',
this.handleOrderFailed.bind(this)
);
}
async handlePaymentCompleted(data) {
const { orderId } = data;
try {
await this.sendConfirmationEmail(orderId);
this.eventBus.emit('email.sent', {
orderId,
type: 'confirmation',
timestamp: Date.now(),
});
// 注文完了を通知
this.eventBus.emit('order.ready_to_complete', {
orderId,
});
} catch (error) {
this.eventBus.emit('email.failed', {
orderId,
error: error.message,
timestamp: Date.now(),
});
}
}
async handleOrderFailed(data) {
const { orderId } = data;
try {
await this.sendErrorNotification(orderId);
} catch (error) {
console.error('エラー通知メール送信失敗:', error);
}
}
async sendConfirmationEmail(orderId) {
// 模擬メール送信
await new Promise((resolve) =>
setTimeout(resolve, 500)
);
this.sentEmails.push({
orderId,
type: 'confirmation',
sentAt: new Date(),
});
console.log(
`✉️ 注文確認メールを送信しました: ${orderId}`
);
}
async sendErrorNotification(orderId) {
await new Promise((resolve) =>
setTimeout(resolve, 300)
);
this.sentEmails.push({
orderId,
type: 'error',
sentAt: new Date(),
});
console.log(
`⚠️ エラー通知メールを送信しました: ${orderId}`
);
}
}
// オーケストレーター(全体の流れを制御)
class OrderOrchestrator {
constructor(eventBus, orderService) {
this.eventBus = eventBus;
this.orderService = orderService;
// 最終完了イベントを監視
this.eventBus.on(
'order.ready_to_complete',
this.handleReadyToComplete.bind(this)
);
}
handleReadyToComplete(data) {
const { orderId } = data;
this.orderService.completeOrder(orderId);
}
}
// システム全体の初期化と使用例
function initializeSystem() {
const eventBus = new EventBus();
// 各サービスを初期化
const orderService = new OrderService(eventBus);
const inventoryService = new InventoryService(eventBus);
const paymentService = new PaymentService(eventBus);
const emailService = new EmailService(eventBus);
const orchestrator = new OrderOrchestrator(
eventBus,
orderService
);
// 在庫を初期化
inventoryService.setStock('product1', 100);
inventoryService.setStock('product2', 50);
// イベント監視(デバッグ用)
const events = [
'order.created',
'order.completed',
'order.failed',
'inventory.reserved',
'inventory.reservation_failed',
'payment.completed',
'payment.failed',
'email.sent',
'email.failed',
];
events.forEach((eventName) => {
eventBus.on(eventName, (data) => {
console.log(`📡 ${eventName}:`, data);
});
});
return { orderService, eventBus };
}
// 使用例
const { orderService } = initializeSystem();
// 注文処理を実行
orderService.processOrder({
customer: {
email: 'customer@example.com',
name: '田中太郎',
},
items: [
{ productId: 'product1', quantity: 2 },
{ productId: 'product2', quantity: 1 },
],
payment: {
method: 'credit_card',
cardNumber: '**** **** **** 1234',
},
});
プラグインシステムの構築
EventEmitter を使用してプラグインシステムを構築することで、機能を動的に追加・削除できる拡張性の高いアプリケーションを作成できます。
javascriptconst EventEmitter = require('events');
// プラグインマネージャー
class PluginManager extends EventEmitter {
constructor() {
super();
this.plugins = new Map();
this.hooks = new Map();
this.middleware = [];
}
// プラグインの登録
register(pluginName, plugin) {
if (this.plugins.has(pluginName)) {
throw new Error(
`プラグイン "${pluginName}" は既に登録されています`
);
}
// プラグインの初期化
const pluginInstance = new plugin(this);
this.plugins.set(pluginName, pluginInstance);
this.emit('plugin.registered', {
pluginName,
plugin: pluginInstance,
timestamp: Date.now(),
});
console.log(
`🔌 プラグイン "${pluginName}" を登録しました`
);
return pluginInstance;
}
// プラグインの削除
unregister(pluginName) {
const plugin = this.plugins.get(pluginName);
if (!plugin) {
throw new Error(
`プラグイン "${pluginName}" が見つかりません`
);
}
// プラグインの終了処理
if (typeof plugin.destroy === 'function') {
plugin.destroy();
}
this.plugins.delete(pluginName);
this.emit('plugin.unregistered', {
pluginName,
timestamp: Date.now(),
});
console.log(
`🔌 プラグイン "${pluginName}" を削除しました`
);
}
// フックポイントの実行
async executeHook(hookName, data = {}) {
const hookHandlers = this.hooks.get(hookName) || [];
this.emit('hook.executing', {
hookName,
handlerCount: hookHandlers.length,
data,
timestamp: Date.now(),
});
let result = data;
for (const handler of hookHandlers) {
try {
const newResult = await handler(result);
if (newResult !== undefined) {
result = newResult;
}
} catch (error) {
this.emit('hook.error', {
hookName,
handler,
error: error.message,
timestamp: Date.now(),
});
}
}
this.emit('hook.executed', {
hookName,
originalData: data,
finalResult: result,
timestamp: Date.now(),
});
return result;
}
// フックハンドラーの登録
addHook(hookName, handler) {
if (!this.hooks.has(hookName)) {
this.hooks.set(hookName, []);
}
this.hooks.get(hookName).push(handler);
}
// ミドルウェアの追加
use(middleware) {
this.middleware.push(middleware);
}
// ミドルウェアの実行
async executeMiddleware(context) {
let result = context;
for (const middleware of this.middleware) {
try {
result = await middleware(result);
} catch (error) {
this.emit('middleware.error', {
middleware,
error: error.message,
timestamp: Date.now(),
});
}
}
return result;
}
// 登録されたプラグインの一覧
getPlugins() {
return Array.from(this.plugins.keys());
}
// フックの一覧
getHooks() {
return Array.from(this.hooks.keys());
}
}
// プラグインの基底クラス
class BasePlugin {
constructor(pluginManager) {
this.pluginManager = pluginManager;
this.name = this.constructor.name;
this.initialize();
}
initialize() {
// サブクラスでオーバーライド
}
destroy() {
// サブクラスでオーバーライド
}
}
// ログ出力プラグインの例
class LoggingPlugin extends BasePlugin {
initialize() {
console.log('📝 ログ出力プラグインを初期化しました');
// 全てのイベントをログ出力
this.originalEmit = this.pluginManager.emit;
this.pluginManager.emit = (eventName, ...args) => {
console.log(`🔔 イベント発生: ${eventName}`, args);
return this.originalEmit.call(
this.pluginManager,
eventName,
...args
);
};
// フック処理を追加
this.pluginManager.addHook(
'request.start',
this.logRequestStart.bind(this)
);
this.pluginManager.addHook(
'request.end',
this.logRequestEnd.bind(this)
);
}
logRequestStart(data) {
console.log(
`🚀 リクエスト開始: ${data.method} ${data.url}`
);
return { ...data, startTime: Date.now() };
}
logRequestEnd(data) {
const duration = Date.now() - data.startTime;
console.log(
`✅ リクエスト完了: ${data.method} ${data.url} (${duration}ms)`
);
return data;
}
destroy() {
// 元の emit メソッドを復元
this.pluginManager.emit = this.originalEmit;
console.log('📝 ログ出力プラグインを終了しました');
}
}
// キャッシュプラグインの例
class CachePlugin extends BasePlugin {
initialize() {
this.cache = new Map();
this.ttl = 60000; // 1分
console.log('💾 キャッシュプラグインを初期化しました');
// キャッシュ関連のフックを追加
this.pluginManager.addHook(
'request.before',
this.checkCache.bind(this)
);
this.pluginManager.addHook(
'response.after',
this.saveCache.bind(this)
);
// 定期的なキャッシュクリーンアップ
this.cleanupInterval = setInterval(
this.cleanup.bind(this),
30000
);
}
checkCache(data) {
const cacheKey = this.generateCacheKey(data);
const cached = this.cache.get(cacheKey);
if (
cached &&
Date.now() - cached.timestamp < this.ttl
) {
console.log(`💾 キャッシュヒット: ${cacheKey}`);
return {
...data,
cached: true,
response: cached.data,
};
}
return data;
}
saveCache(data) {
if (!data.cached && data.response) {
const cacheKey = this.generateCacheKey(data);
this.cache.set(cacheKey, {
data: data.response,
timestamp: Date.now(),
});
console.log(`💾 キャッシュ保存: ${cacheKey}`);
}
return data;
}
generateCacheKey(data) {
return `${data.method}_${data.url}`;
}
cleanup() {
const now = Date.now();
let cleanedCount = 0;
for (const [key, value] of this.cache.entries()) {
if (now - value.timestamp >= this.ttl) {
this.cache.delete(key);
cleanedCount++;
}
}
if (cleanedCount > 0) {
console.log(
`🧹 期限切れキャッシュを${cleanedCount}件削除しました`
);
}
}
destroy() {
clearInterval(this.cleanupInterval);
this.cache.clear();
console.log('💾 キャッシュプラグインを終了しました');
}
}
// 認証プラグインの例
class AuthPlugin extends BasePlugin {
initialize() {
this.sessions = new Map();
console.log('🔐 認証プラグインを初期化しました');
// 認証チェックのミドルウェアを追加
this.pluginManager.use(this.authMiddleware.bind(this));
// 認証関連のフックを追加
this.pluginManager.addHook(
'request.before',
this.validateAuth.bind(this)
);
}
authMiddleware(context) {
// 認証が不要なパスをスキップ
const publicPaths = ['/login', '/register', '/health'];
if (publicPaths.includes(context.url)) {
return context;
}
const token = context.headers?.authorization;
if (!token) {
throw new Error('認証トークンが必要です');
}
const session = this.sessions.get(token);
if (!session || Date.now() > session.expiresAt) {
throw new Error('無効または期限切れのトークンです');
}
return { ...context, user: session.user };
}
validateAuth(data) {
if (data.user) {
console.log(
`🔐 認証済みユーザー: ${data.user.email}`
);
}
return data;
}
// セッション作成(実際の認証システムから呼び出される)
createSession(user) {
const token = `token_${Date.now()}_${Math.random()
.toString(36)
.substr(2, 9)}`;
this.sessions.set(token, {
user,
createdAt: Date.now(),
expiresAt: Date.now() + 24 * 60 * 60 * 1000, // 24時間
});
return token;
}
destroy() {
this.sessions.clear();
console.log('🔐 認証プラグインを終了しました');
}
}
// 使用例
async function demonstratePluginSystem() {
const pluginManager = new PluginManager();
// プラグインイベントの監視
pluginManager.on('plugin.registered', (data) => {
console.log(`✅ プラグイン登録: ${data.pluginName}`);
});
// プラグインを登録
const loggingPlugin = pluginManager.register(
'logging',
LoggingPlugin
);
const cachePlugin = pluginManager.register(
'cache',
CachePlugin
);
const authPlugin = pluginManager.register(
'auth',
AuthPlugin
);
// 模擬的なリクエスト処理
async function processRequest(requestData) {
try {
// ミドルウェアを実行
let context = await pluginManager.executeMiddleware(
requestData
);
// リクエスト開始フックを実行
context = await pluginManager.executeHook(
'request.before',
context
);
// キャッシュがある場合はそれを返す
if (context.cached) {
return context.response;
}
// リクエスト開始ログ
context = await pluginManager.executeHook(
'request.start',
context
);
// 実際の処理(模擬)
await new Promise((resolve) =>
setTimeout(resolve, 100)
);
const response = {
data: `処理結果: ${context.url}`,
timestamp: Date.now(),
};
context.response = response;
// レスポンス後処理
context = await pluginManager.executeHook(
'response.after',
context
);
// リクエスト終了ログ
await pluginManager.executeHook(
'request.end',
context
);
return response;
} catch (error) {
console.error(
'❌ リクエスト処理エラー:',
error.message
);
throw error;
}
}
// テスト用セッションを作成
const token = authPlugin.createSession({
id: 'user123',
email: 'test@example.com',
});
// リクエストをテスト
const requests = [
{
method: 'GET',
url: '/api/users',
headers: { authorization: token },
},
{
method: 'GET',
url: '/api/users', // 同じリクエスト(キャッシュされるはず)
headers: { authorization: token },
},
{
method: 'POST',
url: '/api/posts',
headers: { authorization: token },
},
];
for (const request of requests) {
try {
console.log('\n--- リクエスト処理開始 ---');
const response = await processRequest(request);
console.log('レスポンス:', response);
console.log('--- リクエスト処理完了 ---\n');
} catch (error) {
console.error('リクエスト失敗:', error.message);
}
}
// プラグイン情報を表示
console.log('\n📊 システム情報:');
console.log(
'登録プラグイン:',
pluginManager.getPlugins()
);
console.log('利用可能フック:', pluginManager.getHooks());
}
demonstratePluginSystem();
このように EventEmitter を活用することで、モジュール間の依存関係を最小限に抑え、機能を動的に追加・削除できる柔軟なアーキテクチャを構築できます。次のセクションでは、マイクロサービス間の通信での活用方法を見ていきましょう。
マイクロサービス間の通信
EventEmitter は単一のプロセス内でのイベント処理に最適化されていますが、マイクロサービス間の通信においても、メッセージブローカーと組み合わせることで強力な分散イベントシステムを構築できます。
javascriptconst EventEmitter = require('events');
// 分散イベントバス(Redis Pub/Sub を模擬)
class DistributedEventBus extends EventEmitter {
constructor(serviceId) {
super();
this.serviceId = serviceId;
this.subscribers = new Map();
this.publishedEvents = [];
this.remoteServices = new Set();
}
// 他のサービスとの接続を模擬
connectToService(otherService) {
this.remoteServices.add(otherService);
otherService.remoteServices.add(this);
console.log(
`🔗 ${this.serviceId} が ${otherService.serviceId} に接続しました`
);
}
// ローカルイベントの発行
emit(eventName, data) {
// ローカルリスナーに通知
super.emit(eventName, data);
// リモートサービスに配信
this.publishToRemoteServices(eventName, data);
return true;
}
// リモートサービスへの配信
publishToRemoteServices(eventName, data) {
const message = {
eventName,
data,
sourceService: this.serviceId,
timestamp: Date.now(),
messageId: `msg_${Date.now()}_${Math.random()
.toString(36)
.substr(2, 9)}`,
};
this.publishedEvents.push(message);
// 他のサービスに配信
this.remoteServices.forEach((service) => {
service.receiveRemoteEvent(message);
});
}
// リモートイベントの受信
receiveRemoteEvent(message) {
const { eventName, data, sourceService, messageId } =
message;
// 重複配信チェック(簡易版)
if (
this.publishedEvents.some(
(event) => event.messageId === messageId
)
) {
return;
}
console.log(
`📨 ${this.serviceId} が ${sourceService} からイベントを受信: ${eventName}`
);
// ローカルイベントとして発行
super.emit(`remote.${eventName}`, {
...data,
_remote: true,
_sourceService: sourceService,
});
}
// サービス固有のイベントを購読
subscribeToService(serviceId, eventName, handler) {
const remoteEventName = `remote.${eventName}`;
this.on(remoteEventName, (data) => {
if (data._sourceService === serviceId) {
handler(data);
}
});
}
}
// ユーザーサービス
class UserService {
constructor(eventBus) {
this.eventBus = eventBus;
this.users = new Map();
// 外部サービスからのイベントを監視
this.eventBus.on(
'remote.order.created',
this.handleOrderCreated.bind(this)
);
}
async createUser(userData) {
const userId = `user_${Date.now()}`;
const user = {
id: userId,
...userData,
createdAt: new Date(),
};
this.users.set(userId, user);
// ユーザー作成イベントを発行
this.eventBus.emit('user.created', {
userId,
email: user.email,
name: user.name,
});
console.log(`👤 ユーザーを作成しました: ${user.name}`);
return user;
}
async updateUserProfile(userId, updates) {
const user = this.users.get(userId);
if (!user) {
throw new Error('ユーザーが見つかりません');
}
Object.assign(user, updates, { updatedAt: new Date() });
this.eventBus.emit('user.updated', {
userId,
updates,
user: this.sanitizeUser(user),
});
return user;
}
handleOrderCreated(data) {
const { userId, orderId } = data;
console.log(
`📦 ユーザー ${userId} の注文 ${orderId} を受信しました`
);
// ユーザーの注文履歴を更新
const user = this.users.get(userId);
if (user) {
if (!user.orderHistory) {
user.orderHistory = [];
}
user.orderHistory.push({
orderId,
createdAt: new Date(),
});
}
}
sanitizeUser(user) {
const { password, ...safeUser } = user;
return safeUser;
}
}
// 注文サービス
class OrderService {
constructor(eventBus) {
this.eventBus = eventBus;
this.orders = new Map();
// 外部サービスからのイベントを監視
this.eventBus.on(
'remote.user.created',
this.handleUserCreated.bind(this)
);
this.eventBus.on(
'remote.payment.completed',
this.handlePaymentCompleted.bind(this)
);
}
async createOrder(orderData) {
const orderId = `order_${Date.now()}`;
const order = {
id: orderId,
...orderData,
status: 'pending',
createdAt: new Date(),
};
this.orders.set(orderId, order);
// 注文作成イベントを発行
this.eventBus.emit('order.created', {
orderId,
userId: order.userId,
items: order.items,
total: order.total,
});
console.log(`📦 注文を作成しました: ${orderId}`);
return order;
}
handleUserCreated(data) {
const { userId, email } = data;
console.log(
`👤 新規ユーザー ${email} (${userId}) が作成されました`
);
// 新規ユーザー向けの特典を提供
this.eventBus.emit('promotion.new_user', {
userId,
promotionType: 'welcome_discount',
discountPercent: 10,
});
}
handlePaymentCompleted(data) {
const { orderId, paymentId } = data;
const order = this.orders.get(orderId);
if (order) {
order.status = 'paid';
order.paymentId = paymentId;
order.paidAt = new Date();
this.eventBus.emit('order.paid', {
orderId,
userId: order.userId,
paymentId,
});
console.log(
`💰 注文 ${orderId} の支払いが完了しました`
);
}
}
}
// 決済サービス
class PaymentService {
constructor(eventBus) {
this.eventBus = eventBus;
this.payments = new Map();
// 注文イベントを監視
this.eventBus.on(
'remote.order.created',
this.handleOrderCreated.bind(this)
);
}
async processPayment(paymentData) {
const paymentId = `payment_${Date.now()}`;
const payment = {
id: paymentId,
...paymentData,
status: 'processing',
createdAt: new Date(),
};
this.payments.set(paymentId, payment);
try {
// 決済処理を模擬
await new Promise((resolve) =>
setTimeout(resolve, 1000)
);
payment.status = 'completed';
payment.completedAt = new Date();
// 決済完了イベントを発行
this.eventBus.emit('payment.completed', {
paymentId,
orderId: payment.orderId,
amount: payment.amount,
userId: payment.userId,
});
console.log(`💰 決済が完了しました: ${paymentId}`);
return payment;
} catch (error) {
payment.status = 'failed';
payment.error = error.message;
this.eventBus.emit('payment.failed', {
paymentId,
orderId: payment.orderId,
error: error.message,
});
throw error;
}
}
handleOrderCreated(data) {
const { orderId, userId, total } = data;
// 自動的に決済処理を開始
setTimeout(() => {
this.processPayment({
orderId,
userId,
amount: total,
method: 'credit_card',
});
}, 500);
}
}
// 通知サービス
class NotificationService {
constructor(eventBus) {
this.eventBus = eventBus;
this.notifications = [];
// 各種イベントを監視
this.eventBus.on(
'remote.user.created',
this.handleUserCreated.bind(this)
);
this.eventBus.on(
'remote.order.paid',
this.handleOrderPaid.bind(this)
);
this.eventBus.on(
'remote.promotion.new_user',
this.handleNewUserPromotion.bind(this)
);
}
async sendNotification(userId, message, type = 'info') {
const notification = {
id: `notif_${Date.now()}`,
userId,
message,
type,
createdAt: new Date(),
read: false,
};
this.notifications.push(notification);
console.log(`📧 通知送信: ${message}`);
this.eventBus.emit('notification.sent', {
notificationId: notification.id,
userId,
type,
});
return notification;
}
handleUserCreated(data) {
const { userId, name } = data;
this.sendNotification(
userId,
`${name}さん、ご登録ありがとうございます!`,
'welcome'
);
}
handleOrderPaid(data) {
const { userId, orderId } = data;
this.sendNotification(
userId,
`注文 ${orderId} の支払いが完了しました。`,
'payment'
);
}
handleNewUserPromotion(data) {
const { userId, discountPercent } = data;
this.sendNotification(
userId,
`新規登録特典として${discountPercent}%割引クーポンをプレゼント!`,
'promotion'
);
}
}
// マイクロサービスシステムの初期化
function initializeMicroservices() {
// 各サービスのイベントバスを作成
const userEventBus = new DistributedEventBus(
'UserService'
);
const orderEventBus = new DistributedEventBus(
'OrderService'
);
const paymentEventBus = new DistributedEventBus(
'PaymentService'
);
const notificationEventBus = new DistributedEventBus(
'NotificationService'
);
// サービス間の接続を確立
userEventBus.connectToService(orderEventBus);
userEventBus.connectToService(notificationEventBus);
orderEventBus.connectToService(paymentEventBus);
orderEventBus.connectToService(notificationEventBus);
paymentEventBus.connectToService(orderEventBus);
paymentEventBus.connectToService(notificationEventBus);
// 各サービスを初期化
const userService = new UserService(userEventBus);
const orderService = new OrderService(orderEventBus);
const paymentService = new PaymentService(
paymentEventBus
);
const notificationService = new NotificationService(
notificationEventBus
);
return {
userService,
orderService,
paymentService,
notificationService,
};
}
// 使用例
async function demonstrateMicroservices() {
const services = initializeMicroservices();
try {
// ユーザー作成
const user = await services.userService.createUser({
name: '田中太郎',
email: 'tanaka@example.com',
password: 'password123',
});
// 少し待ってから注文作成
setTimeout(async () => {
await services.orderService.createOrder({
userId: user.id,
items: [
{
productId: 'product1',
quantity: 2,
price: 1000,
},
{
productId: 'product2',
quantity: 1,
price: 500,
},
],
total: 2500,
});
}, 1000);
} catch (error) {
console.error('エラー:', error.message);
}
}
demonstrateMicroservices();
エラーハンドリングとデバッグ
EventEmitter を使用したアプリケーションでは、適切なエラーハンドリングとデバッグ手法が重要です。特に非同期処理が多い環境では、エラーの追跡が困難になりがちです。
error イベントの適切な処理
EventEmitter においてerror
イベントは特別な扱いを受けます。適切に処理しないとプロセスがクラッシュするため、必ずエラーハンドラーを設定しましょう。
javascriptconst EventEmitter = require('events');
// エラーハンドリングを含む基底クラス
class SafeEventEmitter extends EventEmitter {
constructor() {
super();
// デフォルトエラーハンドラーを設定
this.on('error', this.defaultErrorHandler.bind(this));
}
defaultErrorHandler(error) {
console.error(
`💥 EventEmitter エラー [${this.constructor.name}]:`,
error
);
// エラー詳細をログ出力
console.error('スタックトレース:', error.stack);
console.error(
'エラー発生時刻:',
new Date().toISOString()
);
// 必要に応じてエラー通知やログ送信を行う
this.handleCriticalError(error);
}
handleCriticalError(error) {
// 実際のアプリケーションでは、ここでログサービスへの送信や
// アラート通知などを行う
console.warn('🚨 クリティカルエラーが発生しました');
}
// 安全なemit(エラーを内部でキャッチ)
safeEmit(eventName, ...args) {
try {
return this.emit(eventName, ...args);
} catch (error) {
this.emit(
'error',
new Error(
`イベント '${eventName}' の処理中にエラー: ${error.message}`
)
);
return false;
}
}
// 非同期リスナーのエラーをキャッチ
addAsyncListener(eventName, asyncHandler) {
this.on(eventName, async (...args) => {
try {
await asyncHandler(...args);
} catch (error) {
this.emit(
'error',
new Error(
`非同期リスナー '${eventName}' でエラー: ${error.message}`
)
);
}
});
}
}
// エラー追跡機能付きのサービス例
class RobustFileProcessor extends SafeEventEmitter {
constructor() {
super();
this.processedFiles = [];
this.errorLog = [];
// 各種エラータイプに対応
this.on('fileError', this.handleFileError.bind(this));
this.on(
'validationError',
this.handleValidationError.bind(this)
);
this.on(
'networkError',
this.handleNetworkError.bind(this)
);
}
async processFile(filePath) {
const processId = `process_${Date.now()}`;
try {
// ファイル存在チェック
await this.validateFile(filePath);
this.safeEmit('processingStarted', {
processId,
filePath,
});
// ファイル処理を実行
const result = await this.performFileProcessing(
filePath
);
this.processedFiles.push({
processId,
filePath,
result,
completedAt: new Date(),
});
this.safeEmit('processingCompleted', {
processId,
filePath,
result,
});
return result;
} catch (error) {
// エラータイプに応じて適切なイベントを発行
if (error.code === 'ENOENT') {
this.emit('fileError', {
processId,
filePath,
error,
type: 'FILE_NOT_FOUND',
});
} else if (error.name === 'ValidationError') {
this.emit('validationError', {
processId,
filePath,
error,
});
} else if (error.code === 'ECONNREFUSED') {
this.emit('networkError', {
processId,
filePath,
error,
});
} else {
this.emit('error', error);
}
throw error;
}
}
async validateFile(filePath) {
const fs = require('fs').promises;
try {
const stats = await fs.stat(filePath);
if (!stats.isFile()) {
const error = new Error(
'指定されたパスはファイルではありません'
);
error.name = 'ValidationError';
throw error;
}
if (stats.size === 0) {
const error = new Error('ファイルが空です');
error.name = 'ValidationError';
throw error;
}
} catch (error) {
if (error.code === 'ENOENT') {
error.message = 'ファイルが見つかりません';
}
throw error;
}
}
async performFileProcessing(filePath) {
// 模擬的なファイル処理
await new Promise((resolve) =>
setTimeout(resolve, 1000)
);
// ランダムにエラーを発生させる(テスト用)
if (Math.random() < 0.3) {
const error = new Error(
'処理中にエラーが発生しました'
);
error.code = 'PROCESSING_ERROR';
throw error;
}
return {
processedAt: new Date(),
size: Math.floor(Math.random() * 10000),
checksum: Math.random().toString(36).substr(2, 16),
};
}
handleFileError(data) {
const { processId, filePath, error, type } = data;
this.errorLog.push({
processId,
filePath,
errorType: 'FILE_ERROR',
subType: type,
message: error.message,
timestamp: new Date(),
});
console.error(
`📁 ファイルエラー [${type}]: ${filePath} - ${error.message}`
);
}
handleValidationError(data) {
const { processId, filePath, error } = data;
this.errorLog.push({
processId,
filePath,
errorType: 'VALIDATION_ERROR',
message: error.message,
timestamp: new Date(),
});
console.error(
`✅ バリデーションエラー: ${filePath} - ${error.message}`
);
}
handleNetworkError(data) {
const { processId, filePath, error } = data;
this.errorLog.push({
processId,
filePath,
errorType: 'NETWORK_ERROR',
message: error.message,
timestamp: new Date(),
});
console.error(
`🌐 ネットワークエラー: ${filePath} - ${error.message}`
);
// ネットワークエラーの場合はリトライを試行
this.scheduleRetry(processId, filePath);
}
scheduleRetry(processId, filePath) {
console.log(
`🔄 ${filePath} のリトライを30秒後に実行します`
);
setTimeout(() => {
console.log(`🔄 リトライ実行: ${filePath}`);
this.processFile(filePath).catch((error) => {
console.error(
'リトライも失敗しました:',
error.message
);
});
}, 30000);
}
getErrorSummary() {
const summary = {
totalErrors: this.errorLog.length,
errorsByType: {},
recentErrors: this.errorLog.slice(-10),
};
this.errorLog.forEach((error) => {
const type = error.errorType;
summary.errorsByType[type] =
(summary.errorsByType[type] || 0) + 1;
});
return summary;
}
}
メモリリークの防止策
EventEmitter を長時間実行するアプリケーションで使用する場合、リスナーの適切な管理によってメモリリークを防ぐ必要があります。
javascript// メモリリーク監視機能付きEventEmitter
class MemoryAwareEventEmitter extends EventEmitter {
constructor() {
super();
this.listenerStats = new Map();
this.maxListenerWarningThreshold = 10;
// 定期的にリスナー統計を更新
this.statsInterval = setInterval(
this.updateListenerStats.bind(this),
30000
);
// プロセス終了時のクリーンアップ
process.on('beforeExit', this.cleanup.bind(this));
}
on(eventName, listener) {
const result = super.on(eventName, listener);
this.trackListener(eventName, 'add');
return result;
}
off(eventName, listener) {
const result = super.off(eventName, listener);
this.trackListener(eventName, 'remove');
return result;
}
removeAllListeners(eventName) {
if (eventName) {
const count = this.listenerCount(eventName);
this.trackListener(eventName, 'removeAll', count);
} else {
// 全イベントのリスナーを削除
this.eventNames().forEach((name) => {
const count = this.listenerCount(name);
this.trackListener(name, 'removeAll', count);
});
}
return super.removeAllListeners(eventName);
}
trackListener(eventName, action, count = 1) {
if (!this.listenerStats.has(eventName)) {
this.listenerStats.set(eventName, {
added: 0,
removed: 0,
current: 0,
maxConcurrent: 0,
lastActivity: new Date(),
});
}
const stats = this.listenerStats.get(eventName);
stats.lastActivity = new Date();
switch (action) {
case 'add':
stats.added++;
stats.current++;
stats.maxConcurrent = Math.max(
stats.maxConcurrent,
stats.current
);
break;
case 'remove':
stats.removed++;
stats.current = Math.max(0, stats.current - 1);
break;
case 'removeAll':
stats.removed += count;
stats.current = 0;
break;
}
// 警告チェック
if (stats.current > this.maxListenerWarningThreshold) {
console.warn(
`⚠️ イベント '${eventName}' のリスナー数が多すぎます: ${stats.current}`
);
console.warn(
'メモリリークの可能性があります。リスナーの削除を確認してください。'
);
}
}
updateListenerStats() {
console.log('\n📊 EventEmitter 統計情報:');
for (const [
eventName,
stats,
] of this.listenerStats.entries()) {
const currentCount = this.listenerCount(eventName);
// 実際のリスナー数と統計の整合性をチェック
if (currentCount !== stats.current) {
console.warn(
`⚠️ 統計不整合: ${eventName} (実際: ${currentCount}, 統計: ${stats.current})`
);
stats.current = currentCount;
}
console.log(` ${eventName}:`);
console.log(` 現在のリスナー数: ${stats.current}`);
console.log(
` 最大同時リスナー数: ${stats.maxConcurrent}`
);
console.log(` 追加回数: ${stats.added}`);
console.log(` 削除回数: ${stats.removed}`);
console.log(
` 最終活動: ${stats.lastActivity.toISOString()}`
);
}
// メモリ使用量も表示
const memUsage = process.memoryUsage();
console.log('\n💾 メモリ使用量:');
console.log(
` RSS: ${Math.round(memUsage.rss / 1024 / 1024)} MB`
);
console.log(
` Heap Used: ${Math.round(
memUsage.heapUsed / 1024 / 1024
)} MB`
);
console.log(
` Heap Total: ${Math.round(
memUsage.heapTotal / 1024 / 1024
)} MB`
);
}
cleanup() {
clearInterval(this.statsInterval);
this.removeAllListeners();
console.log('🧹 EventEmitter をクリーンアップしました');
}
// リーク検出のためのヘルパーメソッド
detectPotentialLeaks() {
const suspiciousEvents = [];
for (const [
eventName,
stats,
] of this.listenerStats.entries()) {
const currentCount = this.listenerCount(eventName);
const leakRatio =
stats.added > 0
? (stats.added - stats.removed) / stats.added
: 0;
if (currentCount > 20 || leakRatio > 0.8) {
suspiciousEvents.push({
eventName,
currentListeners: currentCount,
leakRatio: Math.round(leakRatio * 100),
totalAdded: stats.added,
totalRemoved: stats.removed,
});
}
}
return suspiciousEvents;
}
}
デバッグ手法とツール
EventEmitter ベースのアプリケーションをデバッグするための実用的な手法を紹介します。
javascript// デバッグ機能付きEventEmitter
class DebuggableEventEmitter extends EventEmitter {
constructor(debugOptions = {}) {
super();
this.debugEnabled = debugOptions.enabled || false;
this.logLevel = debugOptions.logLevel || 'info';
this.eventHistory = [];
this.maxHistorySize =
debugOptions.maxHistorySize || 1000;
if (this.debugEnabled) {
this.enableDebugMode();
}
}
enableDebugMode() {
console.log('🐛 デバッグモードが有効になりました');
// emit メソッドをラップしてログ出力
const originalEmit = this.emit;
this.emit = (eventName, ...args) => {
const timestamp = new Date();
const eventData = {
eventName,
args,
timestamp,
listenerCount: this.listenerCount(eventName),
};
// イベント履歴に追加
this.eventHistory.push(eventData);
if (this.eventHistory.length > this.maxHistorySize) {
this.eventHistory.shift();
}
// ログ出力
this.logEvent('emit', eventData);
// 元のemitを実行
const result = originalEmit.call(
this,
eventName,
...args
);
// 結果をログ出力
this.logEvent('emitted', {
...eventData,
hasListeners: result,
});
return result;
};
// on メソッドをラップ
const originalOn = this.on;
this.on = (eventName, listener) => {
this.logEvent('listener_added', {
eventName,
listenerName: listener.name || 'anonymous',
totalListeners: this.listenerCount(eventName) + 1,
});
return originalOn.call(this, eventName, listener);
};
// off メソッドをラップ
const originalOff = this.off;
this.off = (eventName, listener) => {
this.logEvent('listener_removed', {
eventName,
listenerName: listener.name || 'anonymous',
totalListeners: Math.max(
0,
this.listenerCount(eventName) - 1
),
});
return originalOff.call(this, eventName, listener);
};
}
logEvent(action, data) {
if (!this.debugEnabled) return;
const logLevels = {
debug: 0,
info: 1,
warn: 2,
error: 3,
};
const currentLevel = logLevels[this.logLevel] || 1;
let logMethod = console.log;
let prefix = '🔔';
let level = 'info';
switch (action) {
case 'emit':
prefix = '📤';
level = 'debug';
break;
case 'emitted':
prefix = data.hasListeners ? '✅' : '❌';
level = data.hasListeners ? 'debug' : 'warn';
break;
case 'listener_added':
prefix = '➕';
level = 'debug';
break;
case 'listener_removed':
prefix = '➖';
level = 'debug';
break;
}
if (logLevels[level] >= currentLevel) {
const timestamp = data.timestamp
? data.timestamp.toISOString()
: new Date().toISOString();
switch (level) {
case 'warn':
logMethod = console.warn;
break;
case 'error':
logMethod = console.error;
break;
default:
logMethod = console.log;
}
logMethod(
`${prefix} [${timestamp}] ${action}:`,
data
);
}
}
// デバッグ情報の取得
getDebugInfo() {
return {
eventNames: this.eventNames(),
listenerCounts: this.eventNames().reduce(
(acc, name) => {
acc[name] = this.listenerCount(name);
return acc;
},
{}
),
recentEvents: this.eventHistory.slice(-10),
totalEventsEmitted: this.eventHistory.length,
debugEnabled: this.debugEnabled,
};
}
// イベント履歴の分析
analyzeEventHistory() {
const analysis = {
eventFrequency: {},
timeRange: {
start: null,
end: null,
},
averageListeners: {},
unusedEvents: [],
};
if (this.eventHistory.length === 0) {
return analysis;
}
analysis.timeRange.start =
this.eventHistory[0].timestamp;
analysis.timeRange.end =
this.eventHistory[
this.eventHistory.length - 1
].timestamp;
// イベント頻度の計算
this.eventHistory.forEach((event) => {
const { eventName, listenerCount } = event;
if (!analysis.eventFrequency[eventName]) {
analysis.eventFrequency[eventName] = 0;
analysis.averageListeners[eventName] = [];
}
analysis.eventFrequency[eventName]++;
analysis.averageListeners[eventName].push(
listenerCount
);
});
// 平均リスナー数の計算
Object.keys(analysis.averageListeners).forEach(
(eventName) => {
const counts = analysis.averageListeners[eventName];
analysis.averageListeners[eventName] =
counts.reduce((sum, count) => sum + count, 0) /
counts.length;
}
);
// 使用されていないイベントの検出
this.eventNames().forEach((eventName) => {
if (!analysis.eventFrequency[eventName]) {
analysis.unusedEvents.push(eventName);
}
});
return analysis;
}
// パフォーマンス測定
measurePerformance(eventName, iterations = 1000) {
console.log(
`⏱️ ${eventName} のパフォーマンス測定開始 (${iterations}回)`
);
const startTime = process.hrtime.bigint();
for (let i = 0; i < iterations; i++) {
this.emit(eventName, {
iteration: i,
timestamp: Date.now(),
});
}
const endTime = process.hrtime.bigint();
const duration = Number(endTime - startTime) / 1000000; // ナノ秒をミリ秒に変換
const results = {
eventName,
iterations,
totalTime: duration,
averageTime: duration / iterations,
eventsPerSecond: (iterations / duration) * 1000,
};
console.log('📊 パフォーマンス結果:');
console.log(
` 総実行時間: ${results.totalTime.toFixed(2)} ms`
);
console.log(
` 平均実行時間: ${results.averageTime.toFixed(4)} ms`
);
console.log(
` 秒間実行回数: ${results.eventsPerSecond.toFixed(
0
)} events/sec`
);
return results;
}
}
// 使用例
const debugEmitter = new DebuggableEventEmitter({
enabled: true,
logLevel: 'debug',
maxHistorySize: 500,
});
// テスト用のリスナーを追加
debugEmitter.on('test', (data) => {
console.log('テストイベント受信:', data);
});
debugEmitter.on('performance', () => {
// パフォーマンステスト用の軽い処理
});
// イベントを発行してテスト
debugEmitter.emit('test', { message: 'Hello World' });
debugEmitter.emit('unknown', { message: 'リスナーなし' });
// パフォーマンス測定
debugEmitter.measurePerformance('performance', 10000);
// デバッグ情報を表示
setTimeout(() => {
console.log('\n🔍 デバッグ情報:');
console.log(
JSON.stringify(debugEmitter.getDebugInfo(), null, 2)
);
console.log('\n📈 イベント履歴分析:');
console.log(
JSON.stringify(
debugEmitter.analyzeEventHistory(),
null,
2
)
);
}, 1000);
まとめ
本記事では、Node.js の EventEmitter について、基本概念から実践的な活用方法まで幅広く解説してきました。
EventEmitter の本質的な価値
EventEmitter は単なるイベント処理の仕組みを超えて、アプリケーション設計における強力な抽象化ツールとしての価値を持っています。Observer パターンの実装により、コンポーネント間の疎結合を実現し、変更に強い柔軟なアーキテクチャを構築できます。
イベント駆動プログラミングの採用により、従来の手続き型プログラミングでは困難だった複雑な非同期処理や状態管理を、より直感的で保守しやすい形で実装できるようになります。
実践での活用場面
ファイル処理の進捗通知では、長時間実行される処理において、ユーザーに適切なフィードバックを提供する仕組みを構築できました。WebSocket サーバーでのリアルタイム通信では、接続管理やメッセージ配信の複雑な制御を効率的に行う方法を学びました。
カスタムクラスへの組み込みでは、既存のビジネスロジックに EventEmitter の機能を統合することで、状態変化の監視や非同期処理の制御を自然に実装する手法を習得しました。
アーキテクチャ設計での威力
疎結合なモジュール設計では、従来の密結合な設計の問題点を解決し、各モジュールが独立して動作する柔軟なシステムを構築する方法を学びました。プラグインシステムの構築では、機能を動的に追加・削除できる拡張性の高いアプリケーションを作成する技術を身につけました。
マイクロサービス間の通信では、分散システムにおいてもイベント駆動アーキテクチャの恩恵を受けられることを確認しました。
エラーハンドリングとデバッグの重要性
EventEmitter を使用したアプリケーションでは、適切なエラーハンドリングが特に重要になります。error
イベントの特別な性質を理解し、メモリリークの防止策を講じることで、安定したプロダクション環境での運用が可能になります。
デバッグ手法とツールの活用により、複雑なイベントフローを持つアプリケーションでも、効率的な問題解決が可能になります。
今後の発展方向
EventEmitter をマスターした次のステップとして、RxJS のような Reactive Programming ライブラリや、Redux のような状態管理ライブラリとの組み合わせを検討することをお勧めします。また、TypeScript との組み合わせにより、型安全なイベント処理システムを構築することも可能です。
実際のプロジェクトでは、パフォーマンス要件やスケーラビリティ要件に応じて、適切な設計パターンを選択することが重要です。EventEmitter は強力なツールですが、すべての問題の解決策ではありません。適切な場面で適切に使用することで、その真価を発揮します。
継続的な学習を通じて、より高度で実用的なイベント駆動アプリケーションの開発スキルを向上させていきましょう。
関連リンク
- article
Dify と外部 API 連携:Webhook・Zapier・REST API 活用法Dify と外部 API 連携:Webhook・Zapier・REST API 活用法
- article
ESLint の自動修正(--fix)の活用事例と注意点
- article
Playwright MCP で大規模 E2E テストを爆速並列化する方法
- article
Storybook × TypeScript で型安全な UI 開発を極める
- article
Zustand でユーザー認証情報を安全に管理する設計パターン
- article
Node.js × TypeScript:バックエンド開発での型活用テクニック
- review
人類はなぜ地球を支配できた?『サピエンス全史 上巻』ユヴァル・ノア・ハラリが解き明かす驚愕の真実
- review
え?世界はこんなに良くなってた!『FACTFULNESS』ハンス・ロスリングが暴く 10 の思い込みの正体
- review
瞬時に答えが出る脳に変身!『ゼロ秒思考』赤羽雄二が贈る思考力爆上げトレーニング
- review
関西弁のゾウに人生変えられた!『夢をかなえるゾウ 1』水野敬也が教えてくれた成功の本質
- review
「なぜ私の考えは浅いのか?」の答えがここに『「具体 ⇄ 抽象」トレーニング』細谷功
- review
もうプレーヤー思考は卒業!『リーダーの仮面』安藤広大で掴んだマネジャー成功の極意