T-CREATOR

Python 標準ライブラリが強い理由:pathlib・itertools・functools 活用レシピ 30 選

Python 標準ライブラリが強い理由:pathlib・itertools・functools 活用レシピ 30 選

Python の魅力の一つは、豊富で強力な標準ライブラリです。特にpathlibitertoolsfunctoolsは、コードを簡潔にし、パフォーマンスを向上させる優れたツールとして開発者に愛用されています。

これらのライブラリを使うことで、複雑な処理を数行で実現でき、可読性が高く保守しやすいコードが書けるようになります。本記事では、実際に業務で使える 30 個の実践的なレシピをご紹介しますね。

背景

現代の Python 開発において、標準ライブラリの活用は必須スキルとなっています。

mermaidflowchart TB
    traditional[従来の開発] --> modern[モダンなPython開発]

    traditional --> |問題点| prob1[冗長なコード]
    traditional --> |問題点| prob2[低いパフォーマンス]
    traditional --> |問題点| prob3[可読性の低さ]

    modern --> |解決| sol1[標準ライブラリの活用]
    sol1 --> benefit1[簡潔なコード]
    sol1 --> benefit2[高いパフォーマンス]
    sol1 --> benefit3[優れた可読性]

図で理解できる要点

  • 従来の手法では冗長で非効率なコードになりがち
  • 標準ライブラリを活用することで、コード品質が飛躍的に向上
  • pathlib、itertools、functools が現代 Python 開発の核となる

Python で Web アプリケーションを開発していると、ファイル操作やデータ処理、関数の最適化といった場面に頻繁に出会います。これらの処理を従来の方法で実装すると、長くて理解しにくいコードになってしまうことがありますね。

しかし、標準ライブラリの力を借りることで、驚くほどエレガントで効率的なコードが書けるのです。多くの開発者がこれらのライブラリの存在を知っていても、その真の力を引き出せていないのが現状でしょう。

課題

Python 開発者が直面する主な課題を整理してみましょう。

mermaidstateDiagram-v2
    [*] --> 開発開始
    開発開始 --> ファイル処理課題
    開発開始 --> データ処理課題
    開発開始 --> 関数設計課題

    ファイル処理課題 --> 複雑なパス操作
    ファイル処理課題 --> 大量ファイル処理
    ファイル処理課題 --> 例外処理の煩雑さ

    データ処理課題 --> 非効率な反復処理
    データ処理課題 --> メモリ不足
    データ処理課題 --> 組み合わせ処理の複雑さ

    関数設計課題 --> 重複処理の多発
    関数設計課題 --> キャッシュの未実装
    関数設計課題 --> コードの可読性低下

ファイル操作での課題

従来のos.pathを使った処理では、パス結合や拡張子の取得などが煩雑になります。Windows と macOS でのパス区切り文字の違いも頭痛の種ですね。

データ処理での課題

大量のデータを扱う際、メモリ効率を意識せずに処理すると、システムがクラッシュしてしまうことがあります。また、組み合わせや順列の生成も、自前で実装すると複雑になりがちです。

関数設計での課題

同じ処理を何度も実行する関数で、計算結果をキャッシュしていないと、パフォーマンスが大幅に低下します。デコレータの活用方法も、意外と知らない開発者が多いのではないでしょうか。

解決策

これらの課題を解決するのが、pathlibitertoolsfunctoolsの 3 つの標準ライブラリです。

mermaidflowchart LR
    課題 --> pathlib[pathlib ライブラリ]
    課題 --> itertools[itertools ライブラリ]
    課題 --> functools[functools ライブラリ]

    pathlib --> pathlib_benefit[ファイル操作の簡潔化]
    itertools --> itertools_benefit[データ処理の効率化]
    functools --> functools_benefit[関数設計の最適化]

    pathlib_benefit --> result[生産性向上]
    itertools_benefit --> result
    functools_benefit --> result

pathlib の威力

オブジェクト指向的なファイルパス操作により、直感的で読みやすいコードが書けます。プラットフォーム間の差異も自動的に吸収してくれるため、クロスプラットフォーム対応が簡単になりますね。

itertools の効率性

メモリ効率の良いイテレータを提供し、大量データの処理でもメモリ使用量を抑えることができます。複雑な組み合わせ処理も、関数一つで実現可能です。

functools の最適化

関数の合成やキャッシュ機能により、コードの再利用性とパフォーマンスが劇的に向上します。デコレータとしても活用でき、関数型プログラミングの恩恵を受けられるでしょう。

pathlib 活用レシピ集(10 選)

ファイル操作系レシピ

レシピ 1:スマートなファイル作成

従来の方法では複数行必要だったファイル作成を、pathlib なら一行で実現できます。

pythonfrom pathlib import Path

# ディレクトリも同時に作成
file_path = Path("data/output/result.txt")
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text("処理結果データ", encoding="utf-8")

parents=Trueでディレクトリの階層も自動作成でき、exist_ok=Trueで既存ディレクトリのエラーも回避できますね。

レシピ 2:安全なファイル読み込み

ファイルの存在確認と読み込みを効率的に行います。

pythonfrom pathlib import Path

def safe_read_file(file_path_str):
    file_path = Path(file_path_str)

    if not file_path.exists():
        return None

    if not file_path.is_file():
        return None

    return file_path.read_text(encoding="utf-8")

# 使用例
content = safe_read_file("config/settings.json")
if content:
    print("ファイル読み込み成功")

エラーハンドリングも含めて、堅牢なファイル操作が実現できます。

レシピ 3:ファイルのバックアップ作成

元ファイルを保持したまま、バックアップファイルを作成する処理です。

pythonfrom pathlib import Path
import shutil
from datetime import datetime

def create_backup(file_path_str):
    original_path = Path(file_path_str)

    if not original_path.exists():
        return False

    # タイムスタンプ付きバックアップファイル名
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = original_path.with_name(
        f"{original_path.stem}_{timestamp}{original_path.suffix}"
    )

    shutil.copy2(original_path, backup_path)
    return backup_path

# 使用例
backup_file = create_backup("important_data.csv")
print(f"バックアップ作成: {backup_file}")

with_name()メソッドを使うことで、ファイル名の変更が簡単にできますね。

パス操作系レシピ

レシピ 4:プラットフォーム間対応パス結合

Windows、macOS、Linux で動作するパス結合処理です。

pythonfrom pathlib import Path

# 従来の問題のあるコード
# path = "data" + os.sep + "images" + os.sep + "photo.jpg"

# pathlibを使った正しい実装
base_dir = Path("data")
image_dir = base_dir / "images"
photo_path = image_dir / "photo.jpg"

print(f"写真パス: {photo_path}")
# Windows: data\images\photo.jpg
# macOS/Linux: data/images/photo.jpg

​/​ 演算子でパス結合ができ、プラットフォームの違いを意識する必要がありません。

レシピ 5:相対パスから絶対パスへの変換

