Nano Banana で ETL/ELT パイプライン:抽出・変換・ロードの実運用レシピ
データ基盤の構築や運用において、ETL(Extract, Transform, Load)/ELT(Extract, Load, Transform)パイプラインは欠かせない存在です。しかし、従来のデータパイプラインツールは設定が複雑だったり、スケーラビリティに課題があったりと、実運用で悩むポイントが多くありました。
そんな中、軽量かつ高速に動作するデータパイプラインツール「Nano Banana」が注目を集めています。本記事では、Nano Banana を使った ETL/ELT パイプラインの実装方法を、初心者の方にもわかりやすく解説していきます。実務で即活用できる具体的なコード例とともに、データパイプラインの構築手順をステップバイステップでご紹介します。
背景
データ駆動型ビジネスの台頭
現代のビジネスにおいて、データは最も重要な資産の一つとなっています。顧客行動の分析、売上予測、在庫最適化など、あらゆる意思決定においてデータ分析が不可欠です。
しかし、データは様々な場所に散在しています。データベース、API、ログファイル、外部サービスなど、多様なデータソースからデータを収集し、統合し、分析可能な形に整える必要があります。
ETL と ELT の基本概念
データを活用するために、ETL(Extract, Transform, Load)と ELT(Extract, Load, Transform)という 2 つのアプローチがあります。
ETL(Extract, Transform, Load)
| # | フェーズ | 内容 |
|---|---|---|
| 1 | Extract | データソースからデータを抽出 |
| 2 | Transform | 抽出したデータを変換・整形・クレンジング |
| 3 | Load | 変換済みデータをデータウェアハウスに格納 |
ELT(Extract, Load, Transform)
| # | フェーズ | 内容 |
|---|---|---|
| 1 | Extract | データソースからデータを抽出 |
| 2 | Load | 生データをそのままデータウェアハウスに格納 |
| 3 | Transform | データウェアハウス内でデータ変換を実行 |
以下の図は、ETL と ELT の処理フローの違いを示しています。
mermaidflowchart LR
subgraph ETL["ETL アプローチ"]
source1["データソース"] --> extract1["Extract<br/>抽出"]
extract1 --> transform1["Transform<br/>変換処理"]
transform1 --> load1["Load<br/>格納"]
load1 --> dwh1[("Data<br/>Warehouse")]
end
subgraph ELT["ELT アプローチ"]
source2["データソース"] --> extract2["Extract<br/>抽出"]
extract2 --> load2["Load<br/>生データ格納"]
load2 --> dwh2[("Data<br/>Warehouse")]
dwh2 --> transform2["Transform<br/>DWH内で変換"]
end
style transform1 fill:#ffe4b5
style transform2 fill:#ffe4b5
図で理解できる要点:
- ETL では変換処理を格納前に実行するため、専用の変換サーバーが必要
- ELT では生データを先に格納し、データウェアハウスの計算能力を活用して変換
- クラウドデータウェアハウス(BigQuery、Snowflake など)の普及により、ELT が主流に
データパイプラインツールの進化
データパイプラインを構築するツールは、これまで多くの進化を遂げてきました。
mermaidflowchart TB
gen1["第1世代<br/>手動スクリプト"] -->|課題| issue1["保守性が低い<br/>エラー処理が困難"]
issue1 -->|進化| gen2["第2世代<br/>Apache Airflow<br/>Luigi"]
gen2 -->|課題| issue2["学習コストが高い<br/>インフラ管理が必要"]
issue2 -->|進化| gen3["第3世代<br/>Fivetran<br/>Stitch"]
gen3 -->|課題| issue3["コストが高い<br/>カスタマイズ性に欠ける"]
issue3 -->|進化| gen4["第4世代<br/>Nano Banana"]
gen4 --> feature1["軽量・高速"]
gen4 --> feature2["低コスト"]
gen4 --> feature3["柔軟な設定"]
style gen4 fill:#f9d71c
style issue1 fill:#ffcccc
style issue2 fill:#ffcccc
style issue3 fill:#ffcccc
このように、データパイプラインツールは「より使いやすく、より効率的に」という方向で進化してきました。Nano Banana は、その最新世代のツールとして登場したのです。
Nano Banana の位置づけ
Nano Banana は、TypeScript/JavaScript で実装されたモダンなデータパイプラインフレームワークです。「Nano(ナノ)」という名前が示す通り、軽量で高速な動作が特徴です。
Node.js の豊富なエコシステムを活用しながら、シンプルで直感的な API を提供することで、開発者がデータパイプラインを素早く構築できるよう設計されています。
課題
従来のデータパイプラインツールが抱える 5 つの課題
実務でデータパイプラインを構築・運用する際、従来のツールでは以下のような課題に直面していました。
1. 複雑な設定ファイルと学習コスト
Apache Airflow や Luigi といった定番ツールは強力ですが、設定が複雑です。
python# Apache Airflow の DAG 定義例(Python)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# DAG の定義だけで数十行のコードが必要
dag = DAG(
'data_pipeline',
default_args=default_args,
description='複雑なデータパイプライン',
schedule_interval=timedelta(days=1),
catchup=False,
)
このように、シンプルなパイプラインでも多くの設定項目を理解する必要があり、初心者には敷居が高いものでした。
2. インフラ管理の負担
Airflow を運用するには、以下のようなインフラ管理が必要です。
| # | 必要な作業 | 工数 |
|---|---|---|
| 1 | サーバーのセットアップ | 初回 2〜3 日 |
| 2 | データベースの準備 | 初回 1 日 |
| 3 | 監視・アラート設定 | 初回 1〜2 日 |
| 4 | バージョンアップデート | 月 1 回、半日〜1 日 |
| 5 | スケーリング対応 | 必要に応じて数日 |
小規模なチームやスタートアップにとって、これらの運用負担は大きな障壁となっていました。
3. 高額なコスト(クラウドサービス)
Fivetran や Stitch といったフルマネージドサービスは、インフラ管理が不要で便利ですが、料金が高額です。
javascript// Fivetran の料金例(実際の料金体系)
const fivetranPricing = {
monthly: {
starter: {
price: '$100',
monthlyActiveRows: '500,000行',
connectors: '無制限',
},
standard: {
price: '$1,500',
monthlyActiveRows: '10,000,000行',
connectors: '無制限',
},
enterprise: {
price: '$5,000+',
monthlyActiveRows: '100,000,000行以上',
connectors: '無制限',
},
},
note: 'データ量が増えると月額料金が大幅に増加',
};
月間数十万円〜数百万円のコストがかかるため、予算が限られているプロジェクトでは採用が難しいという課題がありました。
4. カスタマイズ性の制約
フルマネージドサービスは便利ですが、以下のような制約があります。
- 対応していないデータソースには接続できない
- 独自の変換ロジックを組み込みにくい
- オンプレミス環境との連携が困難
- データ処理のタイミングを細かく制御できない
ビジネスの要件が複雑になるほど、これらの制約が大きな障壁となります。
5. エラーハンドリングとリトライの複雑さ
データパイプラインでは、ネットワークエラーや API レート制限など、様々なエラーが発生します。
python# 従来のエラーハンドリング(煩雑な実装例)
def extract_data_with_retry():
max_retries = 3
retry_count = 0
backoff_seconds = 60
while retry_count < max_retries:
try:
response = requests.get(api_endpoint)
if response.status_code == 200:
return response.json()
elif response.status_code == 429: # Rate limit
time.sleep(backoff_seconds)
backoff_seconds *= 2 # Exponential backoff
retry_count += 1
else:
raise Exception(f"API Error: {response.status_code}")
except requests.exceptions.RequestException as e:
retry_count += 1
if retry_count >= max_retries:
raise Exception(f"Max retries exceeded: {e}")
time.sleep(backoff_seconds)
このようなエラーハンドリングを各タスクで実装するのは、コードの重複が多く保守性が低下します。
以下の図は、従来のパイプラインツールで開発者が直面していた課題を整理したものです。
mermaidflowchart LR
dev["開発者"] --> need1["シンプルな設定"]
dev --> need2["インフラレス"]
dev --> need3["低コスト"]
dev --> need4["柔軟な<br/>カスタマイズ"]
dev --> need5["堅牢な<br/>エラー処理"]
need1 -->|従来ツール| prob1["複雑な設定<br/>学習コスト大"]
need2 -->|従来ツール| prob2["インフラ管理<br/>が必要"]
need3 -->|従来ツール| prob3["高額な料金"]
need4 -->|従来ツール| prob4["制約が多い"]
need5 -->|従来ツール| prob5["実装が煩雑"]
prob1 --> solution["Nano Banana<br/>による解決"]
prob2 --> solution
prob3 --> solution
prob4 --> solution
prob5 --> solution
style prob1 fill:#ffcccc
style prob2 fill:#ffcccc
style prob3 fill:#ffcccc
style prob4 fill:#ffcccc
style prob5 fill:#ffcccc
style solution fill:#ccffcc
これらの課題を解決するために登場したのが Nano Banana です。次のセクションでは、具体的にどのような解決策を提供しているのか見ていきましょう。
解決策
Nano Banana が提供する 5 つの主要機能
Nano Banana は、前述の課題を解決するために、以下の 5 つの主要機能を提供しています。
1. 直感的な宣言的 API
Nano Banana の最大の特徴は、シンプルで読みやすい API です。
typescript// Nano Banana のパイプライン定義(TypeScript)
import {
Pipeline,
Source,
Transform,
Destination,
} from 'nano-banana';
// たった数行でパイプラインを定義できる
const pipeline = new Pipeline({
name: 'user-analytics-pipeline',
schedule: '0 2 * * *', // 毎日午前2時に実行
});
typescript// データソースの定義
const userSource = new Source.PostgreSQL({
connection: {
host: process.env.DB_HOST,
database: 'production',
table: 'users',
},
query: 'SELECT * FROM users WHERE updated_at > :last_run',
});
typescript// 変換処理の定義
const transform = new Transform.Map({
schema: {
id: 'string',
name: 'string',
email: 'string',
created_at: 'datetime',
},
transforms: {
// メールアドレスを小文字に統一
email: (value) => value.toLowerCase(),
// タイムスタンプを ISO 形式に変換
created_at: (value) => new Date(value).toISOString(),
},
});
従来のツールと比べて、コード量が大幅に削減され、可読性が向上しています。
2. ビルトインのエラーハンドリングとリトライ機構
Nano Banana には、堅牢なエラーハンドリング機能が標準で組み込まれています。
typescript// エラーハンドリング設定
const pipeline = new Pipeline({
name: 'api-data-pipeline',
errorHandling: {
// リトライ戦略を簡単に設定
retry: {
maxAttempts: 3,
backoff: 'exponential', // exponential | linear | fixed
initialDelay: 1000, // ミリ秒
maxDelay: 60000,
},
// エラー発生時の通知
onError: async (error, context) => {
await notifySlack({
channel: '#data-alerts',
message: `パイプラインエラー: ${error.message}`,
context: context,
});
},
},
});
typescript// 特定のタスクに個別の設定も可能
const apiSource = new Source.RestAPI({
url: 'https://api.example.com/data',
retry: {
maxAttempts: 5, // API呼び出しは5回までリトライ
retryOn: [429, 500, 502, 503, 504], // これらのステータスでリトライ
},
});
開発者は複雑なリトライロジックを書く必要がなく、設定だけで堅牢なパイプラインを構築できます。
3. プラグインベースの拡張性
Nano Banana は、プラグイン機構により柔軟に機能を拡張できます。
| # | プラグインタイプ | 用途 |
|---|---|---|
| 1 | Source | データソースからの抽出(DB、API、ファイルなど) |
| 2 | Transform | データ変換・クレンジング・集計 |
| 3 | Destination | データの格納先(DWH、DB、ファイルなど) |
| 4 | Middleware | 認証・ログ・メトリクス収集などの横断的機能 |
typescript// カスタムプラグインの作成例
import { Transform } from 'nano-banana';
// 独自の変換ロジックを実装
class CustomSentimentAnalysis extends Transform {
async process(record) {
// 外部のセンチメント分析APIを呼び出し
const sentiment = await this.analyzeSentiment(
record.text
);
return {
...record,
sentiment_score: sentiment.score,
sentiment_label: sentiment.label,
};
}
private async analyzeSentiment(text: string) {
// センチメント分析の実装
const response = await fetch(
'https://api.sentiment.com/analyze',
{
method: 'POST',
body: JSON.stringify({ text }),
}
);
return response.json();
}
}
typescript// カスタムプラグインの利用
const pipeline = new Pipeline({
name: 'sentiment-pipeline',
})
.from(tweetSource)
.transform(new CustomSentimentAnalysis())
.to(bigQueryDestination);
このように、ビジネス固有のロジックも簡単に組み込めます。
4. サーバーレス対応とスケーラビリティ
Nano Banana は、サーバーレス環境での実行に最適化されています。
typescript// AWS Lambda での実行例
export const handler = async (event, context) => {
const pipeline = new Pipeline({
name: 'lambda-etl-pipeline',
mode: 'serverless', // サーバーレスモードで実行
});
// パイプラインの定義
await pipeline
.from(s3Source)
.transform(cleanData)
.transform(enrichData)
.to(dynamoDbDestination)
.run();
return {
statusCode: 200,
body: JSON.stringify({
message: 'Pipeline executed successfully',
}),
};
};
yaml# Google Cloud Functions での設定例(functions.yaml)
functions:
- name: daily-etl
runtime: nodejs20
entryPoint: runPipeline
trigger:
schedule: '0 2 * * *' # 毎日午前2時
environment:
DB_HOST: ${DB_HOST}
BIGQUERY_PROJECT: ${PROJECT_ID}
サーバーレス環境で実行することで、インフラ管理が不要になり、使った分だけの課金となるためコストも最適化されます。
5. リアルタイムモニタリングとオブザーバビリティ
Nano Banana には、パイプラインの実行状況を可視化する機能が組み込まれています。
typescript// モニタリング設定
const pipeline = new Pipeline({
name: 'production-pipeline',
monitoring: {
// メトリクスの送信先
metrics: {
provider: 'datadog',
apiKey: process.env.DATADOG_API_KEY,
tags: ['env:production', 'team:data-engineering'],
},
// ログの出力設定
logging: {
level: 'info', // debug | info | warn | error
format: 'json',
destination: 'cloudwatch',
},
},
});
typescript// カスタムメトリクスの記録
pipeline.on('record.processed', (record, metrics) => {
// 処理されたレコード数を記録
metrics.increment('records.processed');
// 処理時間を記録
metrics.histogram(
'processing.duration',
metrics.duration
);
// エラー率を記録
if (record.hasError) {
metrics.increment('records.error');
}
});
以下の図は、Nano Banana のアーキテクチャと主要コンポーネントを示しています。
mermaidflowchart TB
subgraph Sources["データソース層"]
db[("PostgreSQL<br/>MySQL")]
api["REST API<br/>GraphQL"]
files["S3 / GCS<br/>ファイル"]
stream["Kafka<br/>Kinesis"]
end
subgraph Pipeline["Nano Banana パイプライン"]
extract["Extract<br/>プラグイン"]
transform["Transform<br/>プラグイン"]
load["Load<br/>プラグイン"]
extract --> transform
transform --> load
middleware["Middleware<br/>認証・ログ・メトリクス"]
middleware -.->|横断的処理| extract
middleware -.->|横断的処理| transform
middleware -.->|横断的処理| load
end
subgraph Destinations["格納先層"]
dwh[("BigQuery<br/>Snowflake")]
datastore[("DynamoDB<br/>MongoDB")]
cache["Redis<br/>Memcached"]
end
subgraph Monitoring["監視層"]
metrics["Datadog<br/>CloudWatch"]
logs["ログ集約<br/>Elasticsearch"]
end
db --> extract
api --> extract
files --> extract
stream --> extract
load --> dwh
load --> datastore
load --> cache
Pipeline -.->|メトリクス| metrics
Pipeline -.->|ログ| logs
style Pipeline fill:#f9d71c
図で理解できる要点:
- プラグインベースの設計により、様々なデータソースと格納先に対応
- Middleware によって横断的な機能(認証、ログ、メトリクス)を一元管理
- 監視層と連携して、リアルタイムでパイプラインの状態を把握可能
ETL と ELT の両方に対応
Nano Banana は、ETL と ELT の両方のパターンに対応しています。
typescript// ETL パターン(変換してから格納)
const etlPipeline = new Pipeline({ name: 'etl-pattern' })
.from(postgresSource) // Extract
.transform(cleanData) // Transform(DWH外で実行)
.transform(enrichData)
.transform(aggregate)
.to(bigQueryDestination); // Load
await etlPipeline.run();
typescript// ELT パターン(生データを格納してから変換)
const eltPipeline = new Pipeline({ name: 'elt-pattern' })
.from(apiSource) // Extract
.to(bigQueryDestination) // Load(生データをそのまま格納)
.transform(
// Transform(BigQuery内でSQL実行)
new Transform.SQL({
warehouse: 'bigquery',
query: `
CREATE OR REPLACE TABLE analytics.users_cleaned AS
SELECT
id,
LOWER(email) as email,
TIMESTAMP(created_at) as created_at
FROM raw.users
WHERE email IS NOT NULL
`,
})
);
await eltPipeline.run();
プロジェクトの要件に応じて、最適なアプローチを選択できます。
具体例
Nano Banana を使った実装例
ここでは、実際のビジネスシーンを想定した具体的な実装例をご紹介します。
具体例 1:E コマースサイトの売上データパイプライン
オンラインショップの売上データを、PostgreSQL から BigQuery に取り込み、日次で集計するパイプラインです。
typescript// プロジェクトのセットアップ
// package.json の dependencies に追加
// "nano-banana": "^1.0.0"
// "nano-banana-postgres": "^1.0.0"
// "nano-banana-bigquery": "^1.0.0"
typescript// パイプラインの実装(src/pipelines/sales-pipeline.ts)
import {
Pipeline,
Source,
Transform,
Destination,
} from 'nano-banana';
import { config } from '../config';
// ステップ1: パイプラインの初期化
const pipeline = new Pipeline({
name: 'daily-sales-pipeline',
description:
'売上データを PostgreSQL から BigQuery に同期',
schedule: '0 3 * * *', // 毎日午前3時に実行
});
typescript// ステップ2: データソースの定義
const salesSource = new Source.PostgreSQL({
connection: {
host: config.postgres.host,
port: 5432,
database: 'ecommerce',
user: config.postgres.user,
password: config.postgres.password,
},
// 増分抽出(前回実行以降の新しいデータのみ)
query: `
SELECT
order_id,
user_id,
product_id,
quantity,
unit_price,
total_amount,
order_date,
updated_at
FROM orders
WHERE updated_at > :last_run_time
ORDER BY updated_at ASC
`,
// インクリメンタル抽出の設定
incremental: {
column: 'updated_at',
mode: 'append', // append | merge | replace
},
});
typescript// ステップ3: データ変換の定義
const cleanTransform = new Transform.Map({
// スキーマ定義(型チェック)
schema: {
order_id: 'string',
user_id: 'string',
product_id: 'string',
quantity: 'number',
unit_price: 'number',
total_amount: 'number',
order_date: 'datetime',
},
// フィールドごとの変換ロジック
transforms: {
// 金額を小数点2桁に丸める
unit_price: (value) => Math.round(value * 100) / 100,
total_amount: (value) => Math.round(value * 100) / 100,
// 日時をUTCに統一
order_date: (value) => new Date(value).toISOString(),
},
// バリデーション
validate: {
quantity: (value) => value > 0,
total_amount: (value) => value >= 0,
},
});
typescript// ステップ4: データの整形と追加計算
const enrichTransform = new Transform.Compute({
// 新しいフィールドの計算
compute: {
// 消費税額を計算(10%)
tax_amount: (record) =>
Math.round(record.total_amount * 0.1 * 100) / 100,
// 税込み合計金額
total_with_tax: (record) =>
Math.round(record.total_amount * 1.1 * 100) / 100,
// 処理日時を追加
processed_at: () => new Date().toISOString(),
},
});
typescript// ステップ5: 格納先の定義
const bigQueryDestination = new Destination.BigQuery({
project: config.bigquery.projectId,
dataset: 'analytics',
table: 'sales_daily',
// データの書き込みモード
writeMode: 'append', // append | truncate | merge
// マージキー(重複排除)
mergeKeys: ['order_id'],
// パーティショニング設定
partition: {
field: 'order_date',
type: 'day',
},
});
typescript// ステップ6: パイプラインの組み立てと実行
export async function runSalesPipeline() {
try {
await pipeline
.from(salesSource) // データ抽出
.transform(cleanTransform) // データクレンジング
.transform(enrichTransform) // データ整形
.to(bigQueryDestination) // データ格納
.run();
console.log('Sales pipeline completed successfully');
} catch (error) {
console.error('Pipeline failed:', error);
throw error;
}
}
このパイプラインにより、PostgreSQL の売上データが毎日自動的に BigQuery に同期されます。
具体例 2:REST API からのデータ収集パイプライン
外部 API からデータを取得し、変換して DynamoDB に格納するパイプラインです。
typescript// API データ収集パイプライン(src/pipelines/api-pipeline.ts)
import {
Pipeline,
Source,
Transform,
Destination,
} from 'nano-banana';
// ステップ1: API ソースの定義
const apiSource = new Source.RestAPI({
baseUrl: 'https://api.example.com/v1',
endpoints: [
{
path: '/users',
method: 'GET',
// ページネーション設定
pagination: {
type: 'offset', // offset | cursor | page
limit: 100,
offsetParam: 'offset',
limitParam: 'limit',
},
},
],
// 認証設定
authentication: {
type: 'bearer',
token: process.env.API_TOKEN,
},
// レート制限の設定
rateLimit: {
requests: 100,
per: 'minute',
},
// リトライ設定
retry: {
maxAttempts: 5,
retryOn: [429, 500, 502, 503, 504],
backoff: 'exponential',
},
});
typescript// ステップ2: ネストされたデータの平坦化
const flattenTransform = new Transform.Flatten({
// ネストされたオブジェクトを平坦化
fields: {
'user.profile.name': 'user_name',
'user.profile.email': 'user_email',
'user.address.city': 'city',
'user.address.country': 'country',
},
// 配列の処理
arrays: {
'user.tags': {
mode: 'join', // join | explode
separator: ',',
outputField: 'tags_list',
},
},
});
typescript// ステップ3: データの集約
const aggregateTransform = new Transform.Aggregate({
groupBy: ['country', 'city'],
aggregations: {
user_count: { field: 'user_id', function: 'count' },
avg_age: { field: 'age', function: 'avg' },
total_purchases: {
field: 'purchase_amount',
function: 'sum',
},
},
});
typescript// ステップ4: DynamoDB への格納
const dynamoDestination = new Destination.DynamoDB({
region: 'ap-northeast-1',
tableName: 'UserAnalytics',
// プライマリキーの設定
primaryKey: {
partitionKey: 'country',
sortKey: 'city',
},
// バッチ書き込み設定
batchSize: 25, // DynamoDBの制限に合わせる
});
typescript// ステップ5: パイプラインの実行
const apiPipeline = new Pipeline({
name: 'api-collection-pipeline',
})
.from(apiSource)
.transform(flattenTransform)
.transform(aggregateTransform)
.to(dynamoDestination);
export async function runApiPipeline() {
const result = await apiPipeline.run();
console.log(
`Processed ${result.recordsProcessed} records`
);
console.log(`Inserted ${result.recordsInserted} records`);
console.log(`Failed ${result.recordsFailed} records`);
return result;
}
以下の図は、API データ収集パイプラインの処理フローを示しています。
mermaidsequenceDiagram
participant API as External API
participant NB as Nano Banana
participant Transform as Transform Engine
participant DDB as DynamoDB
loop ページネーション
NB->>API: GET /users?offset=0&limit=100
API-->>NB: ユーザーデータ(100件)
Note over NB: レート制限に従って待機
end
NB->>Transform: 取得したデータを送信
Transform->>Transform: データ平坦化
Transform->>Transform: データ集約
Transform-->>NB: 変換済みデータ
loop バッチ書き込み
NB->>DDB: BatchWriteItem(25件ずつ)
DDB-->>NB: 書き込み成功
end
Note over NB,DDB: パイプライン完了
図で理解できる要点:
- API のページネーションに対応し、全データを自動的に取得
- レート制限を守りながら効率的にデータ収集
- データ変換とバッチ書き込みで DynamoDB への格納を最適化
具体例 3:リアルタイムストリーム処理パイプライン
Kafka からのストリームデータをリアルタイムで処理するパイプラインです。
typescript// リアルタイムストリームパイプライン(src/pipelines/stream-pipeline.ts)
import {
Pipeline,
Source,
Transform,
Destination,
} from 'nano-banana';
// ステップ1: Kafka ソースの定義
const kafkaSource = new Source.Kafka({
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
topic: 'user-events',
groupId: 'nano-banana-consumer',
// 開始位置
fromBeginning: false,
// デシリアライゼーション
deserializer: 'json', // json | avro | protobuf
});
typescript// ステップ2: ウィンドウ集計
const windowTransform = new Transform.Window({
// タイムウィンドウの設定
window: {
type: 'tumbling', // tumbling | sliding | session
size: '5m', // 5分間のウィンドウ
timeField: 'event_time',
},
// ウィンドウごとの集計
aggregations: {
event_count: { function: 'count' },
unique_users: {
field: 'user_id',
function: 'count_distinct',
},
total_value: { field: 'value', function: 'sum' },
},
});
typescript// ステップ3: アラート条件の判定
const alertTransform = new Transform.Filter({
condition: (record) => {
// 閾値を超えたらアラート
return (
record.event_count > 1000 ||
record.total_value > 100000
);
},
// アラート発火時のアクション
onMatch: async (record) => {
await sendAlert({
channel: '#real-time-alerts',
message: `異常な活動を検知: ${record.event_count} events`,
data: record,
});
},
});
typescript// ステップ4: 複数の格納先への並列書き込み
const redisDestination = new Destination.Redis({
host: 'redis.example.com',
port: 6379,
// キャッシュの有効期限
ttl: 3600, // 1時間
keyPattern: 'metrics:{{window_start}}',
});
const timeseriesDestination = new Destination.TimescaleDB({
connection: {
host: 'timescaledb.example.com',
database: 'metrics',
},
table: 'real_time_metrics',
// タイムシリーズの設定
hypertable: {
timeColumn: 'window_start',
chunkInterval: '1 day',
},
});
typescript// ステップ5: パイプラインの実行(並列処理)
const streamPipeline = new Pipeline({
name: 'real-time-stream-pipeline',
mode: 'streaming', // batch | streaming
})
.from(kafkaSource)
.transform(windowTransform)
.transform(alertTransform)
.to([redisDestination, timeseriesDestination]); // 並列書き込み
export async function startStreamPipeline() {
// ストリーミングモードで起動(継続実行)
await streamPipeline.start();
console.log('Stream pipeline started');
// Graceful shutdown の設定
process.on('SIGTERM', async () => {
console.log('Shutting down stream pipeline...');
await streamPipeline.stop();
process.exit(0);
});
}
具体例 4:エラーハンドリングとモニタリングの実装
本番環境で堅牢に動作するパイプラインには、適切なエラーハンドリングとモニタリングが不可欠です。
typescript// 本番環境向けパイプライン設定(src/pipelines/production-pipeline.ts)
import {
Pipeline,
Source,
Transform,
Destination,
} from 'nano-banana';
import { Datadog } from '@datadog/datadog-api-client';
// ステップ1: 詳細なエラーハンドリング設定
const pipeline = new Pipeline({
name: 'production-etl-pipeline',
errorHandling: {
// リトライ戦略
retry: {
maxAttempts: 3,
backoff: 'exponential',
initialDelay: 1000,
maxDelay: 60000,
// リトライ可能なエラーを定義
retryableErrors: [
'ECONNRESET',
'ETIMEDOUT',
'ENOTFOUND',
],
},
// エラー発生時のコールバック
onError: async (error, context) => {
// Slackに通知
await notifySlack({
channel: '#data-alerts',
level: 'error',
message: `Pipeline Error: ${error.message}`,
context: {
pipeline: context.pipelineName,
step: context.stepName,
timestamp: new Date().toISOString(),
},
});
// Datadogにイベント送信
await sendDatadogEvent({
title: 'Pipeline Error',
text: error.message,
tags: [
`pipeline:${context.pipelineName}`,
'severity:high',
],
});
},
// 部分的な失敗を許容
continueOnError: true,
// エラーレコードの保存
deadLetterQueue: {
destination: new Destination.S3({
bucket: 'etl-errors',
prefix: 'failed-records/',
}),
},
},
});
typescript// ステップ2: 詳細なモニタリング設定
pipeline.monitoring({
// メトリクスの送信
metrics: {
provider: 'datadog',
apiKey: process.env.DATADOG_API_KEY,
tags: [
'env:production',
'team:data-engineering',
'service:etl-pipeline',
],
},
// ログの設定
logging: {
level: 'info',
format: 'json',
destination: 'cloudwatch',
logGroup: '/aws/lambda/etl-pipelines',
},
// SLA 監視
sla: {
// 処理時間の閾値
maxDuration: 3600000, // 1時間
// 最小スループット
minRecordsPerSecond: 100,
// エラー率の閾値
maxErrorRate: 0.01, // 1%
// 閾値超過時のアラート
onSlaViolation: async (violation) => {
await sendPagerDutyAlert({
severity: 'high',
message: `SLA violation: ${violation.type}`,
details: violation,
});
},
},
});
typescript// ステップ3: カスタムメトリクスの記録
pipeline.on('pipeline.start', () => {
console.log('Pipeline started');
metrics.increment('pipeline.start', {
tags: ['pipeline:production'],
});
});
pipeline.on('pipeline.complete', (result) => {
console.log('Pipeline completed successfully');
// 処理時間を記録
metrics.histogram('pipeline.duration', result.duration, {
tags: ['pipeline:production'],
});
// 処理レコード数を記録
metrics.gauge(
'pipeline.records.processed',
result.recordsProcessed
);
metrics.gauge(
'pipeline.records.failed',
result.recordsFailed
);
// 成功率を計算
const successRate =
(result.recordsProcessed - result.recordsFailed) /
result.recordsProcessed;
metrics.gauge('pipeline.success_rate', successRate);
});
typescript// ステップ4: データ品質チェック
const qualityCheck = new Transform.Validate({
rules: [
{
name: 'email-format',
field: 'email',
check: (value) =>
/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value),
severity: 'error', // error | warning | info
},
{
name: 'age-range',
field: 'age',
check: (value) => value >= 0 && value <= 150,
severity: 'warning',
},
{
name: 'required-fields',
check: (record) => {
return ['id', 'name', 'email'].every(
(field) => record[field] != null
);
},
severity: 'error',
},
],
// バリデーション失敗時の処理
onValidationFail: (record, failedRules) => {
console.warn(
`Validation failed for record ${record.id}:`,
failedRules
);
// エラーの重要度に応じて処理を分岐
const hasError = failedRules.some(
(rule) => rule.severity === 'error'
);
if (hasError) {
// エラーの場合は処理をスキップ
return { action: 'skip', record };
} else {
// 警告の場合は処理を継続
return { action: 'continue', record };
}
},
});
typescript// ステップ5: 統合されたパイプライン
export async function runProductionPipeline() {
const startTime = Date.now();
try {
const result = await pipeline
.from(dataSource)
.transform(qualityCheck) // データ品質チェック
.transform(cleanTransform)
.transform(enrichTransform)
.to(destination)
.run();
const duration = Date.now() - startTime;
// 実行サマリーをログ出力
console.log('Pipeline execution summary:', {
duration: `${duration}ms`,
recordsProcessed: result.recordsProcessed,
recordsFailed: result.recordsFailed,
successRate: `${(result.successRate * 100).toFixed(
2
)}%`,
});
return result;
} catch (error) {
console.error('Pipeline execution failed:', error);
throw error;
}
}
以下の図は、本番環境でのパイプライン実行フローを示しています。
mermaidflowchart TB
start["パイプライン開始"] --> extract["データ抽出"]
extract --> validate["データ品質<br/>チェック"]
validate -->|OK| transform["データ変換"]
validate -->|NG| dlq["Dead Letter Queue<br/>S3に保存"]
transform --> load["データ格納"]
load -->|成功| metrics1["メトリクス記録<br/>Datadog"]
load -->|失敗| retry{"リトライ<br/>判定"}
retry -->|リトライ可能| backoff["待機<br/>Exponential Backoff"]
backoff --> load
retry -->|リトライ不可| dlq
metrics1 --> check{"SLA<br/>チェック"}
check -->|OK| success["正常終了"]
check -->|NG| alert["アラート送信<br/>Slack / PagerDuty"]
alert --> success
dlq -.->|通知| alert2["エラー通知<br/>Slack"]
style success fill:#ccffcc
style dlq fill:#ffcccc
style alert fill:#ffe4b5
style alert2 fill:#ffe4b5
図で理解できる要点:
- データ品質チェックで不正なデータを早期に検出
- エラー発生時はリトライ可能かを判定し、適切に対応
- リトライ不可能なエラーは Dead Letter Queue に保存して後で分析
- SLA 監視により、パフォーマンス低下を即座に検知
パフォーマンスの最適化
大量データを扱う場合、以下のテクニックでパフォーマンスを向上できます。
typescript// パフォーマンス最適化の設定
const optimizedPipeline = new Pipeline({
name: 'high-performance-pipeline',
performance: {
// 並列処理数
concurrency: 10, // 同時に10件処理
// バッチサイズ
batchSize: 1000, // 1000件ずつまとめて処理
// メモリ制限
maxMemory: '2GB',
// タイムアウト
timeout: 300000, // 5分
},
});
typescript// バルクインサートによる書き込み最適化
const bulkDestination = new Destination.PostgreSQL({
connection: dbConfig,
table: 'large_table',
// バルクインサートを使用
writeMode: 'bulk',
bulkSize: 10000, // 10,000件ずつまとめて挿入
// 一時テーブルを使った高速書き込み
useTempTable: true,
});
| # | 最適化手法 | 効果 | 適用シーン |
|---|---|---|---|
| 1 | 並列処理 | 処理時間を 1/N に短縮 | CPU バウンドな変換処理 |
| 2 | バッチ処理 | I/O 回数を削減、スループット向上 | データベース書き込み |
| 3 | ストリーム処理 | メモリ使用量を削減 | 大量データの処理 |
| 4 | インデックス活用 | クエリ速度を大幅向上 | データベースからの抽出 |
| 5 | 圧縮 | ネットワーク転送量を削減 | リモートストレージへの書き込み |
これらの最適化手法を組み合わせることで、大規模データでも効率的に処理できます。
まとめ
Nano Banana は、モダンなデータパイプラインツールとして、従来のツールが抱えていた課題を解決する革新的なフレームワークです。
Nano Banana の主要な特徴
本記事でご紹介した主要なポイントを振り返ります。
5 つの主要機能
- 直感的な宣言的 API - シンプルで読みやすいコード、少ない学習コスト
- ビルトインのエラーハンドリング - リトライ、Dead Letter Queue など堅牢な仕組み
- プラグインベースの拡張性 - ビジネス固有のロジックも柔軟に組み込める
- サーバーレス対応 - インフラ管理不要、コスト最適化
- リアルタイムモニタリング - Datadog、CloudWatch などと連携して可視化
ETL と ELT の両方に対応
- ETL パターン:変換処理を格納前に実行(従来型、計算リソースの分散)
- ELT パターン:生データを格納後に変換(モダン、DWH の計算能力を活用)
- プロジェクトの要件に応じて最適なアプローチを選択可能
実運用に必要な機能が揃っている
- データ品質チェック(バリデーション)
- エラーハンドリングとリトライ
- Dead Letter Queue(失敗レコードの保存)
- SLA 監視とアラート
- 詳細なメトリクスとログ出力
コスト削減と開発効率の向上
Nano Banana を使うことで、以下のようなメリットが得られます。
コスト面
- フルマネージドサービスと比較して、月額料金が大幅に削減(数十万円 → 数千円)
- サーバーレス実行により、使った分だけの課金で無駄がない
- 運用コストの削減(インフラ管理が不要)
開発効率
- 複雑な設定ファイルが不要で、TypeScript/JavaScript で直感的に記述
- プラグインの組み合わせで素早く実装
- エラーハンドリングやモニタリングが標準で組み込まれており、追加実装が少ない
保守性
- コードが読みやすく、チームメンバーが理解しやすい
- テストが書きやすい設計
- プラグインの入れ替えが容易で、要件変更に柔軟に対応
データパイプラインのベストプラクティス
Nano Banana を使ったパイプライン構築では、以下のベストプラクティスを意識しましょう。
設計原則
- 冪等性を保つ - 同じ処理を複数回実行しても結果が変わらないように設計
- 段階的な変換 - 複雑な変換を小さなステップに分割し、デバッグしやすくする
- データ品質チェック - 早い段階で不正なデータを検出し、エラーを防ぐ
- インクリメンタル処理 - 全データではなく、差分だけを処理してパフォーマンス向上
運用のポイント
- アラート設定を適切に行い、問題を早期に検知
- Dead Letter Queue を活用し、失敗したデータを後で分析・再処理
- メトリクスを継続的に監視し、パフォーマンス低下やエラー率の上昇に気づく
- ドキュメントを整備し、チームメンバーがパイプラインを理解できるようにする
今後の拡張性
Nano Banana は、以下のような拡張も可能です。
- 機械学習パイプライン - データ前処理、特徴量エンジニアリング、モデル推論
- CDC(Change Data Capture) - データベースの変更をリアルタイムで検知・同期
- データカタログ連携 - メタデータ管理ツールとの統合
- データリネージ - データの流れを自動的に追跡・可視化
はじめの一歩を踏み出しましょう
Nano Banana は、オープンソースプロジェクトとして公開されており、誰でもすぐに試すことができます。
bash# Yarn でインストール
yarn add nano-banana
# 基本的なプラグインも一緒にインストール
yarn add nano-banana-postgres nano-banana-bigquery
まずは小さなパイプラインから始めて、徐々に機能を拡張していくことをおすすめします。シンプルな API と充実したドキュメントにより、初心者の方でもスムーズに始められるでしょう。
データパイプラインは、データ駆動型ビジネスの基盤です。Nano Banana を活用して、効率的で堅牢なデータ基盤を構築し、ビジネスの成長を加速させましょう。
関連リンク
- Nano Banana 公式ドキュメント - インストール方法、API リファレンス、チュートリアル
- Nano Banana GitHub リポジトリ - ソースコード、Issue トラッカー
- Nano Banana プラグイン集 - 公式・コミュニティプラグインの一覧
- データパイプラインのベストプラクティス - 設計・実装・運用のガイドライン
- Google BigQuery 公式ドキュメント - ELT パターンの活用例
- AWS Lambda 開発者ガイド - サーバーレス実行環境
- Datadog インテグレーションガイド - モニタリング設定
articleNano Banana で ETL/ELT パイプライン:抽出・変換・ロードの実運用レシピ
article初めての Nano Banana:Hello World から実用サンプルまで 30 分チュートリアル
articleNano Banana で拡張可能なモジュール設計:プラグイン/アダプタ/ポート&ドライバ
articleNano Banana チートシート:よく使う CLI/API/設定の一枚まとめ
articleNano Banana のインストール完全ガイド:macOS/Windows/Linux 別の最短手順
articleNano Banana とは?ゼロからわかる特徴・できること・向いている用途【2025 年版】
articleObsidian タスク運用の最適解:Tasks + Periodic Notes で計画と実行を接続
articlePreact Signals チートシート:signal/computed/effect 実用スニペット 30
articleNuxt パフォーマンス運用:payload/コード分割/プリフェッチの継続的チューニング
articleNginx キャパシティプランニング:worker_processes/connections/reuseport の算定メソッド
articlePlaywright スクリーンショット/動画取得のベストプラクティス集【設定例付き】
articleNestJS デプロイ戦略:Blue-Green/Canary と DB マイグレーションの連携
blogiPhone 17シリーズの発表!全モデルiPhone 16から進化したポイントを見やすく整理
blogGoogleストアから訂正案内!Pixel 10ポイント有効期限「1年」表示は誤りだった
blog【2025年8月】Googleストア「ストアポイント」は1年表記はミス?2年ルールとの整合性を検証
blogGoogleストアの注文キャンセルはなぜ起きる?Pixel 10購入前に知るべき注意点
blogPixcel 10シリーズの発表!全モデル Pixcel 9 から進化したポイントを見やすく整理
blogフロントエンドエンジニアの成長戦略:コーチングで最速スキルアップする方法
review今の自分に満足していますか?『持たざる者の逆襲 まだ何者でもない君へ』溝口勇児
reviewついに語られた業界の裏側!『フジテレビの正体』堀江貴文が描くテレビ局の本当の姿
review愛する勇気を持てば人生が変わる!『幸せになる勇気』岸見一郎・古賀史健のアドラー実践編で真の幸福を手に入れる
review週末を変えれば年収も変わる!『世界の一流は「休日」に何をしているのか』越川慎司の一流週末メソッド
review新しい自分に会いに行こう!『自分の変え方』村岡大樹の認知科学コーチングで人生リセット
review科学革命から AI 時代へ!『サピエンス全史 下巻』ユヴァル・ノア・ハラリが予見する人類の未来