コンテンツへスキップ

バックグラウンドタスク

レスポンスを返すに実行されるバックグラウンドタスクを定義できます。

これは、リクエスト後に発生する必要があるが、クライアントがレスポンスを受け取る前に操作の完了を待つ必要がない操作に役立ちます。

これには、例えば以下が含まれます。

  • アクションの実行後に送信されるメール通知
    • メールサーバーへの接続とメールの送信は「遅い」(数秒かかる)傾向があるため、すぐにレスポンスを返し、バックグラウンドでメール通知を送信できます。
  • データの処理
    • たとえば、遅いプロセスを経る必要があるファイルを受け取ったとします。その場合、「承認済み」(HTTP 202)のレスポンスを返し、バックグラウンドでファイルを処理できます。

BackgroundTasksの使用

まず、BackgroundTasksをインポートし、パスオペレーション関数BackgroundTasksの型宣言を持つパラメータを定義します。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

FastAPIは、BackgroundTasks型のオブジェクトを作成し、そのパラメータとして渡します。

タスク関数の作成

バックグラウンドタスクとして実行される関数を作成します。

これは、パラメータを受け取ることができる標準的な関数です。

async def関数でも通常のdef関数でもかまいません。FastAPIは、それを正しく処理する方法を認識します。

この場合、タスク関数はファイルに書き込みます(メールの送信をシミュレートします)。

そして、書き込み操作はasyncawaitを使用しないため、通常のdefで関数を定義します。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

バックグラウンドタスクの追加

パスオペレーション関数内で、.add_task()メソッドを使用して、タスク関数をバックグラウンドタスクオブジェクトに渡します。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

.add_task()は、引数として以下を受け取ります。

  • バックグラウンドで実行されるタスク関数(write_notification)。
  • タスク関数に順番に渡される引数のシーケンス(email)。
  • タスク関数に渡されるキーワード引数(message="some notification")。

依存関係の注入

BackgroundTasksは、依存関係注入システムでも機能します。BackgroundTasks型のパラメータを複数レベルで宣言できます。パスオペレーション関数内、依存関係(依存可能)内、サブ依存関係内などです。

FastAPIは、それぞれのケースで何をするべきか、そして同じオブジェクトをどのように再利用するかを認識するため、すべてのバックグラウンドタスクがマージされ、後でバックグラウンドで実行されます。

from typing import Annotated

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
from typing import Annotated, Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI
from typing_extensions import Annotated

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

ヒント

可能であれば、Annotatedバージョンを使用することをお勧めします。

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

ヒント

可能であれば、Annotatedバージョンを使用することをお勧めします。

from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

この例では、メッセージはレスポンスが送信されたlog.txtファイルに書き込まれます。

リクエストにクエリがあった場合、バックグラウンドタスクでログに書き込まれます。

そして、パスオペレーション関数で生成された別のバックグラウンドタスクが、emailパスパラメータを使用してメッセージを書き込みます。

技術的な詳細

BackgroundTasksクラスは、starlette.backgroundから直接取得されます。

FastAPIに直接インポート/インクルードされているため、fastapiからインポートでき、starlette.backgroundから別のBackgroundTask(末尾の`s`がないもの)を誤ってインポートすることを防ぎます。

BackgroundTasksのみを使用することで(BackgroundTaskは使用せず)、パスオペレーション関数のパラメータとして使用し、Requestオブジェクトを直接使用する場合と同様に、残りの処理を**FastAPI**に任せることができます。

FastAPIではBackgroundTaskを単独で使用することもできますが、コード内でオブジェクトを作成し、それを含むStarletteのResponseを返す必要があります。

Starletteのバックグラウンドタスクに関する公式ドキュメントで詳細を確認できます。

注意点

負荷の高いバックグラウンド計算を実行する必要があり、必ずしも同じプロセスで実行する必要がない場合(たとえば、メモリ、変数などを共有する必要がない場合)、Celeryなどのより高度なツールを使用すると効果的です。

これらはより複雑な設定、RabbitMQやRedisなどのメッセージ/ジョブキューマネージャーを必要とする傾向がありますが、複数のプロセスで、特に複数のサーバーでバックグラウンドタスクを実行できます。

例については、プロジェクトジェネレーターを参照してください。これらにはすべて、Celeryが既に設定されています。

しかし、同じ**FastAPI**アプリの変数やオブジェクトにアクセスする必要がある場合、または小さなバックグラウンドタスク(メール通知の送信など)を実行する必要がある場合は、BackgroundTasksを使用するだけで済みます。

要約

パスオペレーション関数と依存関係のパラメータを使用してBackgroundTasksをインポートおよび使用し、バックグラウンドタスクを追加します。