T-CREATOR

Redis Pub/Sub vs Redis Streams:配信保証とスケーラビリティ比較

Redis Pub/Sub vs Redis Streams:配信保証とスケーラビリティ比較

Redis でリアルタイムメッセージングを実装する際、Pub/Sub と Streams のどちらを選ぶべきか迷ったことはありませんか。

両者は同じ Redis で提供される機能ですが、配信保証の仕組みやスケーラビリティの特性が大きく異なります。適切に選択しないと、メッセージの取りこぼしやシステムのボトルネックにつながる可能性があるのです。

本記事では、Redis Pub/Sub と Redis Streams の配信保証とスケーラビリティに焦点を当てて、それぞれの特徴や使い分けのポイントを解説します。実際のコード例を交えながら、あなたのプロジェトに最適な選択ができるようサポートいたしますね。

背景

Redis におけるメッセージング機能の位置づけ

Redis は高速なインメモリデータストアとして知られていますが、メッセージング機能も提供しています。

リアルタイム通知、チャットアプリケーション、イベント駆動型アーキテクチャなど、様々なユースケースで活用されているのです。Redis のメッセージング機能には主に 2 つの選択肢があります。

Pub/Sub と Streams の登場経緯

Redis Pub/Sub は Redis の初期バージョンから提供されている軽量なメッセージングパターンです。

一方、Redis Streams は Redis 5.0(2018 年リリース)で追加された、より高度なメッセージング機能になります。Streams は Pub/Sub の制約を解消し、メッセージの永続化や配信保証を実現するために設計されました。

以下の図は、Redis におけるメッセージング機能の進化を示しています。

mermaidflowchart LR
  redis_early["Redis 初期版<br/>(Pub/Sub)"] -->|軽量・シンプル| pubsub["Pub/Sub<br/>パターン"]
  redis5["Redis 5.0<br/>(2018)"] -->|永続化・保証| streams["Streams<br/>パターン"]
  pubsub -->|制約| limitations["・配信保証なし<br/>・履歴保持なし"]
  streams -->|解決| solutions["・At-least-once 配信<br/>・メッセージ履歴<br/>・Consumer Group"]

この図から、Streams が Pub/Sub の制約を補完する形で登場したことがわかりますね。

メッセージング要件の多様化

現代のアプリケーションでは、メッセージングに対する要件が多様化しています。

単純な通知だけでなく、確実な配信保証、メッセージの再処理、複数コンシューマーでの負荷分散など、より高度な機能が求められるようになりました。この背景から、Pub/Sub と Streams のどちらを選ぶべきか理解することが重要になります。

課題

配信保証の欠如による問題

Redis Pub/Sub は「Fire and Forget」方式です。

Publisher がメッセージを送信した時点で Subscriber が接続していない場合、そのメッセージは完全に失われてしまいます。これは以下のような問題を引き起こすのです。

#課題影響
1オフライン時のメッセージロスクライアント再接続時にメッセージを受信できない
2ネットワーク障害時のデータ損失一時的な接続断でメッセージが消失
3スケーリング時の課題Subscriber を動的に追加してもメッセージが届かない

以下の図は、Pub/Sub におけるメッセージロスのシナリオを示しています。

mermaidsequenceDiagram
  participant pub as Publisher
  participant redis as Redis Server
  participant sub1 as Subscriber A<br/>(接続中)
  participant sub2 as Subscriber B<br/>(オフライン)

  pub->>redis: メッセージ送信
  redis->>sub1: 配信成功
  redis-xsub2: 配信失敗<br/>(接続なし)

  Note over sub2: メッセージは<br/>完全に失われる

このように、接続していない Subscriber にはメッセージが届かず、再送の仕組みもありません。

メッセージ履歴の不在

Pub/Sub では過去のメッセージを取得する手段がありません。

Subscriber が接続した瞬間から送信されるメッセージのみを受信できるため、アプリケーションの起動順序に依存する問題が発生します。これは特に以下のシナリオで課題となるでしょう。

  • システム再起動後のメッセージ復旧
  • 新しい Subscriber の追加時の初期データ同期
  • デバッグやモニタリング用のメッセージ追跡

スケーラビリティの制約

Pub/Sub では複数の Subscriber が同じチャネルを購読すると、すべての Subscriber に同じメッセージが配信されます。

