T-CREATOR

Dify ワークフロー定型 30:分岐・並列・リトライ・サーキットブレーカの型

Dify ワークフロー定型 30:分岐・並列・リトライ・サーキットブレーカの型

AI ワークフローの構築において、エラーハンドリングや効率的な処理制御は避けて通れない課題です。Dify では v0.8.0 以降、並列処理機能が追加され、v0.14.0 ではエラーハンドリングが大幅に強化されました。

本記事では、Dify ワークフローにおける「分岐」「並列」「リトライ」「サーキットブレーカ」という 4 つの重要なパターンを、実装例とともに解説していきます。これらのパターンを理解することで、より堅牢で効率的な AI ワークフローが構築できるようになるでしょう。

4 つのパターン早見表

以下の表は、本記事で解説する 4 つの処理制御パターンの概要を示しています。

#パターン名用途主な効果実装方法対応バージョン
1条件分岐(IF/ELSE)条件に応じた処理の切り替えビジネスロジックの柔軟な実装IF/ELSE ノード + 条件式(AND/OR 組み合わせ可)全バージョン
2並列処理(Parallel)複数タスクの同時実行処理時間 60% 短縮(例:15 秒 → 5 秒)1 ノードから複数ノードへ接続v0.8.0 以降
3リトライ(Retry)一時的な障害への自動再試行耐障害性向上エラー自動回復Code ノード + 待機処理 + カウンタ管理v0.14.0 で強化
4サーキットブレーカ連鎖障害の防止システム全体の安定性維持影響範囲の局所化エラーカウンタ + 状態管理(CLOSED/OPEN/HALF-OPEN)v0.14.0 で強化

各パターンの組み合わせ効果

これら 4 つのパターンは、単独でも効果を発揮しますが、組み合わせることでさらに強力になります。

  • 分岐 + 並列:条件に応じて異なる並列処理フローを実行
  • 並列 + リトライ:各並列分岐で独立したリトライ処理を実行
  • リトライ + サーキットブレーカ:リトライ上限到達時にサーキットを開いて全体を保護
  • 4 つ全て:堅牢で高速、かつ柔軟なエンタープライズグレードのワークフロー

背景

Dify ワークフローの進化

Dify は AI アプリケーションを構築するためのオープンソースプラットフォームで、ノーコード・ローコードでワークフローを設計できる点が特徴です。

初期のバージョンでは、ワークフローのノードは順次実行のみでしたが、v0.8.0 で並列処理が導入され、複数のタスクを同時実行できるようになりました。さらに v0.14.0 では、エラーハンドリング機能が強化され、障害発生時の処理継続や代替ルートの自動選択が可能になっています。

ワークフロー設計の重要性

AI ワークフローでは、以下のような課題が頻繁に発生します。

  • LLM API の一時的な障害やレート制限
  • HTTP リクエストのタイムアウトや 404/500 エラー
  • 複数の処理を並行実行したい場合のパフォーマンス課題
  • 条件によって処理フローを変更したい要件

これらの課題に対応するため、分岐・並列・リトライ・サーキットブレーカという 4 つのパターンが重要になります。

以下の図は、Dify ワークフローにおける処理制御パターンの全体像を示しています。

mermaidflowchart TB
  start["ワークフロー開始"] --> condition["条件判定"]

  condition -->|条件A| branchA["分岐A<br/>特定処理"]
  condition -->|条件B| branchB["分岐B<br/>代替処理"]

  branchA --> parallel["並列処理開始"]

  parallel --> task1["タスク1<br/>並列実行"]
  parallel --> task2["タスク2<br/>並列実行"]
  parallel --> task3["タスク3<br/>並列実行"]

  task1 --> errorCheck1["エラー検出"]
  errorCheck1 -->|成功| merge["結果統合"]
  errorCheck1 -->|失敗| retry1["リトライ処理"]

  retry1 -->|再試行| task1
  retry1 -->|上限到達| circuit["サーキット<br/>ブレーカ発動"]

  task2 --> merge
  task3 --> merge
  branchB --> merge
  circuit --> merge

  merge --> done["ワークフロー完了"]

