T-CREATOR

Haystack とは?RAG 検索 × 生成 AI を実務投入するための完全入門【2025 年版】

Haystack とは?RAG 検索 × 生成 AI を実務投入するための完全入門【2025 年版】

生成 AI の活用が進む中、社内文書や製品マニュアルなど、独自データを活用した AI アプリケーションの需要が急増しています。その中核技術として注目されているのが RAG(Retrieval-Augmented Generation)です。

Haystack は、RAG システムの構築を強力に支援するオープンソースフレームワークであり、検索エンジンと生成 AI を組み合わせた高度な質問応答システムを実現できます。本記事では、Haystack の基礎から実務投入までのノウハウを、初心者の方にもわかりやすく解説いたします。

Haystack とは何か

Haystack は、deepset 社が開発した Python 製のオープンソースフレームワークで、検索拡張生成(RAG)を中心とした NLP(自然言語処理)パイプラインを構築するためのツールです。

2019 年のリリース以来、エンタープライズ領域での採用が進み、2025 年現在では v2.x 系が主流となっています。従来の検索システムでは「キーワードマッチング」が中心でしたが、Haystack では意味的な検索(セマンティック検索)と生成 AI を組み合わせることで、より自然で正確な回答を生成できるのが最大の特徴です。

Haystack の主要機能

Haystack が提供する主な機能を以下の表にまとめます。

#機能カテゴリ概要代表的なコンポーネント
1ドキュメント処理PDF、HTML、テキストファイルの読み込みと前処理FileConverter, PreProcessor
2ベクトル検索埋め込みベクトルを用いたセマンティック検索EmbeddingRetriever, DocumentStore
3キーワード検索従来型の全文検索機能BM25Retriever, ElasticsearchRetriever
4生成 AI 統合OpenAI、Hugging Face などのモデル連携PromptNode, Generator
5パイプライン構築検索から生成までの処理フローを柔軟に定義Pipeline, Component

Haystack が解決する課題

従来の検索システムや AI チャットボットでは、以下のような課題がありました。

  • 情報の鮮度: 学習済みモデルは最新情報を持たない
  • ドメイン特化性: 汎用モデルでは専門分野の質問に答えられない
  • ハルシネーション: 生成 AI が事実と異なる内容を生成してしまう
  • 検索精度: キーワードマッチだけでは意図を汲み取れない

Haystack はこれらの課題に対し、外部知識ベースと生成 AI を組み合わせることで、正確かつ最新の情報に基づいた回答を提供します。

背景

RAG(Retrieval-Augmented Generation)の登場

2020 年に Meta AI(旧 Facebook AI)が発表した RAG は、生成 AI の弱点を補完する画期的なアプローチとして注目されました。

RAG の基本的なアイデアは「生成前に関連情報を検索し、その情報をコンテキストとして与える」というシンプルなものですが、これにより以下のメリットが得られます。

  • 最新情報への対応: 外部データベースから情報を取得するため、リアルタイム性が確保されます
  • 根拠の明示: 回答の根拠となる文書を提示でき、信頼性が向上します
  • ハルシネーション削減: 実際のデータに基づいて生成するため、誤情報の生成を抑制できます

RAG の仕組みを図で表すと以下のようになります。

mermaidflowchart TB
    user["ユーザー"] -->|質問| query["クエリ処理"]
    query -->|ベクトル化| retriever["検索エンジン<br/>(Retriever)"]
    retriever -->|関連文書取得| store[("ドキュメント<br/>ストア")]
    retriever -->|Top-k 文書| generator["生成 AI<br/>(Generator)"]
    query -->|元の質問| generator
    generator -->|コンテキスト付き<br/>プロンプト| llm["大規模言語<br/>モデル"]
    llm -->|回答生成| user

この図から、RAG が「検索」と「生成」の 2 段階で動作することがわかります。

Haystack が選ばれる理由

RAG を実装するフレームワークは複数存在しますが、Haystack が特に支持される理由は以下の通りです。

#特徴詳細競合との違い
1モジュール性コンポーネントを自由に組み合わせられるLangChain より型安全で堅牢
2エンタープライズ対応大規模データに対応した設計商用利用を前提とした安定性
3豊富な統合Elasticsearch、OpenSearch、Pinecone など多様な検索エンジンに対応プラグイン形式で拡張可能
4評価機能パイプラインの精度を定量的に測定できる継続的な改善が可能

特に、型システムを活用した堅牢な設計は、プロダクション環境での運用において大きなアドバンテージとなります。

2025 年版 Haystack の進化

