T-CREATOR

Convex で実践する CQRS/イベントソーシング:履歴・再生・集約の設計ガイド

Convex で実践する CQRS/イベントソーシング:履歴・再生・集約の設計ガイド

リアルタイムなアプリケーション開発において、データの変更履歴を完全に追跡したい、複雑な集約処理を効率的に実行したい、そんなニーズはありませんか。

Convex は、バックエンド開発を簡素化する強力なプラットフォームですが、CQRS(Command Query Responsibility Segregation)やイベントソーシングといった高度なアーキテクチャパターンとの相性も抜群です。本記事では、Convex を用いて CQRS/イベントソーシングを実践的に実装する方法を、履歴管理、イベント再生、集約設計という 3 つの視点から徹底解説します。

TypeScript と Convex の組み合わせで、データの一貫性と拡張性を両立した設計を体験していきましょう。

背景

CQRS とイベントソーシングとは

CQRS(Command Query Responsibility Segregation)は、データの書き込み(Command)と読み込み(Query)の責務を明確に分離するアーキテクチャパターンです。 従来の CRUD アプリケーションでは、同じモデルやデータベーステーブルに対して作成・更新・削除・参照のすべての操作を行っていましたが、CQRS では書き込み用のモデルと読み込み用のモデルを分けることで、それぞれに最適化された処理を実現できます。

イベントソーシングは、データの現在の状態だけを保存するのではなく、「状態を変更したイベント」そのものを保存する手法です。 たとえば、銀行口座の残高を直接更新するのではなく、「入金イベント」「出金イベント」といった履歴を記録していき、それらを再生することで現在の残高を計算します。

この 2 つのパターンを組み合わせることで、データの完全な監査証跡(Audit Trail)を実現し、過去の任意の時点の状態を再現できるようになります。

Convex の特徴と適合性

Convex は、リアルタイムデータベース、サーバーレス関数、型安全性を統合したバックエンドプラットフォームです。 以下のような特徴があります。

  • リアルタイム性: データベースの変更が即座にクライアントに反映される
  • 型安全性: TypeScript でスキーマを定義し、フロントエンドとバックエンド間で型を共有できる
  • トランザクション: mutation 関数内で複数のデータベース操作をアトミックに実行可能
  • 関数ベースの API: query、mutation、action という 3 種類の関数でロジックを記述

これらの特徴は、CQRS/イベントソーシングの実装に必要な以下の要件と合致しています。

#要件Convex の対応機能
1イベントの永続化データベーステーブルでイベントストアを実装
2イベントの順序保証トランザクション機能で一貫性を確保
3読み取りモデルの更新mutation でイベント保存と同時に集約テーブルを更新
4リアルタイム通知query のリアルタイム購読機能で UI を自動更新
5型安全性スキーマ定義による厳密な型チェック

以下の図は、Convex における CQRS/イベントソーシングの基本構造を示しています。

mermaidflowchart TB
  client["クライアント<br/>(React など)"]
  mutation["Mutation<br/>(Command 処理)"]
  eventStore[("イベントストア<br/>テーブル")]
  aggregate[("集約テーブル<br/>(Read Model)")]
  query["Query<br/>(データ取得)"]

  client -->|"Command 発行"| mutation
  mutation -->|"イベント保存"| eventStore
  mutation -->|"集約更新"| aggregate
  query -->|"データ取得"| aggregate
  aggregate -->|"リアルタイム購読"| client

図で理解できる要点:

  • Command(書き込み)は mutation 経由でイベントストアに保存され、同時に集約テーブルも更新されます
  • Query(読み込み)は集約テーブルから効率的にデータを取得し、リアルタイムでクライアントに配信されます
  • イベントストアは履歴の完全な記録として機能し、集約テーブルは最新状態の高速な参照先となります

なぜ今、このパターンが注目されるのか