この図では、4 つのパターンがどのように組み合わさってワークフローを構成するかを示しています。各パターンは独立して使用することもできますが、組み合わせることでより堅牢な設計が実現できます。

課題

Dify ワークフロー設計における典型的な課題

Dify でワークフローを構築する際、以下のような課題に直面することがあります。

課題 1:条件による処理の切り替え

ユーザーの入力内容や処理結果に応じて、異なる処理フローを実行したい場合があります。例えば、検索クエリの種類によって異なる LLM モデルを使い分けたり、データの存在有無で処理を分岐させたりするケースです。

課題 2:処理時間の長さ

複数の API 呼び出しやデータ取得処理を順次実行すると、全体の処理時間が各タスクの合計時間になってしまいます。例えば、3 つの API 呼び出しがそれぞれ 5 秒かかる場合、順次実行では合計 15 秒必要ですが、並列実行できれば約 5 秒で完了します。

課題 3:外部 API の不安定性

LLM API や外部サービスは、レート制限、一時的な障害、ネットワークエラーなどで失敗する可能性があります。1 回の失敗でワークフロー全体が停止してしまうと、ユーザー体験が大幅に低下します。

課題 4:連鎖的な障害の拡大

一部のノードでエラーが発生した際、そのエラーが他の処理に波及し、システム全体が停止してしまうことがあります。特に並列処理では、1 つの分岐の失敗が全体に影響を及ぼすケースが課題となります。

以下の図は、これらの課題が発生するシナリオを示しています。

mermaidflowchart LR
  user["ユーザー入力"] --> seq["順次処理"]

  seq --> api1["API呼び出し1<br/>(5秒)"]
  api1 --> api2["API呼び出し2<br/>(5秒)"]
  api2 --> api3["API呼び出し3<br/>(5秒)"]
  api3 --> result["結果<br/>(合計15秒)"]

  api2 -.->|失敗時| error["エラー<br/>全体停止"]

  style error fill:#ff6b6b
  style result fill:#51cf66

このような順次処理では、処理時間が長くなり、途中でのエラーが全体に影響を与えてしまいます。

Dify の制約事項

Dify ワークフローには、いくつかの制約があることも理解しておく必要があります。

#制約項目内容
1ループ禁止ノードのループはできず、循環参照は不可
2再帰制限Cloud.Dify では Workflow の呼び出し回数が 5 回まで
3リトライ実装LLM アクセスの Exponential Backoff+Jitter は未実装
4エラー伝播並列処理での 1 分岐の失敗が全体に影響する可能性

これらの制約を踏まえた上で、適切な設計パターンを選択することが重要です。

解決策

4 つの処理制御パターン

Dify では、分岐・並列・リトライ・サーキットブレーカという 4 つのパターンを組み合わせることで、上記の課題を解決できます。以下、それぞれのパターンについて詳しく解説します。

パターン 1:条件分岐(IF/ELSE ノード)

条件分岐ノードは、IF/ELSE 条件に基づいてワークフローを 2 つの分岐に分ける機能です。

条件分岐の基本構造

条件分岐ノードは、以下の 6 つのパスで構成されます。

  • 入力パス:前のノードから条件分岐ノードへの接続
  • IF パス:条件が真の場合の処理フロー
  • ELSE パス:条件が偽の場合の処理フロー
  • 出力パス:IF または ELSE の処理後に次のノードへ接続

利用可能な条件タイプ

変数のデータ型によって、以下のような条件を設定できます。

#データ型利用可能な条件
1文字列含む/含まない、開始・終了パターン、空判定
2数値等しい、より大きい、より小さい、範囲内
3ブール値真偽値の直接判定
4配列含有判定、空判定
5オブジェクトプロパティの存在判定、値の比較

複数条件の組み合わせ

複雑な判断が必要な場合、複数条件を設定し、条件間に AND または OR を設定することで、交集や和集を取ることができます。

以下の図は、条件分岐の処理フローを示しています。