Haystack v2.x では、以下の大きな変更が加えられました。

  • パイプラインの再設計: より直感的で柔軟な構文
  • 非同期処理の強化: 大量リクエストへの対応
  • マルチモーダル対応: テキスト以外の画像や音声データにも対応
  • 観察可能性の向上: ログとトレーシングの充実

これらの改善により、実務での利用がさらに容易になりました。

課題

従来の検索システムの限界

キーワードベースの検索では、以下のような問題が頻発します。

問題 1: 同義語や言い換えへの対応不足

例えば、「車の修理方法」と「自動車の整備手順」は同じ意味ですが、キーワード検索では別物として扱われてしまいます。

問題 2: 文脈の理解不足

「Python」というキーワードが、プログラミング言語なのか、蛇なのか、判別できません。

問題 3: 回答生成の欠如

検索結果は文書リストとして返されるだけで、ユーザーは複数の文書を読み込んで自分で答えを組み立てる必要がありました。

生成 AI 単体の課題

一方、ChatGPT のような生成 AI を単体で使う場合も問題があります。

mermaidstateDiagram-v2
    [*] --> question: ユーザーが質問
    question --> generate: モデルが回答生成
    generate --> hallucination: 学習データに<br/>情報がない
    generate --> outdated: 学習時期より<br/>新しい情報
    generate --> correct: 学習済み情報で<br/>正確に回答
    hallucination --> incorrect: 誤った情報を<br/>生成
    outdated --> incorrect
    incorrect --> [*]: 信頼性低下
    correct --> [*]: 信頼性確保

上図のように、生成 AI 単体では学習データの範囲外の質問に対して、誤った情報を生成するリスクがあります。

RAG 実装の技術的課題

RAG を自前で実装しようとすると、以下の技術的ハードルが存在します。

#課題具体例影響
1埋め込みモデルの選定どのモデルが最適か判断が難しい検索精度に直結
2ベクトルストアの構築インフラの準備と運用コスト導入障壁が高い
3チャンク分割の最適化文書をどう分割するか検索ヒット率に影響
4プロンプトエンジニアリング適切なプロンプト設計が必要回答品質を左右
5パフォーマンス最適化レスポンスタイムの確保ユーザー体験に影響

これらの課題を解決するために、Haystack のような統合フレームワークが必要とされているのです。

解決策

Haystack による RAG の実装アプローチ

Haystack は、RAG システムの構築に必要なすべてのコンポーネントを提供し、以下の流れで実装を進められます。

mermaidflowchart LR
    step1["1. データ準備"] --> step2["2. ドキュメント<br/>ストア構築"]
    step2 --> step3["3. Retriever<br/>設定"]
    step3 --> step4["4. Generator<br/>設定"]
    step4 --> step5["5. パイプライン<br/>構築"]
    step5 --> step6["6. 評価・改善"]

各ステップについて、具体的なコードと共に解説していきます。

アーキテクチャの全体像

Haystack を用いた RAG システムの全体構成は以下のようになります。

mermaidflowchart TB
    subgraph input["入力層"]
        user_query["ユーザークエリ"]
        documents["元文書<br/>(PDF/HTML/TXT)"]
    end

    subgraph processing["処理層"]
        converter["FileConverter<br/>ファイル読込"]
        preprocessor["PreProcessor<br/>チャンク分割"]
        embedder["Embedder<br/>ベクトル化"]
    end

    subgraph storage["ストレージ層"]
        docstore[("DocumentStore<br/>Elasticsearch/Qdrant")]
    end

    subgraph retrieval["検索層"]
        retriever["Retriever<br/>類似度検索"]
    end

    subgraph generation["生成層"]
        prompt["PromptBuilder<br/>プロンプト構築"]
        llm["Generator<br/>OpenAI/HF"]
    end

    documents --> converter
    converter --> preprocessor
    preprocessor --> embedder
    embedder --> docstore
    user_query --> retriever
    docstore --> retriever
    retriever --> prompt
    user_query --> prompt
    prompt --> llm
    llm --> answer["回答"]

この図から、Haystack が複数のコンポーネントを連携させていることがわかります。

環境構築とインストール

まず、Haystack を利用するための環境を準備しましょう。

必要な環境

  • Python 3.8 以上(推奨は 3.10 以上)
  • pip または Yarn(パッケージ管理)
  • 4GB 以上のメモリ

基本パッケージのインストール

以下のコマンドで Haystack をインストールします。

bash# 仮想環境の作成(推奨)
python -m venv haystack-env
source haystack-env/bin/activate  # Windows: haystack-env\Scripts\activate

# Haystack のインストール
pip install haystack-ai

このコマンドで、Haystack の最新版がインストールされます。

追加コンポーネントのインストール

用途に応じて、以下のパッケージも追加します。