これは「ファンアウト」パターンとしては便利ですが、負荷分散には不向きです。複数のワーカーでメッセージ処理を分散させたい場合、Pub/Sub では実現が困難になります。

処理失敗時のリトライ問題

Subscriber がメッセージを受信しても、処理中にエラーが発生した場合の再処理メカニズムがありません。

メッセージの Acknowledge(確認応答)の仕組みがないため、処理の成功・失敗を Redis 側で追跡できないのです。これは信頼性が求められるシステムでは大きな課題となります。

解決策

Redis Streams による配信保証の実現

Redis Streams は、メッセージを永続化し、配信保証を提供します。

各メッセージには一意の ID が割り当てられ、Redis サーバー上に保存されるため、Subscriber がオフラインでもメッセージは失われません。以下の特徴により、確実な配信が可能になるのです。

Streams の主要機能

#機能説明
1メッセージ永続化Stream に追加されたメッセージは明示的に削除されるまで保持
2Consumer Group複数コンシューマーでメッセージを分散処理
3Acknowledge 機能処理完了を明示的に確認
4Pending Entry List未確認メッセージの追跡と再処理

以下の図は、Streams における配信保証の仕組みを示しています。

mermaidflowchart TD
  producer["Producer"] -->|XADD| stream["Redis Stream<br/>(永続化)"]
  stream -->|XREADGROUP| cg["Consumer Group"]
  cg -->|メッセージ割り当て| c1["Consumer 1"]
  cg -->|メッセージ割り当て| c2["Consumer 2"]
  c1 -->|処理| process1["処理成功"]
  c2 -->|処理| process2["処理失敗"]
  process1 -->|XACK| stream
  process2 -->|未確認| pending["Pending List"]
  pending -->|XCLAIM で再割り当て| c1

この図から、処理失敗時のメッセージが Pending List に保持され、再処理できることがわかりますね。

Pub/Sub の軽量性を活かす使い方

一方、Pub/Sub にも明確な利点があります。

配信保証が不要で、リアルタイム性と低レイテンシが重要な場合には、Pub/Sub が最適な選択となります。以下のようなユースケースでは、Pub/Sub のシンプルさが強みになるのです。

Pub/Sub が適しているケース

  • リアルタイムチャットの一時的な通知
  • ダッシュボードのライブ更新
  • キャッシュ無効化の通知
  • 一時的なイベントブロードキャスト

Consumer Group によるスケーラビリティ

Redis Streams の Consumer Group は、複数のコンシューマー間でメッセージを自動的に分配します。

各メッセージは Consumer Group 内の 1 つのコンシューマーにのみ配信されるため、水平スケーリングが容易になります。これにより、処理能力をコンシューマー数に応じて線形に拡張できるのです。

mermaidflowchart LR
  stream[("Stream<br/>メッセージキュー")] -->|XREADGROUP| group["Consumer Group<br/>'workers'"]
  group -->|msg-1| w1["Worker 1"]
  group -->|msg-2| w2["Worker 2"]
  group -->|msg-3| w3["Worker 3"]
  w1 -->|XACK| ack1["処理完了"]
  w2 -->|XACK| ack2["処理完了"]
  w3 -->|XACK| ack3["処理完了"]

この図のように、メッセージが自動的に分散されることで、効率的な負荷分散が実現されます。

用途に応じた使い分け戦略

2 つの機能を組み合わせることで、より柔軟なアーキテクチャを構築できます。

例えば、リアルタイム通知には Pub/Sub を使用し、重要なイベント処理には Streams を使用するハイブリッドアプローチも有効でしょう。以下の表は、選択基準をまとめたものです。

#要件推奨理由
1メッセージ損失が許容できないStreams永続化と Acknowledge 機能
2超低レイテンシが必要Pub/Subオーバーヘッドが最小
3複数ワーカーで負荷分散StreamsConsumer Group による分散
4過去のメッセージ参照Streamsメッセージ履歴の保持
5シンプルなブロードキャストPub/Sub実装が簡潔

具体例

Pub/Sub の基本実装

まず、Redis Pub/Sub の基本的な実装を見てみましょう。

Pub/Sub は非常にシンプルで、Publisher と Subscriber の 2 つのコンポーネントで構成されます。以下は TypeScript での実装例です。