mermaidflowchart TB
  input["入力変数"] --> ifelse["IF/ELSEノード"]

  ifelse -->|条件式を評価| check{"条件判定"}

  check -->|真| ifPath["IFパス"]
  check -->|偽| elsePath["ELSEパス"]

  ifPath --> process1["処理A<br/>(LLMモデル1)"]
  elsePath --> process2["処理B<br/>(LLMモデル2)"]

  process1 --> output["出力"]
  process2 --> output

  style check fill:#ffd43b
  style process1 fill:#51cf66
  style process2 fill:#74c0fc

この構造により、入力内容に応じて適切な処理ルートを選択できます。

ELIF 条件の活用

複数の条件を順序的に評価したい場合、ELIF 条件を活用することで、より柔軟な分岐ロジックを構築できます。

パターン 2:並列処理(Parallel Branch)

Dify v0.8.0 以降、一般のノードからも通常の接続方法で並列実行が可能になりました。

並列処理のメリット

並列処理を活用することで、以下のようなメリットが得られます。

  • 処理時間の短縮:複数タスクを同時実行し、全体の処理時間を最も長い分岐の時間に短縮
  • リソースの効率的活用:独立したタスクを同時に処理し、待ち時間を削減
  • スケーラビリティの向上:処理を分散することで、より多くのタスクを効率的に処理

並列処理の実装方法

並列処理は、1 つのノードから複数のノードに接続することで実現します。Dify は自動的にこれらを並列実行として認識し、同時に処理を開始します。

以下の図は、並列処理の構造を示しています。

mermaidflowchart TB
  trigger["開始トリガー"] --> split["分岐点"]

  split --> branch1["分岐1<br/>API呼び出し1"]
  split --> branch2["分岐2<br/>API呼び出し2"]
  split --> branch3["分岐3<br/>データ取得"]

  branch1 --> result1["結果1<br/>(5秒)"]
  branch2 --> result2["結果2<br/>(3秒)"]
  branch3 --> result3["結果3<br/>(4秒)"]

  result1 --> merge["結果統合"]
  result2 --> merge
  result3 --> merge

  merge --> final["最終出力<br/>(約5秒で完了)"]

  style split fill:#ffd43b
  style merge fill:#51cf66

並列処理では、全体の処理時間は最も長い分岐(この場合は 5 秒)に近づきます。順次実行なら 12 秒(5+3+4)かかるところを、大幅に短縮できます。

並列処理の注意点

並列処理を実装する際は、以下の点に注意が必要です。

  • 各分岐が独立していることを確認する(相互依存がないこと)
  • リソース制限(API レート制限など)を考慮する
  • エラーハンドリングを各分岐に適切に設定する

パターン 3:リトライ処理(Retry Mechanism)

外部 API の一時的な障害やネットワークエラーに対応するため、リトライ処理を実装します。

リトライ処理の基本設計

Dify v0.14.0 では、HTTP ノードなどでリトライ機能が強化されました。リトライ処理では、以下の要素を設定します。

  • リトライ回数:最大何回まで再試行するか
  • リトライ間隔:再試行の間隔(固定または指数バックオフ)
  • 代替処理:リトライ上限到達時のフォールバック処理

リトライ間隔の実装

Dify では、Code ノードを使用して待機処理を実装できます。

以下は、Python を使用したリトライ間隔の実装例です。

pythonimport time

def main():
    # リトライ間隔を5秒に設定
    time.sleep(5)
    return {
        "result": "待機完了"
    }

このコードは、リトライの間隔を制御するための待機処理を実装しています。time.sleep(5) により、5 秒間処理を待機させることができます。

リトライカウンタの実装

リトライ回数を管理するため、変数を使用してカウンタを実装します。

pythondef main(retry_count: int):
    # 現在のリトライ回数をインクリメント
    current_count = retry_count + 1

    # 最大リトライ回数を定義
    max_retries = 3

    return {
        "retry_count": current_count,
        "should_retry": current_count < max_retries,
        "message": f"リトライ {current_count}/{max_retries}"
    }

この実装では、リトライ回数をカウントし、最大リトライ回数に到達したかを判定します。should_retry フラグを使用して、条件分岐ノードで再試行するか代替処理に進むかを決定できます。

リトライフローの構築

以下の図は、リトライ処理の全体フローを示しています。

