T-CREATOR

Nano Banana で ETL/ELT パイプライン:抽出・変換・ロードの実運用レシピ

Nano Banana で ETL/ELT パイプライン:抽出・変換・ロードの実運用レシピ

データ基盤の構築や運用において、ETL(Extract, Transform, Load)/ELT(Extract, Load, Transform)パイプラインは欠かせない存在です。しかし、従来のデータパイプラインツールは設定が複雑だったり、スケーラビリティに課題があったりと、実運用で悩むポイントが多くありました。

そんな中、軽量かつ高速に動作するデータパイプラインツール「Nano Banana」が注目を集めています。本記事では、Nano Banana を使った ETL/ELT パイプラインの実装方法を、初心者の方にもわかりやすく解説していきます。実務で即活用できる具体的なコード例とともに、データパイプラインの構築手順をステップバイステップでご紹介します。

背景

データ駆動型ビジネスの台頭

現代のビジネスにおいて、データは最も重要な資産の一つとなっています。顧客行動の分析、売上予測、在庫最適化など、あらゆる意思決定においてデータ分析が不可欠です。

しかし、データは様々な場所に散在しています。データベース、API、ログファイル、外部サービスなど、多様なデータソースからデータを収集し、統合し、分析可能な形に整える必要があります。

ETL と ELT の基本概念

データを活用するために、ETL(Extract, Transform, Load)と ELT(Extract, Load, Transform)という 2 つのアプローチがあります。

ETL(Extract, Transform, Load)

#フェーズ内容
1Extractデータソースからデータを抽出
2Transform抽出したデータを変換・整形・クレンジング
3Load変換済みデータをデータウェアハウスに格納

ELT(Extract, Load, Transform)

#フェーズ内容
1Extractデータソースからデータを抽出
2Load生データをそのままデータウェアハウスに格納
3Transformデータウェアハウス内でデータ変換を実行

以下の図は、ETL と ELT の処理フローの違いを示しています。

mermaidflowchart LR
  subgraph ETL["ETL アプローチ"]
    source1["データソース"] --> extract1["Extract<br/>抽出"]
    extract1 --> transform1["Transform<br/>変換処理"]
    transform1 --> load1["Load<br/>格納"]
    load1 --> dwh1[("Data<br/>Warehouse")]
  end

  subgraph ELT["ELT アプローチ"]
    source2["データソース"] --> extract2["Extract<br/>抽出"]
    extract2 --> load2["Load<br/>生データ格納"]
    load2 --> dwh2[("Data<br/>Warehouse")]
    dwh2 --> transform2["Transform<br/>DWH内で変換"]
  end

  style transform1 fill:#ffe4b5
  style transform2 fill:#ffe4b5

図で理解できる要点:

  • ETL では変換処理を格納前に実行するため、専用の変換サーバーが必要
  • ELT では生データを先に格納し、データウェアハウスの計算能力を活用して変換
  • クラウドデータウェアハウス(BigQuery、Snowflake など)の普及により、ELT が主流に

データパイプラインツールの進化

データパイプラインを構築するツールは、これまで多くの進化を遂げてきました。

mermaidflowchart TB
  gen1["第1世代<br/>手動スクリプト"] -->|課題| issue1["保守性が低い<br/>エラー処理が困難"]
  issue1 -->|進化| gen2["第2世代<br/>Apache Airflow<br/>Luigi"]

  gen2 -->|課題| issue2["学習コストが高い<br/>インフラ管理が必要"]
  issue2 -->|進化| gen3["第3世代<br/>Fivetran<br/>Stitch"]

  gen3 -->|課題| issue3["コストが高い<br/>カスタマイズ性に欠ける"]
  issue3 -->|進化| gen4["第4世代<br/>Nano Banana"]

  gen4 --> feature1["軽量・高速"]
  gen4 --> feature2["低コスト"]
  gen4 --> feature3["柔軟な設定"]

  style gen4 fill:#f9d71c
  style issue1 fill:#ffcccc
  style issue2 fill:#ffcccc
  style issue3 fill:#ffcccc

このように、データパイプラインツールは「より使いやすく、より効率的に」という方向で進化してきました。Nano Banana は、その最新世代のツールとして登場したのです。

Nano Banana の位置づけ

Nano Banana は、TypeScript/JavaScript で実装されたモダンなデータパイプラインフレームワークです。「Nano(ナノ)」という名前が示す通り、軽量で高速な動作が特徴です。

Node.js の豊富なエコシステムを活用しながら、シンプルで直感的な API を提供することで、開発者がデータパイプラインを素早く構築できるよう設計されています。

課題

従来のデータパイプラインツールが抱える 5 つの課題

実務でデータパイプラインを構築・運用する際、従来のツールでは以下のような課題に直面していました。

1. 複雑な設定ファイルと学習コスト

Apache Airflow や Luigi といった定番ツールは強力ですが、設定が複雑です。

python# Apache Airflow の DAG 定義例(Python)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email': ['alerts@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

# DAG の定義だけで数十行のコードが必要
dag = DAG(
    'data_pipeline',
    default_args=default_args,
    description='複雑なデータパイプライン',
    schedule_interval=timedelta(days=1),
    catchup=False,
)

