T-CREATOR

Gemini CLI を中核にした“AI パイプライン”設計:前処理 → 推論 → 後処理の標準化

Gemini CLI を中核にした“AI パイプライン”設計:前処理 → 推論 → 後処理の標準化

AI を活用したシステム開発において、「推論部分だけ AI に任せればいい」という発想では、保守性や再現性が低下してしまいます。実際には、データの前処理、推論の実行、結果の後処理を一貫したパイプラインとして設計することが、本番運用できる AI システムの鍵となります。

本記事では、Gemini CLI を中核に据えた AI パイプラインの設計パターンを解説します。前処理 → 推論 → 後処理という 3 つのフェーズを明確に分離し、それぞれのフェーズで何をすべきか、どのようにツールを組み合わせるべきかを具体的なコード例とともに紹介していきます。

AI パイプラインとは

AI パイプラインとは、入力データから最終的な出力結果を得るまでの一連の処理を、段階的に分割して設計したワークフローのことです。

パイプライン設計の必要性

AI モデルは単体では機能しません。実際のシステムでは、以下のような要素が必要です。

  • データの正規化: 入力形式を AI が理解できる形式に変換する
  • 推論の実行: AI モデルに問い合わせて結果を取得する
  • 結果の加工: AI の出力を業務システムで利用可能な形式に変換する

これらを統合した処理フローを「パイプライン」として設計することで、保守性・再現性・テスト容易性が飛躍的に向上します。

Gemini CLI をパイプラインの中核に選ぶ理由

Gemini CLI は、Google の生成 AI モデル Gemini をコマンドラインから利用できるツールです。パイプライン設計において以下の利点があります。

#項目内容
1標準入出力対応Unix パイプで他のツールと連携可能
2JSON 出力構造化データとして後処理しやすい
3エラーハンドリング終了コードで正常・異常を判定可能
4スクリプト化シェルスクリプトで自動化しやすい
5再現性コマンドオプションで推論条件を固定できる

次のセクションでは、パイプライン全体のアーキテクチャを図解で理解していきましょう。

パイプラインアーキテクチャの全体像

AI パイプラインは、前処理・推論・後処理の 3 つのフェーズで構成されます。それぞれのフェーズは独立したモジュールとして設計し、Unix 哲学の「小さなツールを組み合わせる」という原則に従います。

3 フェーズ構成の概念図

以下の図は、データが各フェーズをどのように流れていくかを示しています。

mermaidflowchart LR
  raw["生データ<br/>(CSV/JSON/テキスト)"] -->|前処理| normalized["正規化データ<br/>(標準形式)"]
  normalized -->|推論| gemini["Gemini CLI<br/>(AI 推論)"]
  gemini -->|後処理| result["業務データ<br/>(DB/API/ファイル)"]

  style gemini fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
  style raw fill:#fff3e0,stroke:#f57c00
  style normalized fill:#f1f8e9,stroke:#689f38
  style result fill:#fce4ec,stroke:#c2185b

この図から読み取れる要点は以下の通りです。

  • 各フェーズが明確に分離されている
  • Gemini CLI は推論フェーズのみに集中している
  • データ形式の変換は前処理・後処理で担当している

フェーズごとの責務

各フェーズの責務を明確にすることで、コードの見通しが良くなります。

#フェーズ責務入力例出力例
1前処理データ正規化・検証・プロンプト生成CSV ファイルJSON テキスト
2推論AI モデルへの問い合わせプロンプト文字列AI 応答 JSON
3後処理結果の解釈・変換・保存AI 応答 JSONデータベースレコード

次のセクションからは、各フェーズの具体的な実装方法を見ていきます。

前処理フェーズの設計

前処理フェーズでは、生データを AI が理解できる形式に変換します。このフェーズの品質が、AI の推論精度に直接影響します。

前処理の主要タスク

前処理では以下のタスクを実行します。

  1. データの読み込みと検証
  2. 不正データのフィルタリング
  3. プロンプトの組み立て
  4. 標準形式への変換

データ検証とフィルタリング

まず、入力データの妥当性を検証し、不正なデータを除外します。

typescriptimport fs from 'fs/promises';

// CSVファイルを読み込み、データを検証する関数
async function loadAndValidateData(
  filePath: string
): Promise<string[]> {
  const content = await fs.readFile(filePath, 'utf-8');
  const lines = content.split('\n');

  // 空行やヘッダーを除外
  return lines
    .filter((line) => line.trim().length > 0)
    .slice(1); // ヘッダー行をスキップ
}

この関数は、CSV ファイルを読み込み、空行を除外してデータ行のみを返します。

プロンプトの組み立て

次に、検証済みデータから AI 用のプロンプトを組み立てます。

typescriptinterface PromptOptions {
  systemInstruction?: string;
  temperature?: number;
  maxTokens?: number;
}

// データからプロンプトを生成する関数
function buildPrompt(
  data: string[],
  options: PromptOptions = {}
): string {
  const {
    systemInstruction = 'あなたは優秀なデータアナリストです。',
  } = options;

  const dataSection = data
    .map((line, index) => `${index + 1}. ${line}`)
    .join('\n');

  return `${systemInstruction}

以下のデータを分析してください:

${dataSection}`;
}

このコードは、配列形式のデータを番号付きリストとしてプロンプトに組み込みます。

JSON 形式での出力

前処理の最後に、プロンプトを JSON 形式で出力します。

typescriptinterface PreprocessedData {
  prompt: string;
  metadata: {
    recordCount: number;
    timestamp: string;
    source: string;
  };
}

// 前処理結果をJSON形式で出力する関数
function outputPreprocessedData(
  prompt: string,
  recordCount: number,
  source: string
): string {
  const data: PreprocessedData = {
    prompt,
    metadata: {
      recordCount,
      timestamp: new Date().toISOString(),
      source,
    },
  };

  return JSON.stringify(data, null, 2);
}

