膨大なデータをストリーミング形式で送信する際、データの整合性を保つためにPydanticで検証を行うのが重要です。ここでは、StreamingResponse
とPydanticを組み合わせて、データ検証を取り入れる方法を解説します。
目次
1. Pydanticの役割
Pydanticは、データのスキーマ定義とバリデーションを行うライブラリです。
ストリーミングデータの各アイテムをPydanticモデルで検証することで、
不正なデータや型の不整合を防ぎ、堅牢なAPIを構築できます。
2. 実装例
データモデルの定義
from pydantic import BaseModel, Field
class QueueItem(BaseModel):
id: int
value: str
created_at: str = Field(regex=r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$") # ISO8601形式
class Config:
schema_extra = {
"example": {
"id": 1,
"value": "Sample Data",
"created_at": "2025-01-06T10:00:00",
}
}
ジェネレータ関数でデータを検証
ジェネレータ関数内で、各アイテムをPydanticモデルで検証します。
from typing import Iterator
from pydantic import ValidationError
def generate_data_with_validation() -> Iterator[str]:
"""
データをストリーム形式で生成し、Pydanticで検証する
"""
raw_data = [
{"id": 1, "value": "Data 1", "created_at": "2025-01-06T10:00:00"},
{"id": 2, "value": "Data 2", "created_at": "Invalid Date"}, # 不正なデータ
{"id": 3, "value": "Data 3", "created_at": "2025-01-06T11:00:00"},
]
for item in raw_data:
try:
# Pydanticモデルで検証
validated_item = QueueItem(**item)
yield validated_item.json() + "\n" # 検証済みデータをJSON形式で返す
except ValidationError as e:
# 検証エラー時のレスポンス
yield f'{{"error": "{str(e)}"}}\n'
- ポイント:
- 不正なデータは
ValidationError
としてキャッチ。 - クライアントにエラーメッセージを返す。
- 不正なデータは
エンドポイントの定義
ストリーミングレスポンスを返すエンドポイントを定義します。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/validated-stream", response_class=StreamingResponse)
def stream_validated_data():
"""
Pydanticで検証したデータをストリーミングで返す
"""
return StreamingResponse(
generate_data_with_validation(),
media_type="application/json",
)
3. クライアント側での受信例
受信側ではストリーミングデータを1行ずつ処理します。
Pythonでの受信例
import requests
import json
response = requests.get("http://localhost:8000/validated-stream", stream=True)
for line in response.iter_lines(decode_unicode=True):
if line:
data = json.loads(line)
if "error" in data:
print("Validation Error:", data["error"])
else:
print("Validated Data:", data)
- 出力例:
Validated Data: {'id': 1, 'value': 'Data 1', 'created_at': '2025-01-06T10:00:00'} Validation Error: 1 validation error for QueueItem created_at string does not match regex "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$" (type=value_error.str.regex; pattern=^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$) Validated Data: {'id': 3, 'value': 'Data 3', 'created_at': '2025-01-06T11:00:00'}
4. 注意点
- パフォーマンス:
Pydanticの検証は高速ですが、大量データをリアルタイムで処理する際には十分なリソースが必要です。 - エラーハンドリング:
クライアント側でエラーデータを処理するロジックを実装しておくと便利です。 - ログ記録:
バリデーションエラーやデータ生成の進行状況をログに記録すると、デバッグが容易になります。
5. メリットとユースケース
メリット
- データの整合性を保証。
- 不正なデータを個別に処理可能。
- ストリーミングで効率的にデータを送信。
ユースケース
- データベースクエリ結果の配信。
- APIゲートウェイからの大量データ処理。
- ログやセンサーデータのリアルタイム配信。
6. まとめ
StreamingResponse
とPydanticの組み合わせは、膨大なデータを安全かつ効率的に送信するための強力な手法です。
- ジェネレータでデータを生成し、逐次検証。
- クライアントにエラーと有効データを区別して送信。
- メモリ使用量を抑えつつ、リアルタイム処理を実現。
これを活用すれば、高パフォーマンスなAPIを構築できるでしょう! 🚀