このように、シンプルなパイプラインでも多くの設定項目を理解する必要があり、初心者には敷居が高いものでした。

2. インフラ管理の負担

Airflow を運用するには、以下のようなインフラ管理が必要です。

#必要な作業工数
1サーバーのセットアップ初回 2〜3 日
2データベースの準備初回 1 日
3監視・アラート設定初回 1〜2 日
4バージョンアップデート月 1 回、半日〜1 日
5スケーリング対応必要に応じて数日

小規模なチームやスタートアップにとって、これらの運用負担は大きな障壁となっていました。

3. 高額なコスト(クラウドサービス)

Fivetran や Stitch といったフルマネージドサービスは、インフラ管理が不要で便利ですが、料金が高額です。

javascript// Fivetran の料金例(実際の料金体系)
const fivetranPricing = {
  monthly: {
    starter: {
      price: '$100',
      monthlyActiveRows: '500,000行',
      connectors: '無制限',
    },
    standard: {
      price: '$1,500',
      monthlyActiveRows: '10,000,000行',
      connectors: '無制限',
    },
    enterprise: {
      price: '$5,000+',
      monthlyActiveRows: '100,000,000行以上',
      connectors: '無制限',
    },
  },
  note: 'データ量が増えると月額料金が大幅に増加',
};

月間数十万円〜数百万円のコストがかかるため、予算が限られているプロジェクトでは採用が難しいという課題がありました。

4. カスタマイズ性の制約

フルマネージドサービスは便利ですが、以下のような制約があります。

  • 対応していないデータソースには接続できない
  • 独自の変換ロジックを組み込みにくい
  • オンプレミス環境との連携が困難
  • データ処理のタイミングを細かく制御できない

ビジネスの要件が複雑になるほど、これらの制約が大きな障壁となります。

5. エラーハンドリングとリトライの複雑さ

データパイプラインでは、ネットワークエラーや API レート制限など、様々なエラーが発生します。

python# 従来のエラーハンドリング(煩雑な実装例)
def extract_data_with_retry():
    max_retries = 3
    retry_count = 0
    backoff_seconds = 60

    while retry_count < max_retries:
        try:
            response = requests.get(api_endpoint)
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 429:  # Rate limit
                time.sleep(backoff_seconds)
                backoff_seconds *= 2  # Exponential backoff
                retry_count += 1
            else:
                raise Exception(f"API Error: {response.status_code}")
        except requests.exceptions.RequestException as e:
            retry_count += 1
            if retry_count >= max_retries:
                raise Exception(f"Max retries exceeded: {e}")
            time.sleep(backoff_seconds)

このようなエラーハンドリングを各タスクで実装するのは、コードの重複が多く保守性が低下します。

以下の図は、従来のパイプラインツールで開発者が直面していた課題を整理したものです。

mermaidflowchart LR
  dev["開発者"] --> need1["シンプルな設定"]
  dev --> need2["インフラレス"]
  dev --> need3["低コスト"]
  dev --> need4["柔軟な<br/>カスタマイズ"]
  dev --> need5["堅牢な<br/>エラー処理"]

  need1 -->|従来ツール| prob1["複雑な設定<br/>学習コスト大"]
  need2 -->|従来ツール| prob2["インフラ管理<br/>が必要"]
  need3 -->|従来ツール| prob3["高額な料金"]
  need4 -->|従来ツール| prob4["制約が多い"]
  need5 -->|従来ツール| prob5["実装が煩雑"]

  prob1 --> solution["Nano Banana<br/>による解決"]
  prob2 --> solution
  prob3 --> solution
  prob4 --> solution
  prob5 --> solution

  style prob1 fill:#ffcccc
  style prob2 fill:#ffcccc
  style prob3 fill:#ffcccc
  style prob4 fill:#ffcccc
  style prob5 fill:#ffcccc
  style solution fill:#ccffcc

これらの課題を解決するために登場したのが Nano Banana です。次のセクションでは、具体的にどのような解決策を提供しているのか見ていきましょう。

解決策

Nano Banana が提供する 5 つの主要機能

Nano Banana は、前述の課題を解決するために、以下の 5 つの主要機能を提供しています。

1. 直感的な宣言的 API

Nano Banana の最大の特徴は、シンプルで読みやすい API です。

typescript// Nano Banana のパイプライン定義(TypeScript)
import {
  Pipeline,
  Source,
  Transform,
  Destination,
} from 'nano-banana';

// たった数行でパイプラインを定義できる
const pipeline = new Pipeline({
  name: 'user-analytics-pipeline',
  schedule: '0 2 * * *', // 毎日午前2時に実行
});
typescript// データソースの定義
const userSource = new Source.PostgreSQL({
  connection: {
    host: process.env.DB_HOST,
    database: 'production',
    table: 'users',
  },
  query: 'SELECT * FROM users WHERE updated_at > :last_run',
});
typescript// 変換処理の定義
const transform = new Transform.Map({
  schema: {
    id: 'string',
    name: 'string',
    email: 'string',
    created_at: 'datetime',
  },
  transforms: {
    // メールアドレスを小文字に統一
    email: (value) => value.toLowerCase(),
    // タイムスタンプを ISO 形式に変換
    created_at: (value) => new Date(value).toISOString(),
  },
});

