SQL(リレーショナル)データベースと Peewee (非推奨)¶
"非推奨"
このチュートリアルは非推奨であり、将来のバージョンで削除されます。
警告
これから始める場合は、SQLAlchemyを使用するチュートリアルSQL (リレーショナル) データベースで十分です。
これをスキップしても構いません。
Peeweeは、FastAPIではasync Pythonとの相性が良くないため、推奨されません。いくつかのより良い代替手段があります。
情報
これらのドキュメントはPydantic v1を前提としています。
Peweeはasyncとの相性が良くなく、より良い代替手段があるため、Pydantic v2のドキュメントは更新しません。これらは、歴史的な目的のためにのみ現時点では保持されます。
ここにある例は、(以前のように) CIではテストされなくなりました。
プロジェクトをゼロから始める場合は、おそらくSQLAlchemy ORM ( SQL (リレーショナル) データベース) またはその他の非同期ORMを使用する方が良いでしょう。
すでにPeewee ORMを使用するコードベースをお持ちの場合は、FastAPIでそれを使用する方法をここで確認できます。
"Python 3.7+が必要"
FastAPIでPeeweeを安全に使用するには、Python 3.7以降が必要です。
asyncのためのPeewee¶
Peeweeは、非同期フレームワーク向け、またはそれらを念頭に置いて設計されたものではありません。
Peeweeは、そのデフォルトと使用方法について、いくつかの大きな前提を持っています。
古い非同期フレームワークを使用してアプリケーションを開発していて、そのすべてのデフォルトで作業できる場合、優れたツールとなる可能性があります。
ただし、デフォルトの一部を変更したり、事前に定義された複数のデータベースをサポートしたり、(FastAPIのような) 非同期フレームワークを使用したりする必要がある場合は、これらのデフォルトをオーバーライドするために、かなり複雑な追加コードを追加する必要があります。
それにもかかわらず、それを行うことは可能であり、ここではFastAPIでPeeweeを使用できるようにするために追加する必要があるコードを正確に確認できます。
同じアプリ¶
SQLAlchemyチュートリアル(SQL (リレーショナル) データベース)と同じアプリケーションを作成します。
ほとんどのコードは実際には同じです。
そのため、違いにのみ焦点を当てます。
ファイル構成¶
my_super_project
という名前のディレクトリがあり、その中にsql_app
というサブディレクトリがあり、次のような構造になっているとします。
.
└── sql_app
├── __init__.py
├── crud.py
├── database.py
├── main.py
└── schemas.py
これは、SQLAlchemyチュートリアルで見たものとほぼ同じ構造です。
それでは、各ファイル/モジュールが何をするかを見ていきましょう。
Peewee のパーツを作成する¶
ファイル sql_app/database.py
を参照しましょう。
標準的な Peewee コード¶
まず、通常の Peewee コードをすべて確認し、Peewee データベースを作成します。
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
ヒント
PostgreSQL のような別のデータベースを使用したい場合は、文字列を変更するだけではいけないことに注意してください。異なる Peewee データベースクラスを使用する必要があります。
注¶
引数
check_same_thread=False
は SQLAlchemy チュートリアルのものと同等です。
connect_args={"check_same_thread": False}
...これは SQLite
の場合にのみ必要です。
"技術的な詳細"
SQL(リレーショナル)データベースの技術的な詳細とまったく同じです。
Peewee を非同期互換にする PeeweeConnectionState
¶
Peewee と FastAPI の主な問題は、Peewee が Python の threading.local
に大きく依存しており、オーバーライドしたり、接続/セッションを直接処理させたりする直接的な方法がないことです(SQLAlchemy チュートリアルで行われているように)。
そして、threading.local
は、最近の Python の新しい非同期機能と互換性がありません。
"技術的な詳細"
threading.local
は、スレッドごとに異なる値を持つ「マジック」変数を保持するために使用されます。
これは、リクエストごとに 1 つのスレッドを持つように設計された古いフレームワークで役立ちました。
これを使用することで、各リクエストは独自のデータベース接続/セッションを持つことになり、これが実際の最終目標です。
しかし、FastAPI は、新しい非同期機能を使用しており、同じスレッドで複数のリクエストを処理することができます。また、単一のリクエストに対して、async def
を使用するか、通常の def
を使用するかによって、複数のことを異なるスレッド(スレッドプール内)で実行することができます。これが FastAPI のパフォーマンス向上につながっています。
しかし、Python 3.7 以降では、threading.local
の代わりに、threading.local
が使用される場所で使用でき、新しい非同期機能と互換性のある、より高度な代替手段が提供されています。
それを使用します。それは contextvars
と呼ばれます。
threading.local
を使用する Peewee の内部パーツをオーバーライドし、それらを contextvars
に置き換え、対応する更新を行います。
これは少し複雑に見えるかもしれませんが(実際そうです)、実際に使用するために、その仕組みを完全に理解する必要はありません。
PeeweeConnectionState
を作成します。
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
このクラスは、Peewee で使用される特別な内部クラスから継承します。
これには、Peewee が threading.local
の代わりに contextvars
を使用するようにするためのすべてのロジックが含まれています。
contextvars
は threading.local
とは少し異なる動作をします。しかし、Peewee の残りの内部コードは、このクラスが threading.local
で動作することを前提としています。
したがって、あたかも threading.local
を使用しているかのように動作させるために、いくつかの追加のトリックを実行する必要があります。__init__
、__setattr__
、および __getattr__
は、FastAPI と互換性があることを知らなくても、Peewee で使用できるようにするために必要なすべてのトリックを実装します。
ヒント
これにより、FastAPI で使用した場合に、Peewee が正しく動作するようになります。使用中の接続をランダムに開いたり閉じたり、エラーが発生したりすることはありません。
しかし、Peewee に非同期のスーパーパワーを与えるわけではありません。引き続き、async def
ではなく、通常の def
関数を使用する必要があります。
カスタム PeeweeConnectionState
クラスを使用する¶
次に、新しい PeeweeConnectionState
を使用して、Peewee データベースの db
オブジェクトの ._state
内部属性を上書きします。
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
ヒント
db
を作成した後に、db._state
を上書きしてください。
ヒント
PostgresqlDatabase
、MySQLDatabase
など、他のすべての Peewee データベースについても同様に行います。
データベースモデルを作成する¶
次に、ファイル sql_app/models.py
を見てみましょう。
データ用の Peewee モデルを作成する¶
次に、User
と Item
の Peewee モデル(クラス)を作成します。
これは、Peewee チュートリアルに従い、SQLAlchemy チュートリアルと同じデータを持つようにモデルを更新した場合と同じです。
ヒント
Peewee は、データベースと対話するこれらのクラスとインスタンスを指すために「モデル」という用語も使用します。
しかし、Pydantic も「モデル」という用語を、データの検証、変換、およびドキュメントのクラスとインスタンスという異なるものを指すために使用します。
database
(上記のファイル database.py
) から db
をインポートし、ここで使用します。
import peewee
from .database import db
class User(peewee.Model):
email = peewee.CharField(unique=True, index=True)
hashed_password = peewee.CharField()
is_active = peewee.BooleanField(default=True)
class Meta:
database = db
class Item(peewee.Model):
title = peewee.CharField(index=True)
description = peewee.CharField(index=True)
owner = peewee.ForeignKeyField(User, backref="items")
class Meta:
database = db
ヒント
Peewee はいくつかのマジック属性を作成します。
自動的に、主キーとなる整数として id
属性を追加します。
クラス名に基づいてテーブルの名前を選択します。
Item
の場合、User
の整数 ID を持つ属性 owner_id
を作成しますが、どこにも宣言しません。
Pydantic モデルを作成する¶
次に、ファイル sql_app/schemas.py
を確認しましょう。
ヒント
Peewee のモデルと Pydantic のモデルの間の混乱を避けるために、Peewee モデルを含むファイル models.py
と、Pydantic モデルを含むファイル schemas.py
を用意します。
これらの Pydantic モデルは、多かれ少なかれ「スキーマ」(有効なデータ形状)を定義します。
これにより、両方を使用する際の混乱を避けることができます。
Pydantic のモデル/スキーマを作成する¶
SQLAlchemy チュートリアルと同じすべての Pydantic モデルを作成します。
from typing import Any, List, Union
import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
ヒント
ここでは、id
を持つモデルを作成しています。
Peewee モデルでは id
属性を明示的に指定しませんでしたが、Peewee は自動的に 1 つ追加します。
また、マジック owner_id
属性を Item
に追加しています。
Pydantic のモデル/スキーマ用の PeeweeGetterDict
を作成する¶
some_user.items
のように、Peewee オブジェクトでリレーションシップにアクセスすると、Peewee は Item
の list
を提供しません。
クラス ModelSelect
の特別なカスタムオブジェクトを提供します。
list(some_user.items)
を使用して、その項目の list
を作成できます。
ただし、オブジェクト自体は list
ではありません。また、実際の Python ジェネレーターでもありません。このため、Pydantic はデフォルトでそれを Pydantic のモデル/スキーマの list
に変換する方法を知りません。
しかし、Pydantic の最近のバージョンでは、pydantic.utils.GetterDict
から継承するカスタムクラスを提供して、ORM モデル属性の値を取得するために orm_mode = True
を使用する際に使用される機能を提供できます。
カスタムの PeeweeGetterDict
クラスを作成し、orm_mode
を使用するすべての同じ Pydantic のモデル/スキーマで使用します。
from typing import Any, List, Union
import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
ここでは、アクセスされている属性(たとえば、some_user.items
の .items
)が peewee.ModelSelect
のインスタンスであるかどうかを確認します。
そして、その場合、それを含む list
を返すだけです。
次に、構成変数 getter_dict = PeeweeGetterDict
を使用して、orm_mode = True
を使用する Pydantic のモデル/スキーマで使用します。
ヒント
PeeweeGetterDict
クラスは 1 つだけ作成する必要があり、すべての Pydantic のモデル/スキーマで使用できます。
CRUD ユーティリティ¶
次に、ファイル sql_app/crud.py
を見てみましょう。
すべての CRUD ユーティリティを作成する¶
SQLAlchemy チュートリアルと同じすべての CRUD ユーティリティを作成します。コードは非常に似ています。
from . import models, schemas
def get_user(user_id: int):
return models.User.filter(models.User.id == user_id).first()
def get_user_by_email(email: str):
return models.User.filter(models.User.email == email).first()
def get_users(skip: int = 0, limit: int = 100):
return list(models.User.select().offset(skip).limit(limit))
def create_user(user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db_user.save()
return db_user
def get_items(skip: int = 0, limit: int = 100):
return list(models.Item.select().offset(skip).limit(limit))
def create_user_item(item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db_item.save()
return db_item
SQLAlchemy チュートリアルのコードとはいくつかの違いがあります。
db
属性を渡しません。代わりに、モデルを直接使用します。これは、db
オブジェクトが、すべての接続ロジックを含むグローバルオブジェクトであるためです。だからこそ、上記ですべての contextvars
の更新を行う必要がありました。
また、get_users
のように、いくつかのオブジェクトを返す場合は、次のように、直接 list
を呼び出します。
list(models.User.select())
これは、カスタムの PeeweeGetterDict
を作成する必要があったのと同じ理由です。しかし、peewee.ModelSelect
の代わりにすでに list
であるものを返すことで、List[models.User]
を使用したパス操作の response_model
(後で説明します) が正しく動作します。
メインの FastAPI アプリ¶
次に、ファイル sql_app/main.py
で、前に作成した他のすべてのパーツを統合して使用しましょう。
データベーステーブルを作成する¶
非常に単純な方法で、データベーステーブルを作成します。
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
依存関係を作成する¶
リクエストの開始時にデータベースに接続し、終了時に切断する依存関係を作成します。
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
ここでは、実際にはデータベースオブジェクトを直接使用していないため、空の yield
があります。
データベースに接続し、各リクエストに独立した内部変数(上記の contextvars
トリックを使用)に接続データを格納します。
データベース接続は I/O ブロックになる可能性があるため、この依存関係は通常の def
関数で作成されます。
次に、データベースにアクセスする必要がある各パス操作関数に、依存関係として追加します。
ただし、この依存関係によって与えられた値を使用しません(実際には、空の yield
があるため、値は与えられません)。したがって、パス操作関数には追加しませんが、dependencies
パラメーター内のパス操作デコレーターに追加します。
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
コンテキスト変数サブ依存関係¶
contextvars
のすべてのパーツが機能するためには、データベースを使用する各リクエストの ContextVar
に独立した値があることを確認する必要があります。また、その値はリクエスト全体のデータベース状態(接続、トランザクションなど)として使用されます。
そのため、get_db()
でサブ依存関係として使用される別の async
依存関係 reset_db_state()
を作成する必要があります。これにより、リクエスト全体のデータベース状態として使用されるコンテキスト変数(デフォルトの dict
を使用)の値が設定されます。次に、依存関係 get_db()
が、データベース状態(接続、トランザクションなど)をそこに格納します。
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
次のリクエストでは、async
依存関係 reset_db_state()
でそのコンテキスト変数を再度リセットし、次に get_db()
依存関係で新しい接続を作成するため、その新しいリクエストは独自のデータベース状態(接続、トランザクションなど)を持つことになります。
ヒント
FastAPI は非同期フレームワークであるため、1 つのリクエストの処理が開始され、完了する前に、別のリクエストが受信されて処理も開始され、すべてが同じスレッドで処理される可能性があります。
しかし、コンテキスト変数はこれらの非同期機能を認識しているため、async
依存関係 reset_db_state()
で設定された Peewee データベース状態は、リクエスト全体で独自のデータを保持します。
同時に、別の同時リクエストは、リクエスト全体に対して独立した独自のデータベース状態を持ちます。
Peeweeプロキシ¶
Peeweeプロキシを使用している場合、実際のデータベースはdb.obj
にあります。
そのため、以下のようにリセットします。
async def reset_db_state():
database.db.obj._state._state.set(db_state_default.copy())
database.db.obj._state.reset()
FastAPIのパス操作を作成する¶
さて、最後に、標準的なFastAPIのパス操作のコードです。
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
def
とasync def
について¶
SQLAlchemyと同様に、次のようなことはしません。
user = await models.User.select().first()
...代わりに、次のように使用します。
user = models.User.select().first()
したがって、ここでも、パス操作関数と依存関係は、async def
を使用せず、通常のdef
を使用して宣言する必要があります。
# Something goes here
def read_users(skip: int = 0, limit: int = 100):
# Something goes here
Peeweeを非同期でテストする¶
この例には、time.sleep(sleep_time)
を使用して長い処理リクエストをシミュレートする追加のパス操作が含まれています。
これは、最初にデータベース接続を開き、応答するまでに数秒間待機します。そして、新しいリクエストごとに1秒ずつ待機時間が短くなります。
これにより、PeeweeとFastAPIを使用したアプリが、スレッドに関するすべての事柄で正しく動作していることを簡単にテストできます。
Peeweeを修正せずに使用した場合にアプリがどのように壊れるかを確認したい場合は、sql_app/database.py
ファイルに移動して、次の行をコメントアウトしてください。
# db._state = PeeweeConnectionState()
そして、sql_app/main.py
ファイルで、async
依存関係reset_db_state()
の本体をコメントアウトし、pass
に置き換えます。
async def reset_db_state():
# database.db._state._state.set(db_state_default.copy())
# database.db._state.reset()
pass
次に、Uvicornでアプリを実行します。
$ uvicorn sql_app.main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
http://127.0.0.1:8000/docsでブラウザを開き、ユーザーを数人作成します。
次に、http://127.0.0.1:8000/docs#/default/read_slow_users_slowusers__getで10個のタブを同時に開きます。
すべてのタブでパス操作 "Get /slowusers/
"に移動します。「試してみる」ボタンを使用して、各タブで順番にリクエストを実行します。
タブはしばらく待機し、一部のタブにはInternal Server Error
が表示されます。
何が起こるか¶
最初のタブでは、アプリがデータベースへの接続を作成し、応答してデータベース接続を閉じるまで数秒間待機します。
次に、次のタブのリクエストでは、アプリは1秒短い時間待機し、以下同様です。
つまり、最後のタブのリクエストの一部が、以前のタブのリクエストよりも早く完了することになります。
次に、待機時間が短い最後の要求の1つがデータベース接続を開こうとしますが、他のタブに対する以前の要求の1つが最初の要求と同じスレッドで処理される可能性があるため、既に開いている同じデータベース接続を持ち、Peeweeはエラーをスローし、ターミナルに表示され、レスポンスにはInternal Server Error
が表示されます。
これは、複数のタブで発生する可能性があります。
複数のクライアントがアプリに同時に接続している場合、これが起こる可能性があります。
また、アプリが同時に処理するクライアントが増えるにつれて、エラーをトリガーするために、単一のリクエストでの待ち時間は短くする必要があります。
FastAPIでPeeweeを修正する¶
次に、sql_app/database.py
ファイルに戻り、次の行のコメントを解除します。
db._state = PeeweeConnectionState()
そして、sql_app/main.py
ファイルで、async
依存関係reset_db_state()
の本体のコメントを解除します。
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
実行中のアプリを終了し、再度起動します。
10個のタブで同じプロセスを繰り返します。今回はすべてが待機し、エラーなしですべての結果が得られます。
...修正されました!
すべてのファイルを確認する¶
my_super_project
(または任意)という名前のディレクトリがあり、その中にsql_app
というサブディレクトリが含まれている必要があります。
sql_app
には、次のファイルが必要です。
-
sql_app/__init__.py
:空のファイルです。 -
sql_app/database.py
:
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
sql_app/models.py
:
import peewee
from .database import db
class User(peewee.Model):
email = peewee.CharField(unique=True, index=True)
hashed_password = peewee.CharField()
is_active = peewee.BooleanField(default=True)
class Meta:
database = db
class Item(peewee.Model):
title = peewee.CharField(index=True)
description = peewee.CharField(index=True)
owner = peewee.ForeignKeyField(User, backref="items")
class Meta:
database = db
sql_app/schemas.py
:
from typing import Any, List, Union
import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
sql_app/crud.py
:
from . import models, schemas
def get_user(user_id: int):
return models.User.filter(models.User.id == user_id).first()
def get_user_by_email(email: str):
return models.User.filter(models.User.email == email).first()
def get_users(skip: int = 0, limit: int = 100):
return list(models.User.select().offset(skip).limit(limit))
def create_user(user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db_user.save()
return db_user
def get_items(skip: int = 0, limit: int = 100):
return list(models.Item.select().offset(skip).limit(limit))
def create_user_item(item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db_item.save()
return db_item
sql_app/main.py
:
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
技術的な詳細¶
警告
これらは、おそらく必要のない非常に技術的な詳細です。
問題点¶
Peeweeは、データベースの「状態」データ(接続、トランザクションなど)を格納するために、デフォルトでthreading.local
を使用します。
threading.local
は現在のスレッドに排他的な値を作成しますが、非同期フレームワークはすべてのコード(たとえば、リクエストごとに)を同じスレッドで、おそらく順番に実行しません。
さらに、非同期フレームワークは、同じリクエストに属するものであっても、スレッドプール(asyncio.run_in_executor
を使用)で同期コードを実行する可能性があります。
これは、Peeweeの現在の実装では、複数のタスクが同じthreading.local
変数を使用して、同じ接続とデータ(共有すべきではない)を共有する可能性があることを意味します。同時に、スレッドプールで同期I/Oブロッキングコードを実行する場合(FastAPIの通常のdef
関数の場合のように、パス操作と依存関係)、そのコードはデータベース状態変数にアクセスできません。たとえそれが同じリクエストの一部であり、同じデータベース状態にアクセスできるべきであってもです。
コンテキスト変数¶
Python 3.7には、contextvars
があり、threading.local
と非常によく似たローカル変数を作成できますが、これらの非同期機能もサポートしています。
留意すべき点がいくつかあります。
ContextVar
は、次のように、モジュールの先頭で作成する必要があります。
some_var = ContextVar("some_var", default="default value")
現在の「コンテキスト」(たとえば、現在のリクエスト)で使用される値を設定するには、次のように使用します。
some_var.set("new value")
コンテキスト内の任意の場所で(たとえば、現在のリクエストを処理する任意の部分で)値を取得するには、次のように使用します。
some_var.get()
async
依存関係reset_db_state()
でコンテキスト変数を設定する¶
非同期コードの一部がsome_var.set("updated in function")
を使用して値を設定した場合(たとえば、async
依存関係のように)、その後のコード(await
で呼び出されたasync
関数内のコードを含む)は、その新しい値を参照します。
そのため、この場合、async
依存関係で(デフォルトのdict
を使用して)Peewee状態変数を設定すると、アプリの内部コードの残りの部分はすべてこの値を参照し、リクエスト全体で再利用できるようになります。
また、コンテキスト変数は、同時実行であっても、次のリクエストのために再度設定されます。
依存関係get_db()
でデータベース状態を設定する¶
get_db()
は通常のdef
関数であるため、FastAPIはそれをスレッドプールで実行し、コンテキスト変数の同じ値(リセットされたデータベース状態を持つdict
)を保持する「コンテキスト」のコピーを使用します。次に、接続などのデータベース状態をそのdict
に追加できます。
ただし、コンテキスト変数(デフォルトのdict
)の値がその通常のdef
関数で設定されている場合、スレッドプールのそのスレッドのみにとどまる新しい値が作成され、残りのコード(パス操作関数など)はそれにアクセスできません。get_db()
では、dict
自体ではなく、dict
に値のみを設定できます。
したがって、async
依存関係reset_db_state()
を使用して、コンテキスト変数にdict
を設定する必要があります。これにより、すべてのコードが単一のリクエストのデータベース状態に対して同じdict
にアクセスできるようになります。
依存関係get_db()
で接続と切断を行う¶
次に、get_db()
ではなく、async
依存関係自体でデータベースを接続および切断しないのはなぜかという疑問が生じます。
async
依存関係は、リクエストの残りの部分でコンテキスト変数が保持されるようにasync
である必要がありますが、データベース接続の作成と終了は潜在的にブロッキングであるため、そこにあるとパフォーマンスが低下する可能性があります。
そのため、通常のdef
依存関係get_db()
も必要です。