T-CREATOR

LangChain JSON 出力が壊れる問題を解く:ストリーミング整形・スキーマ強制・再試行設計

LangChain JSON 出力が壊れる問題を解く:ストリーミング整形・スキーマ強制・再試行設計

LangChain を使って LLM から JSON 形式のデータを取得する際、出力が壊れたり、期待したフォーマットにならないという問題に遭遇したことはありませんか。 特にストリーミング出力を利用する場合、JSON が途中で切れてしまったり、スキーマと異なる構造が返ってきたりすることがあります。

本記事では、LangChain における JSON 出力の問題を解決するための 3 つのアプローチ、「ストリーミング整形」「スキーマ強制」「再試行設計」について詳しく解説いたします。 実践的なコード例とともに、安定した JSON 出力を実現する方法をご紹介しますので、ぜひ最後までお読みください。

背景

LangChain における JSON 出力の重要性

LangChain は、大規模言語モデル(LLM)を活用したアプリケーション開発を支援する強力なフレームワークです。 多くの実装では、LLM からの出力を構造化された JSON データとして受け取り、それをアプリケーションで処理する必要があります。

たとえば、ユーザーの質問に対して商品情報を返すチャットボットでは、商品名、価格、在庫状況などを JSON 形式で取得し、フロントエンドで表示するといったケースが一般的でしょう。 このようなシナリオでは、JSON の構造が正確であることが極めて重要になります。

ストリーミング出力の利点と課題

ストリーミング出力は、LLM の応答をリアルタイムでユーザーに表示できるため、体感速度が向上し、優れたユーザー体験を提供できます。 しかし、JSON のような構造化データをストリーミングで扱う場合、データが完全に揃うまでパースできないという技術的な制約があります。

以下の図は、LangChain における JSON 出力フローの全体像を示しています。

mermaidflowchart TB
    user["ユーザー"] -->|プロンプト| chain["LangChain"]
    chain -->|リクエスト| llm["LLM<br/>(GPT-4など)"]
    llm -->|ストリーミング応答| parser["JSON パーサー"]
    parser -->|検証| validator["スキーマ検証"]
    validator -->|OK| app["アプリケーション"]
    validator -->|NG| retry["再試行処理"]
    retry -->|再リクエスト| chain

図で理解できる要点:

  • ユーザーから LLM までのリクエストフロー
  • JSON パーサーとスキーマ検証の位置づけ
  • 検証失敗時の再試行ループ

TypeScript と LangChain の組み合わせ

TypeScript は型安全性を提供するため、JSON 出力のスキーマ定義と検証を強力にサポートします。 LangChain の TypeScript 実装(LangChain.js)を使用することで、開発時点でのエラー検出が容易になり、より堅牢なアプリケーションを構築できるでしょう。

課題

JSON 出力が壊れる主な原因

LangChain で JSON 出力を扱う際に発生する問題には、いくつかの典型的なパターンがあります。 これらの問題を理解することが、効果的な解決策を実装する第一歩となります。

#問題タイプ説明発生頻度
1不完全な JSONストリーミング中断により閉じ括弧が欠損★★★
2スキーマ不一致期待したフィールドが存在しない、または型が異なる★★★★
3余分なテキストJSON の前後に説明文が含まれる★★★
4エスケープ処理文字列内の特殊文字が正しくエスケープされない★★
5トークン制限出力が途中で切れてしまう★★

ストリーミング時の JSON パース問題

ストリーミング出力では、データが段階的に到着するため、完全な JSON が揃うまでパースを待つ必要があります。 しかし、ストリーミングが途中で中断されたり、LLM が予期せず出力を終了したりすると、不完全な JSON が残ってしまいます。

以下は、ストリーミングで発生する問題を図解したものです。