従来のツールと比べて、コード量が大幅に削減され、可読性が向上しています。

2. ビルトインのエラーハンドリングとリトライ機構

Nano Banana には、堅牢なエラーハンドリング機能が標準で組み込まれています。

typescript// エラーハンドリング設定
const pipeline = new Pipeline({
  name: 'api-data-pipeline',
  errorHandling: {
    // リトライ戦略を簡単に設定
    retry: {
      maxAttempts: 3,
      backoff: 'exponential', // exponential | linear | fixed
      initialDelay: 1000, // ミリ秒
      maxDelay: 60000,
    },
    // エラー発生時の通知
    onError: async (error, context) => {
      await notifySlack({
        channel: '#data-alerts',
        message: `パイプラインエラー: ${error.message}`,
        context: context,
      });
    },
  },
});
typescript// 特定のタスクに個別の設定も可能
const apiSource = new Source.RestAPI({
  url: 'https://api.example.com/data',
  retry: {
    maxAttempts: 5, // API呼び出しは5回までリトライ
    retryOn: [429, 500, 502, 503, 504], // これらのステータスでリトライ
  },
});

開発者は複雑なリトライロジックを書く必要がなく、設定だけで堅牢なパイプラインを構築できます。

3. プラグインベースの拡張性

Nano Banana は、プラグイン機構により柔軟に機能を拡張できます。

#プラグインタイプ用途
1Sourceデータソースからの抽出(DB、API、ファイルなど)
2Transformデータ変換・クレンジング・集計
3Destinationデータの格納先(DWH、DB、ファイルなど)
4Middleware認証・ログ・メトリクス収集などの横断的機能
typescript// カスタムプラグインの作成例
import { Transform } from 'nano-banana';

// 独自の変換ロジックを実装
class CustomSentimentAnalysis extends Transform {
  async process(record) {
    // 外部のセンチメント分析APIを呼び出し
    const sentiment = await this.analyzeSentiment(
      record.text
    );

    return {
      ...record,
      sentiment_score: sentiment.score,
      sentiment_label: sentiment.label,
    };
  }

  private async analyzeSentiment(text: string) {
    // センチメント分析の実装
    const response = await fetch(
      'https://api.sentiment.com/analyze',
      {
        method: 'POST',
        body: JSON.stringify({ text }),
      }
    );
    return response.json();
  }
}
typescript// カスタムプラグインの利用
const pipeline = new Pipeline({
  name: 'sentiment-pipeline',
})
  .from(tweetSource)
  .transform(new CustomSentimentAnalysis())
  .to(bigQueryDestination);

このように、ビジネス固有のロジックも簡単に組み込めます。

4. サーバーレス対応とスケーラビリティ

Nano Banana は、サーバーレス環境での実行に最適化されています。

typescript// AWS Lambda での実行例
export const handler = async (event, context) => {
  const pipeline = new Pipeline({
    name: 'lambda-etl-pipeline',
    mode: 'serverless', // サーバーレスモードで実行
  });

  // パイプラインの定義
  await pipeline
    .from(s3Source)
    .transform(cleanData)
    .transform(enrichData)
    .to(dynamoDbDestination)
    .run();

  return {
    statusCode: 200,
    body: JSON.stringify({
      message: 'Pipeline executed successfully',
    }),
  };
};
yaml# Google Cloud Functions での設定例(functions.yaml)
functions:
  - name: daily-etl
    runtime: nodejs20
    entryPoint: runPipeline
    trigger:
      schedule: '0 2 * * *' # 毎日午前2時
    environment:
      DB_HOST: ${DB_HOST}
      BIGQUERY_PROJECT: ${PROJECT_ID}

サーバーレス環境で実行することで、インフラ管理が不要になり、使った分だけの課金となるためコストも最適化されます。

5. リアルタイムモニタリングとオブザーバビリティ

Nano Banana には、パイプラインの実行状況を可視化する機能が組み込まれています。

typescript// モニタリング設定
const pipeline = new Pipeline({
  name: 'production-pipeline',
  monitoring: {
    // メトリクスの送信先
    metrics: {
      provider: 'datadog',
      apiKey: process.env.DATADOG_API_KEY,
      tags: ['env:production', 'team:data-engineering'],
    },
    // ログの出力設定
    logging: {
      level: 'info', // debug | info | warn | error
      format: 'json',
      destination: 'cloudwatch',
    },
  },
});
typescript// カスタムメトリクスの記録
pipeline.on('record.processed', (record, metrics) => {
  // 処理されたレコード数を記録
  metrics.increment('records.processed');

  // 処理時間を記録
  metrics.histogram(
    'processing.duration',
    metrics.duration
  );

  // エラー率を記録
  if (record.hasError) {
    metrics.increment('records.error');
  }
});

以下の図は、Nano Banana のアーキテクチャと主要コンポーネントを示しています。