この関数は、プロンプトとメタデータを含む JSON を生成します。

前処理の実行例

以下は、前処理を実行するメインスクリプトの例です。

typescript// 前処理パイプラインのメイン関数
async function preprocessPipeline(
  inputFile: string
): Promise<void> {
  try {
    // データの読み込みと検証
    const validData = await loadAndValidateData(inputFile);

    // プロンプトの組み立て
    const prompt = buildPrompt(validData, {
      systemInstruction: 'データの傾向を分析してください。',
    });

    // JSON形式で出力
    const output = outputPreprocessedData(
      prompt,
      validData.length,
      inputFile
    );
    console.log(output);
  } catch (error) {
    console.error('前処理エラー:', error);
    process.exit(1);
  }
}

// スクリプト実行
preprocessPipeline(process.argv[2]);

このスクリプトは、コマンドライン引数でファイルパスを受け取り、前処理を実行して JSON を標準出力に出力します。

前処理フェーズの設計により、AI への入力品質が保証されます。次のセクションでは、この出力を Gemini CLI で処理する推論フェーズを見ていきましょう。

推論フェーズの設計

推論フェーズでは、前処理で生成されたプロンプトを Gemini CLI に渡し、AI の応答を取得します。このフェーズは極力シンプルに保ち、推論に専念させることが重要です。

Gemini CLI の基本的な使い方

Gemini CLI は、標準入力からプロンプトを受け取り、標準出力に結果を返します。

bash# 前処理の出力をGemini CLIに渡す基本パターン
echo "こんにちは" | gemini generate

このコマンドは、テキストを Gemini に送信し、応答を受け取ります。

JSON 入力の処理

前処理フェーズで生成した JSON から、プロンプト部分だけを抽出して Gemini CLI に渡します。

bash# jqを使用してJSONからプロンプトを抽出
cat preprocessed.json | jq -r '.prompt' | gemini generate

jq コマンドで JSON を解析し、prompt フィールドの値だけを取り出しています。

推論パラメータの指定

Gemini CLI では、推論の振る舞いを制御するためのオプションが用意されています。

bash# 温度・最大トークン数・モデルを指定した推論
cat preprocessed.json | jq -r '.prompt' | \
  gemini generate \
    --temperature 0.2 \
    --max-tokens 1024 \
    --model gemini-pro

このコマンドは、温度を低く設定することで決定論的な応答を得やすくしています。

エラーハンドリングの実装

推論フェーズでは、API エラーやネットワークエラーが発生する可能性があります。

bash# エラーハンドリング付きの推論スクリプト
#!/bin/bash

set -euo pipefail

INPUT_JSON="$1"
OUTPUT_FILE="$2"

# JSONからプロンプトを抽出
PROMPT=$(jq -r '.prompt' "$INPUT_JSON")

# Gemini CLIで推論を実行
if ! RESULT=$(echo "$PROMPT" | gemini generate --temperature 0.2 2>&1); then
  echo "Error: Gemini CLI failed" >&2
  echo "Details: $RESULT" >&2
  exit 1
fi

# 結果を出力ファイルに保存
echo "$RESULT" > "$OUTPUT_FILE"
echo "推論完了: $OUTPUT_FILE" >&2

このスクリプトは、set -euo pipefail でエラーを即座に検出し、エラーメッセージを標準エラー出力に書き込みます。

推論結果の構造化

Gemini CLI の出力を JSON 形式で受け取るために、--output-format json オプションを使用します。

bash# JSON形式で推論結果を取得
echo "$PROMPT" | \
  gemini generate \
    --output-format json \
    --temperature 0.2 > inference_result.json

このコマンドは、AI の応答をそのまま JSON として保存します。

リトライ機能の実装

API の一時的な障害に対応するため、リトライ機能を実装します。

bash#!/bin/bash

MAX_RETRIES=3
RETRY_DELAY=5

function inference_with_retry() {
  local prompt="$1"
  local attempt=1

  while [ $attempt -le $MAX_RETRIES ]; do
    echo "推論試行 $attempt/$MAX_RETRIES..." >&2

    # Gemini CLIで推論を実行
    if result=$(echo "$prompt" | gemini generate --output-format json 2>&1); then
      echo "$result"
      return 0
    fi

    # リトライ前の待機
    if [ $attempt -lt $MAX_RETRIES ]; then
      echo "エラー発生。${RETRY_DELAY}秒後にリトライします..." >&2
      sleep $RETRY_DELAY
    fi

    attempt=$((attempt + 1))
  done

  echo "Error: 最大リトライ回数に到達しました" >&2
  return 1
}

# 使用例
PROMPT=$(jq -r '.prompt' input.json)
inference_with_retry "$PROMPT"

この関数は、最大 3 回まで推論を試行し、失敗時には 5 秒待機してからリトライします。

推論フェーズの実行フロー

以下の図は、推論フェーズの実行フローを示しています。

mermaidflowchart TD
  start["前処理JSON"] --> extract["jqでプロンプト抽出"]
  extract --> inference["Gemini CLI実行"]
  inference --> check{"成功?"}
  check -->|Yes| output["JSON出力"]
  check -->|No| retry{"リトライ<br/>可能?"}
  retry -->|Yes| wait["待機"]
  wait --> inference
  retry -->|No| error["エラー終了"]
  output --> done["次フェーズへ"]

  style inference fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
  style error fill:#ffebee,stroke:#c62828
  style done fill:#e8f5e9,stroke:#2e7d32

この図から分かるように、推論フェーズはシンプルな構造ですが、エラーハンドリングとリトライ機能により堅牢性を確保しています。

