MENU

Python + FastAPI: StreamingResponseとPydanticによるデータ検証

膨大なデータをストリーミング形式で送信する際、データの整合性を保つためにPydanticで検証を行うのが重要です。ここでは、StreamingResponseとPydanticを組み合わせて、データ検証を取り入れる方法を解説します。


目次

1. Pydanticの役割

Pydanticは、データのスキーマ定義とバリデーションを行うライブラリです。
ストリーミングデータの各アイテムをPydanticモデルで検証することで、
不正なデータや型の不整合を防ぎ、堅牢なAPIを構築できます。


2. 実装例

データモデルの定義

from pydantic import BaseModel, Field

class QueueItem(BaseModel):
    id: int
    value: str
    created_at: str = Field(regex=r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$")  # ISO8601形式

    class Config:
        schema_extra = {
            "example": {
                "id": 1,
                "value": "Sample Data",
                "created_at": "2025-01-06T10:00:00",
            }
        }

ジェネレータ関数でデータを検証

ジェネレータ関数内で、各アイテムをPydanticモデルで検証します。

from typing import Iterator
from pydantic import ValidationError

def generate_data_with_validation() -> Iterator[str]:
    """
    データをストリーム形式で生成し、Pydanticで検証する
    """
    raw_data = [
        {"id": 1, "value": "Data 1", "created_at": "2025-01-06T10:00:00"},
        {"id": 2, "value": "Data 2", "created_at": "Invalid Date"},  # 不正なデータ
        {"id": 3, "value": "Data 3", "created_at": "2025-01-06T11:00:00"},
    ]

    for item in raw_data:
        try:
            # Pydanticモデルで検証
            validated_item = QueueItem(**item)
            yield validated_item.json() + "\n"  # 検証済みデータをJSON形式で返す
        except ValidationError as e:
            # 検証エラー時のレスポンス
            yield f'{{"error": "{str(e)}"}}\n'
  • ポイント:
    • 不正なデータはValidationErrorとしてキャッチ。
    • クライアントにエラーメッセージを返す。

エンドポイントの定義

ストリーミングレスポンスを返すエンドポイントを定義します。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/validated-stream", response_class=StreamingResponse)
def stream_validated_data():
    """
    Pydanticで検証したデータをストリーミングで返す
    """
    return StreamingResponse(
        generate_data_with_validation(),
        media_type="application/json",
    )

3. クライアント側での受信例

受信側ではストリーミングデータを1行ずつ処理します。

Pythonでの受信例

import requests
import json

response = requests.get("http://localhost:8000/validated-stream", stream=True)

for line in response.iter_lines(decode_unicode=True):
    if line:
        data = json.loads(line)
        if "error" in data:
            print("Validation Error:", data["error"])
        else:
            print("Validated Data:", data)
  • 出力例: Validated Data: {'id': 1, 'value': 'Data 1', 'created_at': '2025-01-06T10:00:00'} Validation Error: 1 validation error for QueueItem created_at string does not match regex "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$" (type=value_error.str.regex; pattern=^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$) Validated Data: {'id': 3, 'value': 'Data 3', 'created_at': '2025-01-06T11:00:00'}

4. 注意点

  1. パフォーマンス:
    Pydanticの検証は高速ですが、大量データをリアルタイムで処理する際には十分なリソースが必要です。
  2. エラーハンドリング:
    クライアント側でエラーデータを処理するロジックを実装しておくと便利です。
  3. ログ記録:
    バリデーションエラーやデータ生成の進行状況をログに記録すると、デバッグが容易になります。

5. メリットとユースケース

メリット

  • データの整合性を保証。
  • 不正なデータを個別に処理可能。
  • ストリーミングで効率的にデータを送信。

ユースケース

  • データベースクエリ結果の配信。
  • APIゲートウェイからの大量データ処理。
  • ログやセンサーデータのリアルタイム配信。

6. まとめ

StreamingResponseとPydanticの組み合わせは、膨大なデータを安全かつ効率的に送信するための強力な手法です。

  1. ジェネレータでデータを生成し、逐次検証。
  2. クライアントにエラーと有効データを区別して送信。
  3. メモリ使用量を抑えつつ、リアルタイム処理を実現。

これを活用すれば、高パフォーマンスなAPIを構築できるでしょう! 🚀

目次