mermaidflowchart TB
  subgraph Sources["データソース層"]
    db[("PostgreSQL<br/>MySQL")]
    api["REST API<br/>GraphQL"]
    files["S3 / GCS<br/>ファイル"]
    stream["Kafka<br/>Kinesis"]
  end

  subgraph Pipeline["Nano Banana パイプライン"]
    extract["Extract<br/>プラグイン"]
    transform["Transform<br/>プラグイン"]
    load["Load<br/>プラグイン"]

    extract --> transform
    transform --> load

    middleware["Middleware<br/>認証・ログ・メトリクス"]
    middleware -.->|横断的処理| extract
    middleware -.->|横断的処理| transform
    middleware -.->|横断的処理| load
  end

  subgraph Destinations["格納先層"]
    dwh[("BigQuery<br/>Snowflake")]
    datastore[("DynamoDB<br/>MongoDB")]
    cache["Redis<br/>Memcached"]
  end

  subgraph Monitoring["監視層"]
    metrics["Datadog<br/>CloudWatch"]
    logs["ログ集約<br/>Elasticsearch"]
  end

  db --> extract
  api --> extract
  files --> extract
  stream --> extract

  load --> dwh
  load --> datastore
  load --> cache

  Pipeline -.->|メトリクス| metrics
  Pipeline -.->|ログ| logs

  style Pipeline fill:#f9d71c

図で理解できる要点:

  • プラグインベースの設計により、様々なデータソースと格納先に対応
  • Middleware によって横断的な機能(認証、ログ、メトリクス)を一元管理
  • 監視層と連携して、リアルタイムでパイプラインの状態を把握可能

ETL と ELT の両方に対応

Nano Banana は、ETL と ELT の両方のパターンに対応しています。

typescript// ETL パターン(変換してから格納)
const etlPipeline = new Pipeline({ name: 'etl-pattern' })
  .from(postgresSource) // Extract
  .transform(cleanData) // Transform(DWH外で実行)
  .transform(enrichData)
  .transform(aggregate)
  .to(bigQueryDestination); // Load

await etlPipeline.run();
typescript// ELT パターン(生データを格納してから変換)
const eltPipeline = new Pipeline({ name: 'elt-pattern' })
  .from(apiSource) // Extract
  .to(bigQueryDestination) // Load(生データをそのまま格納)
  .transform(
    // Transform(BigQuery内でSQL実行)
    new Transform.SQL({
      warehouse: 'bigquery',
      query: `
        CREATE OR REPLACE TABLE analytics.users_cleaned AS
        SELECT
          id,
          LOWER(email) as email,
          TIMESTAMP(created_at) as created_at
        FROM raw.users
        WHERE email IS NOT NULL
      `,
    })
  );

await eltPipeline.run();

プロジェクトの要件に応じて、最適なアプローチを選択できます。

具体例

Nano Banana を使った実装例

ここでは、実際のビジネスシーンを想定した具体的な実装例をご紹介します。

具体例 1:E コマースサイトの売上データパイプライン

オンラインショップの売上データを、PostgreSQL から BigQuery に取り込み、日次で集計するパイプラインです。

typescript// プロジェクトのセットアップ
// package.json の dependencies に追加
// "nano-banana": "^1.0.0"
// "nano-banana-postgres": "^1.0.0"
// "nano-banana-bigquery": "^1.0.0"
typescript// パイプラインの実装(src/pipelines/sales-pipeline.ts)
import {
  Pipeline,
  Source,
  Transform,
  Destination,
} from 'nano-banana';
import { config } from '../config';

// ステップ1: パイプラインの初期化
const pipeline = new Pipeline({
  name: 'daily-sales-pipeline',
  description:
    '売上データを PostgreSQL から BigQuery に同期',
  schedule: '0 3 * * *', // 毎日午前3時に実行
});
typescript// ステップ2: データソースの定義
const salesSource = new Source.PostgreSQL({
  connection: {
    host: config.postgres.host,
    port: 5432,
    database: 'ecommerce',
    user: config.postgres.user,
    password: config.postgres.password,
  },
  // 増分抽出(前回実行以降の新しいデータのみ)
  query: `
    SELECT
      order_id,
      user_id,
      product_id,
      quantity,
      unit_price,
      total_amount,
      order_date,
      updated_at
    FROM orders
    WHERE updated_at > :last_run_time
    ORDER BY updated_at ASC
  `,
  // インクリメンタル抽出の設定
  incremental: {
    column: 'updated_at',
    mode: 'append', // append | merge | replace
  },
});
typescript// ステップ3: データ変換の定義
const cleanTransform = new Transform.Map({
  // スキーマ定義(型チェック)
  schema: {
    order_id: 'string',
    user_id: 'string',
    product_id: 'string',
    quantity: 'number',
    unit_price: 'number',
    total_amount: 'number',
    order_date: 'datetime',
  },
  // フィールドごとの変換ロジック
  transforms: {
    // 金額を小数点2桁に丸める
    unit_price: (value) => Math.round(value * 100) / 100,
    total_amount: (value) => Math.round(value * 100) / 100,
    // 日時をUTCに統一
    order_date: (value) => new Date(value).toISOString(),
  },
  // バリデーション
  validate: {
    quantity: (value) => value > 0,
    total_amount: (value) => value >= 0,
  },
});
typescript// ステップ4: データの整形と追加計算
const enrichTransform = new Transform.Compute({
  // 新しいフィールドの計算
  compute: {
    // 消費税額を計算(10%)
    tax_amount: (record) =>
      Math.round(record.total_amount * 0.1 * 100) / 100,
    // 税込み合計金額
    total_with_tax: (record) =>
      Math.round(record.total_amount * 1.1 * 100) / 100,
    // 処理日時を追加
    processed_at: () => new Date().toISOString(),
  },
});
typescript// ステップ5: 格納先の定義
const bigQueryDestination = new Destination.BigQuery({
  project: config.bigquery.projectId,
  dataset: 'analytics',
  table: 'sales_daily',
  // データの書き込みモード
  writeMode: 'append', // append | truncate | merge
  // マージキー(重複排除)
  mergeKeys: ['order_id'],
  // パーティショニング設定
  partition: {
    field: 'order_date',
    type: 'day',
  },
});
typescript// ステップ6: パイプラインの組み立てと実行
export async function runSalesPipeline() {
  try {
    await pipeline
      .from(salesSource) // データ抽出
      .transform(cleanTransform) // データクレンジング
      .transform(enrichTransform) // データ整形
      .to(bigQueryDestination) // データ格納
      .run();

    console.log('Sales pipeline completed successfully');
  } catch (error) {
    console.error('Pipeline failed:', error);
    throw error;
  }
}