必要なパッケージのインストール

bashyarn add redis
yarn add -D @types/node typescript

このコマンドで Redis クライアントライブラリをインストールします。

Publisher の実装

Publisher はメッセージをチャネルに送信する役割を担います。

typescript// publisher.ts
import { createClient } from 'redis';

// Redis クライアントの作成
const publisher = createClient({
  url: 'redis://localhost:6379',
});

// エラーハンドリング
publisher.on('error', (err) => {
  console.error('Redis Publisher Error:', err);
});

まず、Redis への接続を確立します。

typescript// 接続処理
async function connect() {
  await publisher.connect();
  console.log('Publisher connected to Redis');
}

接続が完了したら、メッセージの送信が可能になります。

typescript// メッセージ送信関数
async function publishMessage(
  channel: string,
  message: string
) {
  try {
    // PUBLISH コマンドでメッセージを送信
    const subscriberCount = await publisher.publish(
      channel,
      message
    );
    console.log(
      `Message published to ${subscriberCount} subscribers`
    );
  } catch (error) {
    console.error('Publish error:', error);
  }
}

publish メソッドは、メッセージを受信した Subscriber の数を返します。

typescript// 実行例
async function main() {
  await connect();

  // 通知チャネルにメッセージを送信
  await publishMessage(
    'notifications',
    JSON.stringify({
      type: 'info',
      message: 'New user registered',
      timestamp: new Date().toISOString(),
    })
  );

  // クリーンアップ
  await publisher.quit();
}

main();

この例では、JSON 形式のメッセージを送信しています。

Subscriber の実装

Subscriber はチャネルを購読し、メッセージを受信します。

typescript// subscriber.ts
import { createClient } from 'redis';

// Subscriber 用のクライアント作成
const subscriber = createClient({
  url: 'redis://localhost:6379',
});

subscriber.on('error', (err) => {
  console.error('Redis Subscriber Error:', err);
});

Pub/Sub モードでは、専用のクライアントが必要です。

typescript// メッセージ受信のハンドラー設定
async function setupSubscriber() {
  await subscriber.connect();

  // チャネルを購読
  await subscriber.subscribe('notifications', (message) => {
    console.log('Received message:', message);

    // メッセージの解析と処理
    try {
      const data = JSON.parse(message);
      console.log(
        `Type: ${data.type}, Message: ${data.message}`
      );
    } catch (error) {
      console.error('Message parse error:', error);
    }
  });

  console.log('Subscribed to notifications channel');
}

setupSubscriber();

subscribe メソッドでチャネルを購読し、コールバック関数でメッセージを処理します。

Pub/Sub の制限を確認

Pub/Sub の配信保証の欠如を実際に確認してみましょう。

typescript// test-pubsub-limitation.ts
import { createClient } from 'redis';

async function demonstrateLimitation() {
  const publisher = createClient({
    url: 'redis://localhost:6379',
  });
  await publisher.connect();

  // Subscriber が接続する前にメッセージを送信
  console.log(
    'Publishing message before subscriber connects...'
  );
  await publisher.publish(
    'test-channel',
    'This message will be lost'
  );

  // 1秒待機
  await new Promise((resolve) => setTimeout(resolve, 1000));

  // ここで Subscriber を接続
  const subscriber = createClient({
    url: 'redis://localhost:6379',
  });
  await subscriber.connect();
  await subscriber.subscribe('test-channel', (msg) => {
    console.log('Received:', msg);
  });

  // Subscriber 接続後にメッセージを送信
  console.log(
    'Publishing message after subscriber connects...'
  );
  await publisher.publish(
    'test-channel',
    'This message will be received'
  );
}

このコードを実行すると、最初のメッセージは受信されないことがわかります。

Streams の基本実装

次に、Redis Streams の実装を見てみましょう。

Streams は Pub/Sub よりも複雑ですが、強力な配信保証を提供します。

Producer の実装

Producer はメッセージを Stream に追加します。

typescript// stream-producer.ts
import { createClient } from 'redis';

const client = createClient({
  url: 'redis://localhost:6379',
});

client.on('error', (err) => {
  console.error('Redis Client Error:', err);
});

同じく Redis クライアントを作成します。