近年、マイクロサービスアーキテクチャや分散システムの普及により、データの一貫性と追跡可能性の重要性が高まっています。 特に、以下のようなユースケースでは CQRS/イベントソーシングが強力な解決策となるでしょう。

  • 金融系アプリケーション: 全ての取引履歴を完全に記録し、監査可能にする必要がある
  • EC サイト: 在庫の変動履歴を追跡し、複数の集約ビュー(在庫数、売上統計など)を効率的に提供
  • コラボレーションツール: ドキュメントの変更履歴を保存し、任意の時点の状態を復元可能にする
  • IoT システム: センサーデータをイベントとして記録し、リアルタイムに集計・分析

Convex のリアルタイム性と型安全性は、これらのユースケースにおいて開発速度と品質の両方を向上させます。

課題

従来のアプローチにおける問題点

従来の CRUD ベースのアプリケーション設計では、以下のような課題がありました。

データの履歴が失われる

通常のデータベース更新では、古いデータは新しいデータで上書きされます。 そのため、「誰が」「いつ」「何を」変更したのかという情報が失われてしまいます。

typescript// 従来の CRUD アプローチ
// 残高を直接更新(履歴が失われる)
await db.update('accounts', accountId, {
  balance: newBalance,
});

この方法では、残高が変更された理由や、過去のどの時点でどのような値だったかを追跡できません。

複雑な集約処理のパフォーマンス問題

リアルタイムで複雑な集計や統計を表示する場合、毎回すべてのレコードを読み込んで計算すると、データ量が増えるにつれてパフォーマンスが劣化します。

typescript// 毎回すべての取引を集計(非効率)
const transactions = await db
  .query('transactions')
  .filter((q) => q.eq(q.field('accountId'), accountId))
  .collect();

const balance = transactions.reduce((sum, tx) => {
  return sum + tx.amount;
}, 0);

この処理は、取引件数が増えるほど遅くなり、ユーザーエクスペリエンスを損ないます。

読み取りと書き込みの競合

同じデータモデルを読み書き両方で使用すると、以下のような問題が発生します。

  • パフォーマンスのトレードオフ: 書き込みに最適化すると読み取りが遅くなり、その逆も然り
  • 複雑なクエリ: 読み取り用に非正規化したテーブルを書き込み時に一貫性を保つのが困難
  • スケーラビリティ: 読み取りと書き込みの負荷特性が異なるため、スケーリング戦略が複雑化

以下の図は、従来の CRUD アプローチにおける問題点を示しています。

mermaidflowchart LR
  app["アプリケーション"]
  db[("単一データモデル")]

  app -->|"書き込み<br/>(上書き更新)"| db
  app -->|"複雑な集約<br/>クエリ"| db
  db -->|"履歴喪失<br/>パフォーマンス低下"| app

  style db fill:#ffcccc

図で理解できる要点:

  • 単一のデータモデルに対して書き込みと読み取りの両方を行うことで、履歴の喪失やパフォーマンス問題が発生します
  • データの上書き更新により、変更の理由や過去の状態が失われます

イベントソーシング実装時の課題

イベントソーシングを導入する際にも、いくつかの技術的な課題があります。

イベントの整合性管理

複数のイベントが同時に発生した場合、その順序を正しく保証し、集約の一貫性を維持する必要があります。

イベントの再生パフォーマンス

イベント数が膨大になると、すべてのイベントを再生して現在の状態を復元するのに時間がかかります。

スナップショット戦略

パフォーマンスを維持するため、定期的に現在の状態をスナップショットとして保存し、それ以降のイベントだけを再生する仕組みが必要です。

これらの課題を Convex でどのように解決するか、次のセクションで詳しく見ていきましょう。

解決策

Convex による CQRS アーキテクチャの実装

Convex を使った CQRS/イベントソーシングの実装では、以下の 3 つの要素が中心となります。

イベントストアの設計

すべての状態変化をイベントとして記録するテーブルを用意します。 イベントには、発生日時、イベントタイプ、関連するエンティティ ID、ペイロードなどの情報を含めます。

集約テーブル(Read Model)の設計

クエリのパフォーマンスを最適化するため、イベントから生成された集約済みの状態を保存するテーブルを用意します。 これにより、毎回イベントを再生することなく、最新の状態に即座にアクセスできます。