現在のディレクトリを基準とした絶対パス変換処理です。

pythonfrom pathlib import Path

def get_absolute_path(relative_path_str):
    relative_path = Path(relative_path_str)

    # 現在のワーキングディレクトリ基準
    absolute_path = relative_path.resolve()

    return absolute_path

# 使用例
relative = "../config/database.yaml"
absolute = get_absolute_path(relative)
print(f"絶対パス: {absolute}")

resolve()メソッドにより、シンボリックリンクの解決も同時に行われます。

レシピ 6:ファイル拡張子の一括変換

複数ファイルの拡張子を効率的に変換する処理です。

pythonfrom pathlib import Path

def change_extensions(directory_path, old_ext, new_ext):
    dir_path = Path(directory_path)
    changed_files = []

    for file_path in dir_path.glob(f"*.{old_ext}"):
        new_file_path = file_path.with_suffix(f".{new_ext}")
        file_path.rename(new_file_path)
        changed_files.append(new_file_path)

    return changed_files

# 使用例:.txtファイルを.mdに変換
changed = change_extensions("./documents", "txt", "md")
print(f"変換完了: {len(changed)}件")

with_suffix()で拡張子を変更し、rename()でファイルを移動できますね。

検索・フィルタリング系レシピ

レシピ 7:条件付きファイル検索

特定の条件に合致するファイルを効率的に検索します。

pythonfrom pathlib import Path
from datetime import datetime, timedelta

def find_recent_files(directory_path, days=7, extensions=None):
    dir_path = Path(directory_path)
    cutoff_date = datetime.now() - timedelta(days=days)
    recent_files = []

    # 検索パターンの設定
    patterns = extensions if extensions else ["*"]

    for pattern in patterns:
        for file_path in dir_path.rglob(pattern):
            if file_path.is_file():
                # ファイル更新時刻の確認
                modified_time = datetime.fromtimestamp(file_path.stat().st_mtime)
                if modified_time > cutoff_date:
                    recent_files.append(file_path)

    return recent_files

# 使用例:過去3日間の.pyファイル検索
recent_python_files = find_recent_files("./src", days=3, extensions=["*.py"])
print(f"最近更新されたPythonファイル: {len(recent_python_files)}件")

rglob()でサブディレクトリも含めた再帰検索ができ、stat()でファイル情報を取得できます。

レシピ 8:ファイルサイズ別フィルタリング

ファイルサイズを条件にしたフィルタリング処理です。

pythonfrom pathlib import Path

def filter_files_by_size(directory_path, min_size_mb=None, max_size_mb=None):
    dir_path = Path(directory_path)
    filtered_files = []

    for file_path in dir_path.rglob("*"):
        if file_path.is_file():
            size_mb = file_path.stat().st_size / (1024 * 1024)  # MB変換

            # サイズ条件のチェック
            if min_size_mb and size_mb < min_size_mb:
                continue
            if max_size_mb and size_mb > max_size_mb:
                continue

            filtered_files.append({
                'path': file_path,
                'size_mb': round(size_mb, 2)
            })

    return filtered_files

# 使用例:1MB以上10MB以下のファイル検索
large_files = filter_files_by_size("./data", min_size_mb=1, max_size_mb=10)
for file_info in large_files:
    print(f"{file_info['path']}: {file_info['size_mb']}MB")

ファイルサイズの計算とフィルタリングを効率的に実行できますね。

レシピ 9:重複ファイルの検出

同名ファイルや内容が同じファイルを検出する処理です。

pythonfrom pathlib import Path
import hashlib
from collections import defaultdict

def find_duplicate_files(directory_path):
    dir_path = Path(directory_path)
    file_hashes = defaultdict(list)

    for file_path in dir_path.rglob("*"):
        if file_path.is_file():
            # ファイルのハッシュ値を計算
            file_hash = calculate_file_hash(file_path)
            file_hashes[file_hash].append(file_path)

    # 重複ファイル(ハッシュが同じ)を抽出
    duplicates = {hash_val: paths for hash_val, paths in file_hashes.items() if len(paths) > 1}

    return duplicates

def calculate_file_hash(file_path):
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

# 使用例
duplicates = find_duplicate_files("./downloads")
for hash_val, file_paths in duplicates.items():
    print(f"重複ファイル群 (hash: {hash_val[:8]}...):")
    for path in file_paths:
        print(f"  {path}")

MD5 ハッシュを使って、確実な重複ファイル検出が可能です。

レシピ 10:ディレクトリ構造の可視化

ディレクトリの構造をツリー形式で表示する処理です。

pythonfrom pathlib import Path

def display_directory_tree(directory_path, max_depth=3, current_depth=0, prefix=""):
    if current_depth > max_depth:
        return

    dir_path = Path(directory_path)

    if current_depth == 0:
        print(f"{dir_path.name}/")

    try:
        items = sorted(dir_path.iterdir(), key=lambda x: (not x.is_dir(), x.name))

        for i, item in enumerate(items):
            is_last = i == len(items) - 1
            current_prefix = "└── " if is_last else "├── "
            next_prefix = "    " if is_last else "│   "

            print(f"{prefix}{current_prefix}{item.name}{'/' if item.is_dir() else ''}")

            if item.is_dir() and current_depth < max_depth:
                display_directory_tree(
                    item,
                    max_depth,
                    current_depth + 1,
                    prefix + next_prefix
                )

    except PermissionError:
        print(f"{prefix}[アクセス権限エラー]")

# 使用例
display_directory_tree("./project", max_depth=2)

再帰処理により、美しいツリー構造表示が実現できますね。

itertools 活用レシピ集(10 選)

組み合わせ・順列系レシピ

レシピ 11:効率的な組み合わせ生成

大量データから組み合わせを生成する際のメモリ効率的な実装です。

pythonimport itertools

def generate_combinations(items, r):
    # メモリ効率的な組み合わせ生成
    for combo in itertools.combinations(items, r):
        yield combo

def find_optimal_team(members, team_size, score_func):
    best_team = None
    best_score = float('-inf')

    for team in generate_combinations(members, team_size):
        score = score_func(team)
        if score > best_score:
            best_score = score
            best_team = team

    return best_team, best_score

# 使用例:チーム編成の最適化
members = [
    {"name": "田中", "skill": 8, "experience": 5},
    {"name": "佐藤", "skill": 7, "experience": 8},
    {"name": "山田", "skill": 9, "experience": 3},
    {"name": "鈴木", "skill": 6, "experience": 7}
]

def team_score(team):
    return sum(member["skill"] + member["experience"] for member in team)

optimal_team, score = find_optimal_team(members, 3, team_score)
print(f"最適チーム: {[m['name'] for m in optimal_team]}, スコア: {score}")

combinations()により、全ての組み合わせを効率的に生成できますね。

レシピ 12:重複を許可した組み合わせ