typescript// Stream にメッセージを追加
async function addMessage(
  streamKey: string,
  data: Record<string, string>
) {
  try {
    // XADD コマンドでメッセージを追加
    // '*' は自動的に ID を生成することを意味する
    const messageId = await client.xAdd(
      streamKey,
      '*',
      data
    );
    console.log(`Message added with ID: ${messageId}`);
    return messageId;
  } catch (error) {
    console.error('Error adding message:', error);
    throw error;
  }
}

xAdd メソッドは、メッセージの一意な ID を返します。

typescript// 実行例
async function main() {
  await client.connect();

  // Stream にメッセージを追加
  await addMessage('orders', {
    orderId: '12345',
    userId: 'user-001',
    product: 'laptop',
    quantity: '1',
    timestamp: new Date().toISOString(),
  });

  await client.quit();
}

main();

フィールドと値のペアとしてデータを格納します。

Consumer Group の作成と Consumer の実装

Consumer Group を使用することで、複数の Consumer でメッセージを分散処理できます。

typescript// stream-consumer.ts
import { createClient } from 'redis';

const client = createClient({
  url: 'redis://localhost:6379',
});

await client.connect();

接続後、Consumer Group を作成します。

typescript// Consumer Group の作成
async function createConsumerGroup(
  streamKey: string,
  groupName: string
) {
  try {
    // XGROUP CREATE コマンドで Consumer Group を作成
    // '0' は Stream の最初から読み取ることを意味する
    await client.xGroupCreate(streamKey, groupName, '0', {
      MKSTREAM: true, // Stream が存在しない場合は作成
    });
    console.log(`Consumer group '${groupName}' created`);
  } catch (error: any) {
    // すでに存在する場合はエラーを無視
    if (error.message.includes('BUSYGROUP')) {
      console.log(
        `Consumer group '${groupName}' already exists`
      );
    } else {
      throw error;
    }
  }
}

MKSTREAM オプションにより、Stream が自動的に作成されます。

typescript// メッセージの読み取りと処理
async function consumeMessages(
  streamKey: string,
  groupName: string,
  consumerName: string
) {
  console.log(`Consumer '${consumerName}' started`);

  while (true) {
    try {
      // XREADGROUP コマンドでメッセージを読み取る
      // '>' は未配信のメッセージのみを取得
      const messages = await client.xReadGroup(
        groupName,
        consumerName,
        { key: streamKey, id: '>' },
        { COUNT: 10, BLOCK: 5000 } // 最大10件、5秒待機
      );

      if (!messages || messages.length === 0) {
        continue;
      }

      // メッセージの処理
      for (const stream of messages) {
        for (const msg of stream.messages) {
          await processMessage(streamKey, groupName, msg);
        }
      }
    } catch (error) {
      console.error('Error consuming messages:', error);
      await new Promise((resolve) =>
        setTimeout(resolve, 1000)
      );
    }
  }
}

BLOCK オプションにより、新しいメッセージが来るまで待機します。

typescript// メッセージ処理関数
async function processMessage(
  streamKey: string,
  groupName: string,
  message: any
) {
  const { id, message: data } = message;

  try {
    console.log(`Processing message ${id}:`, data);

    // ビジネスロジックの実行(例:注文処理)
    await handleOrder(data);

    // 処理成功を確認(XACK)
    await client.xAck(streamKey, groupName, id);
    console.log(`Message ${id} acknowledged`);
  } catch (error) {
    console.error(`Error processing message ${id}:`, error);
    // エラー時は XACK しないため、Pending List に残る
  }
}

xAck で処理完了を通知することが重要です。

typescript// ビジネスロジックの例
async function handleOrder(data: Record<string, string>) {
  // 実際の処理をシミュレート
  await new Promise((resolve) => setTimeout(resolve, 100));
  console.log(
    `Order ${data.orderId} processed for user ${data.userId}`
  );
}

実際の処理では、データベース更新や外部 API 呼び出しなどを行います。

Pending メッセージの再処理

処理に失敗したメッセージは Pending List に残ります。

typescript// pending-recovery.ts
import { createClient } from 'redis';

const client = createClient({
  url: 'redis://localhost:6379',
});
await client.connect();