mermaidflowchart TB
  start["処理開始"] --> execute["API呼び出し"]

  execute --> check{"成功判定"}

  check -->|成功| success["処理完了"]
  check -->|失敗| counter["リトライカウンタ<br/>インクリメント"]

  counter --> limit{"上限チェック"}

  limit -->|未到達| wait["待機処理<br/>(5秒)"]
  wait --> execute

  limit -->|到達| fallback["代替処理<br/>(Fail Branch)"]

  fallback --> done["終了"]
  success --> done

  style check fill:#ffd43b
  style success fill:#51cf66
  style fallback fill:#ff8787

このフローでは、API 呼び出しが失敗した場合、リトライカウンタをインクリメントし、上限に到達していなければ待機後に再試行します。上限に達した場合は、代替処理(Fail Branch)に進みます。

エラーブランチの設定

Dify v0.14.0 では、各ノードにエラーブランチを設定できます。エラーが発生した際に、このブランチに処理が移り、代替ルートを実行できます。

パターン 4:サーキットブレーカ(Circuit Breaker)

サーキットブレーカパターンは、連鎖的な障害の拡大を防ぐための設計パターンです。

サーキットブレーカの概念

サーキットブレーカは、電気回路のブレーカーと同様に、障害が発生した際に「回路を開く」ことで、問題のあるサービスへの呼び出しを停止し、システム全体への影響を最小限に抑えます。

サーキットブレーカには、以下の 3 つの状態があります。

#状態説明動作
1Closed(閉じている)正常状態通常通り処理を実行
2Open(開いている)障害検出状態即座にエラーを返し、サービスを呼び出さない
3Half-Open(半開き)回復確認状態限定的にサービスを呼び出し、回復を確認

Dify でのサーキットブレーカ実装

Dify では、エラーカウンタと条件分岐を組み合わせてサーキットブレーカを実装します。

以下は、エラーカウンタの実装例です。

pythondef main(error_count: int, error_threshold: int = 5):
    # エラー閾値を定義
    threshold = error_threshold

    # 現在のエラー数をインクリメント
    current_errors = error_count + 1

    # サーキットブレーカの状態を判定
    if current_errors >= threshold:
        circuit_state = "OPEN"  # 回路を開く
    else:
        circuit_state = "CLOSED"  # 回路は閉じたまま

    return {
        "error_count": current_errors,
        "circuit_state": circuit_state,
        "is_circuit_open": current_errors >= threshold,
        "message": f"エラー数: {current_errors}/{threshold}"
    }

このコードでは、エラー発生回数をカウントし、閾値(デフォルトは 5 回)に到達したかを判定します。閾値に達すると、サーキットブレーカが「OPEN」状態に遷移します。

サーキットブレーカの状態管理

サーキットブレーカの状態に応じて処理を分岐します。

pythondef main(circuit_state: str):
    # サーキットブレーカの状態に応じた処理
    if circuit_state == "OPEN":
        # 回路が開いている場合は即座に失敗を返す
        return {
            "should_call_service": False,
            "message": "サービスは現在利用できません",
            "use_fallback": True
        }
    elif circuit_state == "HALF_OPEN":
        # 半開き状態では限定的に試行
        return {
            "should_call_service": True,
            "message": "サービスの回復を確認中",
            "use_fallback": False
        }
    else:  # CLOSED
        # 通常処理を実行
        return {
            "should_call_service": True,
            "message": "通常動作中",
            "use_fallback": False
        }

この実装では、サーキットブレーカの状態に応じて、サービスを呼び出すか代替処理を使用するかを決定します。

以下の図は、サーキットブレーカの状態遷移を示しています。

mermaidstateDiagram-v2
  [*] --> Closed

  Closed --> Open: エラー閾値到達
  Closed --> Closed: 正常処理

  Open --> HalfOpen: タイムアウト後
  Open --> Open: 待機中

  HalfOpen --> Closed: 試行成功
  HalfOpen --> Open: 試行失敗

  note right of Closed
    正常状態
    通常通り処理実行
  end note

  note right of Open
    障害検出状態
    即座にエラー返却
  end note

  note right of HalfOpen
    回復確認状態
    限定的に試行
  end note