同じ要素を複数回選択できる組み合わせ生成処理です。

pythonimport itertools

def generate_menu_combinations(dishes, courses):
    # 重複を許可した組み合わせ
    for menu in itertools.combinations_with_replacement(dishes, courses):
        yield menu

def calculate_menu_cost(menu, prices):
    return sum(prices.get(dish, 0) for dish in menu)

# 使用例:レストランメニューの組み合わせ
dishes = ["サラダ", "パスタ", "ピザ", "デザート"]
prices = {"サラダ": 800, "パスタ": 1200, "ピザ": 1500, "デザート": 600}

print("3品コースの組み合わせ:")
for i, menu in enumerate(generate_menu_combinations(dishes, 3), 1):
    cost = calculate_menu_cost(menu, prices)
    print(f"{i:2d}. {' + '.join(menu)} = {cost:,}円")
    if i >= 10:  # 表示制限
        break

combinations_with_replacement()で重複を許可した組み合わせが生成できます。

レシピ 13:順列生成とパターンマッチング

文字列の順列を生成してパターンマッチングを行う処理です。

pythonimport itertools

def find_anagrams(word, word_list):
    # 指定した単語のアナグラムを検索
    target_chars = sorted(word.lower())
    anagrams = []

    for candidate in word_list:
        if sorted(candidate.lower()) == target_chars:
            anagrams.append(candidate)

    return anagrams

def generate_all_permutations(word):
    # 全順列を生成(重複文字を考慮)
    unique_perms = set()
    for perm in itertools.permutations(word.lower()):
        unique_perms.add(''.join(perm))

    return sorted(unique_perms)

# 使用例
word_list = ["listen", "silent", "enlist", "cat", "act", "tac"]
target_word = "listen"

anagrams = find_anagrams(target_word, word_list)
print(f"'{target_word}'のアナグラム: {anagrams}")

# 短い単語の全順列生成
short_word = "abc"
all_perms = generate_all_permutations(short_word)
print(f"'{short_word}'の全順列: {all_perms}")

permutations()により、文字列の全順列を簡単に生成できますね。

無限イテレータ系レシピ

レシピ 14:カウンタとサイクル処理

無限にカウントやサイクルを実行する処理です。

pythonimport itertools

def create_id_generator(prefix="ID", start=1, step=1):
    # 無限にIDを生成
    counter = itertools.count(start, step)
    while True:
        yield f"{prefix}{next(counter):04d}"

def process_items_with_cycling(items, processors):
    # プロセッサーを循環させながら処理
    processor_cycle = itertools.cycle(processors)

    for item in items:
        processor = next(processor_cycle)
        result = processor(item)
        yield f"処理者: {processor.__name__}, 結果: {result}"

# 使用例:ID生成
id_gen = create_id_generator("USER", start=100, step=5)
for _ in range(5):
    print(next(id_gen))

# 処理の循環実行
def process_a(x): return f"A処理({x})"
def process_b(x): return f"B処理({x})"
def process_c(x): return f"C処理({x})"

items = ["データ1", "データ2", "データ3", "データ4", "データ5"]
processors = [process_a, process_b, process_c]

for result in process_items_with_cycling(items, processors):
    print(result)

count()cycle()により、無限イテレータを効率的に活用できます。

レシピ 15:無限反復とリミット制御

無限反復処理にリミットを設ける制御機能です。

pythonimport itertools
import time

def retry_with_backoff(func, max_retries=5, base_delay=1):
    # 指数バックオフでリトライ実行
    delays = itertools.accumulate(itertools.repeat(base_delay), lambda x, y: min(x * 2, 60))

    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise e

            delay = next(delays, 60)
            print(f"試行{attempt + 1}失敗。{delay}秒後にリトライ... エラー: {e}")
            time.sleep(delay)

def simulate_api_call():
    # API呼び出しシミュレーション(たまに失敗)
    import random
    if random.random() < 0.7:  # 70%の確率で失敗
        raise Exception("ネットワークエラー")
    return "API呼び出し成功"

# 使用例
try:
    result = retry_with_backoff(simulate_api_call, max_retries=3)
    print(f"最終結果: {result}")
except Exception as e:
    print(f"全てのリトライが失敗: {e}")

accumulate()repeat()を組み合わせて、柔軟なリトライ機構が実現できますね。

グルーピング・フィルタリング系レシピ

レシピ 16:データのグルーピング処理

データを条件に応じてグループ化する効率的な実装です。

pythonimport itertools
from operator import itemgetter

def group_data_by_key(data, key_func):
    # データをキー関数でグループ化
    sorted_data = sorted(data, key=key_func)
    grouped_data = {}

    for key, group in itertools.groupby(sorted_data, key_func):
        grouped_data[key] = list(group)

    return grouped_data

def analyze_sales_data(sales_data):
    # 売上データの分析
    results = {}

    # 部門別グルーピング
    by_department = group_data_by_key(sales_data, itemgetter('department'))
    results['by_department'] = by_department

    # 月別グルーピング
    by_month = group_data_by_key(sales_data, lambda x: x['date'][:7])
    results['by_month'] = by_month

    return results

# 使用例
sales_data = [
    {'date': '2024-01-15', 'department': '営業', 'amount': 50000, 'rep': '田中'},
    {'date': '2024-01-20', 'department': '営業', 'amount': 75000, 'rep': '佐藤'},
    {'date': '2024-02-10', 'department': 'マーケ', 'amount': 30000, 'rep': '山田'},
    {'date': '2024-02-15', 'department': '営業', 'amount': 60000, 'rep': '田中'},
    {'date': '2024-02-20', 'department': 'マーケ', 'amount': 45000, 'rep': '鈴木'}
]

analysis = analyze_sales_data(sales_data)

print("部門別売上:")
for dept, records in analysis['by_department'].items():
    total = sum(record['amount'] for record in records)
    print(f"  {dept}: {total:,}円 ({len(records)}件)")

groupby()により、データの効率的なグループ化が実現できます。

レシピ 17:条件フィルタリングの連鎖

複数の条件を連鎖させたフィルタリング処理です。

pythonimport itertools

def chain_filters(*filter_functions):
    # フィルタ関数を連鎖実行
    def combined_filter(data):
        result = data
        for filter_func in filter_functions:
            result = filter(filter_func, result)
        return result
    return combined_filter

def create_product_filters():
    # 商品フィルタ関数群
    def price_filter(min_price, max_price):
        return lambda product: min_price <= product['price'] <= max_price

    def category_filter(categories):
        return lambda product: product['category'] in categories

    def stock_filter(min_stock):
        return lambda product: product['stock'] >= min_stock

    return price_filter, category_filter, stock_filter