// Pending メッセージの確認と再処理
async function recoverPendingMessages(
  streamKey: string,
  groupName: string,
  consumerName: string
) {
  try {
    // XPENDING コマンドで未確認メッセージを取得
    const pending = await client.xPending(
      streamKey,
      groupName
    );
    console.log(`Pending messages: ${pending.pending}`);

    if (pending.pending === 0) {
      return;
    }

    // 詳細な Pending 情報を取得
    const details = await client.xPendingRange(
      streamKey,
      groupName,
      '-',
      '+',
      10
    );

    for (const entry of details) {
      // 一定時間経過したメッセージを再割り当て
      if (entry.millisecondsSinceLastDelivery > 60000) {
        // 60秒
        console.log(`Claiming message ${entry.id}`);

        // XCLAIM で自分に再割り当て
        const claimed = await client.xClaim(
          streamKey,
          groupName,
          consumerName,
          60000,
          [entry.id]
        );

        // 再処理
        for (const msg of claimed.messages) {
          await processMessage(streamKey, groupName, msg);
        }
      }
    }
  } catch (error) {
    console.error(
      'Error recovering pending messages:',
      error
    );
  }
}

この仕組みにより、障害からの自動復旧が可能になります。

パフォーマンス比較の実装

実際にパフォーマンスを測定してみましょう。

typescript// performance-test.ts
import { createClient } from 'redis';

// Pub/Sub のパフォーマンステスト
async function testPubSubPerformance(messageCount: number) {
  const publisher = createClient({
    url: 'redis://localhost:6379',
  });
  const subscriber = createClient({
    url: 'redis://localhost:6379',
  });

  await publisher.connect();
  await subscriber.connect();

  let receivedCount = 0;
  const startTime = Date.now();

  // Subscriber を設定
  await subscriber.subscribe('perf-test', () => {
    receivedCount++;
  });

  // メッセージ送信
  for (let i = 0; i < messageCount; i++) {
    await publisher.publish('perf-test', `Message ${i}`);
  }

  // すべて受信するまで待機
  while (receivedCount < messageCount) {
    await new Promise((resolve) => setTimeout(resolve, 10));
  }

  const elapsed = Date.now() - startTime;
  console.log(
    `Pub/Sub: ${messageCount} messages in ${elapsed}ms`
  );
  console.log(
    `Throughput: ${(
      (messageCount / elapsed) *
      1000
    ).toFixed(2)} msg/s`
  );

  await publisher.quit();
  await subscriber.quit();
}

このテストでレイテンシとスループットを測定できます。

typescript// Streams のパフォーマンステスト
async function testStreamsPerformance(
  messageCount: number
) {
  const producer = createClient({
    url: 'redis://localhost:6379',
  });
  const consumer = createClient({
    url: 'redis://localhost:6379',
  });

  await producer.connect();
  await consumer.connect();

  const streamKey = 'perf-stream';
  const groupName = 'perf-group';

  // Consumer Group を作成
  try {
    await consumer.xGroupCreate(streamKey, groupName, '0', {
      MKSTREAM: true,
    });
  } catch (error: any) {
    if (!error.message.includes('BUSYGROUP')) {
      throw error;
    }
  }

  const startTime = Date.now();

  // メッセージ追加
  for (let i = 0; i < messageCount; i++) {
    await producer.xAdd(streamKey, '*', {
      message: `Message ${i}`,
    });
  }

  // メッセージ読み取りと確認
  let processedCount = 0;
  while (processedCount < messageCount) {
    const messages = await consumer.xReadGroup(
      groupName,
      'perf-consumer',
      { key: streamKey, id: '>' },
      { COUNT: 100 }
    );

    if (messages && messages.length > 0) {
      for (const stream of messages) {
        const ids = stream.messages.map((m) => m.id);
        await consumer.xAck(streamKey, groupName, ids);
        processedCount += ids.length;
      }
    }
  }

  const elapsed = Date.now() - startTime;
  console.log(
    `Streams: ${messageCount} messages in ${elapsed}ms`
  );
  console.log(
    `Throughput: ${(
      (messageCount / elapsed) *
      1000
    ).toFixed(2)} msg/s`
  );

  await producer.quit();
  await consumer.quit();
}

Streams は永続化のオーバーヘッドがあるため、Pub/Sub よりも若干遅くなります。

typescript// 比較実行
async function runComparison() {
  const messageCount = 10000;

  console.log('=== Performance Comparison ===\n');

  await testPubSubPerformance(messageCount);
  console.log();
  await testStreamsPerformance(messageCount);
}