Mutation による Command 処理

書き込み操作(Command)は mutation 関数として実装します。 mutation 内で、イベントの保存と集約テーブルの更新を同一トランザクション内で実行することで、データの一貫性を保証します。

以下の図は、Convex における CQRS の処理フローを詳細に示しています。

mermaidsequenceDiagram
  participant Client as クライアント
  participant Mutation as Mutation<br/>(Command)
  participant EventStore as イベントストア
  participant Aggregate as 集約テーブル
  participant Query as Query

  Client->>Mutation: Command 実行<br/>(例: 入金)
  activate Mutation
  Mutation->>EventStore: イベント保存<br/>(DepositEvent)
  Mutation->>Aggregate: 残高更新<br/>(+1000円)
  Mutation-->>Client: 処理完了
  deactivate Mutation

  Client->>Query: データ取得<br/>(残高照会)
  Query->>Aggregate: 集約データ取得
  Aggregate-->>Query: 最新残高
  Query-->>Client: リアルタイム更新

図で理解できる要点:

  • Command(入金など)は mutation で処理され、イベントストアと集約テーブルの両方が更新されます
  • Query はイベントを再生せず、集約テーブルから高速にデータを取得します
  • Convex のリアルタイム機能により、集約テーブルの変更が自動的にクライアントに反映されます

イベント再生とスナップショット戦略

イベント数が増加しても高速に状態を復元できるよう、以下の 2 つの戦略を組み合わせます。

定期的なスナップショット作成

一定のイベント数ごと、または一定期間ごとに、現在の集約状態をスナップショットとして保存します。 これにより、再生時にはスナップショット以降のイベントだけを処理すればよくなります。

段階的なイベント再生

過去の特定時点の状態を復元する場合、その時点に最も近いスナップショットを取得し、そこから必要なイベントだけを再生します。

以下の図は、スナップショットを活用したイベント再生の流れを示しています。

mermaidflowchart LR
  snapshot["スナップショット<br/>(1000件目)"]
  event1["イベント<br/>1001"]
  event2["イベント<br/>1002"]
  event3["イベント<br/>1003"]
  current["現在の状態"]

  snapshot --> event1
  event1 --> event2
  event2 --> event3
  event3 --> current

  style snapshot fill:#ccffcc
  style current fill:#ccccff

図で理解できる要点:

  • スナップショットから再生を開始することで、1 ~ 1000 件目のイベント処理をスキップできます
  • 最新の状態を復元する際のパフォーマンスが大幅に向上します

トランザクションとデータ整合性の確保

Convex の mutation 関数は、関数内のすべてのデータベース操作がアトミックに実行されることを保証します。 これにより、イベントの保存と集約の更新が不可分な単位として処理され、データの不整合が発生しません。

万が一、mutation の実行中にエラーが発生した場合、すべての変更がロールバックされます。

具体例

プロジェクトのセットアップ

まず、Convex プロジェクトを作成し、必要なパッケージをインストールします。

bash# Convex プロジェクトの初期化
yarn create convex
bash# 依存パッケージのインストール
cd my-convex-app
yarn install

スキーマ定義

Convex のスキーマを定義して、イベントストアと集約テーブルの構造を決定します。

typescript// convex/schema.ts
import { defineSchema, defineTable } from "convex/server";
import { v } from "convex/values";