# 使用例
products = [
    {'name': 'ノートPC', 'price': 80000, 'category': '電子機器', 'stock': 5},
    {'name': 'マウス', 'price': 2000, 'category': '電子機器', 'stock': 20},
    {'name': '本', 'price': 1500, 'category': '書籍', 'stock': 0},
    {'name': 'キーボード', 'price': 8000, 'category': '電子機器', 'stock': 15},
    {'name': '雑誌', 'price': 800, 'category': '書籍', 'stock': 30}
]

price_filter, category_filter, stock_filter = create_product_filters()

# フィルタ条件を組み合わせ
combined_filter = chain_filters(
    price_filter(1000, 50000),        # 価格帯フィルタ
    category_filter(['電子機器']),     # カテゴリフィルタ
    stock_filter(10)                  # 在庫フィルタ
)

filtered_products = list(combined_filter(products))
print("フィルタリング結果:")
for product in filtered_products:
    print(f"  {product['name']}: {product['price']:,}円 (在庫: {product['stock']})")

複数のフィルタを効率的に連鎖実行できる仕組みが作れますね。

レシピ 18:チャンク処理とバッチ実行

大量データを一定サイズに分割して処理する仕組みです。

pythonimport itertools

def chunked(iterable, chunk_size):
    # データを指定サイズのチャンクに分割
    iterator = iter(iterable)
    while True:
        chunk = list(itertools.islice(iterator, chunk_size))
        if not chunk:
            break
        yield chunk

def process_in_batches(data, batch_size, process_func):
    # バッチ処理の実行
    results = []

    for i, batch in enumerate(chunked(data, batch_size), 1):
        print(f"バッチ {i} を処理中... ({len(batch)}件)")
        batch_result = process_func(batch)
        results.extend(batch_result)

    return results

def simulate_database_insert(batch):
    # データベース挿入のシミュレーション
    import time
    time.sleep(0.1)  # 処理時間のシミュレーション
    return [f"{item}_processed" for item in batch]

# 使用例
large_dataset = [f"データ{i:03d}" for i in range(1, 101)]

# 10件ずつバッチ処理
processed_data = process_in_batches(
    large_dataset,
    batch_size=10,
    process_func=simulate_database_insert
)

print(f"処理完了: {len(processed_data)}件")
print(f"サンプル結果: {processed_data[:5]}")

islice()を使ったチャンク分割により、メモリ効率的な大量データ処理が可能です。

レシピ 19:複数イテレータの同期処理

複数のデータソースを同期して処理する仕組みです。

pythonimport itertools

def sync_process_multiple_sources(*sources, fill_value=None):
    # 複数のデータソースを同期処理
    for items in itertools.zip_longest(*sources, fillvalue=fill_value):
        yield items

def merge_datasets(dataset1, dataset2, dataset3):
    # データセットのマージ処理
    merged_data = []

    for item1, item2, item3 in sync_process_multiple_sources(dataset1, dataset2, dataset3, fill_value="N/A"):
        merged_record = {
            'source1': item1,
            'source2': item2,
            'source3': item3,
            'timestamp': f"record_{len(merged_data) + 1}"
        }
        merged_data.append(merged_record)

    return merged_data

# 使用例
sales_data = ["売上100万", "売上80万", "売上120万"]
customer_data = ["顧客A", "顧客B", "顧客C", "顧客D"]
product_data = ["商品X", "商品Y"]

merged = merge_datasets(sales_data, customer_data, product_data)

print("マージ結果:")
for record in merged:
    print(f"  {record}")

zip_longest()により、長さの異なるデータソースも安全に同期処理できますね。

レシピ 20:イテレータチェーンの構築

複数のイテレータを連結してシーケンシャル処理を行う仕組みです。

pythonimport itertools

def create_processing_chain(*iterators):
    # イテレータを連結してチェーン構築
    return itertools.chain(*iterators)

def multi_source_data_processing():
    # 複数データソースからの統合処理

    # データソース1: ファイルデータ
    def file_data_generator():
        files = ["file1.txt", "file2.txt", "file3.txt"]
        for file_name in files:
            yield f"ファイルデータ: {file_name}"

    # データソース2: API データ
    def api_data_generator():
        endpoints = ["users", "products", "orders"]
        for endpoint in endpoints:
            yield f"APIデータ: {endpoint}"

    # データソース3: データベースデータ
    def db_data_generator():
        tables = ["customers", "transactions"]
        for table in tables:
            yield f"DBデータ: {table}"

    # 全データソースを連結
    all_data = create_processing_chain(
        file_data_generator(),
        api_data_generator(),
        db_data_generator()
    )

    return all_data

# 使用例
print("統合データ処理:")
for data in multi_source_data_processing():
    print(f"  処理中: {data}")

# フラットな結合の例
nested_lists = [
    ["項目1-1", "項目1-2"],
    ["項目2-1", "項目2-2", "項目2-3"],
    ["項目3-1"]
]

flattened = list(itertools.chain(*nested_lists))
print(f"\nフラット化結果: {flattened}")

chain()により、複数のイテレータを効率的に連結・統合処理できます。

functools 活用レシピ集(10 選)

デコレータ系レシピ

レシピ 21:関数実行時間の計測

関数の実行時間を自動的に計測するデコレータです。

pythonimport functools
import time
from typing import Callable, Any

def measure_time(func: Callable) -> Callable:
    @functools.wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()

        execution_time = end_time - start_time
        print(f"{func.__name__} 実行時間: {execution_time:.4f}秒")
        return result

    return wrapper

@measure_time
def heavy_calculation(n: int) -> int:
    # 重い計算処理のシミュレーション
    total = 0
    for i in range(n):
        total += i * i
    return total

@measure_time
def data_processing(data_size: int) -> list:
    # データ処理のシミュレーション
    time.sleep(0.1)  # 処理時間のシミュレーション
    return [f"処理済み_{i}" for i in range(data_size)]

# 使用例
result1 = heavy_calculation(100000)
result2 = data_processing(50)

print(f"計算結果: {result1}")
print(f"データ処理結果数: {len(result2)}")

functools.wraps()により、元の関数のメタデータが保持されますね。

レシピ 22:引数検証デコレータ

関数の引数を自動検証するデコレータシステムです。

pythonimport functools
from typing import Callable, Any

def validate_args(**validators):
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 引数名の取得
            import inspect
            sig = inspect.signature(func)
            bound_args = sig.bind(*args, **kwargs)
            bound_args.apply_defaults()

            # バリデーション実行
            for param_name, validator in validators.items():
                if param_name in bound_args.arguments:
                    value = bound_args.arguments[param_name]
                    if not validator(value):
                        raise ValueError(f"引数'{param_name}'の値'{value}'は無効です")

            return func(*args, **kwargs)
        return wrapper
    return decorator

# バリデータ関数群
def positive_number(x):
    return isinstance(x, (int, float)) and x > 0

def non_empty_string(s):
    return isinstance(s, str) and len(s.strip()) > 0

def valid_email(email):
    return isinstance(email, str) and "@" in email and "." in email