runComparison();

この比較により、用途に応じた最適な選択ができるようになります。

実用的なユースケース例

最後に、実際のアプリケーションでの使い分けを見てみましょう。

typescript// hybrid-messaging.ts
import { createClient } from 'redis';

/**
 * リアルタイム通知には Pub/Sub を使用
 * 一時的で失われても問題ない情報向け
 */
class NotificationService {
  private client;

  constructor() {
    this.client = createClient({
      url: 'redis://localhost:6379',
    });
  }

  async connect() {
    await this.client.connect();
  }

  // ライブ通知を送信
  async sendLiveNotification(
    userId: string,
    message: string
  ) {
    const channel = `user:${userId}:notifications`;
    await this.client.publish(
      channel,
      JSON.stringify({
        type: 'live',
        message,
        timestamp: Date.now(),
      })
    );
  }
}

リアルタイム性が重要な通知には Pub/Sub が最適です。

typescript/**
 * 重要なイベント処理には Streams を使用
 * 確実な配信と処理が必要な場合向け
 */
class OrderProcessingService {
  private client;
  private streamKey = 'orders';
  private groupName = 'order-processors';

  constructor() {
    this.client = createClient({
      url: 'redis://localhost:6379',
    });
  }

  async connect() {
    await this.client.connect();

    // Consumer Group を初期化
    try {
      await this.client.xGroupCreate(
        this.streamKey,
        this.groupName,
        '0',
        { MKSTREAM: true }
      );
    } catch (error: any) {
      if (!error.message.includes('BUSYGROUP')) {
        throw error;
      }
    }
  }

  // 注文をキューに追加
  async submitOrder(order: {
    orderId: string;
    userId: string;
    items: string;
    total: string;
  }) {
    const messageId = await this.client.xAdd(
      this.streamKey,
      '*',
      order
    );
    console.log(
      `Order ${order.orderId} queued with ID ${messageId}`
    );
    return messageId;
  }
}

注文処理のような重要なイベントには Streams を使用します。

typescript// 使用例
async function demonstrateHybridApproach() {
  const notifications = new NotificationService();
  const orders = new OrderProcessingService();

  await notifications.connect();
  await orders.connect();

  // 重要な注文は Streams で確実に処理
  await orders.submitOrder({
    orderId: 'ORD-001',
    userId: 'user-123',
    items: JSON.stringify([{ product: 'laptop', qty: 1 }]),
    total: '1500',
  });

  // リアルタイム通知は Pub/Sub で即座に配信
  await notifications.sendLiveNotification(
    'user-123',
    'Your order has been received!'
  );

  console.log('Hybrid messaging approach demonstrated');
}

demonstrateHybridApproach();

このハイブリッドアプローチにより、各機能の利点を最大限に活用できますね。

図で理解できる要点

  • Pub/Sub は軽量で低レイテンシだが配信保証なし
  • Streams は永続化により確実な配信を実現
  • Consumer Group で複数ワーカーへの負荷分散が可能
  • 用途に応じて使い分けることで最適なシステムを構築

まとめ

Redis Pub/Sub と Redis Streams は、それぞれ異なる強みを持つメッセージング機能です。

Pub/Sub は軽量でシンプル、超低レイテンシが特徴ですが、配信保証やメッセージ履歴がありません。一方、Streams はメッセージの永続化、Consumer Group による負荷分散、Acknowledge 機能により確実な配信を実現します。

選択のポイントは以下の通りです。

Pub/Sub を選ぶべきケース

  • リアルタイム性が最優先
  • メッセージの損失が許容できる
  • シンプルなブロードキャストで十分
  • 超低レイテンシが求められる

Streams を選ぶべきケース

  • メッセージの損失が許容できない
  • 過去のメッセージを参照したい
  • 複数ワーカーで負荷分散したい
  • 処理失敗時の再試行が必要

多くの実用的なアプリケーションでは、両者を組み合わせたハイブリッドアプローチが効果的でしょう。重要なビジネスロジックには Streams を、一時的な通知には Pub/Sub を使用することで、信頼性とパフォーマンスの両立が可能になります。

あなたのプロジェクトの要件に合わせて、最適なメッセージング戦略を選択してくださいね。

関連リンク