この状態遷移図では、サーキットブレーカがどのように状態を変更し、システムを保護するかを示しています。

タイムアウト設定

Open 状態から Half-Open 状態への遷移には、タイムアウト(例:30 秒)を設定します。これにより、障害が解消される可能性を考慮して、定期的に回復を確認できます。

pythonimport time
from datetime import datetime, timedelta

def main(circuit_opened_at: str, timeout_seconds: int = 30):
    # サーキットが開いた時刻
    opened_time = datetime.fromisoformat(circuit_opened_at)

    # 現在時刻
    current_time = datetime.now()

    # 経過時間を計算
    elapsed = (current_time - opened_time).total_seconds()

    # タイムアウトに達したかチェック
    should_transition = elapsed >= timeout_seconds

    return {
        "elapsed_seconds": elapsed,
        "should_transition_to_half_open": should_transition,
        "next_state": "HALF_OPEN" if should_transition else "OPEN"
    }

この実装では、サーキットが開いてからの経過時間を計算し、タイムアウトに達したら Half-Open 状態に遷移する判定を行います。

具体例

実践:4 つのパターンを組み合わせたワークフロー

ここでは、実際の業務シナリオを想定し、4 つのパターンを組み合わせたワークフローを構築します。

シナリオ:マルチソース情報収集システム

複数の外部 API から情報を収集し、統合して返すシステムを構築します。このシステムには、以下の要件があります。

  • ユーザーの入力に応じて検索対象を切り替える(分岐)
  • 複数の API を同時に呼び出してパフォーマンスを向上させる(並列)
  • API 障害時には自動的にリトライする(リトライ)
  • 連続したエラー発生時にはサービスを一時停止する(サーキットブレーカ)

ワークフロー全体構成

以下の図は、4 つのパターンを統合したワークフロー全体の構成を示しています。

mermaidflowchart TB
  input["ユーザー入力"] --> validate["入力検証"]

  validate --> condition["条件分岐<br/>(検索タイプ判定)"]

  condition -->|ニュース検索| newsFlow["ニュース検索フロー"]
  condition -->|商品検索| productFlow["商品検索フロー"]

  newsFlow --> parallel["並列処理開始"]

  parallel --> api1["API-1<br/>(News)"]
  parallel --> api2["API-2<br/>(Blog)"]
  parallel --> api3["API-3<br/>(SNS)"]

  api1 --> circuit1["サーキット<br/>ブレーカ1"]
  api2 --> circuit2["サーキット<br/>ブレーカ2"]
  api3 --> circuit3["サーキット<br/>ブレーカ3"]

  circuit1 --> error1{"エラー?"}
  circuit2 --> error2{"エラー?"}
  circuit3 --> error3{"エラー?"}

  error1 -->|Yes| retry1["リトライ1"]
  error2 -->|Yes| retry2["リトライ2"]
  error3 -->|Yes| retry3["リトライ3"]

  retry1 --> api1
  retry2 --> api2
  retry3 --> api3

  error1 -->|No| merge["結果統合"]
  error2 -->|No| merge
  error3 -->|No| merge
  productFlow --> merge

  merge --> format["結果フォーマット"]
  format --> output["出力"]

  style condition fill:#ffd43b
  style parallel fill:#74c0fc
  style merge fill:#51cf66

この図では、入力から出力までの全体的な流れと、各パターンがどのように連携するかを示しています。

ステップ 1:入力検証と条件分岐

まず、ユーザーの入力を検証し、検索タイプに応じて処理を分岐します。

入力検証コード

pythondef main(user_input: str):
    # 入力の検証
    if not user_input or len(user_input.strip()) == 0:
        return {
            "is_valid": False,
            "error_message": "入力が空です"
        }

    # 検索タイプの判定
    search_type = "news"  # デフォルト

    if "商品" in user_input or "price" in user_input.lower():
        search_type = "product"
    elif "ニュース" in user_input or "news" in user_input.lower():
        search_type = "news"

    return {
        "is_valid": True,
        "search_type": search_type,
        "query": user_input.strip()
    }

