T-CREATOR

Python ドメインイベント設計:Outbox・整合性・再試行をコードで実装

Python ドメインイベント設計:Outbox・整合性・再試行をコードで実装

マイクロサービスやイベント駆動アーキテクチャを採用する際、最も頭を悩ませるのが「データの整合性」と「イベント配信の信頼性」ではないでしょうか。

データベースへの書き込みとイベント発行を別々に行うと、片方が失敗した場合にシステム全体の整合性が崩れてしまいます。また、ネットワーク障害やサービス停止時には、イベントが正しく配信されない可能性もあるでしょう。

本記事では、こうした課題を解決する「Outbox パターン」を中心に、Python で実装するドメインイベント設計の実践的な手法をご紹介します。再試行メカニズムやトランザクション管理を含めた完全なコード例とともに、初心者の方にも理解しやすい形で解説していきますね。

背景

ドメイン駆動設計とイベントソーシング

ドメイン駆動設計(DDD)では、ビジネスロジックを「ドメインモデル」として表現し、システムの変化を「ドメインイベント」として扱います。例えば、EC サイトで注文が作成されたら「OrderCreated」イベントを発行し、在庫管理システムや決済システムがそのイベントを受け取って処理を行う仕組みです。

このアプローチには大きなメリットがあります。システムを疎結合に保ちながら、複数のサービス間で協調動作を実現できるのです。

マイクロサービスアーキテクチャの普及

近年、モノリシックなアプリケーションからマイクロサービスへの移行が加速しています。各サービスが独立したデータベースを持ち、イベントを通じて連携するアーキテクチャが主流になってきました。

Python は、FastAPI や Flask などの軽量フレームワークと豊富なライブラリエコシステムにより、マイクロサービス構築に最適な言語の一つとして注目されていますね。

下図は、典型的なマイクロサービスアーキテクチャにおけるイベントフローを示したものです。

mermaidflowchart LR
  order["注文サービス"] -->|OrderCreated| broker["メッセージブローカー<br/>(Kafka/RabbitMQ)"]
  broker -->|イベント配信| inventory["在庫サービス"]
  broker -->|イベント配信| payment["決済サービス"]
  broker -->|イベント配信| notification["通知サービス"]
  order -.->|DB書き込み| orderDB[("注文DB")]
  inventory -.->|DB書き込み| inventoryDB[("在庫DB")]

この図からわかるように、注文サービスが OrderCreated イベントを発行すると、複数の下流サービスがそれぞれ独立して処理を行います。

イベント駆動アーキテクチャの利点

#利点説明
1疎結合サービス間の依存関係を最小化し、独立した開発・デプロイが可能
2スケーラビリティイベント消費者を水平スケールさせることで処理能力を向上
3監査ログすべてのイベントが記録されるため、システムの状態変化を追跡可能
4非同期処理時間のかかる処理を非同期化し、ユーザー体験を向上

課題

デュアル書き込み問題

最も深刻な課題は「デュアル書き込み問題」と呼ばれるものです。データベースへの書き込みとメッセージブローカーへのイベント発行を別々のトランザクションで行うと、以下のような問題が発生します。

データベースへの書き込みは成功したものの、その直後にアプリケーションがクラッシュしてイベントが発行されない場合、他のサービスは状態変化を知ることができません。逆に、イベントは発行されたがデータベース書き込みに失敗した場合、存在しないデータに対するイベントが流れてしまいます。

下図は、デュアル書き込み問題が発生する典型的なシナリオを表しています。

mermaidsequenceDiagram
  participant App as アプリケーション
  participant DB as データベース
  participant Broker as メッセージブローカー

  App->>DB: トランザクション開始
  App->>DB: 注文データ書き込み
  DB-->>App: 成功
  App->>DB: コミット
  DB-->>App: 成功

  Note over App: ここでクラッシュ!

  App-xBroker: イベント発行失敗

  Note over Broker,DB: データは保存されたが<br/>イベントは配信されず<br/>システム整合性が崩れる

ネットワーク障害とメッセージロスト

分散システムでは、ネットワーク障害は避けられません。メッセージブローカーへの接続が一時的に失われた場合、イベントが失われてしまう可能性があるでしょう。