このパイプラインにより、PostgreSQL の売上データが毎日自動的に BigQuery に同期されます。

具体例 2:REST API からのデータ収集パイプライン

外部 API からデータを取得し、変換して DynamoDB に格納するパイプラインです。

typescript// API データ収集パイプライン(src/pipelines/api-pipeline.ts)
import {
  Pipeline,
  Source,
  Transform,
  Destination,
} from 'nano-banana';

// ステップ1: API ソースの定義
const apiSource = new Source.RestAPI({
  baseUrl: 'https://api.example.com/v1',
  endpoints: [
    {
      path: '/users',
      method: 'GET',
      // ページネーション設定
      pagination: {
        type: 'offset', // offset | cursor | page
        limit: 100,
        offsetParam: 'offset',
        limitParam: 'limit',
      },
    },
  ],
  // 認証設定
  authentication: {
    type: 'bearer',
    token: process.env.API_TOKEN,
  },
  // レート制限の設定
  rateLimit: {
    requests: 100,
    per: 'minute',
  },
  // リトライ設定
  retry: {
    maxAttempts: 5,
    retryOn: [429, 500, 502, 503, 504],
    backoff: 'exponential',
  },
});
typescript// ステップ2: ネストされたデータの平坦化
const flattenTransform = new Transform.Flatten({
  // ネストされたオブジェクトを平坦化
  fields: {
    'user.profile.name': 'user_name',
    'user.profile.email': 'user_email',
    'user.address.city': 'city',
    'user.address.country': 'country',
  },
  // 配列の処理
  arrays: {
    'user.tags': {
      mode: 'join', // join | explode
      separator: ',',
      outputField: 'tags_list',
    },
  },
});
typescript// ステップ3: データの集約
const aggregateTransform = new Transform.Aggregate({
  groupBy: ['country', 'city'],
  aggregations: {
    user_count: { field: 'user_id', function: 'count' },
    avg_age: { field: 'age', function: 'avg' },
    total_purchases: {
      field: 'purchase_amount',
      function: 'sum',
    },
  },
});
typescript// ステップ4: DynamoDB への格納
const dynamoDestination = new Destination.DynamoDB({
  region: 'ap-northeast-1',
  tableName: 'UserAnalytics',
  // プライマリキーの設定
  primaryKey: {
    partitionKey: 'country',
    sortKey: 'city',
  },
  // バッチ書き込み設定
  batchSize: 25, // DynamoDBの制限に合わせる
});
typescript// ステップ5: パイプラインの実行
const apiPipeline = new Pipeline({
  name: 'api-collection-pipeline',
})
  .from(apiSource)
  .transform(flattenTransform)
  .transform(aggregateTransform)
  .to(dynamoDestination);

export async function runApiPipeline() {
  const result = await apiPipeline.run();

  console.log(
    `Processed ${result.recordsProcessed} records`
  );
  console.log(`Inserted ${result.recordsInserted} records`);
  console.log(`Failed ${result.recordsFailed} records`);

  return result;
}

以下の図は、API データ収集パイプラインの処理フローを示しています。

mermaidsequenceDiagram
  participant API as External API
  participant NB as Nano Banana
  participant Transform as Transform Engine
  participant DDB as DynamoDB

  loop ページネーション
    NB->>API: GET /users?offset=0&limit=100
    API-->>NB: ユーザーデータ(100件)
    Note over NB: レート制限に従って待機
  end

  NB->>Transform: 取得したデータを送信
  Transform->>Transform: データ平坦化
  Transform->>Transform: データ集約
  Transform-->>NB: 変換済みデータ

  loop バッチ書き込み
    NB->>DDB: BatchWriteItem(25件ずつ)
    DDB-->>NB: 書き込み成功
  end

  Note over NB,DDB: パイプライン完了