bash# OpenAI を使う場合
pip install haystack-ai[openai]

# Elasticsearch をドキュメントストアとして使う場合
pip install haystack-ai[elasticsearch]

# PDF ファイルを扱う場合
pip install haystack-ai[pdf]

必要な機能に応じて、ブラケット内に追加モジュール名を指定することで、関連パッケージが一括インストールされます。

バージョン確認

インストールが完了したら、バージョンを確認しましょう。

python# version_check.py
import haystack

# Haystack のバージョンを表示
print(f"Haystack version: {haystack.__version__}")

このスクリプトを実行して、v2.x 系が表示されれば準備完了です。

ドキュメントストアの設定

ドキュメントストアは、文書とその埋め込みベクトルを保存するデータベースです。Haystack では複数の選択肢が用意されています。

インメモリストアの利用(開発・テスト用)

まずは、最もシンプルなインメモリストアから始めましょう。

python# document_store_setup.py
from haystack.document_stores.in_memory import InMemoryDocumentStore

# インメモリドキュメントストアの初期化
document_store = InMemoryDocumentStore()

print("インメモリドキュメントストアが初期化されました")

インメモリストアは、データをメモリ上に保持するため、高速ですが永続化されません。開発段階での動作確認に最適です。

Elasticsearch の利用(本番環境推奨)

本番環境では、Elasticsearch のような永続化されたストアを使用します。

python# elasticsearch_store.py
from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore

# Elasticsearch ドキュメントストアの設定
document_store = ElasticsearchDocumentStore(
    host="localhost",  # Elasticsearch サーバーのアドレス
    port=9200,         # デフォルトポート
    index="documents", # インデックス名
    embedding_dim=768  # 埋め込みベクトルの次元数
)

print("Elasticsearch ドキュメントストアが構築されました")

この設定により、文書データが Elasticsearch に永続化され、システム再起動後もデータが保持されます。

Docker で Elasticsearch を起動

Elasticsearch を簡単に起動するには、Docker を使うと便利です。

yaml# docker-compose.yml
version: '3.8'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: haystack-elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - 'ES_JAVA_OPTS=-Xms512m -Xmx512m'
    ports:
      - '9200:9200'
    volumes:
      - es-data:/usr/share/elasticsearch/data

volumes:
  es-data:
    driver: local

この設定ファイルにより、開発環境に適した Elasticsearch が起動します。

bash# Elasticsearch の起動
docker-compose up -d

# 起動確認
curl http://localhost:9200

正常に起動すれば、JSON 形式でクラスター情報が返されます。

文書の読み込みと前処理

次に、PDF やテキストファイルなどの文書を読み込み、検索可能な形式に変換します。

ファイルコンバーターの使用

まず、ファイルを読み込むためのコンバーターを設定します。

python# file_converter.py
from haystack.components.converters import PDFToDocument, TextFileToDocument
from pathlib import Path

# PDF ファイルの読み込み
pdf_converter = PDFToDocument()

# テキストファイルの読み込み
txt_converter = TextFileToDocument()

# ファイルパスを指定
file_paths = [
    Path("./data/manual_001.pdf"),
    Path("./data/faq_002.txt")
]

コンバーターは、ファイル形式に応じて適切なパーサーを自動選択します。

文書の変換実行

実際にファイルを Document オブジェクトに変換しましょう。

python# convert_documents.py
from haystack import Document

# PDF ファイルを変換
documents = []
for file_path in file_paths:
    if file_path.suffix == ".pdf":
        result = pdf_converter.run(sources=[file_path])
        documents.extend(result["documents"])
    elif file_path.suffix == ".txt":
        result = txt_converter.run(sources=[file_path])
        documents.extend(result["documents"])

print(f"{len(documents)} 件の文書が読み込まれました")

この処理により、各ファイルが Haystack の Document 形式に変換されます。

PreProcessor によるチャンク分割

長い文書は、検索精度を高めるために適切なサイズに分割する必要があります。

python# preprocessor_setup.py
from haystack.components.preprocessors import DocumentSplitter

# ドキュメント分割器の設定
splitter = DocumentSplitter(
    split_by="word",        # 単語単位で分割
    split_length=200,       # 1チャンクあたり200単語
    split_overlap=50,       # 隣接チャンク間で50単語重複
    split_threshold=0       # 最小チャンクサイズ
)

print("PreProcessor が設定されました")

split_overlap を設定することで、チャンクの境界で文脈が途切れるのを防ぎます。

チャンク分割の実行

設定した PreProcessor を使って、文書を分割しましょう。

python# split_documents.py

# 文書を分割
result = splitter.run(documents=documents)
split_documents = result["documents"]