また、メッセージブローカー自体がダウンしている場合、イベント発行のリトライを実装していないと、重要なビジネスイベントが消失してしまいます。

べき等性の欠如

ネットワークの不安定さやリトライ処理により、同じイベントが複数回配信される可能性があります。イベント処理がべき等でない場合、重複処理によってデータの不整合が発生してしまうのです。

例えば、在庫減少イベントが 2 回処理されると、実際よりも多く在庫が減ってしまいますね。

トランザクション境界の不明瞭さ

複数のドメインイベントを一つのトランザクション内で発行する場合、どのイベントがどの順序で発行されるべきか、トランザクションが失敗した場合にどう処理するかといった設計が複雑になります。

主な課題を表にまとめると以下のようになります。

#課題影響発生頻度
1デュアル書き込み問題データとイベントの不整合★★★
2ネットワーク障害イベントロスト★★☆
3べき等性の欠如重複処理による不整合★★☆
4トランザクション管理複雑性の増加★★★

解決策

Outbox パターンの採用

Outbox パターンは、デュアル書き込み問題を解決する最も確実な方法です。基本的な考え方はシンプルで、ドメインイベントを専用の Outbox テーブルに保存し、別プロセスでそれを読み取ってメッセージブローカーに発行します。

重要なのは、ビジネスデータと Outbox テーブルへの書き込みを同一のデータベーストランザクション内で行うことです。これにより、両方が成功するか、両方が失敗するかのどちらかになり、整合性が保証されます。

下図は、Outbox パターンの全体フローを示しています。

mermaidflowchart TB
  app["アプリケーション"] -->|1. 同一トランザクション内| db[("データベース")]
  app -->|1. 同一トランザクション内| outbox[("Outboxテーブル")]

  poller["Outboxポーラー<br/>(別プロセス)"] -->|2. 定期的にポーリング| outbox
  poller -->|3. イベント発行| broker["メッセージブローカー"]
  poller -->|4. 発行済みマーク| outbox

  broker -->|5. イベント配信| consumer["イベント消費者"]

  style outbox fill:#f9f,stroke:#333,stroke-width:2px

この図から、Outbox パターンが以下のステップで動作することがわかります。

要点整理:

  • ビジネスデータとイベントを同一トランザクションで保存
  • 別プロセスが Outbox テーブルをポーリング
  • 発行済みイベントはマークして重複配信を防止

再試行メカニズムの実装

Outbox ポーラーがメッセージブローカーへのイベント発行に失敗した場合、再試行メカニズムが必要です。一般的な戦略は以下の通りです。

#戦略説明適用ケース
1指数バックオフ待機時間を指数的に増加させる一時的な障害
2最大リトライ回数一定回数で諦める恒久的な障害
3デッドレターキュー失敗したイベントを別キューへ手動対応が必要
4サーキットブレーカー連続失敗時に一時停止ブローカーダウン

べき等性の保証

イベント消費者側では、各イベントにユニークな ID を付与し、処理済みイベントの ID をデータベースに記録することでべき等性を実現します。

同じイベント ID が再度来た場合は、スキップするだけで重複処理を防げるのです。

トランザクション整合性の設計方針

Outbox パターンを採用することで、以下の整合性が保証されます。

結果整合性(Eventual Consistency):ビジネスデータの変更は即座に反映されますが、イベント配信には若干の遅延が生じます。しかし、最終的にはすべてのサービスが整合した状態になるでしょう。

At-Least-Once 配信:Outbox ポーラーの実装により、各イベントは少なくとも 1 回は配信されることが保証されます。消費者側のべき等性実装と組み合わせることで、Exactly-Once セマンティクスを実現できますね。

具体例

それでは、Python での具体的な実装を見ていきましょう。今回は、EC サイトの注文処理を例に、Outbox パターンと再試行メカニズムを実装します。

環境とライブラリ

まず、必要なライブラリをインストールします。

python# requirements.txt
sqlalchemy>=2.0.0
pydantic>=2.0.0
python-dotenv>=1.0.0
kafka-python>=2.0.2
tenacity>=8.2.0

依存関係をインストールするコマンドは以下の通りです。

bashyarn add python  # Pythonプロジェクトでもyarnでの管理を想定
pip install -r requirements.txt

データモデルの定義