推論フェーズでは、Gemini CLI を中心に据えつつ、エラー処理とリトライ機能で安定性を高めることが重要です。次のセクションでは、AI の応答を業務システムで利用可能な形式に変換する後処理フェーズを解説します。

後処理フェーズの設計

後処理フェーズでは、Gemini CLI から得られた AI 応答を解釈し、業務システムで利用可能な形式に変換します。このフェーズの品質が、システム全体の実用性を左右します。

後処理の主要タスク

後処理では以下のタスクを実行します。

  1. AI 応答の構造解析
  2. データの検証と整形
  3. 業務形式への変換
  4. 永続化(データベース保存・ファイル出力)

AI 応答の構造解析

まず、Gemini CLI から返された JSON を解析します。

typescriptinterface GeminiResponse {
  candidates: Array<{
    content: {
      parts: Array<{
        text: string;
      }>;
    };
    finishReason: string;
  }>;
  usageMetadata?: {
    promptTokenCount: number;
    candidatesTokenCount: number;
    totalTokenCount: number;
  };
}

// Gemini CLIの応答を解析する関数
function parseGeminiResponse(
  jsonText: string
): string | null {
  try {
    const response: GeminiResponse = JSON.parse(jsonText);

    // 最初の候補から応答テキストを取得
    const firstCandidate = response.candidates?.[0];
    if (!firstCandidate) {
      console.error('応答に候補が含まれていません');
      return null;
    }

    const text = firstCandidate.content?.parts?.[0]?.text;
    if (!text) {
      console.error('応答テキストが空です');
      return null;
    }

    return text;
  } catch (error) {
    console.error('JSON解析エラー:', error);
    return null;
  }
}

この関数は、Gemini CLI の応答構造から実際のテキスト部分だけを抽出します。

構造化データの抽出

AI の応答が構造化されたデータ(JSON や表形式)を含む場合、それを抽出します。

typescriptinterface ExtractedData {
  items: Array<{
    name: string;
    value: number;
  }>;
}

// AIの応答からJSON部分を抽出する関数
function extractStructuredData(
  responseText: string
): ExtractedData | null {
  // マークダウンコードブロック内のJSONを抽出
  const jsonMatch = responseText.match(
    /```json\n([\s\S]+?)\n```/
  );

  if (!jsonMatch) {
    console.error('構造化データが見つかりません');
    return null;
  }

  try {
    return JSON.parse(jsonMatch[1]);
  } catch (error) {
    console.error('構造化データの解析エラー:', error);
    return null;
  }
}

このコードは、Markdown コードブロック内の JSON を正規表現で抽出し、パースします。

データの検証と正規化

抽出したデータの妥当性を検証し、業務ロジックに適合する形式に正規化します。

typescriptinterface ValidatedItem {
  name: string;
  value: number;
  processedAt: string;
}

// データを検証して正規化する関数
function validateAndNormalize(
  data: ExtractedData
): ValidatedItem[] {
  return data.items
    .filter((item) => {
      // 必須フィールドのチェック
      if (!item.name || typeof item.value !== 'number') {
        console.warn('無効なアイテムをスキップ:', item);
        return false;
      }

      // 値の範囲チェック
      if (item.value < 0 || item.value > 1000000) {
        console.warn('範囲外の値をスキップ:', item);
        return false;
      }

      return true;
    })
    .map((item) => ({
      name: item.name.trim(),
      value: Math.round(item.value), // 整数化
      processedAt: new Date().toISOString(),
    }));
}

この関数は、不正なデータを除外し、値を整数化して処理時刻を追加します。

データベースへの保存

検証済みデータをデータベースに保存します。

typescriptimport { Pool } from 'pg';

// データベース接続プールの作成
const pool = new Pool({
  host: process.env.DB_HOST,
  database: process.env.DB_NAME,
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
});

// データベースにデータを保存する関数
async function saveToDatabase(
  items: ValidatedItem[]
): Promise<void> {
  const client = await pool.connect();

  try {
    await client.query('BEGIN');

    // 各アイテムをINSERT
    for (const item of items) {
      await client.query(
        'INSERT INTO analysis_results (name, value, processed_at) VALUES ($1, $2, $3)',
        [item.name, item.value, item.processedAt]
      );
    }

    await client.query('COMMIT');
    console.log(
      `${items.length}件のレコードを保存しました`
    );
  } catch (error) {
    await client.query('ROLLBACK');
    console.error('データベース保存エラー:', error);
    throw error;
  } finally {
    client.release();
  }
}

このコードは、トランザクションを使用して複数レコードを安全に保存します。

ファイル出力による永続化

データベースの代わりに、CSV や JSON ファイルとして出力することもできます。

typescriptimport fs from 'fs/promises';

// CSVファイルとして出力する関数
async function saveAsCsv(
  items: ValidatedItem[],
  outputPath: string
): Promise<void> {
  const header = 'name,value,processed_at\n';
  const rows = items
    .map(
      (item) =>
        `"${item.name}",${item.value},"${item.processedAt}"`
    )
    .join('\n');

  const csv = header + rows;
  await fs.writeFile(outputPath, csv, 'utf-8');
  console.log(`CSVファイルを保存しました: ${outputPath}`);
}

// JSONファイルとして出力する関数
async function saveAsJson(
  items: ValidatedItem[],
  outputPath: string
): Promise<void> {
  const json = JSON.stringify(items, null, 2);
  await fs.writeFile(outputPath, json, 'utf-8');
  console.log(`JSONファイルを保存しました: ${outputPath}`);
}

これらの関数は、データをファイルとして永続化する代替手段を提供します。

後処理パイプラインの統合

以下は、後処理の全ステップを統合したメイン関数です。

