MENU

Python + FastAPI: StreamingResponseを活用した効率的なレスポンス処理

大量のデータを扱うAPIを構築する際、クエリ結果をすべて一度のレスポンスに詰め込むとメモリ不足やタイムアウトなどのエラーが発生する場合があります。
これを解決するのが、FastAPIのStreamingResponseです。

以下では、StreamingResponseを利用して効率的にデータをクライアントに送信する方法を汎用的な形で説明します。


目次

1. StreamingResponseとは?

StreamingResponseは、データをチャンク(小さな部分)単位でクライアントに逐次送信するための仕組みです。
これにより、メモリ消費を抑えながら、大量のデータを効率的に処理できます。


2. 典型的なユースケース

  • データベースから大量のクエリ結果を取得する場合
    DynamoDBやPostgreSQLのクエリ結果が膨大な場合に利用できます。
  • ログやリアルタイムデータのストリーミング
    リアルタイムのログやセンサー情報などをクライアントに継続的に配信する用途に適しています。

3. 実装手順

ステップ1: データを生成するジェネレータ関数の作成

ジェネレータ関数を作成し、データを1件ずつ生成します。

from typing import Iterator
import json

def generate_large_dataset() -> Iterator[str]:
    """
    大量のデータを生成するジェネレータ関数
    """
    for i in range(1, 10001):  # 仮に1万件のデータを生成
        yield json.dumps({"id": i, "value": f"Data {i}"}) + "\n"
  • yieldで1件ずつデータを返します。
  • JSONデータに改行コード(\n)を付与することで、ストリームを区切ります。

ステップ2: StreamingResponseでデータを返すエンドポイントを定義

FastAPIのエンドポイントでStreamingResponseを利用します。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/stream-data")
def stream_data():
    """
    ストリーミングレスポンスでデータを返す
    """
    return StreamingResponse(
        generate_large_dataset(),  # ジェネレータ関数を指定
        media_type="application/json"
    )

4. クライアント側でのデータ受信

クライアント側では、改行区切りのJSONデータを解析する必要があります。

Pythonの例

import requests

response = requests.get("http://localhost:8000/stream-data", stream=True)

for line in response.iter_lines(decode_unicode=True):
    if line:  # 空行をスキップ
        data = json.loads(line)
        print(data)

JavaScriptの例

fetch("http://localhost:8000/stream-data")
  .then(response => {
    const reader = response.body.getReader();
    const decoder = new TextDecoder("utf-8");

    return reader.read().then(function process({ done, value }) {
      if (done) return;
      console.log(JSON.parse(decoder.decode(value))); // デコードして表示
      return reader.read().then(process);
    });
  })
  .catch(console.error);

5. 注意点

  1. データ形式:
    ストリーミングでは改行コード(\n)でデータを区切ることが一般的です。
    • JSON Lines(NDJSON)形式が適している場合が多いです。
  2. タイムアウト:
    クライアントが一定時間データを受信しない場合、タイムアウトする可能性があります。
    サーバー側では、クライアントの切断を検知する必要があります。
  3. エラーハンドリング:
    ジェネレータ関数内で例外が発生した場合、適切なエラーメッセージを送信する必要があります。
def generate_data_with_error_handling():
    for i in range(10):
        try:
            if i == 5: # テスト用のエラー発生
                raise ValueError("Test error")
            yield json.dumps({"id": i, "value": f"Data {i}"}) + "\n"
        except Exception as e:
            yield json.dumps({"error": str(e)}) + "\n"</code>

6. メリットとデメリット

メリット

  • メモリ使用量を大幅に削減。
  • 膨大なデータを効率的に送信可能。
  • リアルタイムデータ配信に対応。

デメリット

  • クライアント側でのデータ処理がやや複雑。
  • ストリーミング中にエラーが発生した場合のハンドリングが難しい。

7. まとめ

StreamingResponseは、FastAPIで膨大なデータを扱う際の強力なツールです。
ジェネレータ関数でデータを逐次生成し、クライアントにストリーミングすることで、効率的でスケーラブルなAPIを構築できます。

改行区切りのデータ形式を採用し、クライアント側で適切にデータを処理すれば、さまざまなユースケースに対応可能です。

備忘録として、今後のプロジェクトでぜひ活用してください! 🚀

目次