export default defineSchema({
  // イベントストアテーブル
  events: defineTable({
    // イベントタイプ(入金、出金など)
    eventType: v.string(),
    // 集約 ID(口座 ID など)
    aggregateId: v.string(),
typescript    // イベント発生日時
    timestamp: v.number(),
    // イベントのペイロード(金額、メモなど)
    payload: v.any(),
    // イベントのバージョン番号(楽観的ロック用)
    version: v.number(),
  })
    .index("by_aggregate", ["aggregateId", "timestamp"])
    .index("by_type", ["eventType", "timestamp"]),

このスキーマでは、events テーブルにイベントを保存します。 aggregateIdtimestamp でインデックスを作成することで、特定の集約に関するイベントを時系列順に高速に取得できます。

typescript  // 集約テーブル(口座の現在状態)
  accounts: defineTable({
    // 口座 ID
    accountId: v.string(),
    // 口座名義
    ownerName: v.string(),
typescript    // 現在の残高
    balance: v.number(),
    // 最後に処理したイベントのバージョン
    lastEventVersion: v.number(),
    // 更新日時
    updatedAt: v.number(),
  }).index("by_account", ["accountId"]),

accounts テーブルは、口座の最新状態を保持する集約テーブルです。 lastEventVersion を記録することで、どこまでのイベントが反映されているかを追跡できます。

typescript  // スナップショットテーブル
  snapshots: defineTable({
    // 集約 ID
    aggregateId: v.string(),
    // スナップショット取得時のバージョン
    version: v.number(),
typescript    // スナップショット時の状態
    state: v.any(),
    // 作成日時
    createdAt: v.number(),
  })
    .index("by_aggregate", ["aggregateId", "version"]),
});

snapshots テーブルは、定期的に集約の状態を保存し、イベント再生のパフォーマンスを向上させます。

Command の実装(Mutation)

次に、入金や出金といった Command を mutation 関数として実装します。

typescript// convex/commands.ts
import { mutation } from "./_generated/server";
import { v } from "convex/values";

// 入金 Command
export const deposit = mutation({
  args: {
    accountId: v.string(),
    amount: v.number(),
    memo: v.optional(v.string()),
  },

このように、mutation の引数をバリデーションすることで、型安全性を確保します。

typescript  handler: async (ctx, args) => {
    // 1. 現在の口座情報を取得
    const account = await ctx.db
      .query("accounts")
      .withIndex("by_account", (q) =>
        q.eq("accountId", args.accountId)
      )
      .first();

まず、対象の口座の現在状態を集約テーブルから取得します。

typescriptif (!account) {
  throw new Error(`Account ${args.accountId} not found`);
}

// 2. 新しいバージョン番号を計算
const newVersion = account.lastEventVersion + 1;

次に、イベントのバージョン番号をインクリメントします。 これにより、イベントの順序を保証できます。

typescript// 3. イベントをイベントストアに保存
await ctx.db.insert('events', {
  eventType: 'DepositEvent',
  aggregateId: args.accountId,
  timestamp: Date.now(),
  payload: {
    amount: args.amount,
    memo: args.memo || '',
  },
  version: newVersion,
});

イベントストアに入金イベントを保存します。 このイベントは、後から履歴として参照したり、状態を再生したりする際に使用されます。

typescript// 4. 集約テーブルを更新
await ctx.db.patch(account._id, {
  balance: account.balance + args.amount,
  lastEventVersion: newVersion,
  updatedAt: Date.now(),
});

同一トランザクション内で、集約テーブルの残高を更新します。 これにより、イベントの保存と集約の更新がアトミックに実行されます。

typescript    // 5. スナップショット作成判定(100 イベントごと)
    if (newVersion % 100 === 0) {
      await ctx.db.insert("snapshots", {
        aggregateId: args.accountId,
        version: newVersion,
        state: {
          balance: account.balance + args.amount,
          ownerName: account.ownerName,
        },
        createdAt: Date.now(),
      });
    }

    return { success: true, newBalance: account.balance + args.amount };
  },
});

100 イベントごとにスナップショットを作成し、イベント再生のパフォーマンスを最適化します。

同様に、出金 Command も実装します。

typescript// 出金 Command
export const withdraw = mutation({
  args: {
    accountId: v.string(),
    amount: v.number(),
    memo: v.optional(v.string()),
  },
  handler: async (ctx, args) => {
typescriptconst account = await ctx.db
  .query('accounts')
  .withIndex('by_account', (q) =>
    q.eq('accountId', args.accountId)
  )
  .first();

if (!account) {
  throw new Error(`Account ${args.accountId} not found`);
}
typescript// 残高不足チェック
if (account.balance < args.amount) {
  throw new Error('Insufficient balance');
}

const newVersion = account.lastEventVersion + 1;
typescript// イベント保存
await ctx.db.insert('events', {
  eventType: 'WithdrawEvent',
  aggregateId: args.accountId,
  timestamp: Date.now(),
  payload: {
    amount: args.amount,
    memo: args.memo || '',
  },
  version: newVersion,
});
typescript    // 集約更新
    await ctx.db.patch(account._id, {
      balance: account.balance - args.amount,
      lastEventVersion: newVersion,
      updatedAt: Date.now(),
    });

    return { success: true, newBalance: account.balance - args.amount };
  },
});

