Node.js のストリーム入門:大量データ処理の基礎

Node.js でアプリケーションを開発していると、大量のデータを効率的に処理する必要に迫られることがあります。従来のメモリベースの処理では、巨大なファイルやリアルタイムデータを扱う際にメモリ不足やパフォーマンスの問題が発生しがちです。
そんな課題を解決する強力な仕組みが「ストリーム」です。ストリームを理解することで、メモリ効率的で高速なデータ処理が可能になり、Node.js アプリケーションの品質が格段に向上します。
本記事では、Node.js ストリームの基本概念から実装方法まで、初心者の方にもわかりやすく解説していきます。
Node.js ストリームとは何か
Node.js におけるストリームとは、データを小さなチャンク(塊)に分割して、連続的に処理する仕組みのことです。水道管を流れる水のように、データが途切れることなく流れ続けるイメージで理解していただけるでしょう。
ストリームの基本的な概念
ストリームは、以下の特徴を持つデータ処理のパラダイムです。
javascript// 従来の方法(非効率)
const fs = require('fs');
// ファイル全体をメモリに読み込む
const data = fs.readFileSync('large-file.txt');
console.log(data.toString());
// 大きなファイルの場合、メモリ不足が発生する可能性
上記のコードは、ファイル全体をメモリに読み込んでから処理を行います。これに対して、ストリームを使用した場合は以下のようになります。
javascript// ストリームを使用した方法(効率的)
const fs = require('fs');
// ストリームでファイルを読み込む
const readStream = fs.createReadStream('large-file.txt');
readStream.on('data', (chunk) => {
// データのチャンクごとに処理
console.log(
`受信したチャンクサイズ: ${chunk.length} bytes`
);
process.stdout.write(chunk);
});
readStream.on('end', () => {
console.log('\nファイル読み込み完了');
});
ストリームの利点
利点 | 説明 | 具体例 |
---|---|---|
メモリ効率 | データを小さなチャンクで処理するため、大量データでもメモリ使用量が一定 | 10GB のログファイルを数 MB のメモリで処理 |
リアルタイム処理 | データが到着次第、即座に処理を開始 | チャットアプリケーションのメッセージ配信 |
パイプライン処理 | 複数の処理を連鎖させて効率的なデータフロー構築 | 読み込み → 変換 → 圧縮 → 書き込み |
バックプレッシャー | 処理速度の違いを自動調整 | 高速データ生成と低速データ処理の調和 |
ストリームが解決する問題
Node.js でデータ処理を行う際に直面する主要な問題と、ストリームによる解決策を見ていきましょう。
メモリ制限の問題
Node.js のデフォルトのメモリ制限は約 1.4GB です。大きなファイルを一度にメモリに読み込もうとすると、この制限に引っかかります。
javascript// 問題のあるコード例
const fs = require('fs');
try {
// 2GB のファイルを読み込もうとする
const largeData = fs.readFileSync('huge-dataset.json');
const parsed = JSON.parse(largeData.toString());
console.log(parsed);
} catch (error) {
console.error('メモリ不足エラー:', error.message);
// "JavaScript heap out of memory" エラーが発生
}
ストリームを使用することで、この問題を解決できます。
javascript// ストリームによる解決例
const fs = require('fs');
const { Transform } = require('stream');
// JSON Lines 形式のファイルを行ごとに処理
const lineProcessor = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
lines.forEach((line) => {
if (line.trim()) {
try {
const jsonData = JSON.parse(line);
// 各行を個別に処理
this.push(jsonData);
} catch (error) {
console.error(
'JSON パースエラー:',
error.message
);
}
}
});
callback();
},
});
fs.createReadStream('huge-dataset.jsonl')
.pipe(lineProcessor)
.on('data', (data) => {
// 各 JSON オブジェクトを処理
console.log('処理中のデータ:', data.id);
})
.on('end', () => {
console.log('全データの処理が完了しました');
});
レスポンス時間の問題
従来の方法では、すべてのデータが準備できるまでユーザーは待機する必要があります。ストリームを使用することで、部分的なデータでも即座にレスポンスを開始できます。
javascript// Express.js でのストリーミングレスポンス例
const express = require('express');
const fs = require('fs');
const app = express();
app.get('/download/:filename', (req, res) => {
const filename = req.params.filename;
const filePath = `./files/${filename}`;
// ファイルの存在確認
if (!fs.existsSync(filePath)) {
return res.status(404).send('ファイルが見つかりません');
}
// ストリーミングダウンロード
const readStream = fs.createReadStream(filePath);
// 適切なヘッダーを設定
res.setHeader('Content-Type', 'application/octet-stream');
res.setHeader(
'Content-Disposition',
`attachment; filename="${filename}"`
);
// ストリームをレスポンスにパイプ
readStream.pipe(res);
// エラーハンドリング
readStream.on('error', (error) => {
console.error('ファイル読み込みエラー:', error);
res.status(500).send('ファイル読み込みに失敗しました');
});
});
app.listen(3000, () => {
console.log('サーバーがポート 3000 で起動しました');
});
リソース使用効率の問題
複数の処理を同時に実行する際、ストリームを使用することでシステムリソースを効率的に活用できます。
javascript// 複数ファイルの並列処理例
const fs = require('fs');
const path = require('path');
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
class FileProcessor {
static async processMultipleFiles(inputDir, outputDir) {
const files = await fs.promises.readdir(inputDir);
const txtFiles = files.filter(
(file) => path.extname(file) === '.txt'
);
// 各ファイルを並列で処理
const promises = txtFiles.map(async (filename) => {
const inputPath = path.join(inputDir, filename);
const outputPath = path.join(
outputDir,
`processed-${filename}`
);
// テキスト変換ストリーム
const textTransformer = new Transform({
transform(chunk, encoding, callback) {
// 文字列を大文字に変換
const upperCaseText = chunk
.toString()
.toUpperCase();
callback(null, upperCaseText);
},
});
try {
await pipeline(
fs.createReadStream(inputPath),
textTransformer,
fs.createWriteStream(outputPath)
);
console.log(`処理完了: ${filename}`);
return { success: true, filename };
} catch (error) {
console.error(
`処理エラー ${filename}:`,
error.message
);
return {
success: false,
filename,
error: error.message,
};
}
});
const results = await Promise.all(promises);
return results;
}
}
// 使用例
async function main() {
try {
const results =
await FileProcessor.processMultipleFiles(
'./input',
'./output'
);
const successful = results.filter((r) => r.success);
const failed = results.filter((r) => !r.success);
console.log(
`成功: ${successful.length}件, 失敗: ${failed.length}件`
);
} catch (error) {
console.error('処理中にエラーが発生:', error);
}
}
main();
4 つのストリームタイプの詳細
Node.js では、用途に応じて 4 つの異なるタイプのストリームが提供されています。それぞれの特徴と使用場面を詳しく見ていきましょう。
Readable ストリーム
Readable ストリームは、データソースからデータを読み取るためのストリームです。ファイル読み込み、HTTP リクエストの受信、データベースからのデータ取得などに使用されます。
基本的な使用方法
javascriptconst fs = require('fs');
const { Readable } = require('stream');
// ファイルから読み取るReadableストリーム
const fileStream = fs.createReadStream('data.txt', {
encoding: 'utf8', // 文字エンコーディング
highWaterMark: 1024, // バッファサイズ(1KB)
start: 0, // 読み取り開始位置
end: 100, // 読み取り終了位置
});
// データイベントの監視
fileStream.on('data', (chunk) => {
console.log(`受信データ: ${chunk.length} bytes`);
console.log('内容:', chunk);
});
// 読み取り完了イベント
fileStream.on('end', () => {
console.log('ファイル読み取り完了');
});
// エラーイベント
fileStream.on('error', (error) => {
console.error('読み取りエラー:', error.message);
});
カスタム Readable ストリームの作成
独自のデータソースから読み取るストリームを作成することができます。
javascriptconst { Readable } = require('stream');
class NumberGenerator extends Readable {
constructor(options = {}) {
super(options);
this.currentNumber = 1;
this.maxNumber = options.max || 10;
}
_read() {
if (this.currentNumber <= this.maxNumber) {
// データをストリームにプッシュ
const data = `数値: ${this.currentNumber}\n`;
this.push(data);
this.currentNumber++;
} else {
// データの終了を示す
this.push(null);
}
}
}
// 使用例
const numberStream = new NumberGenerator({ max: 5 });
numberStream.on('data', (chunk) => {
process.stdout.write(chunk);
});
numberStream.on('end', () => {
console.log('数値生成完了');
});
実用的な Readable ストリーム例
API からのデータ取得を例に、実用的な使用方法を見てみましょう。
javascriptconst https = require('https');
const { Readable } = require('stream');
class ApiDataStream extends Readable {
constructor(apiUrl, options = {}) {
super(options);
this.apiUrl = apiUrl;
this.pageNumber = 1;
this.maxPages = options.maxPages || 5;
this.fetching = false;
}
_read() {
if (this.fetching || this.pageNumber > this.maxPages) {
return;
}
this.fetching = true;
const url = `${this.apiUrl}?page=${this.pageNumber}`;
https
.get(url, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
try {
const jsonData = JSON.parse(data);
// データをストリームにプッシュ
this.push(JSON.stringify(jsonData) + '\n');
this.pageNumber++;
this.fetching = false;
// 最後のページの場合は終了
if (this.pageNumber > this.maxPages) {
this.push(null);
}
} catch (error) {
this.emit('error', error);
}
});
})
.on('error', (error) => {
this.emit('error', error);
});
}
}
// 使用例
const apiStream = new ApiDataStream(
'https://api.example.com/data',
{ maxPages: 3 }
);
apiStream.on('data', (chunk) => {
console.log('取得したデータ:', chunk.toString());
});
apiStream.on('end', () => {
console.log('全ページの取得完了');
});
apiStream.on('error', (error) => {
console.error('API取得エラー:', error.message);
});
Writable ストリーム
Writable ストリームは、データの書き込み先となるストリームです。ファイルへの書き込み、HTTP レスポンスの送信、データベースへのデータ保存などに使用されます。
基本的な使用方法
javascriptconst fs = require('fs');
const { Writable } = require('stream');
// ファイルへ書き込むWritableストリーム
const fileWriteStream = fs.createWriteStream('output.txt', {
encoding: 'utf8',
flags: 'w', // 書き込みモード('a'は追記モード)
highWaterMark: 1024, // バッファサイズ
});
// データの書き込み
fileWriteStream.write('最初の行\n');
fileWriteStream.write('2番目の行\n');
fileWriteStream.write('3番目の行\n');
// 書き込み完了
fileWriteStream.end('最後の行\n');
// イベント監視
fileWriteStream.on('finish', () => {
console.log('ファイル書き込み完了');
});
fileWriteStream.on('error', (error) => {
console.error('書き込みエラー:', error.message);
});
カスタム Writable ストリームの作成
データを特定の形式で処理・保存するストリームを作成できます。
javascriptconst { Writable } = require('stream');
class DataLogger extends Writable {
constructor(options = {}) {
super(options);
this.logCount = 0;
this.logFile = options.logFile || './application.log';
this.fs = require('fs');
// ログファイルの初期化
if (options.clearLog) {
this.fs.writeFileSync(this.logFile, '');
}
}
_write(chunk, encoding, callback) {
this.logCount++;
// タイムスタンプ付きでログを記録
const timestamp = new Date().toISOString();
const logEntry = `[${timestamp}] #${
this.logCount
}: ${chunk.toString()}\n`;
this.fs.appendFile(this.logFile, logEntry, (error) => {
if (error) {
callback(error);
} else {
console.log(
`ログ記録完了: エントリ #${this.logCount}`
);
callback();
}
});
}
_final(callback) {
// ストリーム終了時の処理
const summary = `\n=== ログセッション終了 ===\n総エントリ数: ${
this.logCount
}\n終了時刻: ${new Date().toISOString()}\n\n`;
this.fs.appendFile(this.logFile, summary, callback);
}
}
// 使用例
const logger = new DataLogger({
logFile: './app.log',
clearLog: true,
});
logger.write('アプリケーション開始');
logger.write('ユーザーログイン: user123');
logger.write('データ処理開始');
logger.end('データ処理完了');
logger.on('finish', () => {
console.log('ログセッション完了');
});
実用的な Writable ストリーム例
CSV ファイルを生成するストリームの例を見てみましょう。
javascriptconst { Writable } = require('stream');
const fs = require('fs');
class CsvWriter extends Writable {
constructor(filename, headers, options = {}) {
super({ objectMode: true, ...options });
this.filename = filename;
this.headers = headers;
this.isFirstRow = true;
this.writeStream = fs.createWriteStream(filename);
// ヘッダー行を書き込み
this.writeStream.write(headers.join(',') + '\n');
}
_write(data, encoding, callback) {
try {
// オブジェクトから CSV 行を生成
const row = this.headers.map((header) => {
const value = data[header] || '';
// カンマやダブルクォートを含む値をエスケープ
return this.escapeCsvValue(value.toString());
});
this.writeStream.write(row.join(',') + '\n');
callback();
} catch (error) {
callback(error);
}
}
_final(callback) {
this.writeStream.end(callback);
}
escapeCsvValue(value) {
if (
value.includes(',') ||
value.includes('"') ||
value.includes('\n')
) {
return `"${value.replace(/"/g, '""')}"`;
}
return value;
}
}
// 使用例
const csvWriter = new CsvWriter('users.csv', [
'id',
'name',
'email',
'age',
]);
// ユーザーデータを書き込み
const users = [
{
id: 1,
name: '田中太郎',
email: 'tanaka@example.com',
age: 30,
},
{
id: 2,
name: '佐藤花子',
email: 'sato@example.com',
age: 25,
},
{
id: 3,
name: '鈴木次郎',
email: 'suzuki@example.com',
age: 35,
},
];
users.forEach((user) => {
csvWriter.write(user);
});
csvWriter.end();
csvWriter.on('finish', () => {
console.log('CSV ファイルの作成が完了しました');
});
Transform ストリーム
Transform ストリームは、データを読み取り、変換処理を行い、変換されたデータを出力するストリームです。Readable と Writable の機能を組み合わせた双方向ストリームです。
基本的な使用方法
javascriptconst { Transform } = require('stream');
const fs = require('fs');
// テキストを大文字に変換するTransformストリーム
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
// 入力データを大文字に変換
const upperCaseData = chunk.toString().toUpperCase();
// 変換されたデータを出力
callback(null, upperCaseData);
},
});
// ストリームのパイプライン
fs.createReadStream('input.txt')
.pipe(upperCaseTransform) // データを変換
.pipe(fs.createWriteStream('output.txt'))
.on('finish', () => {
console.log('変換処理が完了しました');
});
カスタム Transform ストリームの作成
JSON データを CSV 形式に変換するストリームを作成してみましょう。
javascriptconst { Transform } = require('stream');
class JsonToCsvTransform extends Transform {
constructor(options = {}) {
super({ objectMode: true, ...options });
this.headers = options.headers || [];
this.isFirstRow = true;
}
_transform(chunk, encoding, callback) {
try {
const data = JSON.parse(chunk.toString());
// 最初の行の場合、ヘッダーを生成
if (this.isFirstRow) {
if (this.headers.length === 0) {
this.headers = Object.keys(data);
}
// ヘッダー行を出力
this.push(this.headers.join(',') + '\n');
this.isFirstRow = false;
}
// データ行を生成
const row = this.headers.map((header) => {
const value = data[header] || '';
return this.escapeCsvValue(value.toString());
});
this.push(row.join(',') + '\n');
callback();
} catch (error) {
callback(error);
}
}
escapeCsvValue(value) {
if (
value.includes(',') ||
value.includes('"') ||
value.includes('\n')
) {
return `"${value.replace(/"/g, '""')}"`;
}
return value;
}
}
// 使用例
const fs = require('fs');
const jsonToCsv = new JsonToCsvTransform({
headers: ['id', 'name', 'email', 'department'],
});
fs.createReadStream('employees.jsonl') // JSON Lines形式のファイル
.pipe(jsonToCsv)
.pipe(fs.createWriteStream('employees.csv'))
.on('finish', () => {
console.log('JSON から CSV への変換が完了しました');
});
高度な Transform ストリーム例
ログファイルの解析と集計を行うストリームの例です。
javascriptconst { Transform } = require('stream');
class LogAnalyzer extends Transform {
constructor(options = {}) {
super(options);
this.stats = {
totalLines: 0,
errorCount: 0,
warningCount: 0,
infoCount: 0,
ipAddresses: new Map(),
statusCodes: new Map(),
};
this.logPattern =
/^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z) \[(\w+)\] (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (\d{3}) - (.+)$/;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
lines.forEach((line) => {
if (line.trim()) {
this.analyzeLine(line);
}
});
// 解析結果を出力
this.push(
JSON.stringify(this.getAnalysisResult()) + '\n'
);
callback();
}
analyzeLine(line) {
this.stats.totalLines++;
const match = line.match(this.logPattern);
if (match) {
const [, timestamp, level, ip, statusCode, message] =
match;
// ログレベル集計
switch (level.toLowerCase()) {
case 'error':
this.stats.errorCount++;
break;
case 'warning':
case 'warn':
this.stats.warningCount++;
break;
case 'info':
this.stats.infoCount++;
break;
}
// IP アドレス集計
const currentIpCount =
this.stats.ipAddresses.get(ip) || 0;
this.stats.ipAddresses.set(ip, currentIpCount + 1);
// ステータスコード集計
const currentStatusCount =
this.stats.statusCodes.get(statusCode) || 0;
this.stats.statusCodes.set(
statusCode,
currentStatusCount + 1
);
}
}
getAnalysisResult() {
return {
timestamp: new Date().toISOString(),
totalLines: this.stats.totalLines,
logLevels: {
error: this.stats.errorCount,
warning: this.stats.warningCount,
info: this.stats.infoCount,
},
topIpAddresses: Array.from(
this.stats.ipAddresses.entries()
)
.sort((a, b) => b[1] - a[1])
.slice(0, 10),
statusCodeDistribution: Object.fromEntries(
this.stats.statusCodes
),
};
}
_flush(callback) {
// ストリーム終了時の最終分析結果
const finalResult = {
...this.getAnalysisResult(),
analysis: 'complete',
};
this.push('\n' + JSON.stringify(finalResult, null, 2));
callback();
}
}
// 使用例
const fs = require('fs');
const logAnalyzer = new LogAnalyzer();
fs.createReadStream('access.log')
.pipe(logAnalyzer)
.pipe(fs.createWriteStream('log-analysis.json'))
.on('finish', () => {
console.log('ログ解析が完了しました');
});
Duplex ストリーム
Duplex ストリームは、読み取りと書き込みの両方の機能を持つ独立したストリームです。ネットワーク接続(TCP ソケット)やプロセス間通信などで使用されます。
基本的な概念
javascriptconst { Duplex } = require('stream');
const net = require('net');
// TCP サーバーの例
const server = net.createServer((socket) => {
console.log('クライアントが接続しました');
// socket は Duplex ストリーム
socket.on('data', (data) => {
console.log('受信:', data.toString());
// エコーバック
socket.write(`エコー: ${data.toString()}`);
});
socket.on('end', () => {
console.log('クライアントが切断しました');
});
});
server.listen(8080, () => {
console.log('サーバーがポート 8080 で起動しました');
});
カスタム Duplex ストリームの作成
チャットルームのような双方向通信を実装してみましょう。
javascriptconst { Duplex } = require('stream');
const EventEmitter = require('events');
class ChatRoom extends EventEmitter {
constructor() {
super();
this.clients = new Map();
this.messageHistory = [];
}
addClient(clientId) {
const client = new ChatClient(clientId, this);
this.clients.set(clientId, client);
// 過去のメッセージを送信
this.messageHistory.forEach((message) => {
client.push(JSON.stringify(message) + '\n');
});
this.broadcastMessage({
type: 'system',
message: `${clientId} がチャットルームに参加しました`,
timestamp: new Date().toISOString(),
});
return client;
}
removeClient(clientId) {
this.clients.delete(clientId);
this.broadcastMessage({
type: 'system',
message: `${clientId} がチャットルームから退出しました`,
timestamp: new Date().toISOString(),
});
}
broadcastMessage(message) {
this.messageHistory.push(message);
// 履歴は最新100件まで保持
if (this.messageHistory.length > 100) {
this.messageHistory.shift();
}
const messageString = JSON.stringify(message) + '\n';
this.clients.forEach((client, clientId) => {
if (message.from !== clientId) {
client.push(messageString);
}
});
}
}
class ChatClient extends Duplex {
constructor(clientId, chatRoom) {
super({ objectMode: false });
this.clientId = clientId;
this.chatRoom = chatRoom;
}
_read() {
// 読み取り要求(実際の実装では何もしない)
}
_write(chunk, encoding, callback) {
try {
const message = JSON.parse(chunk.toString());
// メッセージにクライアント情報を追加
const enrichedMessage = {
...message,
from: this.clientId,
timestamp: new Date().toISOString(),
};
// チャットルームにメッセージをブロードキャスト
this.chatRoom.broadcastMessage(enrichedMessage);
callback();
} catch (error) {
callback(error);
}
}
_final(callback) {
// クライアント切断時の処理
this.chatRoom.removeClient(this.clientId);
callback();
}
}
// 使用例
const chatRoom = new ChatRoom();
// クライアント1
const client1 = chatRoom.addClient('ユーザー1');
client1.on('data', (data) => {
console.log('クライアント1が受信:', data.toString());
});
// クライアント2
const client2 = chatRoom.addClient('ユーザー2');
client2.on('data', (data) => {
console.log('クライアント2が受信:', data.toString());
});
// メッセージ送信テスト
setTimeout(() => {
client1.write(
JSON.stringify({
type: 'message',
message: 'こんにちは!',
})
);
}, 1000);
setTimeout(() => {
client2.write(
JSON.stringify({
type: 'message',
message: 'こんにちは!元気ですか?',
})
);
}, 2000);
実用的な Duplex ストリーム例
WebSocket のような双方向通信を模擬するストリームです。
javascriptconst { Duplex } = require('stream');
const crypto = require('crypto');
class MockWebSocket extends Duplex {
constructor(options = {}) {
super(options);
this.connectionId = crypto.randomUUID();
this.isConnected = false;
this.heartbeatInterval = null;
this.lastPing = null;
// 接続を開始
this.connect();
}
connect() {
this.isConnected = true;
console.log(`WebSocket 接続確立: ${this.connectionId}`);
// ハートビート開始
this.startHeartbeat();
// 接続確立メッセージを送信
this.push(
JSON.stringify({
type: 'connection',
connectionId: this.connectionId,
timestamp: new Date().toISOString(),
}) + '\n'
);
}
_read() {
// クライアントからの読み取り要求
}
_write(chunk, encoding, callback) {
if (!this.isConnected) {
return callback(
new Error('WebSocket が切断されています')
);
}
try {
const message = JSON.parse(chunk.toString());
// メッセージタイプに応じた処理
switch (message.type) {
case 'ping':
this.handlePing(message);
break;
case 'message':
this.handleMessage(message);
break;
case 'subscribe':
this.handleSubscribe(message);
break;
default:
console.log(
'未知のメッセージタイプ:',
message.type
);
}
callback();
} catch (error) {
callback(error);
}
}
handlePing(message) {
this.lastPing = Date.now();
// Pong を返送
this.push(
JSON.stringify({
type: 'pong',
timestamp: new Date().toISOString(),
latency: 0, // 実際の実装では計算する
}) + '\n'
);
}
handleMessage(message) {
// エコーメッセージとして返送
this.push(
JSON.stringify({
type: 'echo',
originalMessage: message.data,
timestamp: new Date().toISOString(),
}) + '\n'
);
}
handleSubscribe(message) {
console.log(`チャンネル購読: ${message.channel}`);
// 購読確認を送信
this.push(
JSON.stringify({
type: 'subscribed',
channel: message.channel,
timestamp: new Date().toISOString(),
}) + '\n'
);
// 定期的にデータを送信
this.startDataStream(message.channel);
}
startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
if (this.isConnected) {
this.push(
JSON.stringify({
type: 'heartbeat',
timestamp: new Date().toISOString(),
}) + '\n'
);
}
}, 30000); // 30秒間隔
}
startDataStream(channel) {
// チャンネルに応じたデータストリーム
const dataInterval = setInterval(() => {
if (!this.isConnected) {
clearInterval(dataInterval);
return;
}
this.push(
JSON.stringify({
type: 'data',
channel: channel,
data: {
value: Math.random() * 100,
timestamp: new Date().toISOString(),
},
}) + '\n'
);
}, 5000); // 5秒間隔
}
_final(callback) {
this.disconnect();
callback();
}
disconnect() {
this.isConnected = false;
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
console.log(`WebSocket 切断: ${this.connectionId}`);
}
}
// 使用例
const mockWs = new MockWebSocket();
mockWs.on('data', (data) => {
console.log('受信:', data.toString());
});
// メッセージ送信テスト
setTimeout(() => {
mockWs.write(
JSON.stringify({
type: 'subscribe',
channel: 'sensor-data',
})
);
}, 1000);
setTimeout(() => {
mockWs.write(
JSON.stringify({
type: 'message',
data: 'Hello WebSocket!',
})
);
}, 3000);
setTimeout(() => {
mockWs.write(
JSON.stringify({
type: 'ping',
})
);
}, 5000);
// 10秒後に切断
setTimeout(() => {
mockWs.end();
}, 10000);
ストリームのライフサイクルとイベント
Node.js ストリームは、明確なライフサイクルを持ち、各段階で適切なイベントが発行されます。これらのイベントを理解することで、効果的なエラーハンドリングと制御が可能になります。
ストリームの基本イベント
イベント | 対象ストリーム | 説明 | 発行タイミング |
---|---|---|---|
data | Readable, Transform, Duplex | データチャンクが利用可能になった時 | データ受信時 |
end | Readable, Transform, Duplex | すべてのデータが読み取られた時 | ストリーム終了時 |
finish | Writable, Transform, Duplex | すべてのデータが書き込まれた時 | 書き込み完了時 |
close | すべて | ストリームが閉じられた時 | リソース解放時 |
error | すべて | エラーが発生した時 | 異常時 |
詳細なライフサイクル管理
javascriptconst fs = require('fs');
const { pipeline } = require('stream');
const { Transform } = require('stream');
class StreamLifecycleMonitor extends Transform {
constructor(name, options = {}) {
super(options);
this.name = name;
this.startTime = Date.now();
this.bytesProcessed = 0;
this.chunksProcessed = 0;
// ライフサイクルイベントの監視
this.setupEventMonitoring();
}
setupEventMonitoring() {
// ストリーム開始
this.on('pipe', (src) => {
console.log(
`[${this.name}] パイプ接続: ${src.constructor.name} から受信開始`
);
});
// データ処理
this.on('data', (chunk) => {
this.bytesProcessed += chunk.length;
console.log(
`[${this.name}] データ出力: ${chunk.length} bytes`
);
});
// 読み取り終了
this.on('end', () => {
const duration = Date.now() - this.startTime;
console.log(
`[${this.name}] 読み取り終了: ${this.chunksProcessed} chunks, ${this.bytesProcessed} bytes, ${duration}ms`
);
});
// 書き込み終了
this.on('finish', () => {
const duration = Date.now() - this.startTime;
console.log(
`[${this.name}] 書き込み終了: 処理時間 ${duration}ms`
);
});
// ストリーム閉鎖
this.on('close', () => {
console.log(
`[${this.name}] ストリーム閉鎖: リソース解放完了`
);
});
// エラー処理
this.on('error', (error) => {
console.error(
`[${this.name}] エラー発生:`,
error.message
);
});
}
_transform(chunk, encoding, callback) {
this.chunksProcessed++;
// 簡単なデータ変換(文字列の場合は行番号を追加)
if (
typeof chunk === 'string' ||
chunk instanceof Buffer
) {
const lines = chunk.toString().split('\n');
lines.forEach((line) => {
if (line.trim()) {
return `${this.chunksProcessed}-${line}`;
}
return line;
});
callback(null, lines.join('\n'));
} else {
callback(null, chunk);
}
}
}
// 使用例
async function demonstrateLifecycle() {
const monitor1 = new StreamLifecycleMonitor('変換処理1');
const monitor2 = new StreamLifecycleMonitor('変換処理2');
try {
await pipeline(
fs.createReadStream('input.txt'),
monitor1,
monitor2,
fs.createWriteStream('output.txt')
);
console.log('パイプライン処理が正常に完了しました');
} catch (error) {
console.error(
'パイプライン処理中にエラーが発生:',
error.message
);
}
}
// demonstrateLifecycle();
エラーハンドリングのベストプラクティス
javascriptconst fs = require('fs');
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
class SafeJsonParser extends Transform {
constructor(options = {}) {
super({ objectMode: true, ...options });
this.errorCount = 0;
this.successCount = 0;
this.maxErrors = options.maxErrors || 10;
}
_transform(chunk, encoding, callback) {
try {
const lines = chunk.toString().split('\n');
lines.forEach((line) => {
if (line.trim()) {
try {
const jsonData = JSON.parse(line);
this.push(jsonData);
this.successCount++;
} catch (parseError) {
this.errorCount++;
// エラー情報を詳細に記録
const errorInfo = {
error: 'JSON_PARSE_ERROR',
line: line.substring(0, 100), // 最初の100文字のみ
message: parseError.message,
lineNumber:
this.successCount + this.errorCount,
};
// エラーイベントを発行(但し、致命的でない場合は継続)
this.emit('parseError', errorInfo);
// 最大エラー数を超えた場合は処理を停止
if (this.errorCount > this.maxErrors) {
return callback(
new Error(
`最大エラー数(${this.maxErrors})を超えました`
)
);
}
}
}
});
callback();
} catch (error) {
callback(error);
}
}
_flush(callback) {
// 最終統計を出力
console.log(
`解析完了: 成功 ${this.successCount}件, エラー ${this.errorCount}件`
);
callback();
}
}
// エラーハンドリングの使用例
async function safeJsonProcessing() {
const parser = new SafeJsonParser({ maxErrors: 5 });
// 非致命的エラーの監視
parser.on('parseError', (errorInfo) => {
console.warn(
`解析エラー (行 ${errorInfo.lineNumber}):`,
errorInfo.message
);
});
try {
await pipeline(
fs.createReadStream('data.jsonl'),
parser,
new Transform({
objectMode: true,
transform(data, encoding, callback) {
// 有効なJSONオブジェクトのみ処理
console.log('処理中:', data.id || 'ID不明');
callback(null, JSON.stringify(data) + '\n');
},
}),
fs.createWriteStream('cleaned-data.jsonl')
);
console.log('JSON処理が完了しました');
} catch (error) {
console.error(
'処理中に致命的エラーが発生:',
error.message
);
}
}
基本的な使用方法とコード例
ストリームの実用的な使用パターンを、具体的なコード例とともに解説します。
パイプライン処理の基本
javascriptconst fs = require('fs');
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
const zlib = require('zlib');
// データ処理パイプラインの例
async function processDataPipeline() {
// ログファイルの処理と圧縮
const logProcessor = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
// エラーログのみ抽出
const errorLines = lines.filter(
(line) =>
line.includes('[ERROR]') ||
line.includes('[FATAL]')
);
if (errorLines.length > 0) {
callback(null, errorLines.join('\n') + '\n');
} else {
callback();
}
},
});
try {
await pipeline(
fs.createReadStream('application.log'), // 入力
logProcessor, // 変換
zlib.createGzip(), // 圧縮
fs.createWriteStream('errors-only.log.gz') // 出力
);
console.log('ログ処理が完了しました');
} catch (error) {
console.error('パイプライン処理エラー:', error.message);
}
}
バックプレッシャーの処理
javascriptconst { Writable, Readable } = require('stream');
class SlowProcessor extends Writable {
constructor(options = {}) {
super(options);
this.processDelay = options.delay || 1000;
this.processedCount = 0;
}
async _write(chunk, encoding, callback) {
// 意図的に遅い処理をシミュレート
await new Promise((resolve) =>
setTimeout(resolve, this.processDelay)
);
this.processedCount++;
console.log(
`処理完了 #${this.processedCount}: ${chunk.length} bytes`
);
callback();
}
}
class FastDataGenerator extends Readable {
constructor(options = {}) {
super(options);
this.count = 0;
this.maxCount = options.maxCount || 10;
}
_read() {
if (this.count < this.maxCount) {
const data = `高速データ生成 #${++this.count}\n`;
console.log(`生成: ${data.trim()}`);
this.push(data);
} else {
this.push(null);
}
}
}
// バックプレッシャーのデモンストレーション
function demonstrateBackpressure() {
const generator = new FastDataGenerator({ maxCount: 5 });
const processor = new SlowProcessor({ delay: 2000 });
// バックプレッシャーの監視
generator.on('pause', () => {
console.log(
'生成側が一時停止されました(バックプレッシャー)'
);
});
generator.on('resume', () => {
console.log('生成側が再開されました');
});
generator.pipe(processor);
processor.on('finish', () => {
console.log('すべての処理が完了しました');
});
}
// demonstrateBackpressure();
並列ストリーム処理
javascriptconst fs = require('fs');
const path = require('path');
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
class ParallelFileProcessor {
constructor(concurrency = 3) {
this.concurrency = concurrency;
this.activePromises = [];
}
async processDirectory(inputDir, outputDir, processor) {
const files = await fs.promises.readdir(inputDir);
const txtFiles = files.filter(
(file) => path.extname(file) === '.txt'
);
console.log(
`${txtFiles.length} ファイルを ${this.concurrency} 並列で処理開始`
);
// ファイルを並列処理用にグループ化
const fileGroups = this.chunkArray(
txtFiles,
this.concurrency
);
for (const group of fileGroups) {
const promises = group.map(async (filename) => {
const inputPath = path.join(inputDir, filename);
const outputPath = path.join(
outputDir,
`processed-${filename}`
);
try {
await pipeline(
fs.createReadStream(inputPath),
processor(),
fs.createWriteStream(outputPath)
);
console.log(`完了: ${filename}`);
return { success: true, filename };
} catch (error) {
console.error(
`エラー ${filename}:`,
error.message
);
return {
success: false,
filename,
error: error.message,
};
}
});
// 並列実行
const results = await Promise.all(promises);
console.log(`グループ処理完了:`, results);
}
}
chunkArray(array, chunkSize) {
const chunks = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
}
return chunks;
}
}
// 使用例
async function parallelProcessingExample() {
const processor = new ParallelFileProcessor(2);
// テキスト変換プロセッサーを定義
const createProcessor = () =>
new Transform({
transform(chunk, encoding, callback) {
// すべての単語を大文字に変換
const processedText = chunk
.toString()
.split(' ')
.map((word) => word.toUpperCase())
.join(' ');
callback(null, processedText);
},
});
try {
await processor.processDirectory(
'./input',
'./output',
createProcessor
);
console.log('並列処理が完了しました');
} catch (error) {
console.error('並列処理エラー:', error.message);
}
}
ストリームとバッファリングの関係
ストリームにおけるバッファリングは、パフォーマンスとメモリ使用量のバランスを取る重要な仕組みです。
バッファサイズの最適化
javascriptconst fs = require('fs');
const { Transform } = require('stream');
class BufferAnalyzer extends Transform {
constructor(options = {}) {
super(options);
this.chunkSizes = [];
this.totalBytes = 0;
this.chunkCount = 0;
}
_transform(chunk, encoding, callback) {
this.chunkCount++;
this.totalBytes += chunk.length;
this.chunkSizes.push(chunk.length);
console.log(
`チャンク #${this.chunkCount}: ${chunk.length} bytes`
);
// そのまま通す
callback(null, chunk);
}
_flush(callback) {
// 統計情報を出力
const avgChunkSize = Math.round(
this.totalBytes / this.chunkCount
);
const minChunkSize = Math.min(...this.chunkSizes);
const maxChunkSize = Math.max(...this.chunkSizes);
console.log('\n=== バッファ統計 ===');
console.log(`総バイト数: ${this.totalBytes}`);
console.log(`チャンク数: ${this.chunkCount}`);
console.log(
`平均チャンクサイズ: ${avgChunkSize} bytes`
);
console.log(
`最小チャンクサイズ: ${minChunkSize} bytes`
);
console.log(
`最大チャンクサイズ: ${maxChunkSize} bytes`
);
callback();
}
}
// 異なるバッファサイズでの比較
async function compareBufferSizes() {
const bufferSizes = [512, 1024, 4096, 16384, 65536]; // 0.5KB から 64KB
for (const bufferSize of bufferSizes) {
console.log(
`\n--- バッファサイズ: ${bufferSize} bytes ---`
);
const analyzer = new BufferAnalyzer();
const startTime = Date.now();
try {
await new Promise((resolve, reject) => {
const readStream = fs.createReadStream(
'large-file.txt',
{
highWaterMark: bufferSize,
}
);
readStream
.pipe(analyzer)
.pipe(
fs.createWriteStream(`output-${bufferSize}.txt`)
)
.on('finish', resolve)
.on('error', reject);
});
const duration = Date.now() - startTime;
console.log(`処理時間: ${duration}ms`);
} catch (error) {
console.error(
`エラー (バッファサイズ ${bufferSize}):`,
error.message
);
}
}
}
メモリ使用量の監視
javascriptconst { Transform } = require('stream');
const fs = require('fs');
class MemoryMonitor extends Transform {
constructor(options = {}) {
super(options);
this.monitorInterval = setInterval(() => {
this.logMemoryUsage();
}, 1000);
this.processedBytes = 0;
this.startTime = Date.now();
}
_transform(chunk, encoding, callback) {
this.processedBytes += chunk.length;
callback(null, chunk);
}
_flush(callback) {
clearInterval(this.monitorInterval);
const duration = Date.now() - this.startTime;
const throughput = Math.round(
this.processedBytes / (duration / 1000)
);
console.log('\n=== 最終統計 ===');
console.log(`処理バイト数: ${this.processedBytes}`);
console.log(`処理時間: ${duration}ms`);
console.log(`スループット: ${throughput} bytes/sec`);
this.logMemoryUsage();
callback();
}
logMemoryUsage() {
const memUsage = process.memoryUsage();
const formatBytes = (bytes) =>
Math.round((bytes / 1024 / 1024) * 100) / 100;
console.log(
`メモリ使用量 - RSS: ${formatBytes(
memUsage.rss
)}MB, ` +
`Heap Used: ${formatBytes(memUsage.heapUsed)}MB, ` +
`External: ${formatBytes(memUsage.external)}MB`
);
}
}
// 使用例
async function monitorMemoryUsage() {
const monitor = new MemoryMonitor();
const largeDataProcessor = new Transform({
transform(chunk, encoding, callback) {
// 意図的に重い処理をシミュレート
const data = chunk.toString();
const processedData = data
.split('\n')
.map((line) => line.toUpperCase())
.join('\n');
callback(null, processedData);
},
});
try {
fs.createReadStream('very-large-file.txt')
.pipe(monitor)
.pipe(largeDataProcessor)
.pipe(fs.createWriteStream('processed-output.txt'))
.on('finish', () => {
console.log('メモリ監視付き処理が完了しました');
});
} catch (error) {
console.error('処理エラー:', error.message);
}
}
バックプレッシャーの詳細制御
javascriptconst { Writable, Readable } = require('stream');
class AdaptiveWriter extends Writable {
constructor(options = {}) {
super(options);
this.writeDelay = options.initialDelay || 100;
this.maxDelay = options.maxDelay || 2000;
this.minDelay = options.minDelay || 50;
this.adaptationFactor = options.adaptationFactor || 1.1;
this.queueLength = 0;
}
_write(chunk, encoding, callback) {
this.queueLength++;
// キューの長さに応じて遅延を調整
if (this.queueLength > 10) {
this.writeDelay = Math.min(
this.writeDelay * this.adaptationFactor,
this.maxDelay
);
console.log(
`書き込み速度を下げました: ${this.writeDelay}ms`
);
} else if (this.queueLength < 3) {
this.writeDelay = Math.max(
this.writeDelay / this.adaptationFactor,
this.minDelay
);
console.log(
`書き込み速度を上げました: ${this.writeDelay}ms`
);
}
setTimeout(() => {
this.queueLength--;
console.log(
`書き込み完了 (キュー: ${this.queueLength}, 遅延: ${this.writeDelay}ms)`
);
callback();
}, this.writeDelay);
}
}
class BurstDataGenerator extends Readable {
constructor(options = {}) {
super(options);
this.burstSize = options.burstSize || 5;
this.burstInterval = options.burstInterval || 3000;
this.dataCount = 0;
this.maxData = options.maxData || 50;
this.startBurstGeneration();
}
startBurstGeneration() {
this.burstTimer = setInterval(() => {
for (
let i = 0;
i < this.burstSize && this.dataCount < this.maxData;
i++
) {
this.generateData();
}
if (this.dataCount >= this.maxData) {
clearInterval(this.burstTimer);
this.push(null);
}
}, this.burstInterval);
}
generateData() {
this.dataCount++;
const data = `バーストデータ #${
this.dataCount
} - ${new Date().toISOString()}\n`;
console.log(`生成: ${data.trim()}`);
this.push(data);
}
_read() {
// 実際の読み取りはタイマーで制御
}
}
// 適応的バックプレッシャーのデモ
function demonstrateAdaptiveBackpressure() {
const generator = new BurstDataGenerator({
burstSize: 3,
burstInterval: 1000,
maxData: 20,
});
const writer = new AdaptiveWriter({
initialDelay: 500,
maxDelay: 3000,
minDelay: 100,
adaptationFactor: 1.2,
});
generator.pipe(writer);
writer.on('finish', () => {
console.log(
'適応的バックプレッシャー処理が完了しました'
);
});
}
// demonstrateAdaptiveBackpressure();
まとめ
Node.js ストリームは、大量データ処理において欠かせない強力な仕組みです。本記事で解説した内容を振り返ってみましょう。
ストリームの重要なポイント
Node.js ストリームを効果的に活用するために、以下の重要なポイントを押さえておきましょう。
項目 | 重要なポイント | 実践のコツ |
---|---|---|
4 つのタイプ | 用途に応じた適切なストリーム選択 | Readable(読み取り)、Writable(書き込み)、Transform(変換)、Duplex(双方向)の特性を理解 |
メモリ効率 | 大量データを一定メモリで処理 | highWaterMark でバッファサイズを最適化 |
エラーハンドリング | 堅牢なエラー処理の実装 | error 、end 、finish 、close イベントの適切な監視 |
バックプレッシャー | 処理速度の自動調整 | 遅い処理に対してデータ生成を一時停止する仕組み |
パイプライン | 複数処理の効率的な連鎖 | pipeline() 関数を使用した安全な処理チェーン |
ストリーム活用のベストプラクティス
実際のプロジェクトでストリームを活用する際は、以下のベストプラクティスを参考にしてください。
javascript// ベストプラクティスの統合例
const fs = require('fs');
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
class ProductionStreamProcessor {
static async processFile(
inputPath,
outputPath,
options = {}
) {
const processor = new Transform({
highWaterMark: options.bufferSize || 16384, // 16KB
transform(chunk, encoding, callback) {
try {
// データ変換処理
const processedData = chunk
.toString()
.split('\n')
.filter((line) => line.trim())
.map((line) => line.toUpperCase())
.join('\n');
callback(null, processedData + '\n');
} catch (error) {
callback(error);
}
},
});
// エラーハンドリングとモニタリング
processor.on('error', (error) => {
console.error('変換処理エラー:', error.message);
});
try {
await pipeline(
fs.createReadStream(inputPath, {
highWaterMark: options.bufferSize || 16384,
}),
processor,
fs.createWriteStream(outputPath)
);
console.log(
`ファイル処理完了: ${inputPath} → ${outputPath}`
);
return { success: true };
} catch (error) {
console.error(
'パイプライン処理エラー:',
error.message
);
return { success: false, error: error.message };
}
}
}
// 使用例
// ProductionStreamProcessor.processFile('./input.txt', './output.txt', { bufferSize: 32768 });
パフォーマンス向上のコツ
ストリームのパフォーマンスを最大限に引き出すためのテクニックをまとめました。
- 適切なバッファサイズの選択: 処理するデータの特性に応じて
highWaterMark
を調整する - 並列処理の活用: 複数ファイルの処理では並列ストリームを利用する
- メモリ監視: 長時間稼働するアプリケーションではメモリ使用量を定期的に監視する
- エラー処理の最適化: 致命的でないエラーは継続処理し、統計情報として記録する
- 適切なクリーンアップ: ストリーム終了時のリソース解放を確実に実行する
今後の学習ステップ
Node.js ストリームをさらに深く理解するために、以下の学習ステップをお勧めします。
- 実際のプロジェクトでの実践: 小さなファイル処理ツールから始めて経験を積む
- パフォーマンス測定: 様々なバッファサイズやストリーム構成でベンチマークを取る
- 高度なパターン: ストリームプールやカスタムストリームの応用例を学習
- 他のライブラリとの組み合わせ: Express.js や WebSocket との連携パターンを習得
- デバッグ技術: ストリーム処理のトラブルシューティング手法をマスター
Node.js ストリームをマスターすることで、効率的で拡張性の高いアプリケーションを構築できるようになります。メモリ効率とパフォーマンスを両立させながら、大量データを滑らかに処理する魅力を、ぜひ実際のプロジェクトで体験してみてください。
ストリームの世界は奥深く、学び続けることで新たな発見や最適化の機会が見つかるでしょう。今回学んだ基礎知識を土台として、さらなるスキルアップを目指していきましょう!