コンテンツへスキップ

バックグラウンドタスク

レスポンスを返したに実行されるバックグラウンドタスクを定義できます。

これは、リクエスト後に発生する必要があるが、クライアントがレスポンスを受け取る前に操作の完了を待つ必要がない操作に役立ちます。

これには、例えば以下が含まれます。

  • アクション実行後に送信されるメール通知
    • メールサーバーへの接続とメール送信は「遅い」(数秒かかる)傾向があるため、すぐにレスポンスを返し、メール通知をバックグラウンドで送信できます。
  • データの処理
    • たとえば、処理に時間がかかるファイルを受け取った場合、「Accepted」(HTTP 202)のレスポンスを返し、ファイルをバックグラウンドで処理できます。

BackgroundTasks の使用

まず、BackgroundTasks をインポートし、パス操作関数BackgroundTasks の型宣言を持つパラメータを定義します。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

FastAPI はあなたのために BackgroundTasks 型のオブジェクトを作成し、それをそのパラメータとして渡します。

タスク関数の作成

バックグラウンドタスクとして実行する関数を作成します。

これは、パラメータを受け取ることができる標準的な関数です。

async def または通常の def 関数のどちらでも構いません。FastAPI が正しく処理する方法を知っています。

この場合、タスク関数はファイルに書き込みます(メール送信をシミュレートしています)。

そして、書き込み操作は asyncawait を使用しないため、通常の def で関数を定義します。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

バックグラウンドタスクの追加

パス操作関数内で、タスク関数を .add_task() メソッドでバックグラウンドタスクオブジェクトに渡します。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

.add_task() は引数として以下を受け取ります。

  • バックグラウンドで実行されるタスク関数(write_notification)。
  • タスク関数に順番に渡されるべき引数のシーケンス(email)。
  • タスク関数に渡されるべきキーワード引数(message="some notification")。

依存性注入

BackgroundTasks の使用は依存性注入システムとも連携します。パス操作関数、依存関係 (dependable)、サブ依存関係など、複数のレベルで BackgroundTasks 型のパラメーターを宣言できます。

FastAPI は各ケースで何をすべきか、そして同じオブジェクトを再利用する方法を知っているので、すべてのバックグラウンドタスクは統合され、後でバックグラウンドで実行されます。

from typing import Annotated

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
🤓 その他のバージョンとバリアント
from typing import Annotated, Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI
from typing_extensions import Annotated

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

ヒント

可能であれば`Annotated`バージョンを使用することをお勧めします。

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

ヒント

可能であれば`Annotated`バージョンを使用することをお勧めします。

from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

この例では、メッセージはレスポンスが送信されたlog.txt ファイルに書き込まれます。

リクエストにクエリがあった場合、それはバックグラウンドタスクでログに書き込まれます。

そして、パス操作関数で生成された別のバックグラウンドタスクが、email パスパラメータを使用してメッセージを書き込みます。

技術的な詳細

BackgroundTasks クラスは starlette.background から直接来ています。

これは FastAPI に直接インポート/含まれているため、fastapi からインポートでき、誤って starlette.background から代替の BackgroundTask (末尾の s なし) をインポートするのを防ぐことができます。

BackgroundTasks のみを使用する (BackgroundTask は使用しない) ことで、パス操作関数のパラメーターとして使用し、Request オブジェクトを直接使用するのと同じように FastAPI が残りを処理できるようにすることができます。

FastAPI で BackgroundTask を単独で使用することも可能ですが、コードでオブジェクトを作成し、それを含む Starlette Response を返す必要があります。

詳細については、Starlette のバックグラウンドタスクに関する公式ドキュメントを参照してください。

注意点

重いバックグラウンド計算を実行する必要があり、必ずしも同じプロセスで実行する必要がない場合(例えば、メモリや変数を共有する必要がない場合)、Celery のようなより大きなツールを使用するとメリットがあるかもしれません。

これらは通常、より複雑な設定、RabbitMQやRedisのようなメッセージ/ジョブキューマネージャーを必要としますが、複数のプロセス、特に複数のサーバーでバックグラウンドタスクを実行できます。

しかし、同じ FastAPI アプリから変数やオブジェクトにアクセスする必要がある場合、または小さなバックグラウンドタスク(メール通知の送信など)を実行する必要がある場合は、BackgroundTasks を使用するだけで済みます。

まとめ

バックグラウンドタスクを追加するために、パス操作関数と依存関係で BackgroundTasks をインポートしてパラメータとともに使用します。