まず、ドメインモデルと Outbox テーブルのモデルを定義します。SQLAlchemy を使用して、データベーススキーマを表現しましょう。

python# models.py - インポート部分
from sqlalchemy import Column, String, Integer, DateTime, Text, Boolean
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import json

Base = declarative_base()

次に、注文テーブルのモデルを定義します。これはビジネスデータを保存するテーブルです。

python# models.py - 注文モデル
class Order(Base):
    """注文を表すドメインモデル"""
    __tablename__ = 'orders'

    # 注文の一意識別子
    id = Column(String(36), primary_key=True)
    # 顧客ID
    customer_id = Column(String(36), nullable=False)
    # 注文金額
    amount = Column(Integer, nullable=False)
    # 注文ステータス(pending, confirmed, cancelledなど)
    status = Column(String(20), nullable=False, default='pending')
    # 作成日時
    created_at = Column(DateTime, default=datetime.utcnow)

続いて、Outbox テーブルのモデルを定義します。これがイベントを一時保存するテーブルになります。

python# models.py - Outboxモデル
class OutboxEvent(Base):
    """イベントを保存するOutboxテーブル"""
    __tablename__ = 'outbox_events'

    # イベントの一意識別子
    id = Column(String(36), primary_key=True)
    # 集約ID(どのエンティティに関するイベントか)
    aggregate_id = Column(String(36), nullable=False)
    # イベントタイプ(OrderCreated, OrderCancelledなど)
    event_type = Column(String(100), nullable=False)
    # イベントのペイロード(JSON形式)
    payload = Column(Text, nullable=False)
    # 発行済みフラグ
    published = Column(Boolean, default=False)
    # 作成日時
    created_at = Column(DateTime, default=datetime.utcnow)
    # 発行日時
    published_at = Column(DateTime, nullable=True)

ドメインイベントの定義

ドメインイベントを Pydantic モデルとして定義します。これにより、型安全性とバリデーションが実現できます。

python# events.py - インポート
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Any, Dict
import uuid

基底イベントクラスを定義し、すべてのイベントが持つべき共通フィールドを定義します。

python# events.py - 基底イベントクラス
class DomainEvent(BaseModel):
    """すべてのドメインイベントの基底クラス"""
    # イベントの一意識別子(べき等性のために使用)
    event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    # イベント発生日時
    occurred_at: datetime = Field(default_factory=datetime.utcnow)
    # イベントタイプ(クラス名から自動取得)
    event_type: str = Field(default="")

    def __init__(self, **data):
        super().__init__(**data)
        # event_typeが未設定の場合、クラス名を使用
        if not self.event_type:
            self.event_type = self.__class__.__name__

具体的なイベントクラスを定義します。ここでは注文作成イベントを例にします。

python# events.py - 注文作成イベント
class OrderCreatedEvent(DomainEvent):
    """注文が作成された際に発行されるイベント"""
    # 注文ID
    order_id: str
    # 顧客ID
    customer_id: str
    # 注文金額
    amount: int
    # 注文ステータス
    status: str = "pending"

    def to_dict(self) -> Dict[str, Any]:
        """イベントを辞書形式に変換(JSON化のため)"""
        return self.model_dump()

サービスレイヤーの実装

注文を作成し、同時に Outbox にイベントを保存するサービスクラスを実装します。

python# order_service.py - インポート
from sqlalchemy.orm import Session
from models import Order, OutboxEvent
from events import OrderCreatedEvent
import uuid
import json
from typing import Optional

注文サービスのメインクラスを定義します。このクラスがビジネスロジックとイベント発行を担当します。

python# order_service.py - 注文サービスクラス
class OrderService:
    """注文に関するビジネスロジックを担当するサービス"""

    def __init__(self, db_session: Session):
        # データベースセッション(トランザクション管理に使用)
        self.db = db_session

    def create_order(
        self,
        customer_id: str,
        amount: int
    ) -> Order:
        """
        新規注文を作成し、OrderCreatedEventを発行する

        重要:注文データとイベントを同一トランザクション内で保存
        """
        # 注文IDを生成
        order_id = str(uuid.uuid4())

        # 注文エンティティを作成
        order = Order(
            id=order_id,
            customer_id=customer_id,
            amount=amount,
            status='pending'
        )

注文作成処理の続きで、イベントを Outbox に保存します。これが重要なポイントです。