図で理解できる要点:

  • API のページネーションに対応し、全データを自動的に取得
  • レート制限を守りながら効率的にデータ収集
  • データ変換とバッチ書き込みで DynamoDB への格納を最適化

具体例 3:リアルタイムストリーム処理パイプライン

Kafka からのストリームデータをリアルタイムで処理するパイプラインです。

typescript// リアルタイムストリームパイプライン(src/pipelines/stream-pipeline.ts)
import {
  Pipeline,
  Source,
  Transform,
  Destination,
} from 'nano-banana';

// ステップ1: Kafka ソースの定義
const kafkaSource = new Source.Kafka({
  brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
  topic: 'user-events',
  groupId: 'nano-banana-consumer',
  // 開始位置
  fromBeginning: false,
  // デシリアライゼーション
  deserializer: 'json', // json | avro | protobuf
});
typescript// ステップ2: ウィンドウ集計
const windowTransform = new Transform.Window({
  // タイムウィンドウの設定
  window: {
    type: 'tumbling', // tumbling | sliding | session
    size: '5m', // 5分間のウィンドウ
    timeField: 'event_time',
  },
  // ウィンドウごとの集計
  aggregations: {
    event_count: { function: 'count' },
    unique_users: {
      field: 'user_id',
      function: 'count_distinct',
    },
    total_value: { field: 'value', function: 'sum' },
  },
});
typescript// ステップ3: アラート条件の判定
const alertTransform = new Transform.Filter({
  condition: (record) => {
    // 閾値を超えたらアラート
    return (
      record.event_count > 1000 ||
      record.total_value > 100000
    );
  },
  // アラート発火時のアクション
  onMatch: async (record) => {
    await sendAlert({
      channel: '#real-time-alerts',
      message: `異常な活動を検知: ${record.event_count} events`,
      data: record,
    });
  },
});
typescript// ステップ4: 複数の格納先への並列書き込み
const redisDestination = new Destination.Redis({
  host: 'redis.example.com',
  port: 6379,
  // キャッシュの有効期限
  ttl: 3600, // 1時間
  keyPattern: 'metrics:{{window_start}}',
});

const timeseriesDestination = new Destination.TimescaleDB({
  connection: {
    host: 'timescaledb.example.com',
    database: 'metrics',
  },
  table: 'real_time_metrics',
  // タイムシリーズの設定
  hypertable: {
    timeColumn: 'window_start',
    chunkInterval: '1 day',
  },
});
typescript// ステップ5: パイプラインの実行(並列処理)
const streamPipeline = new Pipeline({
  name: 'real-time-stream-pipeline',
  mode: 'streaming', // batch | streaming
})
  .from(kafkaSource)
  .transform(windowTransform)
  .transform(alertTransform)
  .to([redisDestination, timeseriesDestination]); // 並列書き込み

export async function startStreamPipeline() {
  // ストリーミングモードで起動(継続実行)
  await streamPipeline.start();

  console.log('Stream pipeline started');

  // Graceful shutdown の設定
  process.on('SIGTERM', async () => {
    console.log('Shutting down stream pipeline...');
    await streamPipeline.stop();
    process.exit(0);
  });
}

具体例 4:エラーハンドリングとモニタリングの実装

本番環境で堅牢に動作するパイプラインには、適切なエラーハンドリングとモニタリングが不可欠です。

typescript// 本番環境向けパイプライン設定(src/pipelines/production-pipeline.ts)
import {
  Pipeline,
  Source,
  Transform,
  Destination,
} from 'nano-banana';
import { Datadog } from '@datadog/datadog-api-client';

// ステップ1: 詳細なエラーハンドリング設定
const pipeline = new Pipeline({
  name: 'production-etl-pipeline',
  errorHandling: {
    // リトライ戦略
    retry: {
      maxAttempts: 3,
      backoff: 'exponential',
      initialDelay: 1000,
      maxDelay: 60000,
      // リトライ可能なエラーを定義
      retryableErrors: [
        'ECONNRESET',
        'ETIMEDOUT',
        'ENOTFOUND',
      ],
    },
    // エラー発生時のコールバック
    onError: async (error, context) => {
      // Slackに通知
      await notifySlack({
        channel: '#data-alerts',
        level: 'error',
        message: `Pipeline Error: ${error.message}`,
        context: {
          pipeline: context.pipelineName,
          step: context.stepName,
          timestamp: new Date().toISOString(),
        },
      });

      // Datadogにイベント送信
      await sendDatadogEvent({
        title: 'Pipeline Error',
        text: error.message,
        tags: [
          `pipeline:${context.pipelineName}`,
          'severity:high',
        ],
      });
    },
    // 部分的な失敗を許容
    continueOnError: true,
    // エラーレコードの保存
    deadLetterQueue: {
      destination: new Destination.S3({
        bucket: 'etl-errors',
        prefix: 'failed-records/',
      }),
    },
  },
});
typescript// ステップ2: 詳細なモニタリング設定
pipeline.monitoring({
  // メトリクスの送信
  metrics: {
    provider: 'datadog',
    apiKey: process.env.DATADOG_API_KEY,
    tags: [
      'env:production',
      'team:data-engineering',
      'service:etl-pipeline',
    ],
  },
  // ログの設定
  logging: {
    level: 'info',
    format: 'json',
    destination: 'cloudwatch',
    logGroup: '/aws/lambda/etl-pipelines',
  },
  // SLA 監視
  sla: {
    // 処理時間の閾値
    maxDuration: 3600000, // 1時間
    // 最小スループット
    minRecordsPerSecond: 100,
    // エラー率の閾値
    maxErrorRate: 0.01, // 1%
    // 閾値超過時のアラート
    onSlaViolation: async (violation) => {
      await sendPagerDutyAlert({
        severity: 'high',
        message: `SLA violation: ${violation.type}`,
        details: violation,
      });
    },
  },
});
typescript// ステップ3: カスタムメトリクスの記録
pipeline.on('pipeline.start', () => {
  console.log('Pipeline started');
  metrics.increment('pipeline.start', {
    tags: ['pipeline:production'],
  });
});

