コンテンツへスキップ

バックグラウンドタスク

レスポンスが返されたに実行されるバックグラウンドタスクを定義できます。

これは、リクエスト後に発生する必要があるが、クライアントがレスポンスを受け取る前に操作が完了するのを待つ必要がない操作に役立ちます。

これには、例えば以下のようなものが含まれます。

  • アクション実行後に送信されるメール通知
    • メールサーバーへの接続やメール送信は「遅い」(数秒かかる)傾向があるため、すぐにレスポンスを返し、メール通知をバックグラウンドで送信することができます。
  • データの処理
    • たとえば、時間のかかる処理が必要なファイルを受け取った場合、「Accepted」(HTTP 202)のレスポンスを返し、ファイルをバックグラウンドで処理することができます。

BackgroundTasks の使用

まず、BackgroundTasks をインポートし、パス操作関数BackgroundTasksの型宣言を持つパラメータを定義します。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

FastAPIBackgroundTasks型のオブジェクトを自動的に作成し、そのパラメータとして渡します。

タスク関数を作成する

バックグラウンドタスクとして実行される関数を作成します。

これは、パラメータを受け取ることができる標準的な関数です。

async defまたは通常のdef関数でも構いません。FastAPIは適切に処理する方法を知っています。

この場合、タスク関数はファイルに書き込みます(メール送信をシミュレートしています)。

書き込み操作はasyncawaitを使用しないため、関数は通常のdefで定義します。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

バックグラウンドタスクを追加する

パス操作関数内で、.add_task()メソッドを使用して、タスク関数をバックグラウンドタスクオブジェクトに渡します。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

.add_task()は以下の引数を受け取ります。

  • バックグラウンドで実行されるタスク関数(write_notification)。
  • タスク関数に順番に渡される引数のシーケンス(email)。
  • タスク関数に渡されるキーワード引数(message="some notification")。

依存性注入

BackgroundTasksの使用は依存性注入システムとも連携し、パス操作関数、依存関係(依存可能なオブジェクト)、サブ依存関係など、複数のレベルでBackgroundTasks型のパラメータを宣言できます。

FastAPIは各ケースで何をすべきか、そして同じオブジェクトをどのように再利用すべきかを知っており、すべてのバックグラウンドタスクは結合され、後でバックグラウンドで実行されます。

from typing import Annotated

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
🤓 その他のバージョンとバリアント
from typing import Annotated, Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI
from typing_extensions import Annotated

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

ヒント

可能であれば`Annotated`バージョンを使用することをお勧めします。

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

ヒント

可能であれば`Annotated`バージョンを使用することをお勧めします。

from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

この例では、メッセージはレスポンスが送信されたlog.txtファイルに書き込まれます。

リクエストにクエリがある場合、それはバックグラウンドタスクでログに書き込まれます。

そして、パス操作関数で生成された別のバックグラウンドタスクが、emailパスパラメータを使用してメッセージを書き込みます。

技術的な詳細

BackgroundTasksクラスは、starlette.backgroundから直接来ています。

これはFastAPIに直接インポート/含まれているため、fastapiからインポートでき、誤ってstarlette.backgroundから代替のBackgroundTask(末尾にsがない)をインポートするのを避けることができます。

BackgroundTasks(およびBackgroundTaskではない)のみを使用することで、パス操作関数のパラメータとして使用でき、Requestオブジェクトを直接使用する場合と同じように、FastAPIが残りを処理してくれます。

FastAPIでBackgroundTaskを単独で使用することも可能ですが、コードでオブジェクトを作成し、それを含むStarlette Responseを返す必要があります。

詳細については、Starletteのバックグラウンドタスクに関する公式ドキュメントを参照してください。

注意点

重いバックグラウンド処理を実行する必要があり、必ずしも同じプロセスで実行する必要がない場合(例えば、メモリ、変数などを共有する必要がない場合)、Celeryのようなより大きなツールを使用すると良いでしょう。

これらはより複雑な設定、RabbitMQやRedisのようなメッセージ/ジョブキューマネージャーを必要とする傾向がありますが、複数のプロセス、特に複数のサーバーでバックグラウンドタスクを実行できます。

しかし、同じFastAPIアプリから変数やオブジェクトにアクセスする必要がある場合、または小さなバックグラウンドタスク(メール通知の送信など)を実行する必要がある場合は、単にBackgroundTasksを使用するだけで十分です。

まとめ

バックグラウンドタスクを追加するには、パス操作関数と依存関係でBackgroundTasksをパラメータとともにインポートして使用します。