python# order_service.py - イベント保存処理
        # ドメインイベントを作成
        event = OrderCreatedEvent(
            order_id=order_id,
            customer_id=customer_id,
            amount=amount,
            status='pending'
        )

        # Outboxテーブルにイベントを保存
        outbox_event = OutboxEvent(
            id=event.event_id,
            aggregate_id=order_id,
            event_type=event.event_type,
            payload=json.dumps(event.to_dict())
        )

        # 同一トランザクション内で両方を保存
        self.db.add(order)
        self.db.add(outbox_event)
        self.db.commit()

        return order

Outbox ポーラーの実装

別プロセスとして動作する Outbox ポーラーを実装します。これが未発行のイベントを定期的にチェックし、メッセージブローカーに発行する役割を担います。

python# outbox_poller.py - インポート
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from models import OutboxEvent
from kafka import KafkaProducer
from tenacity import retry, stop_after_attempt, wait_exponential
import json
import time
from datetime import datetime
from typing import List

Kafka プロデューサーを初期化する部分を実装します。

python# outbox_poller.py - Kafka設定
class OutboxPoller:
    """Outboxテーブルをポーリングしてイベントを発行"""

    def __init__(
        self,
        db_url: str,
        kafka_servers: List[str],
        topic: str = 'domain-events'
    ):
        # データベース接続
        self.engine = create_engine(db_url)
        # Kafkaプロデューサー
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.topic = topic

再試行メカニズムを含むイベント発行メソッドを実装します。tenacity ライブラリを使用して、指数バックオフを実現します。

python# outbox_poller.py - イベント発行メソッド
    @retry(
        stop=stop_after_attempt(5),  # 最大5回リトライ
        wait=wait_exponential(multiplier=1, min=2, max=60)  # 指数バックオフ
    )
    def publish_event(self, event: OutboxEvent) -> None:
        """
        単一のイベントをKafkaに発行する

        失敗時は自動的にリトライされる(tenacityデコレータ)
        """
        # ペイロードをパース
        payload = json.loads(event.payload)

        # Kafkaに送信
        future = self.producer.send(
            self.topic,
            key=event.aggregate_id.encode('utf-8'),
            value=payload
        )

        # 送信完了を待機
        future.get(timeout=10)

ポーリングループのメイン処理を実装します。

python# outbox_poller.py - ポーリングループ
    def poll_and_publish(self) -> None:
        """
        Outboxテーブルから未発行イベントを取得し、
        Kafkaに発行してマークする
        """
        with Session(self.engine) as session:
            # 未発行イベントを取得(作成日時順)
            events = session.query(OutboxEvent)\
                .filter(OutboxEvent.published == False)\
                .order_by(OutboxEvent.created_at)\
                .limit(100)\
                .all()

            for event in events:
                try:
                    # イベント発行(リトライ付き)
                    self.publish_event(event)

イベント発行後の処理を実装します。発行済みフラグを立てて、重複配信を防ぎます。

python# outbox_poller.py - 発行済みマーク
                    # 発行済みマークを付ける
                    event.published = True
                    event.published_at = datetime.utcnow()
                    session.commit()

                    print(f"Published event {event.id} of type {event.event_type}")

                except Exception as e:
                    # リトライを使い果たした場合
                    print(f"Failed to publish event {event.id}: {e}")
                    session.rollback()
                    # 次のイベントへ続行(デッドレターキュー実装も検討)

ポーラーを継続的に実行するメソッドを実装します。

python# outbox_poller.py - 実行ループ
    def run(self, interval_seconds: int = 5) -> None:
        """
        指定間隔でポーリングを継続的に実行

        Args:
            interval_seconds: ポーリング間隔(秒)
        """
        print(f"Starting Outbox Poller (interval: {interval_seconds}s)")

        while True:
            try:
                self.poll_and_publish()
                time.sleep(interval_seconds)
            except KeyboardInterrupt:
                print("Stopping Outbox Poller...")
                self.producer.close()
                break
            except Exception as e:
                print(f"Poller error: {e}")
                time.sleep(interval_seconds)

イベント消費者の実装

イベントを受信して処理する消費者側の実装も見ていきましょう。べき等性を保証するため、処理済みイベント ID を記録します。