この検証ロジックでは、入力の妥当性をチェックし、検索タイプを自動判定します。検索タイプは後続の条件分岐ノードで使用されます。

条件分岐ノードの設定

IF/ELSE ノードで search_type 変数を評価し、以下のように設定します。

  • IF 条件search_type"news" に等しい
  • IF パス:ニュース検索フロー
  • ELSE パス:商品検索フロー

ステップ 2:並列処理の実装

ニュース検索フローでは、3 つの異なる API を並列に呼び出します。

API 呼び出しノードの設定

3 つの HTTP ノードを作成し、それぞれ異なる API エンドポイントを設定します。

typescript// API-1: News API
const newsApiConfig = {
  method: 'GET',
  url: 'https://api.news-service.com/v1/search',
  params: {
    q: '{{query}}',
    apiKey: '{{api_key_news}}',
  },
  timeout: 10000, // 10秒タイムアウト
};

このコードは、News API を呼び出すための設定を定義しています。クエリパラメータとして検索キーワードと API キーを渡し、タイムアウトを 10 秒に設定しています。

typescript// API-2: Blog API
const blogApiConfig = {
  method: 'GET',
  url: 'https://api.blog-service.com/v2/articles',
  params: {
    search: '{{query}}',
    limit: 10,
  },
  timeout: 10000,
};

Blog API の設定です。同様にクエリパラメータを設定し、取得件数の上限を 10 件としています。

typescript// API-3: SNS API
const snsApiConfig = {
  method: 'GET',
  url: 'https://api.sns-service.com/posts',
  headers: {
    Authorization: 'Bearer {{api_token_sns}}',
  },
  params: {
    keyword: '{{query}}',
  },
  timeout: 10000,
};

SNS API の設定です。認証にベアラートークンを使用し、検索キーワードをパラメータとして渡します。

これらの 3 つの HTTP ノードを、1 つの親ノードから接続することで、Dify は自動的に並列実行を認識します。

ステップ 3:サーキットブレーカの実装

各 API 呼び出しの後に、サーキットブレーカロジックを配置します。

エラーカウント管理

pythondef main(
    previous_error_count: int,
    current_status: str,
    error_threshold: int = 3
):
    # 現在の呼び出しが成功したかチェック
    if current_status == "success":
        # 成功した場合はカウントをリセット
        new_error_count = 0
        circuit_state = "CLOSED"
    else:
        # 失敗した場合はカウントをインクリメント
        new_error_count = previous_error_count + 1

        # 閾値チェック
        if new_error_count >= error_threshold:
            circuit_state = "OPEN"
        else:
            circuit_state = "CLOSED"

    return {
        "error_count": new_error_count,
        "circuit_state": circuit_state,
        "is_circuit_open": circuit_state == "OPEN",
        "message": f"エラー数: {new_error_count}/{error_threshold}"
    }

このコードでは、API 呼び出しの成功・失敗に応じてエラーカウントを管理し、閾値(デフォルト 3 回)に達したらサーキットを開きます。成功した場合は、カウントをリセットして通常状態に戻します。

サーキット状態による処理分岐

サーキットブレーカの後に IF/ELSE ノードを配置し、以下のように設定します。

  • IF 条件is_circuit_opentrue に等しい
  • IF パス:代替データ返却(キャッシュデータや空の結果)
  • ELSE パス:通常の処理継続

ステップ 4:リトライ処理の実装

サーキットが開いていない場合で、エラーが発生した場合は、リトライ処理を実行します。

リトライロジック

pythondef main(
    retry_count: int,
    max_retries: int = 3,
    base_delay: int = 2
):
    # 現在のリトライ回数をインクリメント
    current_retry = retry_count + 1

    # リトライ可能かチェック
    can_retry = current_retry <= max_retries

    # 指数バックオフによる待機時間の計算
    # delay = base_delay * (2 ^ retry_count)
    delay_seconds = base_delay * (2 ** retry_count)

    return {
        "retry_count": current_retry,
        "can_retry": can_retry,
        "delay_seconds": delay_seconds,
        "message": f"リトライ {current_retry}/{max_retries} (待機: {delay_seconds}秒)"
    }