@validate_args(
    age=positive_number,
    name=non_empty_string,
    email=valid_email
)
def create_user(name: str, age: int, email: str) -> dict:
    return {
        "name": name,
        "age": age,
        "email": email,
        "created_at": time.time()
    }

# 使用例
try:
    user1 = create_user("田中太郎", 25, "tanaka@example.com")
    print(f"ユーザー作成成功: {user1}")

    # エラーケース
    user2 = create_user("", 25, "invalid-email")
except ValueError as e:
    print(f"バリデーションエラー: {e}")

引数の自動検証により、堅牢な関数設計が実現できます。

レシピ 23:リトライ機能付きデコレータ

失敗時に自動リトライを行うデコレータです。

pythonimport functools
import time
import random
from typing import Callable, Type, Tuple

def retry(max_attempts: int = 3, delay: float = 1, exceptions: Tuple[Type[Exception], ...] = (Exception,)):
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt == max_attempts:
                        print(f"{func.__name__}: 全{max_attempts}回の試行が失敗しました")
                        raise e

                    wait_time = delay * attempt  # 指数バックオフ的な遅延
                    print(f"{func.__name__}: 試行{attempt}失敗。{wait_time}秒後にリトライ...")
                    time.sleep(wait_time)

            raise last_exception
        return wrapper
    return decorator

@retry(max_attempts=3, delay=0.5, exceptions=(ConnectionError, TimeoutError))
def unreliable_api_call(endpoint: str) -> dict:
    # 不安定なAPI呼び出しのシミュレーション
    if random.random() < 0.7:  # 70%の確率で失敗
        raise ConnectionError(f"API '{endpoint}' への接続に失敗しました")

    return {"status": "success", "data": f"{endpoint}のデータ", "timestamp": time.time()}

@retry(max_attempts=2, delay=1)
def file_operation(filename: str) -> str:
    # ファイル操作のシミュレーション
    if random.random() < 0.5:  # 50%の確率で失敗
        raise IOError(f"ファイル '{filename}' の読み込みに失敗しました")

    return f"ファイル '{filename}' の内容"

# 使用例
try:
    result = unreliable_api_call("user-data")
    print(f"API呼び出し成功: {result}")
except ConnectionError as e:
    print(f"最終的にAPI呼び出しが失敗: {e}")

try:
    content = file_operation("config.json")
    print(f"ファイル読み込み成功: {content}")
except IOError as e:
    print(f"ファイル操作が失敗: {e}")

指定した例外に対してのみリトライを実行する、柔軟なリトライ機構が作れますね。

高階関数系レシピ

レシピ 24:関数の部分適用

functools.partialを使った関数の部分適用テクニックです。

pythonimport functools
from typing import Callable

def create_logger(level: str, prefix: str = "") -> Callable:
    def log_message(message: str, timestamp: bool = True) -> None:
        import datetime
        ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") if timestamp else ""
        prefix_str = f"[{prefix}]" if prefix else ""
        print(f"{ts} {prefix_str} [{level}] {message}")

    return log_message

# 部分適用でログ関数をカスタマイズ
base_logger = create_logger("INFO", "システム")

# 特定用途のログ関数を作成
error_logger = functools.partial(base_logger, timestamp=True)
debug_logger = functools.partial(create_logger("DEBUG", "デバッグ"))

def calculate_with_logging(x: float, y: float, operation: str) -> float:
    debug_logger(f"計算開始: {x} {operation} {y}")

    operations = {
        "+": lambda a, b: a + b,
        "-": lambda a, b: a - b,
        "*": lambda a, b: a * b,
        "/": lambda a, b: a / b if b != 0 else None
    }

    if operation not in operations:
        error_logger(f"不正な演算子: {operation}")
        return None

    if operation == "/" and y == 0:
        error_logger("ゼロ除算エラー")
        return None

    result = operations[operation](x, y)
    base_logger(f"計算完了: {x} {operation} {y} = {result}")

    return result

# 使用例
result1 = calculate_with_logging(10, 5, "+")
result2 = calculate_with_logging(10, 0, "/")
result3 = calculate_with_logging(8, 4, "*")

partialにより、関数の引数を事前に固定して、用途別の関数を効率的に作成できます。

レシピ 25:関数の合成とパイプライン

複数の関数を合成してデータ処理パイプラインを構築します。

pythonimport functools
from typing import Callable, Any

def compose(*functions: Callable) -> Callable:
    # 関数合成:右から左に適用
    def composed_function(data: Any) -> Any:
        return functools.reduce(lambda result, func: func(result), reversed(functions), data)
    return composed_function

def pipe(*functions: Callable) -> Callable:
    # パイプライン:左から右に適用
    def pipeline(data: Any) -> Any:
        return functools.reduce(lambda result, func: func(result), functions, data)
    return pipeline

# データ処理関数群
def clean_text(text: str) -> str:
    return text.strip().lower()

def remove_punctuation(text: str) -> str:
    import string
    return ''.join(char for char in text if char not in string.punctuation)

def split_words(text: str) -> list:
    return text.split()

def count_words(words: list) -> dict:
    word_count = {}
    for word in words:
        word_count[word] = word_count.get(word, 0) + 1
    return word_count

def filter_common_words(word_dict: dict, min_count: int = 2) -> dict:
    return {word: count for word, count in word_dict.items() if count >= min_count}

# パイプライン構築
text_analysis_pipeline = pipe(
    clean_text,
    remove_punctuation,
    split_words,
    count_words,
    functools.partial(filter_common_words, min_count=2)
)

# 関数合成版(同じ処理を逆順で定義)
text_analysis_composed = compose(
    functools.partial(filter_common_words, min_count=2),
    count_words,
    split_words,
    remove_punctuation,
    clean_text
)

# 使用例
sample_text = "  Hello World! This is a test. Hello again, World!  "

print("パイプライン処理結果:")
result1 = text_analysis_pipeline(sample_text)
print(result1)

print("\n関数合成処理結果:")
result2 = text_analysis_composed(sample_text)
print(result2)

関数合成により、複雑なデータ処理も段階的で理解しやすい形で実装できますね。

レシピ 26:高階関数による条件分岐

関数を引数として受け取り、動的に処理を切り替える仕組みです。

pythonimport functools
from typing import Callable, Any, Dict

def conditional_processor(condition_func: Callable, true_func: Callable, false_func: Callable):
    # 条件に応じて処理を分岐
    def processor(data: Any) -> Any:
        if condition_func(data):
            return true_func(data)
        else:
            return false_func(data)
    return processor

def create_data_validator(rules: Dict[str, Callable]) -> Callable:
    # 複数ルールでデータ検証
    def validator(data: dict) -> dict:
        results = {"valid": True, "errors": [], "processed_data": data.copy()}

        for field, rule in rules.items():
            if field in data:
                try:
                    is_valid = rule(data[field])
                    if not is_valid:
                        results["valid"] = False
                        results["errors"].append(f"{field}: 検証失敗")
                except Exception as e:
                    results["valid"] = False
                    results["errors"].append(f"{field}: {str(e)}")

        return results
    return validator