print(f"{len(split_documents)} 個のチャンクに分割されました")

# 最初のチャンクを表示
if split_documents:
    print("\n--- サンプルチャンク ---")
    print(split_documents[0].content[:200])

この処理により、長文が検索に適したサイズのチャンクに分割されます。

埋め込みベクトルの生成

セマンティック検索を実現するために、テキストを埋め込みベクトルに変換します。

Embedder の設定

OpenAI の埋め込みモデルを使用する例です。

python# embedder_setup.py
import os
from haystack.components.embedders import OpenAIDocumentEmbedder

# 環境変数から API キーを取得
os.environ["OPENAI_API_KEY"] = "your-api-key-here"

# Document Embedder の初期化
doc_embedder = OpenAIDocumentEmbedder(
    model="text-embedding-3-small",  # 埋め込みモデル
    dimensions=768                    # ベクトル次元数
)

print("Document Embedder が初期化されました")

text-embedding-3-small は、コストパフォーマンスに優れた埋め込みモデルです。

埋め込みベクトルの生成と保存

文書に埋め込みベクトルを追加し、ドキュメントストアに保存します。

python# embed_and_store.py

# 文書に埋め込みを追加
result = doc_embedder.run(documents=split_documents)
embedded_documents = result["documents"]

# ドキュメントストアに保存
document_store.write_documents(embedded_documents)

print(f"{len(embedded_documents)} 件の文書が保存されました")

これで、ベクトル検索の準備が整いました。

Retriever の構築

検索を実行するための Retriever コンポーネントを設定します。

セマンティック検索用 Retriever

埋め込みベクトルを用いた類似度検索を行う Retriever です。

python# retriever_setup.py
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.embedders import OpenAITextEmbedder

# クエリ用 Embedder(ユーザーの質問をベクトル化)
query_embedder = OpenAITextEmbedder(
    model="text-embedding-3-small"
)

# Embedding Retriever の設定
retriever = InMemoryEmbeddingRetriever(
    document_store=document_store,
    top_k=3  # 上位3件の文書を取得
)

print("Retriever が構築されました")

top_k パラメータで、検索結果として返す文書の数を制御できます。

ハイブリッド検索の実装

セマンティック検索とキーワード検索を組み合わせることで、より高精度な検索が可能です。

python# hybrid_retriever.py
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever

# BM25 Retriever(キーワードベース検索)
bm25_retriever = InMemoryBM25Retriever(
    document_store=document_store,
    top_k=3
)

print("ハイブリッド検索用の Retriever が準備されました")

BM25 は、従来型の全文検索アルゴリズムで、キーワードマッチに優れています。

Generator の設定

検索結果を基に、自然な回答を生成する Generator を設定します。

OpenAI Generator の初期化

OpenAI の GPT モデルを使用する場合の設定です。

python# generator_setup.py
from haystack.components.generators import OpenAIGenerator

# OpenAI Generator の初期化
generator = OpenAIGenerator(
    model="gpt-4o-mini",     # 使用するモデル
    generation_kwargs={
        "temperature": 0.3,  # 生成のランダム性(低いほど安定)
        "max_tokens": 500    # 最大トークン数
    }
)

print("Generator が初期化されました")

temperature を低く設定することで、事実に忠実な回答が生成されやすくなります。

プロンプトテンプレートの作成

検索結果を効果的に活用するためのプロンプトを設計します。

python# prompt_template.py
from haystack.components.builders import PromptBuilder

# プロンプトテンプレートの定義
template = """
以下のコンテキスト情報を参考にして、質問に答えてください。
コンテキストに情報がない場合は、「情報が見つかりませんでした」と答えてください。

コンテキスト:
{% for doc in documents %}
{{ doc.content }}
---
{% endfor %}

質問: {{ query }}

回答:
"""

# PromptBuilder の作成
prompt_builder = PromptBuilder(template=template)

print("プロンプトテンプレートが作成されました")

Jinja2 テンプレートを使用することで、動的にコンテキストを挿入できます。

パイプラインの構築

これまで設定したコンポーネントを組み合わせて、RAG パイプラインを構築します。

パイプラインの定義

各コンポーネントを接続し、データの流れを定義します。

python# pipeline_setup.py
from haystack import Pipeline

# パイプラインの初期化
rag_pipeline = Pipeline()

# コンポーネントの追加
rag_pipeline.add_component("query_embedder", query_embedder)
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("generator", generator)

print("パイプラインが初期化されました")

各コンポーネントに名前を付けることで、後で接続先を指定できます。

コンポーネント間の接続

データの流れを定義するために、コンポーネントを接続します。

python# pipeline_connections.py