このリトライロジックでは、指数バックオフアルゴリズムを使用して、リトライ間隔を徐々に増やします。1 回目のリトライは 2 秒後、2 回目は 4 秒後、3 回目は 8 秒後というように待機時間が増加します。

待機処理の実装

pythonimport time

def main(delay_seconds: int):
    # 指定された秒数待機
    time.sleep(delay_seconds)

    return {
        "result": "待機完了",
        "waited_seconds": delay_seconds
    }

この待機処理は、リトライ間隔を制御するために使用します。Code ノードとして実装し、リトライロジックの後に配置します。

リトライフローの接続

リトライフローは以下のように接続します。

  1. API 呼び出し → サーキットブレーカ
  2. サーキットブレーカ(CLOSED)→ エラーチェック
  3. エラーチェック(エラー発生)→ リトライロジック
  4. リトライロジック(can_retry = true)→ 待機処理
  5. 待機処理 → API 呼び出し(再試行)
  6. リトライロジック(can_retry = false)→ 代替処理

ステップ 5:結果統合とフォーマット

並列処理の各分岐からの結果を統合します。

結果統合ロジック

pythondef main(
    news_data: dict,
    blog_data: dict,
    sns_data: dict
):
    # 各ソースからのデータを統合
    combined_results = []

    # News API の結果を追加
    if news_data and "articles" in news_data:
        for article in news_data["articles"]:
            combined_results.append({
                "source": "News",
                "title": article.get("title", ""),
                "url": article.get("url", ""),
                "published_at": article.get("publishedAt", "")
            })

    # Blog API の結果を追加
    if blog_data and "posts" in blog_data:
        for post in blog_data["posts"]:
            combined_results.append({
                "source": "Blog",
                "title": post.get("title", ""),
                "url": post.get("link", ""),
                "published_at": post.get("date", "")
            })

    # SNS API の結果を追加
    if sns_data and "items" in sns_data:
        for item in sns_data["items"]:
            combined_results.append({
                "source": "SNS",
                "title": item.get("text", "")[:100],  # 最初の100文字
                "url": item.get("permalink", ""),
                "published_at": item.get("created_at", "")
            })

    return {
        "total_results": len(combined_results),
        "results": combined_results
    }

この統合ロジックでは、3 つの異なる API からの結果を統一されたフォーマットに変換し、1 つのリストにまとめます。各データソースの構造が異なっていても、共通のフォーマットに変換することで、後続の処理が簡単になります。

フォーマット処理

pythonfrom datetime import datetime

def main(results: list):
    # 公開日時でソート(新しい順)
    sorted_results = sorted(
        results,
        key=lambda x: x.get("published_at", ""),
        reverse=True
    )

    # 上位10件に絞る
    top_results = sorted_results[:10]

    # ユーザーフレンドリーなフォーマットに変換
    formatted_output = {
        "summary": f"{len(top_results)}件の情報を収集しました",
        "timestamp": datetime.now().isoformat(),
        "items": [
            {
                "番号": idx + 1,
                "ソース": item["source"],
                "タイトル": item["title"],
                "URL": item["url"],
                "公開日時": item["published_at"]
            }
            for idx, item in enumerate(top_results)
        ]
    }

    return formatted_output

このフォーマット処理では、結果を公開日時順にソートし、上位 10 件を選択して、ユーザーが読みやすい形式に整形します。

パフォーマンス比較

4 つのパターンを適用した場合と、適用しない場合のパフォーマンスを比較します。

#項目パターン未適用パターン適用改善率
1平均処理時間30 秒(順次実行)12 秒(並列実行)60% 短縮
2エラー時の回復手動対応が必要自動リトライ自動化
3障害の影響範囲システム全体停止該当分岐のみ局所化
4ユーザー体験頻繁なエラー安定した動作大幅改善

このように、4 つのパターンを組み合わせることで、処理時間の短縮、耐障害性の向上、ユーザー体験の改善が実現できます。

エラーハンドリングのベストプラクティス

実装時には、以下のベストプラクティスを守ることが重要です。