# 条件判定関数群
def is_adult(person: dict) -> bool:
    return person.get("age", 0) >= 18

def is_premium_user(user: dict) -> bool:
    return user.get("membership", "basic") == "premium"

# 処理関数群
def adult_processing(person: dict) -> dict:
    result = person.copy()
    result["access_level"] = "full"
    result["can_purchase"] = True
    return result

def minor_processing(person: dict) -> dict:
    result = person.copy()
    result["access_level"] = "restricted"
    result["can_purchase"] = False
    return result

def premium_service(user: dict) -> dict:
    result = user.copy()
    result["features"] = ["advanced_analytics", "priority_support", "unlimited_storage"]
    return result

def basic_service(user: dict) -> dict:
    result = user.copy()
    result["features"] = ["basic_analytics", "standard_support"]
    return result

# 条件付きプロセッサの作成
age_processor = conditional_processor(is_adult, adult_processing, minor_processing)
service_processor = conditional_processor(is_premium_user, premium_service, basic_service)

# バリデーションルール
validation_rules = {
    "age": lambda x: isinstance(x, int) and 0 <= x <= 120,
    "name": lambda x: isinstance(x, str) and len(x.strip()) > 0,
    "email": lambda x: isinstance(x, str) and "@" in x
}

user_validator = create_data_validator(validation_rules)

# 使用例
users = [
    {"name": "田中太郎", "age": 25, "email": "tanaka@example.com", "membership": "premium"},
    {"name": "佐藤花子", "age": 16, "email": "sato@example.com", "membership": "basic"},
    {"name": "山田次郎", "age": 30, "email": "yamada@example.com", "membership": "basic"}
]

print("ユーザー処理結果:")
for user in users:
    # バリデーション
    validation_result = user_validator(user)
    if not validation_result["valid"]:
        print(f"  {user['name']}: 検証エラー - {validation_result['errors']}")
        continue

    # 年齢ベース処理
    processed_user = age_processor(user)

    # サービスレベル処理
    final_user = service_processor(processed_user)

    print(f"  {final_user['name']}: アクセス={final_user['access_level']}, 機能={len(final_user['features'])}個")

高階関数により、複雑な条件分岐も柔軟で保守しやすい形で実装できます。

パフォーマンス向上系レシピ

レシピ 27:メモ化による高速化

計算結果をキャッシュして処理速度を向上させる実装です。

pythonimport functools
import time
from typing import Dict, Any

# カスタムメモ化デコレータ
def custom_memoize(maxsize: int = 128, ttl: int = None):
    def decorator(func):
        cache: Dict[Any, Any] = {}
        cache_times: Dict[Any, float] = {}

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # キーの作成(引数のハッシュ値)
            key = str(args) + str(sorted(kwargs.items()))
            current_time = time.time()

            # TTL(有効期限)チェック
            if ttl and key in cache_times:
                if current_time - cache_times[key] > ttl:
                    cache.pop(key, None)
                    cache_times.pop(key, None)

            # キャッシュヒット
            if key in cache:
                print(f"キャッシュヒット: {func.__name__}")
                return cache[key]

            # キャッシュサイズ制限
            if len(cache) >= maxsize:
                # 最も古いエントリを削除(LRU風)
                oldest_key = min(cache_times.keys(), key=lambda k: cache_times[k])
                cache.pop(oldest_key)
                cache_times.pop(oldest_key)

            # 関数実行とキャッシュ保存
            print(f"計算実行: {func.__name__}")
            result = func(*args, **kwargs)
            cache[key] = result
            cache_times[key] = current_time

            return result

        # キャッシュ情報の取得メソッド
        wrapper.cache_info = lambda: {
            "cache_size": len(cache),
            "cache_keys": list(cache.keys())[:5]  # 最初の5個のみ表示
        }

        wrapper.cache_clear = lambda: cache.clear() or cache_times.clear()

        return wrapper
    return decorator

@functools.lru_cache(maxsize=128)
def fibonacci_cached(n: int) -> int:
    if n < 2:
        return n
    return fibonacci_cached(n - 1) + fibonacci_cached(n - 2)

@custom_memoize(maxsize=50, ttl=10)
def expensive_calculation(x: int, y: int) -> int:
    # 重い計算のシミュレーション
    time.sleep(0.5)
    return x ** 2 + y ** 2 + x * y

@custom_memoize(maxsize=100)
def api_call_simulation(endpoint: str, params: dict = None) -> dict:
    # API呼び出しのシミュレーション
    time.sleep(0.2)
    return {
        "endpoint": endpoint,
        "params": params or {},
        "response": f"データ from {endpoint}",
        "timestamp": time.time()
    }

# 使用例
print("=== フィボナッチ数列(標準lru_cache)===")
start_time = time.time()
result1 = fibonacci_cached(30)
print(f"結果: {result1}, 時間: {time.time() - start_time:.3f}秒")

# 同じ計算(キャッシュから取得)
start_time = time.time()
result2 = fibonacci_cached(30)
print(f"結果: {result2}, 時間: {time.time() - start_time:.3f}秒")

print("\n=== カスタムメモ化の使用例 ===")
# 最初の計算
result3 = expensive_calculation(5, 3)
print(f"計算結果: {result3}")

# 同じ引数での再計算(キャッシュから取得)
result4 = expensive_calculation(5, 3)
print(f"計算結果: {result4}")

print(f"キャッシュ情報: {expensive_calculation.cache_info()}")

メモ化により、同じ引数での関数呼び出しを劇的に高速化できますね。

レシピ 28:singledispatchによる型別処理最適化

引数の型に応じて最適化された処理を自動選択する仕組みです。

pythonimport functools
from typing import Union, List, Dict, Any

@functools.singledispatch
def serialize_data(data) -> str:
    # デフォルト処理(未対応型の場合)
    return f"未対応の型: {type(data).__name__}"

@serialize_data.register
def _(data: str) -> str:
    # 文字列の場合の最適化処理
    return f'"{"".join(data)}"'  # JSON風文字列

@serialize_data.register
def _(data: int) -> str:
    # 整数の場合の処理
    return str(data)

@serialize_data.register
def _(data: float) -> str:
    # 浮動小数点の場合の処理
    return f"{data:.6f}"

@serialize_data.register
def _(data: list) -> str:
    # リストの場合の再帰処理
    serialized_items = [serialize_data(item) for item in data]
    return f"[{', '.join(serialized_items)}]"

@serialize_data.register
def _(data: dict) -> str:
    # 辞書の場合の処理
    serialized_items = [f'"{k}": {serialize_data(v)}' for k, v in data.items()]
    return f"{{{', '.join(serialized_items)}}}"

@serialize_data.register
def _(data: bool) -> str:
    # ブール値の場合の処理
    return "true" if data else "false"