# コンポーネント間の接続を定義
rag_pipeline.connect("query_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder.prompt", "generator.prompt")

print("パイプラインの接続が完了しました")

この接続により、クエリ → 埋め込み → 検索 → プロンプト構築 → 生成という流れが自動化されます。

パイプラインの実行

実際にユーザーの質問に対して回答を生成してみましょう。

python# run_pipeline.py

# 質問を入力
query = "製品の保証期間は何年ですか?"

# パイプラインを実行
result = rag_pipeline.run({
    "query_embedder": {"text": query},
    "prompt_builder": {"query": query}
})

# 回答を取得
answer = result["generator"]["replies"][0]
print(f"質問: {query}")
print(f"回答: {answer}")

このコードにより、質問に対する回答が自動生成されます。

パフォーマンスの最適化

実務環境では、レスポンス速度とコストの最適化が重要です。

キャッシュの活用

同じ質問に対しては、キャッシュを使って高速化できます。

python# cache_setup.py
from haystack.components.caching import CacheChecker
from haystack.document_stores.in_memory import InMemoryDocumentStore

# キャッシュストアの作成
cache_store = InMemoryDocumentStore()

# CacheChecker の設定
cache_checker = CacheChecker(document_store=cache_store)

print("キャッシュ機能が有効化されました")

頻繁に聞かれる質問については、生成コストを削減できます。

バッチ処理の実装

複数の質問を一度に処理することで、効率を向上させます。

python# batch_processing.py

# 複数の質問をリストで定義
queries = [
    "保証期間について教えてください",
    "返品ポリシーはどうなっていますか",
    "配送にかかる日数は?"
]

# バッチ処理
results = []
for query in queries:
    result = rag_pipeline.run({
        "query_embedder": {"text": query},
        "prompt_builder": {"query": query}
    })
    results.append(result["generator"]["replies"][0])

# 結果を表示
for q, a in zip(queries, results):
    print(f"Q: {q}\nA: {a}\n---")

この方法で、複数の質問を効率的に処理できます。

具体例

ユースケース 1: 社内 FAQ システム

企業の社内向け FAQ システムを構築する例を見ていきましょう。

データの準備

まず、FAQ データを CSV 形式で用意します。

csv# faq_data.csv
質問,回答
有給休暇は何日ありますか,年間20日の有給休暇が付与されます。入社半年後から取得可能です。
リモートワークは可能ですか,週3日まで在宅勤務が可能です。事前に上長の承認が必要です。
健康診断はいつ受けられますか,年1回、毎年6月に実施しています。受診日は人事部から通知されます。

このような形式で、よくある質問とその回答をまとめます。

CSV データの読み込み

CSV ファイルを Haystack の Document 形式に変換します。

python# load_faq_csv.py
import csv
from haystack import Document