python# event_consumer.py - インポート
from kafka import KafkaConsumer
from sqlalchemy import create_engine, Column, String, DateTime
from sqlalchemy.orm import Session, declarative_base
from datetime import datetime
import json
from typing import Callable, Dict

処理済みイベントを記録するテーブルモデルを定義します。

python# event_consumer.py - 処理済みイベントテーブル
Base = declarative_base()

class ProcessedEvent(Base):
    """べき等性保証のための処理済みイベント記録"""
    __tablename__ = 'processed_events'

    # イベントID(主キー)
    event_id = Column(String(36), primary_key=True)
    # イベントタイプ
    event_type = Column(String(100), nullable=False)
    # 処理日時
    processed_at = Column(DateTime, default=datetime.utcnow)

イベント消費者のメインクラスを実装します。

python# event_consumer.py - 消費者クラス
class EventConsumer:
    """ドメインイベントを消費して処理する"""

    def __init__(self, db_url: str, kafka_servers: list, topic: str):
        # データベース接続
        self.engine = create_engine(db_url)
        Base.metadata.create_all(self.engine)

        # Kafka消費者
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=kafka_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            group_id='order-processing-group'
        )

        # イベントハンドラーの登録
        self.handlers: Dict[str, Callable] = {}

イベントハンドラーを登録するメソッドと、べき等性チェックを実装します。

python# event_consumer.py - ハンドラー登録とべき等性チェック
    def register_handler(self, event_type: str, handler: Callable):
        """特定のイベントタイプに対するハンドラーを登録"""
        self.handlers[event_type] = handler

    def is_processed(self, session: Session, event_id: str) -> bool:
        """イベントが既に処理済みかチェック"""
        return session.query(ProcessedEvent)\
            .filter(ProcessedEvent.event_id == event_id)\
            .first() is not None

    def mark_processed(self, session: Session, event_id: str, event_type: str):
        """イベントを処理済みとしてマーク"""
        processed = ProcessedEvent(
            event_id=event_id,
            event_type=event_type
        )
        session.add(processed)

メッセージ消費のメインループを実装します。

python# event_consumer.py - 消費ループ
    def consume(self) -> None:
        """イベントを継続的に消費して処理"""
        print("Starting Event Consumer...")

        for message in self.consumer:
            event_data = message.value
            event_id = event_data.get('event_id')
            event_type = event_data.get('event_type')

            with Session(self.engine) as session:
                try:
                    # べき等性チェック
                    if self.is_processed(session, event_id):
                        print(f"Event {event_id} already processed, skipping")
                        continue

                    # ハンドラーを実行
                    if event_type in self.handlers:
                        self.handlers[event_type](event_data)

                        # 処理済みマーク
                        self.mark_processed(session, event_id, event_type)
                        session.commit()

                        print(f"Processed event {event_id} of type {event_type}")
                    else:
                        print(f"No handler for event type {event_type}")

                except Exception as e:
                    print(f"Error processing event {event_id}: {e}")
                    session.rollback()

実行例

すべてを統合して実行する例を示します。まず、データベース初期化とアプリケーション起動のコードです。

python# main.py - データベース初期化
from sqlalchemy import create_engine
from models import Base, Order
from order_service import OrderService
from outbox_poller import OutboxPoller
from event_consumer import EventConsumer
from sqlalchemy.orm import Session

# データベース接続文字列
DATABASE_URL = "postgresql://user:password@localhost/orders_db"

# データベース初期化
engine = create_engine(DATABASE_URL)
Base.metadata.create_all(engine)

注文作成のサンプルコードです。

python# main.py - 注文作成
def create_sample_order():
    """サンプル注文を作成する例"""
    with Session(engine) as session:
        # OrderServiceのインスタンス作成
        order_service = OrderService(session)

        # 注文作成(自動的にOutboxにイベントも保存される)
        order = order_service.create_order(
            customer_id="customer-123",
            amount=15000
        )

        print(f"Created order: {order.id}")
        print("Event stored in outbox table")

Outbox ポーラーの起動コードです。これは別プロセスとして実行します。

python# main.py - Outboxポーラー起動
def start_outbox_poller():
    """Outboxポーラーを起動(別プロセスで実行)"""
    poller = OutboxPoller(
        db_url=DATABASE_URL,
        kafka_servers=['localhost:9092'],
        topic='domain-events'
    )

    # 5秒間隔でポーリング
    poller.run(interval_seconds=5)