出金の場合は、残高不足のチェックも行います。

Query の実装(読み取り専用)

Query 関数では、集約テーブルから最新の状態を高速に取得します。

typescript// convex/queries.ts
import { query } from "./_generated/server";
import { v } from "convex/values";

// 口座残高を取得する Query
export const getAccountBalance = query({
  args: { accountId: v.string() },
typescript  handler: async (ctx, args) => {
    const account = await ctx.db
      .query("accounts")
      .withIndex("by_account", (q) =>
        q.eq("accountId", args.accountId)
      )
      .first();

    if (!account) {
      return null;
    }
typescript    return {
      accountId: account.accountId,
      ownerName: account.ownerName,
      balance: account.balance,
      lastUpdated: account.updatedAt,
    };
  },
});

この Query は、集約テーブルから直接データを取得するため、イベントを再生する必要がなく高速です。

イベント履歴の取得

特定の口座のすべてのイベント履歴を取得する Query も実装します。

typescript// イベント履歴を取得する Query
export const getAccountHistory = query({
  args: { accountId: v.string() },
  handler: async (ctx, args) => {
typescriptconst events = await ctx.db
  .query('events')
  .withIndex('by_aggregate', (q) =>
    q.eq('aggregateId', args.accountId)
  )
  .order('asc')
  .collect();
typescript    return events.map(event => ({
      eventType: event.eventType,
      timestamp: event.timestamp,
      payload: event.payload,
      version: event.version,
    }));
  },
});

この Query により、過去のすべての入金・出金イベントを時系列順に取得できます。

イベント再生による状態復元

過去の特定時点の状態を復元する関数を実装します。

typescript// 指定バージョン時点の状態を再生する Query
export const replayToVersion = query({
  args: {
    accountId: v.string(),
    targetVersion: v.number(),
  },
typescript  handler: async (ctx, args) => {
    // 1. 最も近いスナップショットを取得
    const snapshot = await ctx.db
      .query("snapshots")
      .withIndex("by_aggregate", (q) =>
        q.eq("aggregateId", args.accountId)
      )
      .filter((q) =>
        q.lte(q.field("version"), args.targetVersion)
      )
      .order("desc")
      .first();

まず、目標バージョン以前の最も新しいスナップショットを取得します。

typescript// 2. スナップショット以降のイベントを取得
const startVersion = snapshot ? snapshot.version : 0;
const events = await ctx.db
  .query('events')
  .withIndex('by_aggregate', (q) =>
    q.eq('aggregateId', args.accountId)
  )
  .filter((q) =>
    q.and(
      q.gt(q.field('version'), startVersion),
      q.lte(q.field('version'), args.targetVersion)
    )
  )
  .order('asc')
  .collect();

次に、スナップショット以降、目標バージョンまでのイベントを取得します。

typescript// 3. スナップショットの状態から開始
let balance = snapshot ? snapshot.state.balance : 0;
let ownerName = snapshot ? snapshot.state.ownerName : '';

// 4. イベントを順に再生
for (const event of events) {
  if (event.eventType === 'DepositEvent') {
    balance += event.payload.amount;
  } else if (event.eventType === 'WithdrawEvent') {
    balance -= event.payload.amount;
  }
}

取得したイベントを順に適用して、目標バージョン時点の状態を復元します。

typescript    return {
      accountId: args.accountId,
      ownerName,
      balance,
      version: args.targetVersion,
    };
  },
});

この関数により、過去の任意の時点の残高を正確に再現できます。

クライアント側の実装

React などのフロントエンドから Convex の Query と Mutation を利用します。