1. エラーメッセージの詳細化

エラーが発生した際には、デバッグしやすいように詳細な情報を記録します。

pythondef main(error: dict):
    # エラー情報を詳細に記録
    error_log = {
        "timestamp": datetime.now().isoformat(),
        "error_type": error.get("type", "Unknown"),
        "error_code": error.get("code", "N/A"),
        "error_message": error.get("message", ""),
        "api_endpoint": error.get("endpoint", ""),
        "request_params": error.get("params", {}),
        "stack_trace": error.get("trace", "")
    }

    return {
        "error_log": error_log,
        "user_message": "一時的なエラーが発生しました。しばらくしてから再試行してください。"
    }

このエラーログには、エラーコード(例:Error 500: Internal Server ErrorTypeError: Cannot read property 'name' of undefined)を必ず含めます。これにより、検索エンジンで同じエラーに遭遇した他の開発者が解決策を見つけやすくなります。

2. タイムアウト設定の最適化

各 API 呼び出しには適切なタイムアウトを設定し、無限待機を防ぎます。

typescript// タイムアウト設定の例
const apiConfig = {
  timeout: 10000, // 10秒
  retryTimeout: 5000, // リトライ時は5秒に短縮
};

タイムアウトを設定することで、レスポンスが遅い API によってワークフロー全体が停止するのを防ぎます。

3. フォールバックデータの準備

API が利用できない場合に備えて、代替データやキャッシュデータを用意します。

pythondef main(api_failed: bool, cache_data: dict):
    # API失敗時にキャッシュデータを使用
    if api_failed and cache_data:
        return {
            "source": "cache",
            "data": cache_data,
            "message": "キャッシュデータを使用しています"
        }
    elif api_failed:
        return {
            "source": "default",
            "data": {
                "items": [],
                "message": "現在データを取得できません"
            }
        }

フォールバックデータを用意することで、エラー時でもユーザーに何らかの結果を返すことができ、ユーザー体験が向上します。

まとめ

本記事では、Dify ワークフローにおける 4 つの重要な処理制御パターン「分岐」「並列」「リトライ」「サーキットブレーカ」について解説しました。

各パターンの要点

**条件分岐(IF/ELSE)**では、ユーザー入力や処理結果に応じて適切な処理ルートを選択できます。複数条件の組み合わせや ELIF 条件を活用することで、複雑なビジネスロジックも実装可能です。

**並列処理(Parallel Branch)**により、独立したタスクを同時実行し、処理時間を大幅に短縮できます。Dify v0.8.0 以降、簡単な接続だけで並列実行が可能になり、パフォーマンスが飛躍的に向上しました。

**リトライ処理(Retry Mechanism)**では、一時的な障害に対して自動的に再試行し、システムの耐障害性を高めることができます。指数バックオフを使用することで、サーバーへの負荷を抑えながら効果的にリトライできます。

**サーキットブレーカ(Circuit Breaker)**は、連鎖的な障害の拡大を防ぎ、システム全体の安定性を維持します。エラー閾値を設定し、問題のあるサービスへの呼び出しを一時停止することで、他の正常な処理への影響を最小限に抑えられます。

実装時の重要ポイント

これらのパターンを実装する際には、以下の点に注意してください。

まず、エラーメッセージには必ずエラーコード(例:Error 404: Not FoundError 500: Internal Server Error)を含めることで、デバッグや検索が容易になります。

次に、タイムアウトやリトライ回数などのパラメータは、環境や要件に応じて調整する必要があります。開発環境と本番環境で異なる設定を使用することも検討しましょう。

また、Dify の制約(ループ禁止、再帰制限など)を理解した上で、それらを回避する設計を心がけることが大切です。

今後の展望

Dify は継続的にアップデートされており、エラーハンドリング機能もさらに強化されていくと予想されます。現在は手動で実装する必要がある一部の機能(Exponential Backoff など)も、将来的にはノードの標準機能として提供される可能性があります。

本記事で紹介した 4 つのパターンを活用することで、より堅牢で効率的な AI ワークフローを構築できるでしょう。ぜひ実際のプロジェクトで試してみてください。

関連リンク