Gemini CLI を中核にした“AI パイプライン”設計:前処理 → 推論 → 後処理の標準化
AI を活用したシステム開発において、「推論部分だけ AI に任せればいい」という発想では、保守性や再現性が低下してしまいます。実際には、データの前処理、推論の実行、結果の後処理を一貫したパイプラインとして設計することが、本番運用できる AI システムの鍵となります。
本記事では、Gemini CLI を中核に据えた AI パイプラインの設計パターンを解説します。前処理 → 推論 → 後処理という 3 つのフェーズを明確に分離し、それぞれのフェーズで何をすべきか、どのようにツールを組み合わせるべきかを具体的なコード例とともに紹介していきます。
AI パイプラインとは
AI パイプラインとは、入力データから最終的な出力結果を得るまでの一連の処理を、段階的に分割して設計したワークフローのことです。
パイプライン設計の必要性
AI モデルは単体では機能しません。実際のシステムでは、以下のような要素が必要です。
- データの正規化: 入力形式を AI が理解できる形式に変換する
- 推論の実行: AI モデルに問い合わせて結果を取得する
- 結果の加工: AI の出力を業務システムで利用可能な形式に変換する
これらを統合した処理フローを「パイプライン」として設計することで、保守性・再現性・テスト容易性が飛躍的に向上します。
Gemini CLI をパイプラインの中核に選ぶ理由
Gemini CLI は、Google の生成 AI モデル Gemini をコマンドラインから利用できるツールです。パイプライン設計において以下の利点があります。
| # | 項目 | 内容 |
|---|---|---|
| 1 | 標準入出力対応 | Unix パイプで他のツールと連携可能 |
| 2 | JSON 出力 | 構造化データとして後処理しやすい |
| 3 | エラーハンドリング | 終了コードで正常・異常を判定可能 |
| 4 | スクリプト化 | シェルスクリプトで自動化しやすい |
| 5 | 再現性 | コマンドオプションで推論条件を固定できる |
次のセクションでは、パイプライン全体のアーキテクチャを図解で理解していきましょう。
パイプラインアーキテクチャの全体像
AI パイプラインは、前処理・推論・後処理の 3 つのフェーズで構成されます。それぞれのフェーズは独立したモジュールとして設計し、Unix 哲学の「小さなツールを組み合わせる」という原則に従います。
3 フェーズ構成の概念図
以下の図は、データが各フェーズをどのように流れていくかを示しています。
mermaidflowchart LR
raw["生データ<br/>(CSV/JSON/テキスト)"] -->|前処理| normalized["正規化データ<br/>(標準形式)"]
normalized -->|推論| gemini["Gemini CLI<br/>(AI 推論)"]
gemini -->|後処理| result["業務データ<br/>(DB/API/ファイル)"]
style gemini fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style raw fill:#fff3e0,stroke:#f57c00
style normalized fill:#f1f8e9,stroke:#689f38
style result fill:#fce4ec,stroke:#c2185b
この図から読み取れる要点は以下の通りです。
- 各フェーズが明確に分離されている
- Gemini CLI は推論フェーズのみに集中している
- データ形式の変換は前処理・後処理で担当している
フェーズごとの責務
各フェーズの責務を明確にすることで、コードの見通しが良くなります。
| # | フェーズ | 責務 | 入力例 | 出力例 |
|---|---|---|---|---|
| 1 | 前処理 | データ正規化・検証・プロンプト生成 | CSV ファイル | JSON テキスト |
| 2 | 推論 | AI モデルへの問い合わせ | プロンプト文字列 | AI 応答 JSON |
| 3 | 後処理 | 結果の解釈・変換・保存 | AI 応答 JSON | データベースレコード |
次のセクションからは、各フェーズの具体的な実装方法を見ていきます。
前処理フェーズの設計
前処理フェーズでは、生データを AI が理解できる形式に変換します。このフェーズの品質が、AI の推論精度に直接影響します。
前処理の主要タスク
前処理では以下のタスクを実行します。
- データの読み込みと検証
- 不正データのフィルタリング
- プロンプトの組み立て
- 標準形式への変換
データ検証とフィルタリング
まず、入力データの妥当性を検証し、不正なデータを除外します。
typescriptimport fs from 'fs/promises';
// CSVファイルを読み込み、データを検証する関数
async function loadAndValidateData(
filePath: string
): Promise<string[]> {
const content = await fs.readFile(filePath, 'utf-8');
const lines = content.split('\n');
// 空行やヘッダーを除外
return lines
.filter((line) => line.trim().length > 0)
.slice(1); // ヘッダー行をスキップ
}
この関数は、CSV ファイルを読み込み、空行を除外してデータ行のみを返します。
プロンプトの組み立て
次に、検証済みデータから AI 用のプロンプトを組み立てます。
typescriptinterface PromptOptions {
systemInstruction?: string;
temperature?: number;
maxTokens?: number;
}
// データからプロンプトを生成する関数
function buildPrompt(
data: string[],
options: PromptOptions = {}
): string {
const {
systemInstruction = 'あなたは優秀なデータアナリストです。',
} = options;
const dataSection = data
.map((line, index) => `${index + 1}. ${line}`)
.join('\n');
return `${systemInstruction}
以下のデータを分析してください:
${dataSection}`;
}
このコードは、配列形式のデータを番号付きリストとしてプロンプトに組み込みます。
JSON 形式での出力
前処理の最後に、プロンプトを JSON 形式で出力します。
typescriptinterface PreprocessedData {
prompt: string;
metadata: {
recordCount: number;
timestamp: string;
source: string;
};
}
// 前処理結果をJSON形式で出力する関数
function outputPreprocessedData(
prompt: string,
recordCount: number,
source: string
): string {
const data: PreprocessedData = {
prompt,
metadata: {
recordCount,
timestamp: new Date().toISOString(),
source,
},
};
return JSON.stringify(data, null, 2);
}
この関数は、プロンプトとメタデータを含む JSON を生成します。
前処理の実行例
以下は、前処理を実行するメインスクリプトの例です。
typescript// 前処理パイプラインのメイン関数
async function preprocessPipeline(
inputFile: string
): Promise<void> {
try {
// データの読み込みと検証
const validData = await loadAndValidateData(inputFile);
// プロンプトの組み立て
const prompt = buildPrompt(validData, {
systemInstruction: 'データの傾向を分析してください。',
});
// JSON形式で出力
const output = outputPreprocessedData(
prompt,
validData.length,
inputFile
);
console.log(output);
} catch (error) {
console.error('前処理エラー:', error);
process.exit(1);
}
}
// スクリプト実行
preprocessPipeline(process.argv[2]);
このスクリプトは、コマンドライン引数でファイルパスを受け取り、前処理を実行して JSON を標準出力に出力します。
前処理フェーズの設計により、AI への入力品質が保証されます。次のセクションでは、この出力を Gemini CLI で処理する推論フェーズを見ていきましょう。
推論フェーズの設計
推論フェーズでは、前処理で生成されたプロンプトを Gemini CLI に渡し、AI の応答を取得します。このフェーズは極力シンプルに保ち、推論に専念させることが重要です。
Gemini CLI の基本的な使い方
Gemini CLI は、標準入力からプロンプトを受け取り、標準出力に結果を返します。
bash# 前処理の出力をGemini CLIに渡す基本パターン
echo "こんにちは" | gemini generate
このコマンドは、テキストを Gemini に送信し、応答を受け取ります。
JSON 入力の処理
前処理フェーズで生成した JSON から、プロンプト部分だけを抽出して Gemini CLI に渡します。
bash# jqを使用してJSONからプロンプトを抽出
cat preprocessed.json | jq -r '.prompt' | gemini generate
jq コマンドで JSON を解析し、prompt フィールドの値だけを取り出しています。
推論パラメータの指定
Gemini CLI では、推論の振る舞いを制御するためのオプションが用意されています。
bash# 温度・最大トークン数・モデルを指定した推論
cat preprocessed.json | jq -r '.prompt' | \
gemini generate \
--temperature 0.2 \
--max-tokens 1024 \
--model gemini-pro
このコマンドは、温度を低く設定することで決定論的な応答を得やすくしています。
エラーハンドリングの実装
推論フェーズでは、API エラーやネットワークエラーが発生する可能性があります。
bash# エラーハンドリング付きの推論スクリプト
#!/bin/bash
set -euo pipefail
INPUT_JSON="$1"
OUTPUT_FILE="$2"
# JSONからプロンプトを抽出
PROMPT=$(jq -r '.prompt' "$INPUT_JSON")
# Gemini CLIで推論を実行
if ! RESULT=$(echo "$PROMPT" | gemini generate --temperature 0.2 2>&1); then
echo "Error: Gemini CLI failed" >&2
echo "Details: $RESULT" >&2
exit 1
fi
# 結果を出力ファイルに保存
echo "$RESULT" > "$OUTPUT_FILE"
echo "推論完了: $OUTPUT_FILE" >&2
このスクリプトは、set -euo pipefail でエラーを即座に検出し、エラーメッセージを標準エラー出力に書き込みます。
推論結果の構造化
Gemini CLI の出力を JSON 形式で受け取るために、--output-format json オプションを使用します。
bash# JSON形式で推論結果を取得
echo "$PROMPT" | \
gemini generate \
--output-format json \
--temperature 0.2 > inference_result.json
このコマンドは、AI の応答をそのまま JSON として保存します。
リトライ機能の実装
API の一時的な障害に対応するため、リトライ機能を実装します。
bash#!/bin/bash
MAX_RETRIES=3
RETRY_DELAY=5
function inference_with_retry() {
local prompt="$1"
local attempt=1
while [ $attempt -le $MAX_RETRIES ]; do
echo "推論試行 $attempt/$MAX_RETRIES..." >&2
# Gemini CLIで推論を実行
if result=$(echo "$prompt" | gemini generate --output-format json 2>&1); then
echo "$result"
return 0
fi
# リトライ前の待機
if [ $attempt -lt $MAX_RETRIES ]; then
echo "エラー発生。${RETRY_DELAY}秒後にリトライします..." >&2
sleep $RETRY_DELAY
fi
attempt=$((attempt + 1))
done
echo "Error: 最大リトライ回数に到達しました" >&2
return 1
}
# 使用例
PROMPT=$(jq -r '.prompt' input.json)
inference_with_retry "$PROMPT"
この関数は、最大 3 回まで推論を試行し、失敗時には 5 秒待機してからリトライします。
推論フェーズの実行フロー
以下の図は、推論フェーズの実行フローを示しています。
mermaidflowchart TD
start["前処理JSON"] --> extract["jqでプロンプト抽出"]
extract --> inference["Gemini CLI実行"]
inference --> check{"成功?"}
check -->|Yes| output["JSON出力"]
check -->|No| retry{"リトライ<br/>可能?"}
retry -->|Yes| wait["待機"]
wait --> inference
retry -->|No| error["エラー終了"]
output --> done["次フェーズへ"]
style inference fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style error fill:#ffebee,stroke:#c62828
style done fill:#e8f5e9,stroke:#2e7d32
この図から分かるように、推論フェーズはシンプルな構造ですが、エラーハンドリングとリトライ機能により堅牢性を確保しています。
推論フェーズでは、Gemini CLI を中心に据えつつ、エラー処理とリトライ機能で安定性を高めることが重要です。次のセクションでは、AI の応答を業務システムで利用可能な形式に変換する後処理フェーズを解説します。
後処理フェーズの設計
後処理フェーズでは、Gemini CLI から得られた AI 応答を解釈し、業務システムで利用可能な形式に変換します。このフェーズの品質が、システム全体の実用性を左右します。
後処理の主要タスク
後処理では以下のタスクを実行します。
- AI 応答の構造解析
- データの検証と整形
- 業務形式への変換
- 永続化(データベース保存・ファイル出力)
AI 応答の構造解析
まず、Gemini CLI から返された JSON を解析します。
typescriptinterface GeminiResponse {
candidates: Array<{
content: {
parts: Array<{
text: string;
}>;
};
finishReason: string;
}>;
usageMetadata?: {
promptTokenCount: number;
candidatesTokenCount: number;
totalTokenCount: number;
};
}
// Gemini CLIの応答を解析する関数
function parseGeminiResponse(
jsonText: string
): string | null {
try {
const response: GeminiResponse = JSON.parse(jsonText);
// 最初の候補から応答テキストを取得
const firstCandidate = response.candidates?.[0];
if (!firstCandidate) {
console.error('応答に候補が含まれていません');
return null;
}
const text = firstCandidate.content?.parts?.[0]?.text;
if (!text) {
console.error('応答テキストが空です');
return null;
}
return text;
} catch (error) {
console.error('JSON解析エラー:', error);
return null;
}
}
この関数は、Gemini CLI の応答構造から実際のテキスト部分だけを抽出します。
構造化データの抽出
AI の応答が構造化されたデータ(JSON や表形式)を含む場合、それを抽出します。
typescriptinterface ExtractedData {
items: Array<{
name: string;
value: number;
}>;
}
// AIの応答からJSON部分を抽出する関数
function extractStructuredData(
responseText: string
): ExtractedData | null {
// マークダウンコードブロック内のJSONを抽出
const jsonMatch = responseText.match(
/```json\n([\s\S]+?)\n```/
);
if (!jsonMatch) {
console.error('構造化データが見つかりません');
return null;
}
try {
return JSON.parse(jsonMatch[1]);
} catch (error) {
console.error('構造化データの解析エラー:', error);
return null;
}
}
このコードは、Markdown コードブロック内の JSON を正規表現で抽出し、パースします。
データの検証と正規化
抽出したデータの妥当性を検証し、業務ロジックに適合する形式に正規化します。
typescriptinterface ValidatedItem {
name: string;
value: number;
processedAt: string;
}
// データを検証して正規化する関数
function validateAndNormalize(
data: ExtractedData
): ValidatedItem[] {
return data.items
.filter((item) => {
// 必須フィールドのチェック
if (!item.name || typeof item.value !== 'number') {
console.warn('無効なアイテムをスキップ:', item);
return false;
}
// 値の範囲チェック
if (item.value < 0 || item.value > 1000000) {
console.warn('範囲外の値をスキップ:', item);
return false;
}
return true;
})
.map((item) => ({
name: item.name.trim(),
value: Math.round(item.value), // 整数化
processedAt: new Date().toISOString(),
}));
}
この関数は、不正なデータを除外し、値を整数化して処理時刻を追加します。
データベースへの保存
検証済みデータをデータベースに保存します。
typescriptimport { Pool } from 'pg';
// データベース接続プールの作成
const pool = new Pool({
host: process.env.DB_HOST,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
});
// データベースにデータを保存する関数
async function saveToDatabase(
items: ValidatedItem[]
): Promise<void> {
const client = await pool.connect();
try {
await client.query('BEGIN');
// 各アイテムをINSERT
for (const item of items) {
await client.query(
'INSERT INTO analysis_results (name, value, processed_at) VALUES ($1, $2, $3)',
[item.name, item.value, item.processedAt]
);
}
await client.query('COMMIT');
console.log(
`${items.length}件のレコードを保存しました`
);
} catch (error) {
await client.query('ROLLBACK');
console.error('データベース保存エラー:', error);
throw error;
} finally {
client.release();
}
}
このコードは、トランザクションを使用して複数レコードを安全に保存します。
ファイル出力による永続化
データベースの代わりに、CSV や JSON ファイルとして出力することもできます。
typescriptimport fs from 'fs/promises';
// CSVファイルとして出力する関数
async function saveAsCsv(
items: ValidatedItem[],
outputPath: string
): Promise<void> {
const header = 'name,value,processed_at\n';
const rows = items
.map(
(item) =>
`"${item.name}",${item.value},"${item.processedAt}"`
)
.join('\n');
const csv = header + rows;
await fs.writeFile(outputPath, csv, 'utf-8');
console.log(`CSVファイルを保存しました: ${outputPath}`);
}
// JSONファイルとして出力する関数
async function saveAsJson(
items: ValidatedItem[],
outputPath: string
): Promise<void> {
const json = JSON.stringify(items, null, 2);
await fs.writeFile(outputPath, json, 'utf-8');
console.log(`JSONファイルを保存しました: ${outputPath}`);
}
これらの関数は、データをファイルとして永続化する代替手段を提供します。
後処理パイプラインの統合
以下は、後処理の全ステップを統合したメイン関数です。
typescript// 後処理パイプラインのメイン関数
async function postprocessPipeline(
inferenceResultPath: string
): Promise<void> {
try {
// 1. Gemini応答の読み込み
const jsonText = await fs.readFile(
inferenceResultPath,
'utf-8'
);
// 2. 応答の解析
const responseText = parseGeminiResponse(jsonText);
if (!responseText) {
throw new Error('応答の解析に失敗しました');
}
// 3. 構造化データの抽出
const extractedData =
extractStructuredData(responseText);
if (!extractedData) {
throw new Error('構造化データの抽出に失敗しました');
}
// 4. データの検証と正規化
const validatedItems =
validateAndNormalize(extractedData);
if (validatedItems.length === 0) {
console.warn('有効なデータが見つかりませんでした');
return;
}
// 5. データベースへの保存
await saveToDatabase(validatedItems);
// 6. バックアップとしてJSON出力
await saveAsJson(validatedItems, 'output/result.json');
console.log('後処理が完了しました');
} catch (error) {
console.error('後処理エラー:', error);
process.exit(1);
}
}
// スクリプト実行
postprocessPipeline(process.argv[2]);
このスクリプトは、AI 応答の解析からデータベース保存までを一貫して処理します。
後処理フローの可視化
以下の図は、後処理フェーズの処理フローを示しています。
mermaidflowchart TD
input["Gemini応答JSON"] --> parse["JSON解析"]
parse --> extract["構造化データ抽出"]
extract --> validate["データ検証"]
validate --> filter{"有効データ<br/>あり?"}
filter -->|No| warn["警告ログ出力"]
filter -->|Yes| save["データベース保存"]
save --> backup["JSONバックアップ"]
backup --> done["完了"]
warn --> done
style input fill:#e3f2fd,stroke:#1976d2
style save fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
style done fill:#f1f8e9,stroke:#689f38
style warn fill:#fff3e0,stroke:#f57c00
この図から、後処理フェーズが多段階の検証とエラーハンドリングを経て、データを安全に永続化していることが分かります。
後処理フェーズでは、AI 応答を業務システムに統合するための重要な役割を果たします。次のセクションでは、3 つのフェーズを統合した完全なパイプラインの実装を見ていきましょう。
パイプライン全体の統合
ここまで設計した前処理・推論・後処理の 3 つのフェーズを統合し、エンドツーエンドで動作する AI パイプラインを構築します。
パイプライン統合の方針
統合にあたり、以下の設計原則を適用します。
| # | 原則 | 理由 |
|---|---|---|
| 1 | 各フェーズは独立したスクリプト | テスト容易性・再利用性が向上 |
| 2 | フェーズ間はファイルまたはパイプで連携 | 疎結合を保ちデバッグしやすい |
| 3 | エラーは即座に検出して停止 | 異常なデータが後続フェーズに流れない |
| 4 | ログは標準エラー出力に記録 | データフローを汚染しない |
シェルスクリプトによる統合
Unix パイプを使用して、3 つのフェーズを連結します。
bash#!/bin/bash
# pipeline.sh - AI パイプライン統合スクリプト
set -euo pipefail
# カラー出力用の定義
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # No Color
function log_info() {
echo -e "${GREEN}[INFO]${NC} $1" >&2
}
function log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1" >&2
}
function log_error() {
echo -e "${RED}[ERROR]${NC} $1" >&2
}
# 使用方法の表示
if [ $# -lt 1 ]; then
echo "Usage: $0 <input-file>" >&2
exit 1
fi
INPUT_FILE="$1"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
WORK_DIR="work/${TIMESTAMP}"
# 作業ディレクトリの作成
mkdir -p "$WORK_DIR"
log_info "AIパイプラインを開始します: $INPUT_FILE"
# フェーズ1: 前処理
log_info "[1/3] 前処理フェーズ"
PREPROCESSED="${WORK_DIR}/preprocessed.json"
if ! node src/preprocess.js "$INPUT_FILE" > "$PREPROCESSED"; then
log_error "前処理に失敗しました"
exit 1
fi
log_info "前処理完了: $PREPROCESSED"
このスクリプトは、色付きログ出力とタイムスタンプ付き作業ディレクトリを使用して、処理の進捗を視覚的に追跡できます。
推論フェーズの統合
前処理の結果を Gemini CLI に渡します。
bash# フェーズ2: 推論
log_info "[2/3] 推論フェーズ"
INFERENCE_RESULT="${WORK_DIR}/inference_result.json"
# JSONからプロンプトを抽出
PROMPT=$(jq -r '.prompt' "$PREPROCESSED")
# Gemini CLIで推論を実行(リトライ機能付き)
MAX_RETRIES=3
RETRY_COUNT=0
while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do
log_info "推論を実行します (試行 $((RETRY_COUNT + 1))/$MAX_RETRIES)"
if echo "$PROMPT" | gemini generate \
--output-format json \
--temperature 0.2 \
--max-tokens 2048 > "$INFERENCE_RESULT" 2>&1; then
log_info "推論完了: $INFERENCE_RESULT"
break
fi
RETRY_COUNT=$((RETRY_COUNT + 1))
if [ $RETRY_COUNT -lt $MAX_RETRIES ]; then
log_warn "推論に失敗しました。5秒後にリトライします..."
sleep 5
else
log_error "推論に失敗しました(最大リトライ回数に到達)"
exit 1
fi
done
このコードは、推論の失敗時に自動的にリトライする機能を実装しています。
後処理フェーズの統合
最後に、推論結果を業務データに変換します。
bash# フェーズ3: 後処理
log_info "[3/3] 後処理フェーズ"
FINAL_OUTPUT="${WORK_DIR}/final_result.json"
if ! node src/postprocess.js "$INFERENCE_RESULT" > "$FINAL_OUTPUT"; then
log_error "後処理に失敗しました"
exit 1
fi
log_info "後処理完了: $FINAL_OUTPUT"
# 結果サマリーの表示
RECORD_COUNT=$(jq '. | length' "$FINAL_OUTPUT")
log_info "===================="
log_info "パイプライン完了"
log_info "処理レコード数: $RECORD_COUNT"
log_info "出力先: $FINAL_OUTPUT"
log_info "===================="
このスクリプトは、最終的な処理結果のサマリーを表示します。
パイプライン実行フローの可視化
以下の図は、統合されたパイプライン全体の実行フローを示しています。
mermaidflowchart TD
start["入力ファイル"] --> mkdir["作業ディレクトリ作成"]
mkdir --> phase1["前処理<br/>(Node.js)"]
phase1 --> check1{"成功?"}
check1 -->|No| error1["エラー終了"]
check1 -->|Yes| phase2["推論<br/>(Gemini CLI)"]
phase2 --> check2{"成功?"}
check2 -->|No| retry{"リトライ<br/>可能?"}
retry -->|Yes| phase2
retry -->|No| error2["エラー終了"]
check2 -->|Yes| phase3["後処理<br/>(Node.js)"]
phase3 --> check3{"成功?"}
check3 -->|No| error3["エラー終了"]
check3 -->|Yes| summary["結果サマリー表示"]
summary --> done["完了"]
style phase1 fill:#f1f8e9,stroke:#689f38,stroke-width:2px
style phase2 fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style phase3 fill:#fce4ec,stroke:#c2185b,stroke-width:2px
style done fill:#e8f5e9,stroke:#2e7d32
style error1 fill:#ffebee,stroke:#c62828
style error2 fill:#ffebee,stroke:#c62828
style error3 fill:#ffebee,stroke:#c62828
この図から、各フェーズが独立して動作しつつ、エラーハンドリングとリトライ機能により堅牢性が確保されていることが分かります。
並列処理による高速化
複数の入力ファイルを並列処理することで、スループットを向上させることができます。
bash#!/bin/bash
# parallel_pipeline.sh - 並列実行対応パイプライン
set -euo pipefail
INPUT_DIR="$1"
MAX_PARALLEL=4
# 入力ファイルを取得
INPUT_FILES=($(find "$INPUT_DIR" -name "*.csv"))
log_info "入力ファイル数: ${#INPUT_FILES[@]}"
log_info "並列度: $MAX_PARALLEL"
# GNU parallelを使用して並列実行
export -f log_info log_warn log_error
printf '%s\n' "${INPUT_FILES[@]}" | \
parallel -j "$MAX_PARALLEL" \
'./pipeline.sh {}'
log_info "全ての入力ファイルの処理が完了しました"
このスクリプトは、GNU parallel を使用して複数のパイプラインを同時に実行します。
TypeScript による統合(代替案)
シェルスクリプトの代わりに、TypeScript で統合することもできます。
typescriptimport { spawn } from 'child_process';
import { promises as fs } from 'fs';
import path from 'path';
interface PipelineConfig {
inputFile: string;
workDir: string;
maxRetries: number;
}
// パイプライン実行クラス
class AIPipeline {
constructor(private config: PipelineConfig) {}
// 外部コマンドを実行するヘルパー関数
private async runCommand(
command: string,
args: string[],
input?: string
): Promise<string> {
return new Promise((resolve, reject) => {
const proc = spawn(command, args);
let stdout = '';
let stderr = '';
if (input) {
proc.stdin.write(input);
proc.stdin.end();
}
proc.stdout.on('data', (data) => (stdout += data));
proc.stderr.on('data', (data) => (stderr += data));
proc.on('close', (code) => {
if (code === 0) {
resolve(stdout);
} else {
reject(new Error(`Command failed: ${stderr}`));
}
});
});
}
// パイプライン実行
async run(): Promise<void> {
const { inputFile, workDir, maxRetries } = this.config;
// 作業ディレクトリの作成
await fs.mkdir(workDir, { recursive: true });
console.log('[1/3] 前処理フェーズ');
const preprocessedPath = path.join(
workDir,
'preprocessed.json'
);
const preprocessedData = await this.runCommand('node', [
'src/preprocess.js',
inputFile,
]);
await fs.writeFile(preprocessedPath, preprocessedData);
console.log('[2/3] 推論フェーズ');
const prompt = JSON.parse(preprocessedData).prompt;
let inferenceResult: string | null = null;
for (let retry = 0; retry < maxRetries; retry++) {
try {
inferenceResult = await this.runCommand(
'gemini',
[
'generate',
'--output-format',
'json',
'--temperature',
'0.2',
],
prompt
);
break;
} catch (error) {
console.warn(`リトライ ${retry + 1}/${maxRetries}`);
if (retry === maxRetries - 1) throw error;
await new Promise((resolve) =>
setTimeout(resolve, 5000)
);
}
}
const inferenceResultPath = path.join(
workDir,
'inference_result.json'
);
await fs.writeFile(
inferenceResultPath,
inferenceResult!
);
console.log('[3/3] 後処理フェーズ');
const finalResult = await this.runCommand('node', [
'src/postprocess.js',
inferenceResultPath,
]);
const finalOutputPath = path.join(
workDir,
'final_result.json'
);
await fs.writeFile(finalOutputPath, finalResult);
console.log('パイプライン完了:', finalOutputPath);
}
}
// メイン実行
const pipeline = new AIPipeline({
inputFile: process.argv[2],
workDir: `work/${Date.now()}`,
maxRetries: 3,
});
pipeline.run().catch((error) => {
console.error('パイプライン実行エラー:', error);
process.exit(1);
});
このコードは、TypeScript の型安全性とエラーハンドリングを活用して、より保守性の高いパイプラインを実現しています。
パイプライン全体の統合により、前処理・推論・後処理が一貫したワークフローとして動作します。次のセクションでは、実際の業務シナリオでこのパイプラインを適用する具体例を紹介します。
実践例:顧客フィードバック分析パイプライン
ここでは、実際の業務シナリオとして、顧客からのフィードバックコメントを AI で分析し、感情分類とキーワード抽出を行うパイプラインを構築します。
シナリオ概要
以下のような業務要件があるとします。
- 入力: 顧客フィードバックを含む CSV ファイル(1000 件/日)
- 処理: 各フィードバックを「ポジティブ/ネガティブ/ニュートラル」に分類し、重要キーワードを抽出
- 出力: 分析結果をデータベースに保存し、ダッシュボードで可視化
入力データ形式
CSV ファイルの形式は以下の通りです。
csvid,customer_name,feedback,timestamp
1,山田太郎,製品の使い心地が非常に良いです,2025-01-15T10:30:00Z
2,佐藤花子,サポートの対応が遅すぎる,2025-01-15T11:00:00Z
3,鈴木一郎,価格は高いが品質に満足している,2025-01-15T11:30:00Z
各行に顧客 ID、名前、フィードバック、タイムスタンプが含まれています。
前処理の実装
CSV ファイルを読み込み、AI 用のプロンプトを生成します。
typescriptimport fs from 'fs/promises';
import { parse } from 'csv-parse/sync';
interface FeedbackRecord {
id: string;
customer_name: string;
feedback: string;
timestamp: string;
}
// CSVファイルを読み込む関数
async function loadFeedbackCsv(
filePath: string
): Promise<FeedbackRecord[]> {
const content = await fs.readFile(filePath, 'utf-8');
const records = parse(content, {
columns: true,
skip_empty_lines: true,
});
return records;
}
この関数は、CSV ファイルをパースしてオブジェクトの配列として返します。
次に、フィードバック分析用のプロンプトを構築します。
typescript// フィードバック分析用プロンプトを生成する関数
function buildAnalysisPrompt(
records: FeedbackRecord[]
): string {
const feedbackList = records
.map(
(record, index) =>
`${index + 1}. [ID:${record.id}] ${record.feedback}`
)
.join('\n');
return `あなたは顧客フィードバックの分析エキスパートです。
以下の顧客フィードバックを分析し、各フィードバックについて以下の情報をJSON形式で出力してください:
1. sentiment: "positive" / "negative" / "neutral" のいずれか
2. keywords: 重要なキーワードを3つまで抽出(配列形式)
3. priority: 対応優先度(1-5の整数、5が最優先)
フィードバック一覧:
${feedbackList}
出力形式(JSON配列):
\`\`\`json
[
{
"id": "1",
"sentiment": "positive",
"keywords": ["使い心地", "良い"],
"priority": 2
}
]
\`\`\`
上記の形式で分析結果を出力してください。`;
}
このプロンプトは、AI に対して明確な出力形式を指示しています。
推論フェーズの実行
プロンプトを Gemini CLI に渡して推論を実行します。
bash#!/bin/bash
# feedback_inference.sh
set -euo pipefail
PREPROCESSED_JSON="$1"
OUTPUT_JSON="$2"
# JSONからプロンプトを抽出
PROMPT=$(jq -r '.prompt' "$PREPROCESSED_JSON")
# Gemini CLIで推論を実行
echo "$PROMPT" | \
gemini generate \
--output-format json \
--temperature 0.1 \
--max-tokens 4096 \
> "$OUTPUT_JSON"
echo "推論完了: $OUTPUT_JSON" >&2
温度を 0.1 に設定することで、より一貫性のある分析結果を得られます。
後処理の実装
AI の分析結果を解析し、データベースに保存します。
typescriptimport { Pool } from 'pg';
interface AnalysisResult {
id: string;
sentiment: 'positive' | 'negative' | 'neutral';
keywords: string[];
priority: number;
}
// Gemini応答から分析結果を抽出する関数
function extractAnalysisResults(
geminiResponseJson: string
): AnalysisResult[] {
const response = JSON.parse(geminiResponseJson);
const text =
response.candidates?.[0]?.content?.parts?.[0]?.text;
if (!text) {
throw new Error('応答テキストが空です');
}
// JSONコードブロックを抽出
const jsonMatch = text.match(/```json\n([\s\S]+?)\n```/);
if (!jsonMatch) {
throw new Error('JSON形式の分析結果が見つかりません');
}
return JSON.parse(jsonMatch[1]);
}
この関数は、Gemini の応答から JSON 部分だけを抽出します。
次に、分析結果をデータベースに保存します。
typescriptconst pool = new Pool({
host: process.env.DB_HOST,
database: 'feedback_analysis',
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
});
// 分析結果をデータベースに保存する関数
async function saveAnalysisResults(
results: AnalysisResult[]
): Promise<void> {
const client = await pool.connect();
try {
await client.query('BEGIN');
for (const result of results) {
await client.query(
`INSERT INTO feedback_analysis
(feedback_id, sentiment, keywords, priority, analyzed_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (feedback_id)
DO UPDATE SET
sentiment = EXCLUDED.sentiment,
keywords = EXCLUDED.keywords,
priority = EXCLUDED.priority,
analyzed_at = NOW()`,
[
result.id,
result.sentiment,
result.keywords,
result.priority,
]
);
}
await client.query('COMMIT');
console.log(
`${results.length}件の分析結果を保存しました`
);
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
このコードは、ON CONFLICT を使用して、既存レコードの更新にも対応しています。
統合実行スクリプト
3 つのフェーズを統合して実行します。
bash#!/bin/bash
# feedback_pipeline.sh - 顧客フィードバック分析パイプライン
set -euo pipefail
INPUT_CSV="$1"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
WORK_DIR="work/feedback_${TIMESTAMP}"
mkdir -p "$WORK_DIR"
echo "[1/3] 前処理: CSVからプロンプトを生成"
node src/feedback_preprocess.js "$INPUT_CSV" > "${WORK_DIR}/preprocessed.json"
echo "[2/3] 推論: Gemini CLIで分析実行"
bash scripts/feedback_inference.sh \
"${WORK_DIR}/preprocessed.json" \
"${WORK_DIR}/inference_result.json"
echo "[3/3] 後処理: 分析結果をデータベースに保存"
node src/feedback_postprocess.js "${WORK_DIR}/inference_result.json"
echo "フィードバック分析パイプライン完了"
このスクリプトにより、CSV ファイルからデータベース保存までが自動化されます。
実行結果の可視化
以下の図は、フィードバック分析パイプラインのデータフローを示しています。
mermaidflowchart LR
csv["顧客フィードバック<br/>CSV"] -->|前処理| prompt["分析プロンプト<br/>JSON"]
prompt -->|推論| gemini["Gemini CLI<br/>感情・キーワード分析"]
gemini -->|後処理| parse["JSON解析"]
parse --> db[("PostgreSQL<br/>分析結果DB")]
db --> dashboard["ダッシュボード<br/>可視化"]
style csv fill:#fff3e0,stroke:#f57c00
style gemini fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style db fill:#e8f5e9,stroke:#2e7d32
style dashboard fill:#fce4ec,stroke:#c2185b
このパイプラインにより、日次で自動的に顧客フィードバックを分析し、ビジネスインサイトを得ることができます。
パフォーマンス最適化
大量のフィードバックを効率的に処理するため、バッチ処理を実装します。
typescript// バッチ処理用の関数
async function processFeedbackInBatches(
records: FeedbackRecord[],
batchSize: number = 50
): Promise<void> {
const batches: FeedbackRecord[][] = [];
// レコードをバッチに分割
for (let i = 0; i < records.length; i += batchSize) {
batches.push(records.slice(i, i + batchSize));
}
console.log(
`全${records.length}件を${batches.length}バッチに分割`
);
// 各バッチを順次処理
for (let i = 0; i < batches.length; i++) {
console.log(
`バッチ ${i + 1}/${batches.length} を処理中...`
);
const prompt = buildAnalysisPrompt(batches[i]);
// プロンプトをファイルに保存して推論フェーズに渡す
const batchWorkDir = `work/batch_${i}`;
await fs.mkdir(batchWorkDir, { recursive: true });
await fs.writeFile(
`${batchWorkDir}/preprocessed.json`,
JSON.stringify({ prompt })
);
// 推論と後処理を実行(詳細は省略)
}
console.log('全バッチの処理が完了しました');
}
このコードは、大量のフィードバックを小さなバッチに分割して処理することで、API のレート制限に対応します。
実践例を通じて、AI パイプラインが実際のビジネス課題をどのように解決するかを示しました。次のセクションでは、パイプライン設計のベストプラクティスをまとめます。
パイプライン設計のベストプラクティス
ここまで見てきた AI パイプラインの設計において、実務で重要となるベストプラクティスをまとめます。
設計原則
AI パイプラインを構築する際に守るべき原則です。
| # | 原則 | 理由 | 実装方法 |
|---|---|---|---|
| 1 | フェーズの明確な分離 | テスト容易性・デバッグしやすさ | 各フェーズを独立したスクリプトにする |
| 2 | 冪等性の確保 | 再実行時の安全性 | DB の UPSERT、ファイルの上書き保存 |
| 3 | エラーの早期検出 | 異常データの伝播防止 | set -e でスクリプト終了 |
| 4 | ログの構造化 | 運用時のトラブルシュート | JSON 形式のログ出力 |
| 5 | リトライ機能 | 一時的障害への対応 | 指数バックオフでリトライ |
エラーハンドリングのパターン
AI パイプラインで発生する典型的なエラーと対処方法です。
typescript// エラー種別の定義
enum PipelineErrorType {
INPUT_VALIDATION = 'INPUT_VALIDATION',
INFERENCE_FAILURE = 'INFERENCE_FAILURE',
OUTPUT_PARSING = 'OUTPUT_PARSING',
DATABASE_ERROR = 'DATABASE_ERROR',
}
// カスタムエラークラス
class PipelineError extends Error {
constructor(
public type: PipelineErrorType,
message: string,
public context?: Record<string, any>
) {
super(message);
this.name = 'PipelineError';
}
}
// エラーハンドリング付きの実行関数
async function executeWithErrorHandling<T>(
fn: () => Promise<T>,
errorType: PipelineErrorType,
context?: Record<string, any>
): Promise<T> {
try {
return await fn();
} catch (error) {
throw new PipelineError(
errorType,
error instanceof Error
? error.message
: String(error),
context
);
}
}
このパターンにより、エラーの種別と発生箇所を明確に追跡できます。
ログ設計のベストプラクティス
構造化ログを使用することで、運用時の問題解決が容易になります。
typescriptinterface LogEntry {
timestamp: string;
level: 'info' | 'warn' | 'error';
phase: 'preprocess' | 'inference' | 'postprocess';
message: string;
metadata?: Record<string, any>;
}
// 構造化ログ出力関数
function logStructured(
level: LogEntry['level'],
phase: LogEntry['phase'],
message: string,
metadata?: Record<string, any>
): void {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
phase,
message,
metadata,
};
// 標準エラー出力にJSON形式で出力
console.error(JSON.stringify(entry));
}
// 使用例
logStructured('info', 'preprocess', 'データ読み込み開始', {
filePath: 'input.csv',
recordCount: 1000,
});
JSON 形式のログは、ログ分析ツール(Elasticsearch、CloudWatch Logs など)で検索・集計しやすくなります。
テスト戦略
各フェーズを独立してテストすることで、品質を確保します。
typescriptimport { describe, it, expect } from '@jest/globals';
describe('前処理フェーズのテスト', () => {
it('正常なCSVデータからプロンプトを生成できる', async () => {
const testCsvPath = 'test/fixtures/valid_feedback.csv';
const records = await loadFeedbackCsv(testCsvPath);
const prompt = buildAnalysisPrompt(records);
expect(prompt).toContain('顧客フィードバックの分析');
expect(prompt).toContain('sentiment');
expect(records.length).toBeGreaterThan(0);
});
it('空のCSVファイルはエラーを投げる', async () => {
const testCsvPath = 'test/fixtures/empty.csv';
await expect(
loadFeedbackCsv(testCsvPath)
).rejects.toThrow('データが空です');
});
});
このようなユニットテストにより、各フェーズの動作を保証します。
モニタリングとアラート
パイプラインの稼働状況を監視するメトリクスを定義します。
| # | メトリクス | 説明 | アラート閾値 |
|---|---|---|---|
| 1 | 処理時間 | パイプライン全体の実行時間 | 平均の 2 倍超 |
| 2 | エラー率 | 全実行に対する失敗の割合 | 5% 超 |
| 3 | トークン消費量 | AI API の使用量 | 日次予算の 90% 超 |
| 4 | レコード処理数 | 処理したレコード数 | 期待値の 50% 未満 |
コスト最適化
AI API の利用コストを最適化する方法です。
typescript// トークン数を推定する関数(簡易版)
function estimateTokenCount(text: string): number {
// 英語:4文字≒1トークン、日本語:2文字≒1トークン
const englishChars =
text.match(/[a-zA-Z0-9\s]/g)?.length || 0;
const japaneseChars = text.length - englishChars;
return Math.ceil(englishChars / 4 + japaneseChars / 2);
}
// コスト推定を含む前処理
function buildPromptWithCostEstimate(
records: FeedbackRecord[]
): {
prompt: string;
estimatedCost: number;
} {
const prompt = buildAnalysisPrompt(records);
const tokenCount = estimateTokenCount(prompt);
// Gemini Proの料金(2025年1月時点の例):$0.00025/1Kトークン
const estimatedCost = (tokenCount / 1000) * 0.00025;
console.warn(
`推定トークン数: ${tokenCount}, 推定コスト: $${estimatedCost.toFixed(
4
)}`
);
return { prompt, estimatedCost };
}
このコードは、推論実行前にコストを推定し、予算超過を事前に防ぎます。
バージョン管理とドキュメント
パイプラインの変更履歴を追跡するため、以下の情報を記録します。
yaml# pipeline_config.yaml
version: 1.2.0
description: 顧客フィードバック分析パイプライン
changelog:
- version: 1.2.0
date: 2025-01-20
changes:
- バッチ処理機能を追加
- リトライロジックを改善
- version: 1.1.0
date: 2025-01-10
changes:
- 後処理でキーワード抽出を追加
phases:
preprocess:
script: src/feedback_preprocess.js
input_format: CSV
output_format: JSON
inference:
cli: gemini generate
options:
temperature: 0.1
max_tokens: 4096
postprocess:
script: src/feedback_postprocess.js
database: feedback_analysis
このような設定ファイルにより、パイプラインの仕様を一元管理できます。
セキュリティとコンプライアンス
AI パイプラインでは、以下のセキュリティ対策が必要です。
| # | 対策 | 実装方法 |
|---|---|---|
| 1 | API キーの保護 | 環境変数または秘密管理サービス(AWS Secrets Manager など) |
| 2 | 個人情報の匿名化 | 前処理フェーズで名前・メールアドレスをマスク |
| 3 | ログの機密情報除外 | ログ出力前に機密データをフィルタリング |
| 4 | アクセス制御 | データベースとファイルのパーミッション設定 |
ベストプラクティスを適用することで、保守性・信頼性・コスト効率の高い AI パイプラインを実現できます。
まとめ
本記事では、Gemini CLI を中核に据えた AI パイプラインの設計手法を、前処理・推論・後処理の 3 フェーズに分けて解説しました。
AI システムを本番運用する際には、推論部分だけでなく、データの前処理と結果の後処理を含めた一貫したワークフローを設計することが不可欠です。各フェーズを独立したモジュールとして構築することで、テスト容易性・保守性・再利用性が飛躍的に向上します。
Gemini CLI は、標準入出力に対応しているため、Unix パイプや既存のツールと自然に統合できる点が大きな強みです。シェルスクリプトによる統合から、TypeScript による型安全な実装まで、プロジェクトの要件に応じた柔軟な設計が可能になります。
顧客フィードバック分析の実践例で示したように、このパイプライン設計パターンは、さまざまな業務シナリオに適用できます。エラーハンドリング、リトライ機能、構造化ログ、コスト最適化といったベストプラクティスを適用することで、信頼性の高い AI システムを構築していきましょう。
AI 技術の進化は目覚ましいですが、それを実務で活用するためには、地道なエンジニアリングの積み重ねが必要です。本記事で紹介した設計パターンが、皆さんの AI プロジェクトの成功に貢献できれば幸いです。
関連リンク
articleGemini CLI のコスト監視ダッシュボード:呼び出し数・トークン・失敗率の可視化
articleGemini CLI を中核にした“AI パイプライン”設計:前処理 → 推論 → 後処理の標準化
articleGemini CLI コマンド&フラグ早見表:入出力・温度・安全設定の定番セット
articleGemini CLI を macOS で最短導入:Homebrew・PATH・シェル補完のベストプラクティス
articleGemini CLI と curl + REST の生産性比較:開発速度・再現性・保守性を計測
articleGemini CLI で JSON が壊れる問題を撲滅:出力拘束・スキーマ・再試行の実務
articleGemini CLI のコスト監視ダッシュボード:呼び出し数・トークン・失敗率の可視化
articleGrok アカウント作成から初回設定まで:5 分で完了するスターターガイド
articleFFmpeg コーデック地図 2025:H.264/H.265/AV1/VP9/ProRes/DNxHR の使いどころ
articleESLint の内部構造を覗く:Parser・Scope・Rule・Fixer の連携を図解
articlegpt-oss の量子化別ベンチ比較:INT8/FP16/FP8 の速度・品質トレードオフ
articleDify で実現する RAG 以外の戦略:ツール実行・関数呼び出し・自律エージェントの全体像
blogiPhone 17シリーズの発表!全モデルiPhone 16から進化したポイントを見やすく整理
blogGoogleストアから訂正案内!Pixel 10ポイント有効期限「1年」表示は誤りだった
blog【2025年8月】Googleストア「ストアポイント」は1年表記はミス?2年ルールとの整合性を検証
blogGoogleストアの注文キャンセルはなぜ起きる?Pixel 10購入前に知るべき注意点
blogPixcel 10シリーズの発表!全モデル Pixcel 9 から進化したポイントを見やすく整理
blogフロントエンドエンジニアの成長戦略:コーチングで最速スキルアップする方法
review今の自分に満足していますか?『持たざる者の逆襲 まだ何者でもない君へ』溝口勇児
reviewついに語られた業界の裏側!『フジテレビの正体』堀江貴文が描くテレビ局の本当の姿
review愛する勇気を持てば人生が変わる!『幸せになる勇気』岸見一郎・古賀史健のアドラー実践編で真の幸福を手に入れる
review週末を変えれば年収も変わる!『世界の一流は「休日」に何をしているのか』越川慎司の一流週末メソッド
review新しい自分に会いに行こう!『自分の変え方』村岡大樹の認知科学コーチングで人生リセット
review科学革命から AI 時代へ!『サピエンス全史 下巻』ユヴァル・ノア・ハラリが予見する人類の未来