pipeline.on('pipeline.complete', (result) => {
  console.log('Pipeline completed successfully');

  // 処理時間を記録
  metrics.histogram('pipeline.duration', result.duration, {
    tags: ['pipeline:production'],
  });

  // 処理レコード数を記録
  metrics.gauge(
    'pipeline.records.processed',
    result.recordsProcessed
  );
  metrics.gauge(
    'pipeline.records.failed',
    result.recordsFailed
  );

  // 成功率を計算
  const successRate =
    (result.recordsProcessed - result.recordsFailed) /
    result.recordsProcessed;
  metrics.gauge('pipeline.success_rate', successRate);
});
typescript// ステップ4: データ品質チェック
const qualityCheck = new Transform.Validate({
  rules: [
    {
      name: 'email-format',
      field: 'email',
      check: (value) =>
        /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value),
      severity: 'error', // error | warning | info
    },
    {
      name: 'age-range',
      field: 'age',
      check: (value) => value >= 0 && value <= 150,
      severity: 'warning',
    },
    {
      name: 'required-fields',
      check: (record) => {
        return ['id', 'name', 'email'].every(
          (field) => record[field] != null
        );
      },
      severity: 'error',
    },
  ],
  // バリデーション失敗時の処理
  onValidationFail: (record, failedRules) => {
    console.warn(
      `Validation failed for record ${record.id}:`,
      failedRules
    );

    // エラーの重要度に応じて処理を分岐
    const hasError = failedRules.some(
      (rule) => rule.severity === 'error'
    );

    if (hasError) {
      // エラーの場合は処理をスキップ
      return { action: 'skip', record };
    } else {
      // 警告の場合は処理を継続
      return { action: 'continue', record };
    }
  },
});
typescript// ステップ5: 統合されたパイプライン
export async function runProductionPipeline() {
  const startTime = Date.now();

  try {
    const result = await pipeline
      .from(dataSource)
      .transform(qualityCheck) // データ品質チェック
      .transform(cleanTransform)
      .transform(enrichTransform)
      .to(destination)
      .run();

    const duration = Date.now() - startTime;

    // 実行サマリーをログ出力
    console.log('Pipeline execution summary:', {
      duration: `${duration}ms`,
      recordsProcessed: result.recordsProcessed,
      recordsFailed: result.recordsFailed,
      successRate: `${(result.successRate * 100).toFixed(
        2
      )}%`,
    });

    return result;
  } catch (error) {
    console.error('Pipeline execution failed:', error);
    throw error;
  }
}

以下の図は、本番環境でのパイプライン実行フローを示しています。

mermaidflowchart TB
  start["パイプライン開始"] --> extract["データ抽出"]
  extract --> validate["データ品質<br/>チェック"]

  validate -->|OK| transform["データ変換"]
  validate -->|NG| dlq["Dead Letter Queue<br/>S3に保存"]

  transform --> load["データ格納"]
  load -->|成功| metrics1["メトリクス記録<br/>Datadog"]
  load -->|失敗| retry{"リトライ<br/>判定"}

  retry -->|リトライ可能| backoff["待機<br/>Exponential Backoff"]
  backoff --> load
  retry -->|リトライ不可| dlq

  metrics1 --> check{"SLA<br/>チェック"}
  check -->|OK| success["正常終了"]
  check -->|NG| alert["アラート送信<br/>Slack / PagerDuty"]
  alert --> success

  dlq -.->|通知| alert2["エラー通知<br/>Slack"]

  style success fill:#ccffcc
  style dlq fill:#ffcccc
  style alert fill:#ffe4b5
  style alert2 fill:#ffe4b5

図で理解できる要点:

  • データ品質チェックで不正なデータを早期に検出
  • エラー発生時はリトライ可能かを判定し、適切に対応
  • リトライ不可能なエラーは Dead Letter Queue に保存して後で分析
  • SLA 監視により、パフォーマンス低下を即座に検知

パフォーマンスの最適化

大量データを扱う場合、以下のテクニックでパフォーマンスを向上できます。