typescript// 後処理パイプラインのメイン関数
async function postprocessPipeline(
  inferenceResultPath: string
): Promise<void> {
  try {
    // 1. Gemini応答の読み込み
    const jsonText = await fs.readFile(
      inferenceResultPath,
      'utf-8'
    );

    // 2. 応答の解析
    const responseText = parseGeminiResponse(jsonText);
    if (!responseText) {
      throw new Error('応答の解析に失敗しました');
    }

    // 3. 構造化データの抽出
    const extractedData =
      extractStructuredData(responseText);
    if (!extractedData) {
      throw new Error('構造化データの抽出に失敗しました');
    }

    // 4. データの検証と正規化
    const validatedItems =
      validateAndNormalize(extractedData);

    if (validatedItems.length === 0) {
      console.warn('有効なデータが見つかりませんでした');
      return;
    }

    // 5. データベースへの保存
    await saveToDatabase(validatedItems);

    // 6. バックアップとしてJSON出力
    await saveAsJson(validatedItems, 'output/result.json');

    console.log('後処理が完了しました');
  } catch (error) {
    console.error('後処理エラー:', error);
    process.exit(1);
  }
}

// スクリプト実行
postprocessPipeline(process.argv[2]);

このスクリプトは、AI 応答の解析からデータベース保存までを一貫して処理します。

後処理フローの可視化

以下の図は、後処理フェーズの処理フローを示しています。

mermaidflowchart TD
  input["Gemini応答JSON"] --> parse["JSON解析"]
  parse --> extract["構造化データ抽出"]
  extract --> validate["データ検証"]
  validate --> filter{"有効データ<br/>あり?"}
  filter -->|No| warn["警告ログ出力"]
  filter -->|Yes| save["データベース保存"]
  save --> backup["JSONバックアップ"]
  backup --> done["完了"]
  warn --> done

  style input fill:#e3f2fd,stroke:#1976d2
  style save fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
  style done fill:#f1f8e9,stroke:#689f38
  style warn fill:#fff3e0,stroke:#f57c00

この図から、後処理フェーズが多段階の検証とエラーハンドリングを経て、データを安全に永続化していることが分かります。

後処理フェーズでは、AI 応答を業務システムに統合するための重要な役割を果たします。次のセクションでは、3 つのフェーズを統合した完全なパイプラインの実装を見ていきましょう。

パイプライン全体の統合

ここまで設計した前処理・推論・後処理の 3 つのフェーズを統合し、エンドツーエンドで動作する AI パイプラインを構築します。

パイプライン統合の方針

統合にあたり、以下の設計原則を適用します。

#原則理由
1各フェーズは独立したスクリプトテスト容易性・再利用性が向上
2フェーズ間はファイルまたはパイプで連携疎結合を保ちデバッグしやすい
3エラーは即座に検出して停止異常なデータが後続フェーズに流れない
4ログは標準エラー出力に記録データフローを汚染しない

シェルスクリプトによる統合

Unix パイプを使用して、3 つのフェーズを連結します。

bash#!/bin/bash
# pipeline.sh - AI パイプライン統合スクリプト

set -euo pipefail

# カラー出力用の定義
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # No Color

function log_info() {
  echo -e "${GREEN}[INFO]${NC} $1" >&2
}

function log_warn() {
  echo -e "${YELLOW}[WARN]${NC} $1" >&2
}

function log_error() {
  echo -e "${RED}[ERROR]${NC} $1" >&2
}