イベント消費者の起動コードです。

python# main.py - イベント消費者起動
def start_event_consumer():
    """イベント消費者を起動(別プロセスで実行)"""
    consumer = EventConsumer(
        db_url=DATABASE_URL,
        kafka_servers=['localhost:9092'],
        topic='domain-events'
    )

    # OrderCreatedイベントのハンドラーを登録
    def handle_order_created(event_data):
        print(f"Handling OrderCreated: {event_data['order_id']}")
        # ここで在庫確保、決済処理などを実行
        # 例:inventory_service.reserve(event_data['order_id'])

    consumer.register_handler('OrderCreatedEvent', handle_order_created)
    consumer.consume()

実際の実行方法をまとめます。

python# main.py - エントリーポイント
if __name__ == "__main__":
    import sys

    if len(sys.argv) < 2:
        print("Usage: python main.py [create|poller|consumer]")
        sys.exit(1)

    command = sys.argv[1]

    if command == "create":
        # 注文作成
        create_sample_order()
    elif command == "poller":
        # Outboxポーラー起動
        start_outbox_poller()
    elif command == "consumer":
        # イベント消費者起動
        start_event_consumer()
    else:
        print(f"Unknown command: {command}")

実行方法は以下の通りです。

bash# ターミナル1: Outboxポーラーを起動
python main.py poller

# ターミナル2: イベント消費者を起動
python main.py consumer

# ターミナル3: 注文を作成
python main.py create

アーキテクチャ全体像

最後に、実装したシステムの全体フローを図で確認しましょう。

mermaidsequenceDiagram
  participant API as REST API
  participant Service as OrderService
  participant DB as PostgreSQL
  participant Outbox as Outboxテーブル
  participant Poller as OutboxPoーラー
  participant Kafka as Kafka
  participant Consumer as EventConsumer

  API->>Service: create_order()

  Service->>DB: BEGIN TRANSACTION
  Service->>DB: INSERT Order
  Service->>Outbox: INSERT Event
  Service->>DB: COMMIT
  DB-->>Service: 成功
  Service-->>API: Order作成完了

  Note over Poller: 5秒ごとにポーリング

  Poller->>Outbox: SELECT未発行イベント
  Outbox-->>Poller: イベントリスト
  Poller->>Kafka: イベント発行
  Kafka-->>Poller: ACK
  Poller->>Outbox: UPDATE published=true

  Kafka->>Consumer: イベント配信
  Consumer->>Consumer: べき等性チェック
  Consumer->>Consumer: ハンドラー実行
  Consumer->>DB: 処理済みマーク保存

この図が示す重要ポイント:

  • ビジネスデータとイベントが同一トランザクションで保存される
  • ポーラーが非同期にイベントを発行する
  • 消費者がべき等性を保証して処理する

まとめ

本記事では、Python を使ったドメインイベント設計の実践的な実装方法を解説しました。

Outbox パターンを採用することで、データベース書き込みとイベント発行の整合性を保証できます。同一トランザクション内で両方を処理することが、このパターンの核心ですね。

再試行メカニズムとべき等性の実装により、ネットワーク障害やサービス停止があっても、イベントが確実に配信され、重複処理も防げるようになりました。

主要な実装ポイントを振り返りましょう。

#実装ポイント目的使用技術
1Outbox パターントランザクション整合性SQLAlchemy
2指数バックオフ再試行ネットワーク障害対応tenacity
3べき等性保証重複処理防止ProcessedEvent テーブル
4非同期ポーリングパフォーマンス向上別プロセス実行

マイクロサービスアーキテクチャやイベント駆動設計を採用する際、本記事で紹介した手法は非常に有効です。特に、金融システムや EC サイトなど、データの整合性が重要なシステムでは必須のパターンと言えるでしょう。

今回のコード例をベースに、実際のプロジェクトに適用していただければ幸いです。サーキットブレーカーやデッドレターキューなど、さらに高度な機能を追加することで、より堅牢なシステムを構築できますね。

分散システム設計の奥深さを感じながら、一歩ずつ実装を進めていきましょう。

関連リンク