mermaidsequenceDiagram
    participant App as アプリケーション
    participant LC as LangChain
    participant LLM as LLM

    App->>LC: JSON リクエスト
    LC->>LLM: プロンプト送信
    LLM-->>LC: { "name": "商品A"
    LC-->>App: 部分データ受信
    LLM-->>LC: , "price": 1000
    LC-->>App: 部分データ受信
    LLM-->>LC: [接続中断]
    LC-->>App: 不完全な JSON
    App->>App: パースエラー発生

図の要約:ストリーミング中に接続が中断されると、閉じ括弧のない不完全な JSON が残り、パースエラーが発生します。

スキーマ検証の欠如

LLM は自然言語生成モデルであるため、指示されたスキーマを完全に守るとは限りません。 たとえば、数値型を期待しているフィールドに文字列が入ったり、必須フィールドが欠落したりすることがあります。

このようなスキーマ不一致を検出せずにアプリケーションで処理すると、ランタイムエラーや予期しない動作につながってしまうでしょう。

エラー時の処理不足

JSON 出力が失敗した際に、単にエラーをスローするだけでは、ユーザー体験が損なわれます。 適切な再試行ロジックやフォールバック処理がないと、一時的な LLM の不調やネットワークエラーでアプリケーション全体が停止してしまう可能性があります。

解決策

解決策の全体像

LangChain における JSON 出力の問題を解決するには、3 つの重要なアプローチを組み合わせることが効果的です。 それぞれの手法は独立して機能しますが、組み合わせることでより堅牢なシステムを構築できるでしょう。

以下の図は、3 つの解決策がどのように連携するかを示しています。

mermaidflowchart LR
    input["入力プロンプト"] --> streaming["1. ストリーミング整形"]
    streaming --> schema["2. スキーマ強制"]
    schema --> validation{検証結果}
    validation -->|成功| output["正常な JSON 出力"]
    validation -->|失敗| retry["3. 再試行設計"]
    retry --> streaming

図で理解できる要点:

  • ストリーミング整形が最初のゲートとして機能
  • スキーマ強制で構造を保証
  • 検証失敗時は再試行で回復

1. ストリーミング整形

ストリーミング出力を扱う場合、JSON が完全に揃うまでバッファリングし、その後でパースする方式が基本となります。 LangChain では、StringOutputParserを拡張して JSON の完全性をチェックする仕組みを実装できます。

ストリーミングバッファの実装

まず、ストリーミングデータを蓄積するバッファクラスを作成します。

typescript// ストリーミング JSON バッファクラス
class StreamingJSONBuffer {
  private buffer: string = '';
  private isComplete: boolean = false;

  // データを追加
  append(chunk: string): void {
    this.buffer += chunk;
    this.checkCompleteness();
  }

  // JSON の完全性をチェック
  private checkCompleteness(): void {
    try {
      JSON.parse(this.buffer);
      this.isComplete = true;
    } catch (error) {
      this.isComplete = false;
    }
  }
}

このクラスは、受信したデータを蓄積し、JSON.parse が成功するかどうかで完全性を判断します。 エラーが発生する場合は、まだデータが不完全であると判断できるわけです。

完全性チェックの実装

次に、JSON が完全かどうかを判定するロジックを実装します。

typescript// JSON の完全性を検証
isComplete(): boolean {
  return this.isComplete;
}

// 完全な JSON を取得
getJSON(): object | null {
  if (!this.isComplete) {
    return null;
  }

  try {
    return JSON.parse(this.buffer);
  } catch (error) {
    console.error('JSON パースエラー:', error);
    return null;
  }
}

この実装により、JSON が完全に揃った時点でのみパース結果を返すことができます。

余分なテキストの除去

LLM が時々 JSON の前後に説明文を追加することがあるため、これを除去する処理も必要です。

typescript// JSON 部分のみを抽出
private extractJSON(text: string): string {
  // 最初の { から最後の } までを抽出
  const startIndex = text.indexOf('{');
  const endIndex = text.lastIndexOf('}');

  if (startIndex === -1 || endIndex === -1) {
    return text;
  }

  return text.substring(startIndex, endIndex + 1);
}

正規表現ではなくシンプルな文字列検索を使うことで、ネストされた JSON オブジェクトにも対応できます。

2. スキーマ強制

スキーマ強制は、LLM の出力が期待した構造に従うことを保証する仕組みです。 LangChain では、Zod などのスキーマ検証ライブラリと組み合わせることで、型安全な JSON 出力を実現できます。

Zod スキーマの定義

まず、期待する JSON の構造を Zod スキーマとして定義します。

typescriptimport { z } from 'zod';

// 商品情報のスキーマ
const ProductSchema = z.object({
  name: z.string().min(1, '商品名は必須です'),
  price: z
    .number()
    .positive('価格は正の数である必要があります'),
  category: z.string(),
  inStock: z.boolean(),
  description: z.string().optional(),
});

// 型推論
type Product = z.infer<typeof ProductSchema>;

Zod を使うことで、スキーマ定義と型定義を一箇所で管理でき、TypeScript の型安全性を最大限に活用できます。

StructuredOutputParser の利用

LangChain のStructuredOutputParserを使って、スキーマを LLM のプロンプトに組み込みます。

typescriptimport { StructuredOutputParser } from 'langchain/output_parsers';

// パーサーの作成
const parser =
  StructuredOutputParser.fromZodSchema(ProductSchema);

// フォーマット指示を取得
const formatInstructions = parser.getFormatInstructions();

このformatInstructionsをプロンプトに含めることで、LLM に期待する JSON 構造を明示的に伝えられます。

プロンプトへのスキーマ組み込み

スキーマをプロンプトテンプレートに統合します。

typescriptimport { PromptTemplate } from '@langchain/core/prompts';

// プロンプトテンプレートの作成
const prompt = PromptTemplate.fromTemplate(
  `以下の質問に対して、商品情報をJSON形式で返してください。

質問: {query}

{format_instructions}

JSON出力:`
);

このテンプレートにより、LLM は常にスキーマに沿った出力を心がけるようになります。

検証とエラーハンドリング

受信した JSON をスキーマで検証し、エラーを適切に処理します。

typescript// JSON を検証
function validateProduct(data: unknown): Product {
  try {
    // Zod で検証
    return ProductSchema.parse(data);
  } catch (error) {
    if (error instanceof z.ZodError) {
      // 検証エラーの詳細をログ出力
      console.error('スキーマ検証エラー:', error.errors);
      throw new Error(
        `JSON がスキーマに適合しません: ${error.errors
          .map((e) => e.message)
          .join(', ')}`
      );
    }
    throw error;
  }
}

Zod のエラーメッセージは詳細で分かりやすいため、デバッグ時に非常に役立ちます。

3. 再試行設計

一時的なエラーや LLM の出力不良に対応するため、自動再試行の仕組みを実装します。 指数バックオフと最大試行回数の制限を組み合わせることで、効率的かつ安全な再試行が可能になります。

再試行ロジックの基本構造

再試行を管理するクラスを作成します。

typescript// 再試行設定の型定義
interface RetryConfig {
  maxAttempts: number;
  initialDelayMs: number;
  maxDelayMs: number;
  backoffMultiplier: number;
}

// デフォルト設定
const defaultRetryConfig: RetryConfig = {
  maxAttempts: 3,
  initialDelayMs: 1000,
  maxDelayMs: 10000,
  backoffMultiplier: 2,
};

この設定により、1 秒から始めて最大 10 秒まで待機時間を延ばしながら、最大 3 回まで再試行します。

指数バックオフの実装

待機時間を指数的に増加させる関数を実装します。

typescript// 待機時間を計算
function calculateDelay(
  attempt: number,
  config: RetryConfig
): number {
  // 指数バックオフ: delay = initial * (multiplier ^ attempt)
  const delay =
    config.initialDelayMs *
    Math.pow(config.backoffMultiplier, attempt - 1);

  // 最大待機時間を超えないように制限
  return Math.min(delay, config.maxDelayMs);
}

// 待機処理
function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

指数バックオフにより、LLM サービスへの負荷を適切にコントロールできます。

再試行関数の実装

汎用的な再試行関数を作成します。

typescript// 再試行実行関数
async function retryWithBackoff<T>(
  fn: () => Promise<T>,
  config: RetryConfig = defaultRetryConfig
): Promise<T> {
  let lastError: Error | undefined;

  for (
    let attempt = 1;
    attempt <= config.maxAttempts;
    attempt++
  ) {
    try {
      // 関数を実行
      return await fn();
    } catch (error) {
      lastError = error as Error;
      console.warn(`試行 ${attempt} 失敗:`, error);

      // 最後の試行でない場合は待機
      if (attempt < config.maxAttempts) {
        const delay = calculateDelay(attempt, config);
        console.log(`${delay}ms 待機してから再試行します`);
        await sleep(delay);
      }
    }
  }

  // すべての試行が失敗
  throw new Error(
    `${config.maxAttempts}回の試行後も失敗: ${lastError?.message}`
  );
}

この関数は、任意の非同期処理に対して再試行ロジックを適用できる汎用的な実装です。

エラー分類と条件付き再試行

すべてのエラーで再試行するのではなく、再試行可能なエラーのみを対象とします。

typescript// 再試行可能なエラーかどうかを判定
function isRetryableError(error: Error): boolean {
  // ネットワークエラー
  if (
    error.message.includes('network') ||
    error.message.includes('timeout')
  ) {
    return true;
  }

  // レート制限エラー
  if (error.message.includes('rate limit')) {
    return true;
  }

  // 一時的なサーバーエラー
  if (
    error.message.includes('503') ||
    error.message.includes('502')
  ) {
    return true;
  }

  // スキーマ検証エラーは再試行(LLMの出力不良の可能性)
  if (error.message.includes('スキーマに適合しません')) {
    return true;
  }

  return false;
}

認証エラーやプログラムのバグなど、再試行しても解決しないエラーは除外することが重要です。

条件付き再試行の実装

エラータイプに応じた再試行ロジックを実装します。

typescript// 条件付き再試行関数
async function retryIfRetryable<T>(
  fn: () => Promise<T>,
  config: RetryConfig = defaultRetryConfig
): Promise<T> {
  let lastError: Error | undefined;

  for (
    let attempt = 1;
    attempt <= config.maxAttempts;
    attempt++
  ) {
    try {
      return await fn();
    } catch (error) {
      lastError = error as Error;

      // 再試行不可能なエラーは即座にスロー
      if (!isRetryableError(lastError)) {
        throw lastError;
      }

      console.warn(
        `再試行可能なエラー (試行 ${attempt}):`,
        error
      );

      if (attempt < config.maxAttempts) {
        const delay = calculateDelay(attempt, config);
        await sleep(delay);
      }
    }
  }

  throw lastError!;
}

この実装により、無駄な再試行を避けつつ、回復可能なエラーには適切に対処できます。

具体例

実践例:商品検索システムの実装

ここまで解説した 3 つの解決策を統合し、実際に動作する商品検索システムを実装してみましょう。 このシステムは、ユーザーの質問に基づいて商品情報を JSON 形式で返す機能を提供します。

プロジェクトのセットアップ

まず、必要なパッケージをインストールします。

bash# LangChain と関連パッケージのインストール
yarn add langchain @langchain/core @langchain/openai

# スキーマ検証ライブラリ
yarn add zod

# 型定義
yarn add -D @types/node typescript

これらのパッケージにより、LangChain の全機能と Zod による型安全性を活用できるようになります。

環境変数の設定

OpenAI API キーを環境変数として設定します。

typescript// .env ファイル
OPENAI_API_KEY = your_api_key_here;

プロジェクトルートに.envファイルを作成し、API キーを記述してください。 本番環境では、シークレット管理サービスの利用を推奨します。

統合クラスの実装

3 つの解決策を統合したクラスを実装します。

typescriptimport { ChatOpenAI } from '@langchain/openai';
import { PromptTemplate } from '@langchain/core/prompts';
import { StructuredOutputParser } from 'langchain/output_parsers';
import { z } from 'zod';

// 商品検索クラス
class ProductSearchAgent {
  private llm: ChatOpenAI;
  private parser: StructuredOutputParser<
    z.ZodType<Product>
  >;
  private prompt: PromptTemplate;

  constructor() {
    // LLM の初期化
    this.llm = new ChatOpenAI({
      modelName: 'gpt-4',
      temperature: 0.1, // 出力の一貫性を高める
    });

    // パーサーの初期化
    this.parser =
      StructuredOutputParser.fromZodSchema(ProductSchema);

    // プロンプトの初期化
    this.initializePrompt();
  }
}

コンストラクタで必要なコンポーネントを初期化し、温度パラメータを低く設定することで JSON 出力の安定性を高めています。

プロンプトの初期化

スキーマ情報を含むプロンプトを作成します。

typescriptprivate initializePrompt(): void {
  const template = `あなたは商品情報を提供するアシスタントです。
ユーザーの質問に基づいて、商品情報を正確なJSON形式で返してください。

質問: {query}

{format_instructions}

重要な注意事項:
- 必ず有効なJSONのみを出力してください
- 説明文やコメントは含めないでください
- すべての必須フィールドを含めてください

JSON出力:`;

  this.prompt = PromptTemplate.fromTemplate(template);
}

明示的な指示により、LLM が余分なテキストを出力する可能性を減らします。

ストリーミング実行メソッド

ストリーミングで JSON を取得し、バッファリングする処理を実装します。

typescriptprivate async executeWithStreaming(query: string): Promise<string> {
  // プロンプトの準備
  const input = await this.prompt.format({
    query,
    format_instructions: this.parser.getFormatInstructions(),
  });

  // ストリーミングバッファの初期化
  const buffer = new StreamingJSONBuffer();

  // ストリーミング実行
  const stream = await this.llm.stream(input);

  for await (const chunk of stream) {
    const content = chunk.content as string;
    buffer.append(content);
  }

  // 完全な JSON を取得
  if (!buffer.isComplete()) {
    throw new Error('不完全な JSON を受信しました');
  }

  const json = buffer.getJSON();
  if (!json) {
    throw new Error('JSON のパースに失敗しました');
  }

  return JSON.stringify(json);
}

ストリーミングが完了するまで待機し、完全性を確認してから JSON を返します。

メインの検索メソッド

再試行ロジックを統合したメイン処理を実装します。

typescriptasync search(query: string): Promise<Product> {
  // 再試行付きで実行
  return await retryIfRetryable(async () => {
    // ストリーミングで JSON 文字列を取得
    const jsonString = await this.executeWithStreaming(query);

    // JSON をパース
    const jsonData = JSON.parse(jsonString);

    // スキーマ検証
    const product = validateProduct(jsonData);

    return product;
  }, {
    maxAttempts: 3,
    initialDelayMs: 1000,
    maxDelayMs: 5000,
    backoffMultiplier: 2,
  });
}

この実装により、エラーが発生しても自動的に再試行され、堅牢なシステムを実現できます。

実行例とエラーハンドリング

実際の使用例とエラーケースを見てみましょう。

typescript// 使用例
async function main() {
  const agent = new ProductSearchAgent();

  try {
    // 商品を検索
    const product = await agent.search(
      'ノートパソコンでおすすめの商品を教えてください'
    );

    console.log('検索結果:', product);
    console.log(`商品名: ${product.name}`);
    console.log(`価格: ¥${product.price.toLocaleString()}`);
    console.log(
      `在庫: ${product.inStock ? 'あり' : 'なし'}`
    );
  } catch (error) {
    console.error('検索エラー:', error);
    // フォールバック処理やユーザーへの通知
  }
}

main();

型安全性により、productオブジェクトのプロパティに安心してアクセスできます。

エラーケースの処理例

さまざまなエラーケースに対する処理を実装します。

typescript// エラー別の処理
async function searchWithErrorHandling(
  agent: ProductSearchAgent,
  query: string
): Promise<Product | null> {
  try {
    return await agent.search(query);
  } catch (error) {
    const err = error as Error;

    // エラータイプ別の処理
    if (err.message.includes('スキーマに適合しません')) {
      console.error('スキーマエラー:', err.message);
      // デフォルト値を返すなどのフォールバック
      return null;
    } else if (err.message.includes('rate limit')) {
      console.error('レート制限エラー:', err.message);
      // ユーザーに再試行を促す
      return null;
    } else if (err.message.includes('network')) {
      console.error('ネットワークエラー:', err.message);
      // キャッシュから返すなどの対応
      return null;
    } else {
      console.error('予期しないエラー:', err);
      throw err;
    }
  }
}

エラーの種類に応じた適切な処理により、ユーザー体験を損なわずにエラーを処理できます。

パフォーマンス最適化

実運用では、キャッシュやタイムアウト設定も重要です。

typescript// キャッシュ付きの検索
class CachedProductSearchAgent extends ProductSearchAgent {
  private cache: Map<
    string,
    { product: Product; timestamp: number }
  >;
  private cacheTTL: number = 1000 * 60 * 60; // 1時間

  constructor() {
    super();
    this.cache = new Map();
  }

  async search(query: string): Promise<Product> {
    // キャッシュチェック
    const cached = this.cache.get(query);
    if (
      cached &&
      Date.now() - cached.timestamp < this.cacheTTL
    ) {
      console.log('キャッシュから返却');
      return cached.product;
    }

    // 通常の検索
    const product = await super.search(query);

    // キャッシュに保存
    this.cache.set(query, {
      product,
      timestamp: Date.now(),
    });

    return product;
  }
}

キャッシュにより、同じクエリに対する LLM 呼び出しを削減でき、コストとレイテンシを大幅に改善できます。

テストの実装

最後に、ユニットテストの例を示します。

typescriptimport { describe, it, expect } from '@jest/globals';

describe('ProductSearchAgent', () => {
  it('有効な商品情報を返す', async () => {
    const agent = new ProductSearchAgent();
    const product = await agent.search('テストクエリ');

    // スキーマに適合することを確認
    expect(product).toHaveProperty('name');
    expect(product).toHaveProperty('price');
    expect(typeof product.price).toBe('number');
    expect(product.price).toBeGreaterThan(0);
  });

  it('再試行により回復可能', async () => {
    // モックで一度失敗してから成功するシナリオをテスト
    // 実装の詳細は省略
  });
});

テストにより、リファクタリング時の安全性が確保され、継続的な改善が可能になります。

以下の図は、実装した商品検索システムの処理フローを示しています。

mermaidsequenceDiagram
    participant U as ユーザー
    participant A as ProductSearchAgent
    participant B as StreamingJSONBuffer
    participant L as LLM
    participant V as Validator

    U->>A: search("ノートPC")
    A->>A: プロンプト生成
    A->>L: ストリーミング開始

    loop ストリーミング
        L-->>B: JSONチャンク受信
        B->>B: バッファに追加
    end

    B->>B: 完全性チェック
    B->>A: 完全なJSON返却
    A->>V: スキーマ検証

    alt 検証成功
        V->>A: 検証済みProduct
        A->>U: Product返却
    else 検証失敗
        V->>A: エラー
        A->>A: 再試行
        A->>L: 再リクエスト
    end

図の要約:ストリーミング受信からスキーマ検証、再試行までの一連の処理フローを示しています。

まとめ

LangChain で JSON 出力を扱う際の問題は、「ストリーミング整形」「スキーマ強制」「再試行設計」の 3 つのアプローチを組み合わせることで効果的に解決できます。

本記事で解説した内容をまとめますと、以下のようになります。

重要なポイント

#ポイント効果
1ストリーミングバッファによる完全性チェック不完全な JSON によるパースエラーを防止
2Zod スキーマによる型安全な検証ランタイムエラーを開発時に検出
3指数バックオフによる再試行一時的なエラーから自動回復
4エラータイプ別の処理無駄な再試行を避けて効率化
5キャッシュによる最適化コストとレイテンシを削減

実装時の推奨事項

これらの解決策を実装する際は、以下の点に注意してください。

まず、スキーマ定義は早期に確定させ、フロントエンドとバックエンドで共有することが重要です。 Zod スキーマから自動的に型定義を生成できるため、開発効率が大幅に向上するでしょう。

次に、再試行回数と待機時間は、アプリケーションの要件と LLM のレート制限に応じて調整してください。 過度な再試行はコストを増加させますが、少なすぎると回復可能なエラーを逃してしまいます。

また、ログ出力を充実させることで、本番環境での問題追跡が容易になります。 特に、どの試行で成功したか、どのようなエラーが発生したかを記録しておくと、システムの改善に役立ちます。

今後の展開

本記事で紹介した手法は、商品検索以外の様々なユースケースにも応用できます。 たとえば、チャットボットの応答構造化、データ抽出パイプライン、自動レポート生成など、LLM からの構造化データ取得が必要なあらゆる場面で活用できるでしょう。

LangChain は日々進化しており、新しい機能や改善が継続的に追加されています。 公式ドキュメントを定期的にチェックし、最新のベストプラクティスを取り入れることをお勧めします。

JSON 出力の安定性を確保することで、LLM を活用したアプリケーションの信頼性が大きく向上します。 本記事が、皆様の LangChain 開発の一助となれば幸いです。

関連リンク