typescript// パフォーマンス最適化の設定
const optimizedPipeline = new Pipeline({
  name: 'high-performance-pipeline',
  performance: {
    // 並列処理数
    concurrency: 10, // 同時に10件処理
    // バッチサイズ
    batchSize: 1000, // 1000件ずつまとめて処理
    // メモリ制限
    maxMemory: '2GB',
    // タイムアウト
    timeout: 300000, // 5分
  },
});
typescript// バルクインサートによる書き込み最適化
const bulkDestination = new Destination.PostgreSQL({
  connection: dbConfig,
  table: 'large_table',
  // バルクインサートを使用
  writeMode: 'bulk',
  bulkSize: 10000, // 10,000件ずつまとめて挿入
  // 一時テーブルを使った高速書き込み
  useTempTable: true,
});
#最適化手法効果適用シーン
1並列処理処理時間を 1/N に短縮CPU バウンドな変換処理
2バッチ処理I/O 回数を削減、スループット向上データベース書き込み
3ストリーム処理メモリ使用量を削減大量データの処理
4インデックス活用クエリ速度を大幅向上データベースからの抽出
5圧縮ネットワーク転送量を削減リモートストレージへの書き込み

これらの最適化手法を組み合わせることで、大規模データでも効率的に処理できます。

まとめ

Nano Banana は、モダンなデータパイプラインツールとして、従来のツールが抱えていた課題を解決する革新的なフレームワークです。

Nano Banana の主要な特徴

本記事でご紹介した主要なポイントを振り返ります。

5 つの主要機能

  1. 直感的な宣言的 API - シンプルで読みやすいコード、少ない学習コスト
  2. ビルトインのエラーハンドリング - リトライ、Dead Letter Queue など堅牢な仕組み
  3. プラグインベースの拡張性 - ビジネス固有のロジックも柔軟に組み込める
  4. サーバーレス対応 - インフラ管理不要、コスト最適化
  5. リアルタイムモニタリング - Datadog、CloudWatch などと連携して可視化

ETL と ELT の両方に対応

  • ETL パターン:変換処理を格納前に実行(従来型、計算リソースの分散)
  • ELT パターン:生データを格納後に変換(モダン、DWH の計算能力を活用)
  • プロジェクトの要件に応じて最適なアプローチを選択可能

実運用に必要な機能が揃っている

  • データ品質チェック(バリデーション)
  • エラーハンドリングとリトライ
  • Dead Letter Queue(失敗レコードの保存)
  • SLA 監視とアラート
  • 詳細なメトリクスとログ出力

コスト削減と開発効率の向上

Nano Banana を使うことで、以下のようなメリットが得られます。

コスト面

  • フルマネージドサービスと比較して、月額料金が大幅に削減(数十万円 → 数千円)
  • サーバーレス実行により、使った分だけの課金で無駄がない
  • 運用コストの削減(インフラ管理が不要)

開発効率

  • 複雑な設定ファイルが不要で、TypeScript/JavaScript で直感的に記述
  • プラグインの組み合わせで素早く実装
  • エラーハンドリングやモニタリングが標準で組み込まれており、追加実装が少ない

保守性

  • コードが読みやすく、チームメンバーが理解しやすい
  • テストが書きやすい設計
  • プラグインの入れ替えが容易で、要件変更に柔軟に対応

データパイプラインのベストプラクティス

Nano Banana を使ったパイプライン構築では、以下のベストプラクティスを意識しましょう。

設計原則

  1. 冪等性を保つ - 同じ処理を複数回実行しても結果が変わらないように設計
  2. 段階的な変換 - 複雑な変換を小さなステップに分割し、デバッグしやすくする
  3. データ品質チェック - 早い段階で不正なデータを検出し、エラーを防ぐ
  4. インクリメンタル処理 - 全データではなく、差分だけを処理してパフォーマンス向上

運用のポイント

  • アラート設定を適切に行い、問題を早期に検知
  • Dead Letter Queue を活用し、失敗したデータを後で分析・再処理
  • メトリクスを継続的に監視し、パフォーマンス低下やエラー率の上昇に気づく
  • ドキュメントを整備し、チームメンバーがパイプラインを理解できるようにする

今後の拡張性

Nano Banana は、以下のような拡張も可能です。

  • 機械学習パイプライン - データ前処理、特徴量エンジニアリング、モデル推論
  • CDC(Change Data Capture) - データベースの変更をリアルタイムで検知・同期
  • データカタログ連携 - メタデータ管理ツールとの統合
  • データリネージ - データの流れを自動的に追跡・可視化

はじめの一歩を踏み出しましょう

Nano Banana は、オープンソースプロジェクトとして公開されており、誰でもすぐに試すことができます。

bash# Yarn でインストール
yarn add nano-banana

# 基本的なプラグインも一緒にインストール
yarn add nano-banana-postgres nano-banana-bigquery

まずは小さなパイプラインから始めて、徐々に機能を拡張していくことをおすすめします。シンプルな API と充実したドキュメントにより、初心者の方でもスムーズに始められるでしょう。

データパイプラインは、データ駆動型ビジネスの基盤です。Nano Banana を活用して、効率的で堅牢なデータ基盤を構築し、ビジネスの成長を加速させましょう。

関連リンク