# CSV ファイルを読み込み
documents = []
with open("faq_data.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        # 質問と回答を結合してコンテンツにする
        content = f"質問: {row['質問']}\n回答: {row['回答']}"
        doc = Document(content=content, meta={"category": "FAQ"})
        documents.append(doc)

print(f"{len(documents)} 件の FAQ が読み込まれました")

メタデータに category を追加することで、後でフィルタリングが可能になります。

エンドツーエンドのパイプライン構築

FAQ システム用の完全なパイプラインを構築します。

python# faq_pipeline_full.py
from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator

# ドキュメントストアの初期化
doc_store = InMemoryDocumentStore()

# 埋め込みを追加
embedder = OpenAIDocumentEmbedder(model="text-embedding-3-small")
embedded_docs = embedder.run(documents=documents)["documents"]
doc_store.write_documents(embedded_docs)

print("FAQ データがストアに保存されました")

これで、FAQ データが検索可能な状態になりました。

パイプラインコンポーネントの組み立て

検索から生成までの一連の流れを構築します。

python# assemble_faq_pipeline.py

# コンポーネントの初期化
query_embedder = OpenAITextEmbedder(model="text-embedding-3-small")
retriever = InMemoryEmbeddingRetriever(document_store=doc_store, top_k=2)

# プロンプトテンプレート
template = """
あなたは社内FAQ の回答アシスタントです。
以下の情報を参考にして、簡潔に答えてください。

{% for doc in documents %}
{{ doc.content }}
---
{% endfor %}

質問: {{ query }}
回答:
"""
prompt_builder = PromptBuilder(template=template)
generator = OpenAIGenerator(model="gpt-4o-mini")

# パイプラインの構築
faq_pipeline = Pipeline()
faq_pipeline.add_component("embedder", query_embedder)
faq_pipeline.add_component("retriever", retriever)
faq_pipeline.add_component("prompt", prompt_builder)
faq_pipeline.add_component("generator", generator)

# 接続
faq_pipeline.connect("embedder.embedding", "retriever.query_embedding")
faq_pipeline.connect("retriever.documents", "prompt.documents")
faq_pipeline.connect("prompt.prompt", "generator.prompt")

print("FAQ パイプラインが構築されました")

これで、質問応答が可能なシステムが完成しました。

実行とテスト

実際に質問を投げて、動作を確認しましょう。

python# test_faq_system.py

# テスト質問
test_query = "在宅勤務のルールについて教えて"

# パイプライン実行
response = faq_pipeline.run({
    "embedder": {"text": test_query},
    "prompt": {"query": test_query}
})

# 結果表示
print(f"質問: {test_query}")
print(f"回答: {response['generator']['replies'][0]}")

適切な FAQ が検索され、自然な回答が生成されることを確認できます。

ユースケース 2: 技術文書の検索システム

次に、製品マニュアルや技術ドキュメントを検索するシステムを構築します。

PDF マニュアルの読み込み

複数の PDF ファイルを一括で処理します。

python# load_pdf_manuals.py
from haystack.components.converters import PDFToDocument
from pathlib import Path
import os

# PDF ファイルが格納されているディレクトリ
pdf_dir = Path("./manuals")

# PDF コンバーターの初期化
converter = PDFToDocument()

# 全 PDF ファイルを読み込み
pdf_files = list(pdf_dir.glob("*.pdf"))
all_documents = []

for pdf_file in pdf_files:
    result = converter.run(sources=[pdf_file])
    # ファイル名をメタデータに追加
    for doc in result["documents"]:
        doc.meta["source_file"] = pdf_file.name
    all_documents.extend(result["documents"])

print(f"{len(all_documents)} ページが読み込まれました")

ファイル名をメタデータに保存することで、回答の出典を明示できます。

チャンク分割と埋め込み

長いマニュアルを適切なサイズに分割します。

python# chunk_and_embed_manuals.py
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.embedders import OpenAIDocumentEmbedder

# チャンク分割(技術文書用に調整)
splitter = DocumentSplitter(
    split_by="sentence",    # 文単位で分割
    split_length=10,        # 10文ごと
    split_overlap=2,        # 2文重複
)

split_docs = splitter.run(documents=all_documents)["documents"]

# 埋め込み生成
embedder = OpenAIDocumentEmbedder(model="text-embedding-3-small")
embedded_docs = embedder.run(documents=split_docs)["documents"]

print(f"{len(embedded_docs)} チャンクが埋め込まれました")

技術文書では、文脈を保つために文単位での分割が効果的です。

メタデータフィルタリング

特定の製品やカテゴリに絞り込んで検索できるようにします。

python# filtered_retrieval.py
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever

# フィルタ付き Retriever
filtered_retriever = InMemoryEmbeddingRetriever(
    document_store=doc_store,
    top_k=5,
    filters={
        "field": "source_file",
        "operator": "==",
        "value": "product_A_manual.pdf"
    }
)

print("フィルタリング機能が追加されました")

これにより、特定のマニュアルのみを対象とした検索が可能になります。

ユースケース 3: カスタマーサポートチャットボット

顧客からの問い合わせに自動応答するシステムを構築します。

会話履歴の管理

複数回のやり取りを考慮したシステムにするため、会話履歴を管理します。

python# conversation_manager.py
from typing import List, Dict

class ConversationManager:
    """会話履歴を管理するクラス"""

    def __init__(self):
        self.history: List[Dict[str, str]] = []

    def add_turn(self, user_message: str, bot_response: str):
        """会話のターンを追加"""
        self.history.append({
            "user": user_message,
            "bot": bot_response
        })

    def get_context(self, last_n: int = 3) -> str:
        """直近 n ターンの会話履歴を文字列として取得"""
        recent = self.history[-last_n:] if len(self.history) > last_n else self.history
        context = ""
        for turn in recent:
            context += f"User: {turn['user']}\nBot: {turn['bot']}\n"
        return context

# マネージャーの初期化
conv_manager = ConversationManager()
print("会話履歴マネージャーが初期化されました")

会話履歴を保持することで、文脈を考慮した応答が可能になります。

会話履歴を含むプロンプト

会話の文脈を考慮したプロンプトテンプレートを作成します。

python# conversational_prompt.py
from haystack.components.builders import PromptBuilder

# 会話型プロンプトテンプレート
conv_template = """
あなたはカスタマーサポートのアシスタントです。
以下の製品情報と会話履歴を参考にして、顧客の質問に答えてください。

製品情報:
{% for doc in documents %}
{{ doc.content }}
---
{% endfor %}

会話履歴:
{{ conversation_history }}

顧客: {{ query }}
アシスタント:
"""

conv_prompt_builder = PromptBuilder(template=conv_template)
print("会話型プロンプトが作成されました")

会話履歴を含めることで、代名詞や省略された情報を正しく解釈できます。

会話型パイプラインの実行

実際に複数ターンの会話をシミュレートします。

python# run_conversation.py

def chat(query: str, pipeline, conv_manager):
    """会話を実行する関数"""
    # 会話履歴を取得
    history = conv_manager.get_context()

    # パイプライン実行
    result = pipeline.run({
        "embedder": {"text": query},
        "prompt": {
            "query": query,
            "conversation_history": history
        }
    })

    # 回答を取得
    response = result["generator"]["replies"][0]

    # 会話履歴に追加
    conv_manager.add_turn(query, response)

    return response

# 会話の実行
print("Bot: こんにちは!何かお困りですか?")
user_input = "配送状況を確認したいです"
bot_reply = chat(user_input, faq_pipeline, conv_manager)
print(f"User: {user_input}")
print(f"Bot: {bot_reply}\n")

user_input = "それはいつ届きますか?"  # 代名詞を使った質問
bot_reply = chat(user_input, faq_pipeline, conv_manager)
print(f"User: {user_input}")
print(f"Bot: {bot_reply}")

このように、会話の流れを踏まえた自然な応答が実現されます。

エラーハンドリングとロバスト性の向上

実務では、様々なエラーケースに対応する必要があります。

検索結果が見つからない場合の処理

関連文書が見つからない場合の対応を実装します。

python# error_handling.py
from haystack import Pipeline

def safe_query(query: str, pipeline: Pipeline, min_score: float = 0.5):
    """スコアが低い場合はフォールバック応答を返す"""
    try:
        result = pipeline.run({
            "embedder": {"text": query},
            "prompt": {"query": query}
        })

        # 検索スコアを確認
        docs = result.get("retriever", {}).get("documents", [])
        if not docs or (docs[0].score < min_score):
            return "申し訳ございません。お問い合わせの内容に関する情報が見つかりませんでした。担当者におつなぎいたします。"

        return result["generator"]["replies"][0]

    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return "システムエラーが発生しました。しばらくしてから再度お試しください。"

print("エラーハンドリング機能が実装されました")

適切なフォールバック処理により、ユーザー体験が向上します。

レート制限への対応

API のレート制限に対応するため、リトライ機能を実装します。

python# rate_limit_handling.py
import time
from typing import Any, Dict

def query_with_retry(query: str, pipeline: Pipeline, max_retries: int = 3):
    """リトライ機能付きクエリ実行"""
    for attempt in range(max_retries):
        try:
            result = pipeline.run({
                "embedder": {"text": query},
                "prompt": {"query": query}
            })
            return result["generator"]["replies"][0]

        except Exception as e:
            if "rate limit" in str(e).lower() and attempt < max_retries - 1:
                wait_time = 2 ** attempt  # 指数バックオフ
                print(f"レート制限に達しました。{wait_time}秒待機します...")
                time.sleep(wait_time)
            else:
                raise e

    return "リクエストを完了できませんでした。時間をおいて再試行してください。"

print("リトライ機能が実装されました")

指数バックオフにより、レート制限を回避しながら処理を続行できます。

評価と改善

RAG システムの品質を継続的に向上させるためには、定量的な評価が不可欠です。

評価指標の設定

Haystack では、以下の指標でシステムを評価できます。

#指標名説明計算方法
1Retrieval Precision取得した文書のうち関連文書の割合関連文書数 / 取得文書数
2Retrieval Recall全関連文書のうち取得できた割合取得した関連文書数 / 全関連文書数
3MRR (Mean Reciprocal Rank)最初の正解文書の順位の逆数の平均1 / 正解文書の順位
4Answer Accuracy生成された回答の正確性人手評価または自動評価

評価データセットの準備

評価用のテストセットを用意します。

python# evaluation_dataset.py
from haystack import Document

# 評価用データセット
eval_dataset = [
    {
        "query": "製品の保証期間は?",
        "relevant_doc_ids": ["doc_001", "doc_002"],
        "expected_answer": "1年間"
    },
    {
        "query": "返品は可能ですか?",
        "relevant_doc_ids": ["doc_010"],
        "expected_answer": "購入後30日以内であれば可能です"
    },
    # ... 他のテストケース
]

print(f"{len(eval_dataset)} 件の評価データが準備されました")

テストケースには、質問、正解文書 ID、期待される回答を含めます。

Retriever の評価

検索精度を測定します。

python# evaluate_retriever.py
from haystack.evaluation import RecallEvaluator, PrecisionEvaluator

# Recall Evaluator の初期化
recall_evaluator = RecallEvaluator()

# 評価の実行
recall_scores = []
for item in eval_dataset:
    # クエリを実行
    result = rag_pipeline.run({
        "embedder": {"text": item["query"]},
        "prompt": {"query": item["query"]}
    })

    # 取得した文書 ID を取得
    retrieved_docs = result["retriever"]["documents"]
    retrieved_ids = [doc.id for doc in retrieved_docs]

    # Recall を計算
    relevant_ids = set(item["relevant_doc_ids"])
    retrieved_ids_set = set(retrieved_ids)
    recall = len(relevant_ids & retrieved_ids_set) / len(relevant_ids)
    recall_scores.append(recall)

# 平均 Recall
avg_recall = sum(recall_scores) / len(recall_scores)
print(f"平均 Recall: {avg_recall:.2%}")

Recall が低い場合は、チャンク分割や埋め込みモデルの見直しが必要です。

回答品質の評価

生成された回答の品質を評価します。

python# evaluate_answers.py
from haystack.components.evaluators import LLMEvaluator

# LLM ベースの評価器
answer_evaluator = LLMEvaluator(
    instructions="""
    生成された回答を以下の基準で0-5点で評価してください:
    - 正確性: 事実と一致しているか
    - 完全性: 質問に十分答えているか
    - 簡潔性: 冗長でないか
    """,
    inputs=[("generated_answer", str), ("expected_answer", str)],
    outputs=["score", "explanation"]
)

print("回答評価器が初期化されました")

LLM を使った自動評価により、大量の回答を効率的に評価できます。

改善サイクルの実装

評価結果に基づいて、システムを改善します。

mermaidflowchart LR
    eval["評価実施"] --> analyze["結果分析"]
    analyze --> identify["課題特定"]
    identify --> improve["改善実施"]
    improve --> eval

    identify --> lowRecall["Recall 低下"]
    identify --> lowPrecision["Precision 低下"]
    identify --> poorAnswer["回答品質低下"]

    lowRecall --> adjustChunk["チャンク分割<br/>見直し"]
    lowPrecision --> adjustTopK["top_k パラメータ<br/>調整"]
    poorAnswer --> improvePrompt["プロンプト<br/>改善"]

この図のように、評価 → 分析 → 改善のサイクルを回すことで、継続的な品質向上が実現できます。

まとめ

本記事では、Haystack を用いた RAG システムの構築について、基礎から実務レベルまでを解説いたしました。

本記事で学んだこと

  • Haystack の基礎: RAG フレームワークとしての特徴と利点
  • 環境構築: インストールからドキュメントストアの設定まで
  • コンポーネント理解: Retriever、Generator、Pipeline の役割
  • 実装方法: FAQ システム、技術文書検索、チャットボットの構築
  • 評価と改善: 定量的な評価指標と改善サイクル

Haystack を選ぶべき理由

Haystack は、以下の点で優れた選択肢です。

  • エンタープライズ対応: 大規模データと高負荷に耐える設計
  • 柔軟性: 多様なコンポーネントを自由に組み合わせられる
  • 拡張性: カスタムコンポーネントの追加が容易
  • コミュニティ: 活発な開発とサポート体制

実務投入への道筋

RAG システムを実務に投入するためには、以下のステップを踏むことをお勧めします。

  1. 小規模な PoC から開始: まずは限定的なデータで動作を確認
  2. 評価基盤の構築: 品質を測定できる仕組みを早期に整備
  3. 段階的なスケールアップ: データ量とユーザー数を徐々に増やす
  4. 継続的な改善: 評価結果に基づいて定期的にチューニング
  5. 運用体制の整備: モニタリングとアラートの仕組みを構築

次のステップ

Haystack をさらに活用するために、以下のトピックにも取り組んでみてください。

  • マルチモーダル RAG: 画像や表を含む文書の処理
  • ファインチューニング: ドメイン特化の埋め込みモデル開発
  • ハイブリッド検索の最適化: セマンティック検索とキーワード検索の重み付け
  • スケーラビリティ: Kubernetes を使った分散デプロイ
  • セキュリティ: アクセス制御と個人情報保護

RAG 技術は急速に進化しており、Haystack もそれに合わせて継続的に改良されています。2025 年以降も、生成 AI を実務で活用する上で、RAG は中核技術であり続けるでしょう。

本記事が、皆様の RAG システム構築の一助となれば幸いです。

関連リンク

公式リソース

学習リソース

コミュニティ

技術スタック関連