# 処理パフォーマンス測定用の型別処理
@functools.singledispatch
def process_efficiently(data) -> str:
    return f"汎用処理: {data}"

@process_efficiently.register
def _(data: str) -> str:
    # 文字列処理の最適化
    return data.upper().replace(" ", "_")

@process_efficiently.register
def _(data: list) -> str:
    # リスト処理の最適化(内包表記使用)
    return f"処理済みリスト({len(data)}要素): " + ",".join(str(x) for x in data[:3])

@process_efficiently.register
def _(data: dict) -> str:
    # 辞書処理の最適化
    return f"処理済み辞書({len(data)}キー): " + ",".join(list(data.keys())[:3])

# 使用例
test_data = [
    "Hello World",
    42,
    3.14159,
    [1, 2, 3, "test"],
    {"name": "田中", "age": 30, "active": True},
    True,
    None  # 未対応型のテスト
]

print("=== 型別シリアライゼーション ===")
for data in test_data:
    serialized = serialize_data(data)
    print(f"{type(data).__name__:>10}: {data} -> {serialized}")

print("\n=== 型別最適化処理 ===")
processing_test_data = [
    "hello world test",
    [1, 2, 3, 4, 5],
    {"key1": "value1", "key2": "value2", "key3": "value3", "key4": "value4"}
]

for data in processing_test_data:
    result = process_efficiently(data)
    print(f"{type(data).__name__:>10}: {result}")

# 登録された型の確認
print(f"\nserialize_data 登録型: {serialize_data.registry.keys()}")

singledispatchにより、型に応じた最適化処理を自動選択でき、パフォーマンスが向上します。

レシピ 29:reduceを使った効率的な集約処理

大量データの集約処理を効率的に実行する実装です。

pythonimport functools
import operator
from typing import List, Dict, Any, Callable

def efficient_aggregation(data: List[Dict], operations: Dict[str, Callable]) -> Dict:
    # 複数の集約操作を効率的に実行
    results = {}

    for operation_name, operation_func in operations.items():
        try:
            result = operation_func(data)
            results[operation_name] = result
        except Exception as e:
            results[operation_name] = f"エラー: {e}"

    return results

# 集約操作関数群
def sum_by_key(data: List[Dict], key: str) -> float:
    return functools.reduce(
        operator.add,
        (item.get(key, 0) for item in data),
        0
    )

def product_by_key(data: List[Dict], key: str) -> float:
    return functools.reduce(
        operator.mul,
        (item.get(key, 1) for item in data if item.get(key, 0) > 0),
        1
    )

def find_max_item(data: List[Dict], key: str) -> Dict:
    return functools.reduce(
        lambda a, b: a if a.get(key, 0) > b.get(key, 0) else b,
        data
    )

def concatenate_strings(data: List[Dict], key: str) -> str:
    return functools.reduce(
        lambda a, b: a + b,
        (item.get(key, "") for item in data),
        ""
    )

def merge_all_dicts(dicts: List[Dict]) -> Dict:
    # 複数の辞書をマージ
    return functools.reduce(
        lambda a, b: {**a, **b},
        dicts,
        {}
    )

def calculate_running_total(numbers: List[float]) -> List[float]:
    # 累積合計の計算
    running_totals = []
    functools.reduce(
        lambda total, num: running_totals.append(total + num) or total + num,
        numbers,
        0
    )
    return running_totals

# 複雑なデータ変換
def transform_and_aggregate(data: List[Dict]) -> Dict:
    # データ変換と集約を組み合わせ
    transformations = {
        "平均売上": lambda items: functools.reduce(
            operator.add, (item.get("sales", 0) for item in items), 0
        ) / len(items) if items else 0,

        "最高売上商品": lambda items: functools.reduce(
            lambda a, b: a if a.get("sales", 0) > b.get("sales", 0) else b,
            items
        ).get("product", "なし"),

        "売上合計": lambda items: functools.reduce(
            operator.add, (item.get("sales", 0) for item in items), 0
        ),

        "商品名一覧": lambda items: functools.reduce(
            lambda a, b: a + ", " + b,
            (item.get("product", "") for item in items),
            ""
        ).lstrip(", ")
    }

    return efficient_aggregation(data, transformations)

# 使用例
sales_data = [
    {"product": "ノートPC", "sales": 150000, "quantity": 5},
    {"product": "マウス", "sales": 8000, "quantity": 20},
    {"product": "キーボード", "sales": 25000, "quantity": 10},
    {"product": "モニター", "sales": 80000, "quantity": 8}
]

print("=== 基本的な集約処理 ===")
total_sales = sum_by_key(sales_data, "sales")
print(f"売上合計: {total_sales:,}円")

max_sales_item = find_max_item(sales_data, "sales")
print(f"最高売上商品: {max_sales_item['product']} - {max_sales_item['sales']:,}円")

print("\n=== 複合的な変換・集約 ===")
analysis_result = transform_and_aggregate(sales_data)
for key, value in analysis_result.items():
    print(f"{key}: {value}")

print("\n=== 累積計算の例 ===")
monthly_sales = [50000, 75000, 60000, 90000, 120000]
running_totals = calculate_running_total(monthly_sales)
print("月別売上:", monthly_sales)
print("累積売上:", running_totals)

# 辞書のマージ例
config_parts = [
    {"database": {"host": "localhost", "port": 5432}},
    {"api": {"timeout": 30, "retries": 3}},
    {"logging": {"level": "INFO", "file": "app.log"}}
]

merged_config = merge_all_dicts(config_parts)
print(f"\n統合設定: {merged_config}")

functools.reduceにより、大量データの集約処理を効率的かつ関数型的に実装できますね。

レシピ 30:キャッシュ戦略の使い分け

異なるキャッシュ戦略を用途に応じて使い分ける実装です。

pythonimport functools
import weakref
import threading
import time
from typing import Any, Dict, Optional

class AdvancedCache:
    def __init__(self, maxsize: int = 128, ttl: Optional[int] = None):
        self.maxsize = maxsize
        self.ttl = ttl
        self.cache: Dict[Any, Any] = {}
        self.access_times: Dict[Any, float] = {}
        self.access_counts: Dict[Any, int] = {}
        self.lock = threading.Lock()

    def get(self, key: Any) -> Any:
        with self.lock:
            if key not in self.cache:
                return None

            # TTL チェック
            if self.ttl:
                if time.time() - self.access_times[key] > self.ttl:
                    self._remove_key(key)
                    return None

            # アクセス情報更新
            self.access_times[key] = time.time()
            self.access_counts[key] = self.access_counts.get(key, 0) + 1
            return self.cache[key]

    def set(self, key: Any, value: Any) -> None:
        with self.lock:
            # キャッシュサイズ制限
            if len(self.cache) >= self.maxsize and key not in self.cache:
                self._evict_lru()

            self.cache[key] = value
            self.access_times[key] = time.time()
            self.access_counts[key] = self.access_counts.get(key, 0) + 1

    def _evict_lru(self) -> None:
        # 最近最少使用(LRU)アイテムを削除
        if not self.access_times:
            return

        lru_key = min(self.access_times.keys(), key=lambda k: self.access_times[k])
        self._remove_key(lru_key)

    def _remove_key(self, key: Any) -> None:
        self.cache.pop(key, None)
        self.access_times.pop(key, None)
        self.access_counts.pop(key, None)

    def clear(self) -> None:
        with self.lock:
            self.cache.clear()
            self.access_times.clear()
            self.access_counts.clear()

    def info(self) -> Dict:
        return {
            "cache_size": len(self.cache),
            "max_size": self.maxsize,
            "hit_rate": self._calculate_hit_rate()
        }

    def _calculate_hit_rate(self) -> float:
        total_accesses = sum(self.access_counts.values())
        return len(self.cache) / total_accesses if total_accesses > 0 else 0

