Zustandでの非同期処理とfetch連携パターン(パターン 4: WebSocket とリアルタイム更新)

リアルタイムデータを扱う Web アプリケーションでは、WebSocket が重要な通信手段となります。この記事では、Zustand と WebSocket を組み合わせたリアルタイム更新の実装パターンについて詳しく解説します。
パターン 4: WebSocket とリアルタイム更新
リアルタイムデータを扱うアプリケーションでは、WebSocket が一般的に使用されます。Zustand との統合方法を見ていきましょう。
ユースケース: チャットアプリケーション
リアルタイムチャットアプリケーションでの WebSocket 統合の例:
typescript// src/stores/chatStore.ts
import { create } from 'zustand';
interface Message {
id: string;
senderId: string;
content: string;
timestamp: string;
}
interface ChatStore {
messages: Message[];
isConnected: boolean;
isLoading: boolean;
error: string | null;
// WebSocket関連
connect: (roomId: string) => void;
disconnect: () => void;
sendMessage: (content: string, senderId: string) => void;
}
export const useChatStore = create<ChatStore>(
(set, get) => {
// WebSocketインスタンスを保持する変数
let socket: WebSocket | null = null;
return {
messages: [],
isConnected: false,
isLoading: false,
error: null,
connect: (roomId) => {
// すでに接続している場合は一度切断
if (socket) {
socket.close();
}
set({ isLoading: true, error: null });
// WebSocket接続の確立
try {
socket = new WebSocket(
`wss://api.example.com/chat/${roomId}`
);
// 接続イベント
socket.addEventListener('open', () => {
set({ isConnected: true, isLoading: false });
console.log('WebSocket接続が確立されました');
});
// メッセージ受信イベント
socket.addEventListener('message', (event) => {
const message = JSON.parse(event.data);
// メッセージタイプに応じた処理
if (message.type === 'chat_message') {
set((state) => ({
messages: [...state.messages, message.data],
}));
}
// 他のメッセージタイプ(ユーザー参加、退出など)も必要に応じて処理
});
// エラーイベント
socket.addEventListener('error', (error) => {
set({
error: 'WebSocket接続エラー',
isLoading: false,
});
console.error('WebSocketエラー:', error);
});
// 切断イベント
socket.addEventListener('close', (event) => {
set({ isConnected: false });
if (!event.wasClean) {
set({
error: `接続が予期せず閉じられました (コード: ${event.code})`,
});
}
console.log(
'WebSocket接続が閉じられました',
event
);
});
} catch (error) {
set({
error:
error instanceof Error
? error.message
: '接続エラー',
isLoading: false,
});
}
},
disconnect: () => {
if (socket) {
socket.close(1000, '意図的な切断');
socket = null;
set({ isConnected: false });
}
},
sendMessage: (content, senderId) => {
if (
!socket ||
socket.readyState !== WebSocket.OPEN
) {
set({
error:
'接続されていません。再接続してください。',
});
return;
}
// 一時的なIDを生成(サーバーから確定IDが返ってくる前に表示用)
const tempId = `temp-${Date.now()}`;
// 楽観的に即時表示
const tempMessage: Message = {
id: tempId,
senderId,
content,
timestamp: new Date().toISOString(),
};
set((state) => ({
messages: [...state.messages, tempMessage],
}));
// メッセージ送信
try {
socket.send(
JSON.stringify({
type: 'send_message',
data: {
content,
senderId,
tempId, // サーバーに一時IDを送信して紐付け可能に
},
})
);
} catch (error) {
// 送信エラー時は楽観的に追加したメッセージを削除
set((state) => ({
messages: state.messages.filter(
(msg) => msg.id !== tempId
),
error:
'送信エラー:メッセージを送信できませんでした',
}));
}
},
};
}
);
コンポーネント実装
WebSocket を使用するチャットコンポーネントの例:
tsx// src/components/ChatRoom.tsx
import React, { useEffect, useState } from 'react';
import { useChatStore } from '../stores/chatStore';
interface ChatRoomProps {
roomId: string;
userId: string;
}
export const ChatRoom: React.FC<ChatRoomProps> = ({
roomId,
userId,
}) => {
const {
messages,
isConnected,
error,
connect,
disconnect,
sendMessage,
} = useChatStore();
const [newMessage, setNewMessage] = useState('');
// コンポーネントマウント時に接続、アンマウント時に切断
useEffect(() => {
connect(roomId);
return () => {
disconnect();
};
}, [roomId]);
const handleSendMessage = (e: React.FormEvent) => {
e.preventDefault();
if (newMessage.trim() && isConnected) {
sendMessage(newMessage, userId);
setNewMessage('');
}
};
return (
<div className='chat-room'>
<div className='connection-status'>
{isConnected ? (
<span className='connected'>接続中</span>
) : (
<span className='disconnected'>
切断されました
</span>
)}
</div>
{error && <div className='error-banner'>{error}</div>}
<div className='messages-container'>
{messages.map((message) => (
<div
key={message.id}
className={`message ${
message.senderId === userId
? 'sent'
: 'received'
}`}
>
<div className='message-content'>
{message.content}
</div>
<div className='message-time'>
{new Date(
message.timestamp
).toLocaleTimeString()}
</div>
</div>
))}
</div>
<form
onSubmit={handleSendMessage}
className='message-form'
>
<input
type='text'
value={newMessage}
onChange={(e) => setNewMessage(e.target.value)}
placeholder='メッセージを入力...'
disabled={!isConnected}
/>
<button
type='submit'
disabled={!isConnected || !newMessage.trim()}
>
送信
</button>
</form>
</div>
);
};
オフライン対応の強化
WebSocket アプリケーションでオフライン対応を強化するためのパターン:
typescript// WebSocketストアを拡張
const useChatStore = create<ChatStore>((set, get) => {
let socket: WebSocket | null = null;
let reconnectAttempts = 0;
let reconnectTimeout: NodeJS.Timeout | null = null;
// 再接続関数
const attemptReconnect = (roomId: string) => {
const maxReconnectAttempts = 5;
const reconnectDelay = Math.min(
1000 * Math.pow(2, reconnectAttempts),
30000
);
if (reconnectAttempts < maxReconnectAttempts) {
set({
error: `再接続中... (${
reconnectAttempts + 1
}/${maxReconnectAttempts})`,
});
reconnectTimeout = setTimeout(() => {
reconnectAttempts++;
get().connect(roomId);
}, reconnectDelay);
} else {
set({
error:
'接続を確立できませんでした。ネットワーク接続を確認してください。',
isConnected: false,
isLoading: false,
});
}
};
return {
// ... 基本的な状態
connect: (roomId) => {
// ... 基本的な接続処理
// 切断イベントでの再接続ロジック
socket.addEventListener('close', (event) => {
set({ isConnected: false });
if (!event.wasClean) {
// 予期せぬ切断の場合は再接続を試みる
attemptReconnect(roomId);
} else {
// 意図的な切断の場合
reconnectAttempts = 0;
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
reconnectTimeout = null;
}
}
});
},
// オフラインキューの追加
offlineQueue: [],
sendMessage: (content, senderId) => {
// オンライン/オフラインで処理を分岐
if (
!navigator.onLine ||
!socket ||
socket.readyState !== WebSocket.OPEN
) {
// オフライン時はキューに追加
const offlineMessage = {
content,
senderId,
timestamp: new Date().toISOString(),
pending: true,
};
set((state) => ({
offlineQueue: [
...state.offlineQueue,
offlineMessage,
],
messages: [
...state.messages,
{
...offlineMessage,
id: `offline-${Date.now()}`,
status: 'pending',
},
],
}));
return;
}
// オンライン時の通常処理...
},
// オフラインキューの処理
processPendingMessages: () => {
const { offlineQueue } = get();
if (
offlineQueue.length > 0 &&
socket?.readyState === WebSocket.OPEN
) {
// キューからメッセージを取り出して送信
[...offlineQueue].forEach((message) => {
// メッセージ送信処理
// 成功したらキューから削除
});
}
},
};
});
WebSocket と HTTP の併用パターン
多くのアプリケーションでは、初期データは HTTP 経由で取得し、リアルタイム更新は WebSocket で受け取るハイブリッドなアプローチが使われます:
typescript// ハイブリッドアプローチ
const useDataStore = create<DataStore>((set, get) => {
let socket: WebSocket | null = null;
return {
items: [],
isLoading: false,
error: null,
isSocketConnected: false,
// HTTP経由の初期データ取得
fetchInitialData: async () => {
set({ isLoading: true, error: null });
try {
const response = await fetch('/api/items');
const data = await response.json();
set({
items: data,
isLoading: false,
});
// 初期データ取得後にWebSocket接続を確立
get().connectToUpdates();
} catch (error) {
set({
error:
error instanceof Error
? error.message
: '取得エラー',
isLoading: false,
});
}
},
// WebSocket経由のリアルタイム更新
connectToUpdates: () => {
if (socket) socket.close();
socket = new WebSocket(
'wss://api.example.com/updates'
);
socket.addEventListener('open', () => {
set({ isSocketConnected: true });
});
socket.addEventListener('message', (event) => {
const update = JSON.parse(event.data);
// 更新タイプに応じた処理
switch (update.type) {
case 'ITEM_ADDED':
set((state) => ({
items: [...state.items, update.item],
}));
break;
case 'ITEM_UPDATED':
set((state) => ({
items: state.items.map((item) =>
item.id === update.item.id
? update.item
: item
),
}));
break;
case 'ITEM_DELETED':
set((state) => ({
items: state.items.filter(
(item) => item.id !== update.itemId
),
}));
break;
}
});
// エラーと切断の処理
socket.addEventListener('error', (error) => {
set({
error: 'WebSocket接続エラー',
isSocketConnected: false,
});
});
socket.addEventListener('close', () => {
set({ isSocketConnected: false });
});
},
// クリーンアップ
disconnect: () => {
if (socket) {
socket.close();
socket = null;
}
},
};
});
複数の WebSocket コネクションの管理
複数の WebSocket コネクションを管理するパターン:
typescript// 複数のWebSocketコネクションを管理するストア
const useWebSocketStore = create<WebSocketStore>(
(set, get) => {
// コネクションをIDで管理
const connections: Record<string, WebSocket> = {};
return {
connectionStatus: {},
// 新しいコネクションを確立
connect: (id: string, url: string) => {
// 既存のコネクションを閉じる
if (connections[id]) {
connections[id].close();
}
// 接続ステータスを更新
set((state) => ({
connectionStatus: {
...state.connectionStatus,
[id]: {
isConnected: false,
isConnecting: true,
error: null,
},
},
}));
// 新しいコネクションを作成
const socket = new WebSocket(url);
socket.addEventListener('open', () => {
set((state) => ({
connectionStatus: {
...state.connectionStatus,
[id]: {
isConnected: true,
isConnecting: false,
error: null,
},
},
}));
});
socket.addEventListener('error', (error) => {
set((state) => ({
connectionStatus: {
...state.connectionStatus,
[id]: {
isConnected: false,
isConnecting: false,
error: '接続エラーが発生しました',
},
},
}));
});
socket.addEventListener('close', () => {
set((state) => ({
connectionStatus: {
...state.connectionStatus,
[id]: {
isConnected: false,
isConnecting: false,
error: null,
},
},
}));
// コネクション一覧から削除
delete connections[id];
});
// コネクションを保存
connections[id] = socket;
// メッセージハンドラの設定(個別のストアや処理で実装)
return socket;
},
// 特定のコネクションにメッセージを送信
sendMessage: (connectionId: string, message: any) => {
const socket = connections[connectionId];
if (
!socket ||
socket.readyState !== WebSocket.OPEN
) {
return false;
}
try {
socket.send(JSON.stringify(message));
return true;
} catch (error) {
console.error('メッセージ送信エラー:', error);
return false;
}
},
// 特定のコネクションを切断
disconnect: (id: string) => {
if (connections[id]) {
connections[id].close();
delete connections[id];
}
},
// すべてのコネクションを切断
disconnectAll: () => {
Object.values(connections).forEach((socket) => {
socket.close();
});
// コネクション一覧をクリア
Object.keys(connections).forEach((id) => {
delete connections[id];
});
set({ connectionStatus: {} });
},
};
}
);
ポイント
WebSocket とリアルタイム更新の実装における重要なポイント:
-
接続管理: WebSocket 接続の確立、維持、再接続の適切な処理。
-
状態同期: サーバーとクライアント間の状態同期の確保。
-
オフライン対応: 接続が切れた場合のユーザー通知とデータの一時保存。
-
メモリリーク防止: コンポーネントのアンマウント時に WebSocket 接続を適切に閉じる。
-
エラー処理: さまざまな接続エラーシナリオに対する適切な対応。
WebSocket と Zustand の組み合わせのメリット
-
中央集権的な状態管理: WebSocket からのデータを Zustand ストアに集約することで、コンポーネント間で簡単に共有できます。
-
宣言的な UI 更新: WebSocket からデータが届くたびに、Zustand の状態が更新され、それに依存する UI が自動的に再レンダリングされます。
-
接続ライフサイクルの管理: コンポーネントのライフサイクルとは別に、WebSocket 接続のライフサイクルを管理できます。
-
テスト容易性: WebSocket ロジックを Zustand ストアに封じ込めることで、UI ロジックとは分離してテストできます。
まとめ
この記事では、Zustand と WebSocket を組み合わせたリアルタイムデータ更新の実装パターンについて解説しました。WebSocket を使用することで、サーバーからクライアントへのプッシュ通知やリアルタイムの双方向通信が可能になります。
Zustand の柔軟な状態管理モデルは、WebSocket からのリアルタイムデータの扱いに特に適しています。WebSocket のイベントを受け取り、それに応じて状態を更新することで、UI に即座に反映することができます。
リアルタイムアプリケーションを構築する際の重要なポイントとしては、接続管理、状態同期、オフライン対応、そしてエラー処理があります。特に再接続ロジックとオフラインキューの実装は、堅牢なユーザー体験を提供するために重要です。
複雑なアプリケーションでは、HTTP API と WebSocket を併用するハイブリッドなアプローチや、複数の WebSocket コネクションを管理するパターンも検討すると良いでしょう。
関連リンク
- Zustand 公式ドキュメント
- MDN - WebSocket API
- Socket.IO - WebSocket をベースにした双方向通信ライブラリ
- Firebase Realtime Database - リアルタイムデータベース
- Network Information API - ネットワーク状態の検出
- article
Zustandでの非同期処理とfetch連携パターン(パターン 5: リクエストの依存関係と連鎖)
- article
Zustandでの非同期処理とfetch連携パターン(パターン 3: 無限スクロールとページネーション)
- article
Zustandでの非同期処理とfetch連携パターン(パターン 2: 楽観的更新(Optimistic Updates))
- article
Zustandでの非同期処理とfetch連携パターン(パターン1: 基本的なデータフェッチング)
- article
Zustandでの非同期処理とfetch連携パターン(基本的な導入と概要)
- article
Zustand × TypeScript:型安全にストアを構築する設計
- article
Zustandでの非同期処理とfetch連携パターン(パターン 5: リクエストの依存関係と連鎖)
- article
Zustandでの非同期処理とfetch連携パターン(パターン 3: 無限スクロールとページネーション)
- article
Zustandでの非同期処理とfetch連携パターン(パターン 2: 楽観的更新(Optimistic Updates))
- article
Zustandでの非同期処理とfetch連携パターン(パターン1: 基本的なデータフェッチング)
- article
Zustandでの非同期処理とfetch連携パターン(基本的な導入と概要)
- article
React × Suspenseを組み合わせてスケーラブルな非同期UIを実現する方法