# 使用方法の表示
if [ $# -lt 1 ]; then
  echo "Usage: $0 <input-file>" >&2
  exit 1
fi

INPUT_FILE="$1"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
WORK_DIR="work/${TIMESTAMP}"

# 作業ディレクトリの作成
mkdir -p "$WORK_DIR"

log_info "AIパイプラインを開始します: $INPUT_FILE"

# フェーズ1: 前処理
log_info "[1/3] 前処理フェーズ"
PREPROCESSED="${WORK_DIR}/preprocessed.json"

if ! node src/preprocess.js "$INPUT_FILE" > "$PREPROCESSED"; then
  log_error "前処理に失敗しました"
  exit 1
fi

log_info "前処理完了: $PREPROCESSED"

このスクリプトは、色付きログ出力とタイムスタンプ付き作業ディレクトリを使用して、処理の進捗を視覚的に追跡できます。

推論フェーズの統合

前処理の結果を Gemini CLI に渡します。

bash# フェーズ2: 推論
log_info "[2/3] 推論フェーズ"
INFERENCE_RESULT="${WORK_DIR}/inference_result.json"

# JSONからプロンプトを抽出
PROMPT=$(jq -r '.prompt' "$PREPROCESSED")

# Gemini CLIで推論を実行(リトライ機能付き)
MAX_RETRIES=3
RETRY_COUNT=0

while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do
  log_info "推論を実行します (試行 $((RETRY_COUNT + 1))/$MAX_RETRIES)"

  if echo "$PROMPT" | gemini generate \
    --output-format json \
    --temperature 0.2 \
    --max-tokens 2048 > "$INFERENCE_RESULT" 2>&1; then
    log_info "推論完了: $INFERENCE_RESULT"
    break
  fi

  RETRY_COUNT=$((RETRY_COUNT + 1))

  if [ $RETRY_COUNT -lt $MAX_RETRIES ]; then
    log_warn "推論に失敗しました。5秒後にリトライします..."
    sleep 5
  else
    log_error "推論に失敗しました(最大リトライ回数に到達)"
    exit 1
  fi
done

このコードは、推論の失敗時に自動的にリトライする機能を実装しています。

後処理フェーズの統合

最後に、推論結果を業務データに変換します。

bash# フェーズ3: 後処理
log_info "[3/3] 後処理フェーズ"
FINAL_OUTPUT="${WORK_DIR}/final_result.json"

if ! node src/postprocess.js "$INFERENCE_RESULT" > "$FINAL_OUTPUT"; then
  log_error "後処理に失敗しました"
  exit 1
fi

log_info "後処理完了: $FINAL_OUTPUT"

# 結果サマリーの表示
RECORD_COUNT=$(jq '. | length' "$FINAL_OUTPUT")
log_info "===================="
log_info "パイプライン完了"
log_info "処理レコード数: $RECORD_COUNT"
log_info "出力先: $FINAL_OUTPUT"
log_info "===================="

このスクリプトは、最終的な処理結果のサマリーを表示します。

パイプライン実行フローの可視化

以下の図は、統合されたパイプライン全体の実行フローを示しています。

mermaidflowchart TD
  start["入力ファイル"] --> mkdir["作業ディレクトリ作成"]
  mkdir --> phase1["前処理<br/>(Node.js)"]
  phase1 --> check1{"成功?"}
  check1 -->|No| error1["エラー終了"]
  check1 -->|Yes| phase2["推論<br/>(Gemini CLI)"]
  phase2 --> check2{"成功?"}
  check2 -->|No| retry{"リトライ<br/>可能?"}
  retry -->|Yes| phase2
  retry -->|No| error2["エラー終了"]
  check2 -->|Yes| phase3["後処理<br/>(Node.js)"]
  phase3 --> check3{"成功?"}
  check3 -->|No| error3["エラー終了"]
  check3 -->|Yes| summary["結果サマリー表示"]
  summary --> done["完了"]

  style phase1 fill:#f1f8e9,stroke:#689f38,stroke-width:2px
  style phase2 fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
  style phase3 fill:#fce4ec,stroke:#c2185b,stroke-width:2px
  style done fill:#e8f5e9,stroke:#2e7d32
  style error1 fill:#ffebee,stroke:#c62828
  style error2 fill:#ffebee,stroke:#c62828
  style error3 fill:#ffebee,stroke:#c62828

この図から、各フェーズが独立して動作しつつ、エラーハンドリングとリトライ機能により堅牢性が確保されていることが分かります。

並列処理による高速化

複数の入力ファイルを並列処理することで、スループットを向上させることができます。

bash#!/bin/bash
# parallel_pipeline.sh - 並列実行対応パイプライン

set -euo pipefail

INPUT_DIR="$1"
MAX_PARALLEL=4

# 入力ファイルを取得
INPUT_FILES=($(find "$INPUT_DIR" -name "*.csv"))

log_info "入力ファイル数: ${#INPUT_FILES[@]}"
log_info "並列度: $MAX_PARALLEL"

# GNU parallelを使用して並列実行
export -f log_info log_warn log_error
printf '%s\n' "${INPUT_FILES[@]}" | \
  parallel -j "$MAX_PARALLEL" \
    './pipeline.sh {}'

log_info "全ての入力ファイルの処理が完了しました"

このスクリプトは、GNU parallel を使用して複数のパイプラインを同時に実行します。

TypeScript による統合(代替案)

シェルスクリプトの代わりに、TypeScript で統合することもできます。

typescriptimport { spawn } from 'child_process';
import { promises as fs } from 'fs';
import path from 'path';

interface PipelineConfig {
  inputFile: string;
  workDir: string;
  maxRetries: number;
}

// パイプライン実行クラス
class AIPipeline {
  constructor(private config: PipelineConfig) {}

  // 外部コマンドを実行するヘルパー関数
  private async runCommand(
    command: string,
    args: string[],
    input?: string
  ): Promise<string> {
    return new Promise((resolve, reject) => {
      const proc = spawn(command, args);
      let stdout = '';
      let stderr = '';

      if (input) {
        proc.stdin.write(input);
        proc.stdin.end();
      }

      proc.stdout.on('data', (data) => (stdout += data));
      proc.stderr.on('data', (data) => (stderr += data));

      proc.on('close', (code) => {
        if (code === 0) {
          resolve(stdout);
        } else {
          reject(new Error(`Command failed: ${stderr}`));
        }
      });
    });
  }

  // パイプライン実行
  async run(): Promise<void> {
    const { inputFile, workDir, maxRetries } = this.config;

    // 作業ディレクトリの作成
    await fs.mkdir(workDir, { recursive: true });

    console.log('[1/3] 前処理フェーズ');
    const preprocessedPath = path.join(
      workDir,
      'preprocessed.json'
    );
    const preprocessedData = await this.runCommand('node', [
      'src/preprocess.js',
      inputFile,
    ]);
    await fs.writeFile(preprocessedPath, preprocessedData);

    console.log('[2/3] 推論フェーズ');
    const prompt = JSON.parse(preprocessedData).prompt;
    let inferenceResult: string | null = null;

    for (let retry = 0; retry < maxRetries; retry++) {
      try {
        inferenceResult = await this.runCommand(
          'gemini',
          [
            'generate',
            '--output-format',
            'json',
            '--temperature',
            '0.2',
          ],
          prompt
        );
        break;
      } catch (error) {
        console.warn(`リトライ ${retry + 1}/${maxRetries}`);
        if (retry === maxRetries - 1) throw error;
        await new Promise((resolve) =>
          setTimeout(resolve, 5000)
        );
      }
    }

    const inferenceResultPath = path.join(
      workDir,
      'inference_result.json'
    );
    await fs.writeFile(
      inferenceResultPath,
      inferenceResult!
    );

    console.log('[3/3] 後処理フェーズ');
    const finalResult = await this.runCommand('node', [
      'src/postprocess.js',
      inferenceResultPath,
    ]);

    const finalOutputPath = path.join(
      workDir,
      'final_result.json'
    );
    await fs.writeFile(finalOutputPath, finalResult);

    console.log('パイプライン完了:', finalOutputPath);
  }
}

// メイン実行
const pipeline = new AIPipeline({
  inputFile: process.argv[2],
  workDir: `work/${Date.now()}`,
  maxRetries: 3,
});

pipeline.run().catch((error) => {
  console.error('パイプライン実行エラー:', error);
  process.exit(1);
});

このコードは、TypeScript の型安全性とエラーハンドリングを活用して、より保守性の高いパイプラインを実現しています。

パイプライン全体の統合により、前処理・推論・後処理が一貫したワークフローとして動作します。次のセクションでは、実際の業務シナリオでこのパイプラインを適用する具体例を紹介します。

実践例:顧客フィードバック分析パイプライン

ここでは、実際の業務シナリオとして、顧客からのフィードバックコメントを AI で分析し、感情分類とキーワード抽出を行うパイプラインを構築します。

シナリオ概要

以下のような業務要件があるとします。

  • 入力: 顧客フィードバックを含む CSV ファイル(1000 件/日)
  • 処理: 各フィードバックを「ポジティブ/ネガティブ/ニュートラル」に分類し、重要キーワードを抽出
  • 出力: 分析結果をデータベースに保存し、ダッシュボードで可視化

入力データ形式

CSV ファイルの形式は以下の通りです。

csvid,customer_name,feedback,timestamp
1,山田太郎,製品の使い心地が非常に良いです,2025-01-15T10:30:00Z
2,佐藤花子,サポートの対応が遅すぎる,2025-01-15T11:00:00Z
3,鈴木一郎,価格は高いが品質に満足している,2025-01-15T11:30:00Z

各行に顧客 ID、名前、フィードバック、タイムスタンプが含まれています。

前処理の実装

CSV ファイルを読み込み、AI 用のプロンプトを生成します。

typescriptimport fs from 'fs/promises';
import { parse } from 'csv-parse/sync';

interface FeedbackRecord {
  id: string;
  customer_name: string;
  feedback: string;
  timestamp: string;
}

// CSVファイルを読み込む関数
async function loadFeedbackCsv(
  filePath: string
): Promise<FeedbackRecord[]> {
  const content = await fs.readFile(filePath, 'utf-8');
  const records = parse(content, {
    columns: true,
    skip_empty_lines: true,
  });

  return records;
}

この関数は、CSV ファイルをパースしてオブジェクトの配列として返します。

次に、フィードバック分析用のプロンプトを構築します。

typescript// フィードバック分析用プロンプトを生成する関数
function buildAnalysisPrompt(
  records: FeedbackRecord[]
): string {
  const feedbackList = records
    .map(
      (record, index) =>
        `${index + 1}. [ID:${record.id}] ${record.feedback}`
    )
    .join('\n');

  return `あなたは顧客フィードバックの分析エキスパートです。

以下の顧客フィードバックを分析し、各フィードバックについて以下の情報をJSON形式で出力してください:

1. sentiment: "positive" / "negative" / "neutral" のいずれか
2. keywords: 重要なキーワードを3つまで抽出(配列形式)
3. priority: 対応優先度(1-5の整数、5が最優先)

フィードバック一覧:
${feedbackList}

出力形式(JSON配列):
\`\`\`json
[
  {
    "id": "1",
    "sentiment": "positive",
    "keywords": ["使い心地", "良い"],
    "priority": 2
  }
]
\`\`\`

上記の形式で分析結果を出力してください。`;
}

このプロンプトは、AI に対して明確な出力形式を指示しています。

推論フェーズの実行

プロンプトを Gemini CLI に渡して推論を実行します。

bash#!/bin/bash
# feedback_inference.sh

set -euo pipefail

PREPROCESSED_JSON="$1"
OUTPUT_JSON="$2"

# JSONからプロンプトを抽出
PROMPT=$(jq -r '.prompt' "$PREPROCESSED_JSON")

# Gemini CLIで推論を実行
echo "$PROMPT" | \
  gemini generate \
    --output-format json \
    --temperature 0.1 \
    --max-tokens 4096 \
    > "$OUTPUT_JSON"

echo "推論完了: $OUTPUT_JSON" >&2

温度を 0.1 に設定することで、より一貫性のある分析結果を得られます。

後処理の実装

AI の分析結果を解析し、データベースに保存します。

typescriptimport { Pool } from 'pg';

interface AnalysisResult {
  id: string;
  sentiment: 'positive' | 'negative' | 'neutral';
  keywords: string[];
  priority: number;
}

// Gemini応答から分析結果を抽出する関数
function extractAnalysisResults(
  geminiResponseJson: string
): AnalysisResult[] {
  const response = JSON.parse(geminiResponseJson);
  const text =
    response.candidates?.[0]?.content?.parts?.[0]?.text;

  if (!text) {
    throw new Error('応答テキストが空です');
  }

  // JSONコードブロックを抽出
  const jsonMatch = text.match(/```json\n([\s\S]+?)\n```/);
  if (!jsonMatch) {
    throw new Error('JSON形式の分析結果が見つかりません');
  }

  return JSON.parse(jsonMatch[1]);
}

この関数は、Gemini の応答から JSON 部分だけを抽出します。

次に、分析結果をデータベースに保存します。

typescriptconst pool = new Pool({
  host: process.env.DB_HOST,
  database: 'feedback_analysis',
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
});

// 分析結果をデータベースに保存する関数
async function saveAnalysisResults(
  results: AnalysisResult[]
): Promise<void> {
  const client = await pool.connect();

  try {
    await client.query('BEGIN');

    for (const result of results) {
      await client.query(
        `INSERT INTO feedback_analysis
         (feedback_id, sentiment, keywords, priority, analyzed_at)
         VALUES ($1, $2, $3, $4, NOW())
         ON CONFLICT (feedback_id)
         DO UPDATE SET
           sentiment = EXCLUDED.sentiment,
           keywords = EXCLUDED.keywords,
           priority = EXCLUDED.priority,
           analyzed_at = NOW()`,
        [
          result.id,
          result.sentiment,
          result.keywords,
          result.priority,
        ]
      );
    }

    await client.query('COMMIT');
    console.log(
      `${results.length}件の分析結果を保存しました`
    );
  } catch (error) {
    await client.query('ROLLBACK');
    throw error;
  } finally {
    client.release();
  }
}

このコードは、ON CONFLICT を使用して、既存レコードの更新にも対応しています。

統合実行スクリプト

3 つのフェーズを統合して実行します。

bash#!/bin/bash
# feedback_pipeline.sh - 顧客フィードバック分析パイプライン

set -euo pipefail

INPUT_CSV="$1"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
WORK_DIR="work/feedback_${TIMESTAMP}"

mkdir -p "$WORK_DIR"

echo "[1/3] 前処理: CSVからプロンプトを生成"
node src/feedback_preprocess.js "$INPUT_CSV" > "${WORK_DIR}/preprocessed.json"

echo "[2/3] 推論: Gemini CLIで分析実行"
bash scripts/feedback_inference.sh \
  "${WORK_DIR}/preprocessed.json" \
  "${WORK_DIR}/inference_result.json"

echo "[3/3] 後処理: 分析結果をデータベースに保存"
node src/feedback_postprocess.js "${WORK_DIR}/inference_result.json"

echo "フィードバック分析パイプライン完了"

このスクリプトにより、CSV ファイルからデータベース保存までが自動化されます。

実行結果の可視化

以下の図は、フィードバック分析パイプラインのデータフローを示しています。

mermaidflowchart LR
  csv["顧客フィードバック<br/>CSV"] -->|前処理| prompt["分析プロンプト<br/>JSON"]
  prompt -->|推論| gemini["Gemini CLI<br/>感情・キーワード分析"]
  gemini -->|後処理| parse["JSON解析"]
  parse --> db[("PostgreSQL<br/>分析結果DB")]
  db --> dashboard["ダッシュボード<br/>可視化"]

  style csv fill:#fff3e0,stroke:#f57c00
  style gemini fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
  style db fill:#e8f5e9,stroke:#2e7d32
  style dashboard fill:#fce4ec,stroke:#c2185b

このパイプラインにより、日次で自動的に顧客フィードバックを分析し、ビジネスインサイトを得ることができます。

パフォーマンス最適化

大量のフィードバックを効率的に処理するため、バッチ処理を実装します。

typescript// バッチ処理用の関数
async function processFeedbackInBatches(
  records: FeedbackRecord[],
  batchSize: number = 50
): Promise<void> {
  const batches: FeedbackRecord[][] = [];

  // レコードをバッチに分割
  for (let i = 0; i < records.length; i += batchSize) {
    batches.push(records.slice(i, i + batchSize));
  }

  console.log(
    `全${records.length}件を${batches.length}バッチに分割`
  );

  // 各バッチを順次処理
  for (let i = 0; i < batches.length; i++) {
    console.log(
      `バッチ ${i + 1}/${batches.length} を処理中...`
    );

    const prompt = buildAnalysisPrompt(batches[i]);
    // プロンプトをファイルに保存して推論フェーズに渡す
    const batchWorkDir = `work/batch_${i}`;
    await fs.mkdir(batchWorkDir, { recursive: true });
    await fs.writeFile(
      `${batchWorkDir}/preprocessed.json`,
      JSON.stringify({ prompt })
    );

    // 推論と後処理を実行(詳細は省略)
  }

  console.log('全バッチの処理が完了しました');
}

このコードは、大量のフィードバックを小さなバッチに分割して処理することで、API のレート制限に対応します。

実践例を通じて、AI パイプラインが実際のビジネス課題をどのように解決するかを示しました。次のセクションでは、パイプライン設計のベストプラクティスをまとめます。

パイプライン設計のベストプラクティス

ここまで見てきた AI パイプラインの設計において、実務で重要となるベストプラクティスをまとめます。

設計原則

AI パイプラインを構築する際に守るべき原則です。

#原則理由実装方法
1フェーズの明確な分離テスト容易性・デバッグしやすさ各フェーズを独立したスクリプトにする
2冪等性の確保再実行時の安全性DB の UPSERT、ファイルの上書き保存
3エラーの早期検出異常データの伝播防止set -e でスクリプト終了
4ログの構造化運用時のトラブルシュートJSON 形式のログ出力
5リトライ機能一時的障害への対応指数バックオフでリトライ

エラーハンドリングのパターン

AI パイプラインで発生する典型的なエラーと対処方法です。

typescript// エラー種別の定義
enum PipelineErrorType {
  INPUT_VALIDATION = 'INPUT_VALIDATION',
  INFERENCE_FAILURE = 'INFERENCE_FAILURE',
  OUTPUT_PARSING = 'OUTPUT_PARSING',
  DATABASE_ERROR = 'DATABASE_ERROR',
}

// カスタムエラークラス
class PipelineError extends Error {
  constructor(
    public type: PipelineErrorType,
    message: string,
    public context?: Record<string, any>
  ) {
    super(message);
    this.name = 'PipelineError';
  }
}

// エラーハンドリング付きの実行関数
async function executeWithErrorHandling<T>(
  fn: () => Promise<T>,
  errorType: PipelineErrorType,
  context?: Record<string, any>
): Promise<T> {
  try {
    return await fn();
  } catch (error) {
    throw new PipelineError(
      errorType,
      error instanceof Error
        ? error.message
        : String(error),
      context
    );
  }
}

このパターンにより、エラーの種別と発生箇所を明確に追跡できます。

ログ設計のベストプラクティス

構造化ログを使用することで、運用時の問題解決が容易になります。

typescriptinterface LogEntry {
  timestamp: string;
  level: 'info' | 'warn' | 'error';
  phase: 'preprocess' | 'inference' | 'postprocess';
  message: string;
  metadata?: Record<string, any>;
}

// 構造化ログ出力関数
function logStructured(
  level: LogEntry['level'],
  phase: LogEntry['phase'],
  message: string,
  metadata?: Record<string, any>
): void {
  const entry: LogEntry = {
    timestamp: new Date().toISOString(),
    level,
    phase,
    message,
    metadata,
  };

  // 標準エラー出力にJSON形式で出力
  console.error(JSON.stringify(entry));
}

// 使用例
logStructured('info', 'preprocess', 'データ読み込み開始', {
  filePath: 'input.csv',
  recordCount: 1000,
});

JSON 形式のログは、ログ分析ツール(Elasticsearch、CloudWatch Logs など)で検索・集計しやすくなります。

テスト戦略

各フェーズを独立してテストすることで、品質を確保します。

typescriptimport { describe, it, expect } from '@jest/globals';

describe('前処理フェーズのテスト', () => {
  it('正常なCSVデータからプロンプトを生成できる', async () => {
    const testCsvPath = 'test/fixtures/valid_feedback.csv';
    const records = await loadFeedbackCsv(testCsvPath);
    const prompt = buildAnalysisPrompt(records);

    expect(prompt).toContain('顧客フィードバックの分析');
    expect(prompt).toContain('sentiment');
    expect(records.length).toBeGreaterThan(0);
  });

  it('空のCSVファイルはエラーを投げる', async () => {
    const testCsvPath = 'test/fixtures/empty.csv';

    await expect(
      loadFeedbackCsv(testCsvPath)
    ).rejects.toThrow('データが空です');
  });
});

このようなユニットテストにより、各フェーズの動作を保証します。

モニタリングとアラート

パイプラインの稼働状況を監視するメトリクスを定義します。

#メトリクス説明アラート閾値
1処理時間パイプライン全体の実行時間平均の 2 倍超
2エラー率全実行に対する失敗の割合5% 超
3トークン消費量AI API の使用量日次予算の 90% 超
4レコード処理数処理したレコード数期待値の 50% 未満

コスト最適化

AI API の利用コストを最適化する方法です。

typescript// トークン数を推定する関数(簡易版)
function estimateTokenCount(text: string): number {
  // 英語:4文字≒1トークン、日本語:2文字≒1トークン
  const englishChars =
    text.match(/[a-zA-Z0-9\s]/g)?.length || 0;
  const japaneseChars = text.length - englishChars;

  return Math.ceil(englishChars / 4 + japaneseChars / 2);
}

// コスト推定を含む前処理
function buildPromptWithCostEstimate(
  records: FeedbackRecord[]
): {
  prompt: string;
  estimatedCost: number;
} {
  const prompt = buildAnalysisPrompt(records);
  const tokenCount = estimateTokenCount(prompt);

  // Gemini Proの料金(2025年1月時点の例):$0.00025/1Kトークン
  const estimatedCost = (tokenCount / 1000) * 0.00025;

  console.warn(
    `推定トークン数: ${tokenCount}, 推定コスト: $${estimatedCost.toFixed(
      4
    )}`
  );

  return { prompt, estimatedCost };
}

このコードは、推論実行前にコストを推定し、予算超過を事前に防ぎます。

バージョン管理とドキュメント

パイプラインの変更履歴を追跡するため、以下の情報を記録します。

yaml# pipeline_config.yaml
version: 1.2.0
description: 顧客フィードバック分析パイプライン
changelog:
  - version: 1.2.0
    date: 2025-01-20
    changes:
      - バッチ処理機能を追加
      - リトライロジックを改善
  - version: 1.1.0
    date: 2025-01-10
    changes:
      - 後処理でキーワード抽出を追加

phases:
  preprocess:
    script: src/feedback_preprocess.js
    input_format: CSV
    output_format: JSON
  inference:
    cli: gemini generate
    options:
      temperature: 0.1
      max_tokens: 4096
  postprocess:
    script: src/feedback_postprocess.js
    database: feedback_analysis

このような設定ファイルにより、パイプラインの仕様を一元管理できます。

セキュリティとコンプライアンス

AI パイプラインでは、以下のセキュリティ対策が必要です。

#対策実装方法
1API キーの保護環境変数または秘密管理サービス(AWS Secrets Manager など)
2個人情報の匿名化前処理フェーズで名前・メールアドレスをマスク
3ログの機密情報除外ログ出力前に機密データをフィルタリング
4アクセス制御データベースとファイルのパーミッション設定

ベストプラクティスを適用することで、保守性・信頼性・コスト効率の高い AI パイプラインを実現できます。

まとめ

本記事では、Gemini CLI を中核に据えた AI パイプラインの設計手法を、前処理・推論・後処理の 3 フェーズに分けて解説しました。

AI システムを本番運用する際には、推論部分だけでなく、データの前処理と結果の後処理を含めた一貫したワークフローを設計することが不可欠です。各フェーズを独立したモジュールとして構築することで、テスト容易性・保守性・再利用性が飛躍的に向上します。

Gemini CLI は、標準入出力に対応しているため、Unix パイプや既存のツールと自然に統合できる点が大きな強みです。シェルスクリプトによる統合から、TypeScript による型安全な実装まで、プロジェクトの要件に応じた柔軟な設計が可能になります。

顧客フィードバック分析の実践例で示したように、このパイプライン設計パターンは、さまざまな業務シナリオに適用できます。エラーハンドリング、リトライ機能、構造化ログ、コスト最適化といったベストプラクティスを適用することで、信頼性の高い AI システムを構築していきましょう。

AI 技術の進化は目覚ましいですが、それを実務で活用するためには、地道なエンジニアリングの積み重ねが必要です。本記事で紹介した設計パターンが、皆さんの AI プロジェクトの成功に貢献できれば幸いです。

関連リンク