# 各種キャッシュデコレータ
def smart_cache(cache_type="lru", **cache_kwargs):
    def decorator(func):
        if cache_type == "lru":
            return functools.lru_cache(maxsize=cache_kwargs.get("maxsize", 128))(func)
        elif cache_type == "advanced":
            cache_instance = AdvancedCache(**cache_kwargs)

            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                key = str(args) + str(sorted(kwargs.items()))

                # キャッシュ確認
                result = cache_instance.get(key)
                if result is not None:
                    return result

                # 関数実行してキャッシュ保存
                result = func(*args, **kwargs)
                cache_instance.set(key, result)
                return result

            wrapper.cache_info = cache_instance.info
            wrapper.cache_clear = cache_instance.clear
            return wrapper

        elif cache_type == "weak":
            # 弱参照キャッシュ(メモリプレッシャーで自動削除)
            cache = weakref.WeakValueDictionary()

            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                key = str(args) + str(sorted(kwargs.items()))

                # 弱参照キャッシュは値が生存している間のみ有効
                result = cache.get(key)
                if result is not None:
                    return result

                # 結果をオブジェクトでラップ(弱参照可能にするため)
                class ResultWrapper:
                    def __init__(self, value):
                        self.value = value

                result = func(*args, **kwargs)
                wrapped_result = ResultWrapper(result)
                cache[key] = wrapped_result
                return result

            wrapper.cache_info = lambda: {"cache_size": len(cache)}
            wrapper.cache_clear = cache.clear
            return wrapper

    return decorator

# 使用例
@smart_cache(cache_type="lru", maxsize=50)
def fibonacci_lru(n: int) -> int:
    if n < 2:
        return n
    return fibonacci_lru(n - 1) + fibonacci_lru(n - 2)

@smart_cache(cache_type="advanced", maxsize=30, ttl=5)
def api_data_fetch(endpoint: str) -> dict:
    # API呼び出しシミュレーション
    time.sleep(0.1)
    return {
        "endpoint": endpoint,
        "data": f"APIデータ from {endpoint}",
        "timestamp": time.time()
    }

@smart_cache(cache_type="weak")
def create_large_object(size: int) -> object:
    # 大きなオブジェクト作成のシミュレーション
    class LargeObject:
        def __init__(self, size):
            self.data = list(range(size))
            self.size = size

    return LargeObject(size)

# パフォーマンステスト
print("=== キャッシュ戦略比較 ===")

# LRU キャッシュテスト
start = time.time()
result = fibonacci_lru(30)
print(f"LRU fibonacci(30): {result}, 時間: {time.time() - start:.3f}秒")
print(f"LRU キャッシュ情報: {fibonacci_lru.cache_info()}")

# 高度なキャッシュテスト
print("\n高度なキャッシュ(TTL付き):")
for i in range(3):
    start = time.time()
    result = api_data_fetch("users")
    print(f"  試行{i+1}: {time.time() - start:.3f}秒")

print(f"高度なキャッシュ情報: {api_data_fetch.cache_info()}")

# TTL 期限切れテスト
print("6秒待機(TTL期限切れ)...")
time.sleep(6)
start = time.time()
result = api_data_fetch("users")
print(f"期限切れ後: {time.time() - start:.3f}秒")

# 弱参照キャッシュテスト
print(f"\n弱参照キャッシュテスト:")
obj1 = create_large_object(1000)
obj2 = create_large_object(1000)  # キャッシュヒット
print(f"弱参照キャッシュ情報: {create_large_object.cache_info()}")

異なるキャッシュ戦略により、用途に応じた最適なパフォーマンス向上が実現できますね。

まとめ

Python 標準ライブラリのpathlibitertoolsfunctoolsを活用することで、驚くほど簡潔で効率的なコードが書けることをご理解いただけたでしょうか。

mermaidflowchart LR
    standardLib[Python標準ライブラリ] --> pathlib[pathlib<br/>ファイル操作]
    standardLib --> itertools[itertools<br/>データ処理]
    standardLib --> functools[functools<br/>関数最適化]

    pathlib --> pathlibBenefit[・直感的なパス操作<br/>・クロスプラットフォーム対応<br/>・オブジェクト指向設計]
    itertools --> itertoolsBenefit[・メモリ効率的な処理<br/>・無限イテレータ<br/>・高度な組み合わせ処理]
    functools --> functoolsBenefit[・関数合成とパイプライン<br/>・メモ化による高速化<br/>・型別最適化処理]

    pathlibBenefit --> result[生産性とパフォーマンスの向上]
    itertoolsBenefit --> result
    functoolsBenefit --> result

図で理解できる要点

  • 3 つのライブラリがそれぞれ異なる領域をカバー
  • pathlib:ファイル操作の簡潔化とプラットフォーム対応
  • itertools:大量データの効率的処理
  • functools:関数レベルでの最適化とパフォーマンス向上

実践で得られる効果

これらのライブラリを活用することで、以下のような効果が期待できます。

項目従来の手法標準ライブラリ活用後
コード行数平均 50%削減簡潔で読みやすい
処理速度メモ化で 10-100 倍高速化大幅な性能向上
保守性関数型設計で向上バグの減少
開発効率テンプレート化で向上迅速なプロトタイピング

実装時の推奨アプローチ

  1. 段階的導入: 既存コードを一気に書き換えず、新規機能から適用
  2. テストの充実: リファクタリング時は必ずテストケースを用意
  3. チーム共有: ベストプラクティスをチーム内で共有・標準化
  4. 継続学習: 新しいレシピや手法を定期的にキャッチアップ

30 個のレシピをご紹介しましたが、これらは基礎的な活用方法に過ぎません。実際の開発では、これらを組み合わせて、より複雑で実用的な処理を実装していくことになるでしょう。

標準ライブラリの力を借りて、より効率的で美しい Python コードを書いていきませんか。きっと開発の楽しさが倍増するはずですね。

関連リンク