typescript// src/App.tsx
import { useQuery, useMutation } from "convex/react";
import { api } from "../convex/_generated/api";

export default function AccountDashboard() {
  // リアルタイムで口座情報を取得
  const account = useQuery(
    api.queries.getAccountBalance,
    { accountId: "account-001" }
  );

useQuery フックを使うことで、集約テーブルの変更がリアルタイムで反映されます。

typescript// 入金 Mutation
const deposit = useMutation(api.commands.deposit);

// 出金 Mutation
const withdraw = useMutation(api.commands.withdraw);

const handleDeposit = async () => {
  await deposit({
    accountId: 'account-001',
    amount: 1000,
    memo: '給料',
  });
};
typescriptconst handleWithdraw = async () => {
  await withdraw({
    accountId: 'account-001',
    amount: 500,
    memo: '買い物',
  });
};

if (!account) {
  return <div>読み込み中...</div>;
}
typescript  return (
    <div>
      <h1>{account.ownerName} の口座</h1>
      <p>残高: ¥{account.balance.toLocaleString()}</p>
      <button onClick={handleDeposit}>¥1,000 入金</button>
      <button onClick={handleWithdraw}>¥500 出金</button>
    </div>
  );
}

このように、Convex のリアルタイム機能により、mutation 実行後すぐに UI が更新されます。

以下の図は、クライアントからの Command 実行とリアルタイム更新のフローを示しています。

mermaidsequenceDiagram
  participant User as ユーザー
  participant UI as React UI
  participant Mutation as Mutation
  participant DB as Convex DB
  participant Query as Query

  User->>UI: 入金ボタンクリック
  UI->>Mutation: deposit() 実行
  Mutation->>DB: イベント保存<br/>+ 集約更新
  DB-->>Query: リアルタイム通知
  Query-->>UI: 残高自動更新
  UI-->>User: 新しい残高表示

図で理解できる要点:

  • ユーザーが入金ボタンをクリックすると、mutation が実行され、データベースが更新されます
  • Convex のリアルタイム機能により、Query が自動的に再実行され、UI が即座に更新されます
  • 手動でのリロードや State 管理が不要です

まとめ

本記事では、Convex を用いて CQRS/イベントソーシングを実装する方法を詳しく解説しました。

重要なポイントを振り返りましょう。

まず、CQRS はデータの書き込みと読み込みを分離するアーキテクチャパターンであり、イベントソーシングは状態変化をイベントとして記録する手法です。 これらを組み合わせることで、データの完全な履歴を保持しつつ、高速なクエリを実現できます。

Convex は、リアルタイム性、型安全性、トランザクション機能を備えており、CQRS/イベントソーシングの実装に最適な環境を提供してくれますね。 特に、mutation 関数内でイベントの保存と集約の更新をアトミックに実行できる点が、データ整合性の確保において非常に有効でした。

実装では、以下の 3 つのテーブルを設計しました。

#テーブル役割
1eventsすべての状態変化をイベントとして記録
2accounts集約済みの最新状態を保持(Read Model)
3snapshots定期的な状態のスナップショットを保存

Command(入金・出金)は mutation として実装し、Query(残高照会・履歴取得)は query として実装することで、責務を明確に分離できました。

スナップショット戦略により、イベント数が増加してもパフォーマンスを維持できます。 100 イベントごとにスナップショットを作成することで、イベント再生時に処理するイベント数を削減し、高速な状態復元を実現しました。

このアーキテクチャは、以下のようなケースで特に威力を発揮するでしょう。

  • 完全な監査証跡が必要な業務アプリケーション
  • 複数の集約ビューを効率的に提供したい場合
  • 過去の任意の時点の状態を復元する必要がある場合
  • リアルタイム性と拡張性を両立したいアプリケーション

Convex の型安全性とリアルタイム機能を活用することで、複雑なアーキテクチャパターンをシンプルに実装でき、開発効率と保守性を高められます。

ぜひ、あなたのプロジェクトでも Convex を使った CQRS/イベントソーシングを試してみてください。

関連リンク