バックグラウンドタスク¶
レスポンスを返す後に実行されるバックグラウンドタスクを定義できます。
これは、リクエスト後に発生する必要があるが、クライアントがレスポンスを受け取る前に操作の完了を待つ必要がない操作に役立ちます。
これには、例えば以下が含まれます。
- アクションの実行後に送信されるメール通知
- メールサーバーへの接続とメールの送信は「遅い」(数秒かかる)傾向があるため、すぐにレスポンスを返し、バックグラウンドでメール通知を送信できます。
- データの処理
- たとえば、遅いプロセスを経る必要があるファイルを受け取ったとします。その場合、「承認済み」(HTTP 202)のレスポンスを返し、バックグラウンドでファイルを処理できます。
BackgroundTasks
の使用¶
まず、BackgroundTasks
をインポートし、パスオペレーション関数でBackgroundTasks
の型宣言を持つパラメータを定義します。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
FastAPIは、BackgroundTasks
型のオブジェクトを作成し、そのパラメータとして渡します。
タスク関数の作成¶
バックグラウンドタスクとして実行される関数を作成します。
これは、パラメータを受け取ることができる標準的な関数です。
async def
関数でも通常のdef
関数でもかまいません。FastAPIは、それを正しく処理する方法を認識します。
この場合、タスク関数はファイルに書き込みます(メールの送信をシミュレートします)。
そして、書き込み操作はasync
とawait
を使用しないため、通常のdef
で関数を定義します。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
バックグラウンドタスクの追加¶
パスオペレーション関数内で、.add_task()
メソッドを使用して、タスク関数をバックグラウンドタスクオブジェクトに渡します。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
.add_task()
は、引数として以下を受け取ります。
- バックグラウンドで実行されるタスク関数(
write_notification
)。 - タスク関数に順番に渡される引数のシーケンス(
email
)。 - タスク関数に渡されるキーワード引数(
message="some notification"
)。
依存関係の注入¶
BackgroundTasks
は、依存関係注入システムでも機能します。BackgroundTasks
型のパラメータを複数レベルで宣言できます。パスオペレーション関数内、依存関係(依存可能)内、サブ依存関係内などです。
FastAPIは、それぞれのケースで何をするべきか、そして同じオブジェクトをどのように再利用するかを認識するため、すべてのバックグラウンドタスクがマージされ、後でバックグラウンドで実行されます。
from typing import Annotated
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
from typing import Annotated, Union
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
from typing import Union
from fastapi import BackgroundTasks, Depends, FastAPI
from typing_extensions import Annotated
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
ヒント
可能であれば、Annotated
バージョンを使用することをお勧めします。
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
ヒント
可能であれば、Annotated
バージョンを使用することをお勧めします。
from typing import Union
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
この例では、メッセージはレスポンスが送信された後にlog.txt
ファイルに書き込まれます。
リクエストにクエリがあった場合、バックグラウンドタスクでログに書き込まれます。
そして、パスオペレーション関数で生成された別のバックグラウンドタスクが、email
パスパラメータを使用してメッセージを書き込みます。
技術的な詳細¶
BackgroundTasks
クラスは、starlette.background
から直接取得されます。
FastAPIに直接インポート/インクルードされているため、fastapi
からインポートでき、starlette.background
から別のBackgroundTask
(末尾の`s`がないもの)を誤ってインポートすることを防ぎます。
BackgroundTasks
のみを使用することで(BackgroundTask
は使用せず)、パスオペレーション関数のパラメータとして使用し、Request
オブジェクトを直接使用する場合と同様に、残りの処理を**FastAPI**に任せることができます。
FastAPIではBackgroundTask
を単独で使用することもできますが、コード内でオブジェクトを作成し、それを含むStarletteのResponse
を返す必要があります。
Starletteのバックグラウンドタスクに関する公式ドキュメントで詳細を確認できます。
注意点¶
負荷の高いバックグラウンド計算を実行する必要があり、必ずしも同じプロセスで実行する必要がない場合(たとえば、メモリ、変数などを共有する必要がない場合)、Celeryなどのより高度なツールを使用すると効果的です。
これらはより複雑な設定、RabbitMQやRedisなどのメッセージ/ジョブキューマネージャーを必要とする傾向がありますが、複数のプロセスで、特に複数のサーバーでバックグラウンドタスクを実行できます。
例については、プロジェクトジェネレーターを参照してください。これらにはすべて、Celeryが既に設定されています。
しかし、同じ**FastAPI**アプリの変数やオブジェクトにアクセスする必要がある場合、または小さなバックグラウンドタスク(メール通知の送信など)を実行する必要がある場合は、BackgroundTasks
を使用するだけで済みます。
要約¶
パスオペレーション関数と依存関係のパラメータを使用してBackgroundTasks
をインポートおよび使用し、バックグラウンドタスクを追加します。