コンテンツへスキップ

SQL(リレーショナル)データベースと Peewee (非推奨)

"非推奨"

このチュートリアルは非推奨であり、将来のバージョンで削除されます。

警告

これから始める場合は、SQLAlchemyを使用するチュートリアルSQL (リレーショナル) データベースで十分です。

これをスキップしても構いません。

Peeweeは、FastAPIではasync Pythonとの相性が良くないため、推奨されません。いくつかのより良い代替手段があります。

情報

これらのドキュメントはPydantic v1を前提としています。

Peweeはasyncとの相性が良くなく、より良い代替手段があるため、Pydantic v2のドキュメントは更新しません。これらは、歴史的な目的のためにのみ現時点では保持されます。

ここにある例は、(以前のように) CIではテストされなくなりました。

プロジェクトをゼロから始める場合は、おそらくSQLAlchemy ORM ( SQL (リレーショナル) データベース) またはその他の非同期ORMを使用する方が良いでしょう。

すでにPeewee ORMを使用するコードベースをお持ちの場合は、FastAPIでそれを使用する方法をここで確認できます。

"Python 3.7+が必要"

FastAPIでPeeweeを安全に使用するには、Python 3.7以降が必要です。

asyncのためのPeewee

Peeweeは、非同期フレームワーク向け、またはそれらを念頭に置いて設計されたものではありません。

Peeweeは、そのデフォルトと使用方法について、いくつかの大きな前提を持っています。

古い非同期フレームワークを使用してアプリケーションを開発していて、そのすべてのデフォルトで作業できる場合、優れたツールとなる可能性があります

ただし、デフォルトの一部を変更したり、事前に定義された複数のデータベースをサポートしたり、(FastAPIのような) 非同期フレームワークを使用したりする必要がある場合は、これらのデフォルトをオーバーライドするために、かなり複雑な追加コードを追加する必要があります。

それにもかかわらず、それを行うことは可能であり、ここではFastAPIでPeeweeを使用できるようにするために追加する必要があるコードを正確に確認できます。

"技術的な詳細"

PeeweeのPythonにおけるasyncに関する立場について詳しくは、ドキュメントイシューPRを参照してください。

同じアプリ

SQLAlchemyチュートリアル(SQL (リレーショナル) データベース)と同じアプリケーションを作成します。

ほとんどのコードは実際には同じです。

そのため、違いにのみ焦点を当てます。

ファイル構成

my_super_projectという名前のディレクトリがあり、その中にsql_appというサブディレクトリがあり、次のような構造になっているとします。

.
└── sql_app
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    └── schemas.py

これは、SQLAlchemyチュートリアルで見たものとほぼ同じ構造です。

それでは、各ファイル/モジュールが何をするかを見ていきましょう。

Peewee のパーツを作成する

ファイル sql_app/database.py を参照しましょう。

標準的な Peewee コード

まず、通常の Peewee コードをすべて確認し、Peewee データベースを作成します。

from contextvars import ContextVar

import peewee

DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())


class PeeweeConnectionState(peewee._ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)

db._state = PeeweeConnectionState()

ヒント

PostgreSQL のような別のデータベースを使用したい場合は、文字列を変更するだけではいけないことに注意してください。異なる Peewee データベースクラスを使用する必要があります。

引数

check_same_thread=False

は SQLAlchemy チュートリアルのものと同等です。

connect_args={"check_same_thread": False}

...これは SQLite の場合にのみ必要です。

"技術的な詳細"

SQL(リレーショナル)データベースの技術的な詳細とまったく同じです。

Peewee を非同期互換にする PeeweeConnectionState

Peewee と FastAPI の主な問題は、Peewee が Python の threading.local に大きく依存しており、オーバーライドしたり、接続/セッションを直接処理させたりする直接的な方法がないことです(SQLAlchemy チュートリアルで行われているように)。

そして、threading.local は、最近の Python の新しい非同期機能と互換性がありません。

"技術的な詳細"

threading.local は、スレッドごとに異なる値を持つ「マジック」変数を保持するために使用されます。

これは、リクエストごとに 1 つのスレッドを持つように設計された古いフレームワークで役立ちました。

これを使用することで、各リクエストは独自のデータベース接続/セッションを持つことになり、これが実際の最終目標です。

しかし、FastAPI は、新しい非同期機能を使用しており、同じスレッドで複数のリクエストを処理することができます。また、単一のリクエストに対して、async def を使用するか、通常の def を使用するかによって、複数のことを異なるスレッド(スレッドプール内)で実行することができます。これが FastAPI のパフォーマンス向上につながっています。

しかし、Python 3.7 以降では、threading.local の代わりに、threading.local が使用される場所で使用でき、新しい非同期機能と互換性のある、より高度な代替手段が提供されています。

それを使用します。それは contextvars と呼ばれます。

threading.local を使用する Peewee の内部パーツをオーバーライドし、それらを contextvars に置き換え、対応する更新を行います。

これは少し複雑に見えるかもしれませんが(実際そうです)、実際に使用するために、その仕組みを完全に理解する必要はありません。

PeeweeConnectionState を作成します。

from contextvars import ContextVar

import peewee

DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())


class PeeweeConnectionState(peewee._ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)

db._state = PeeweeConnectionState()

このクラスは、Peewee で使用される特別な内部クラスから継承します。

これには、Peewee が threading.local の代わりに contextvars を使用するようにするためのすべてのロジックが含まれています。

contextvarsthreading.local とは少し異なる動作をします。しかし、Peewee の残りの内部コードは、このクラスが threading.local で動作することを前提としています。

したがって、あたかも threading.local を使用しているかのように動作させるために、いくつかの追加のトリックを実行する必要があります。__init____setattr__、および __getattr__ は、FastAPI と互換性があることを知らなくても、Peewee で使用できるようにするために必要なすべてのトリックを実装します。

ヒント

これにより、FastAPI で使用した場合に、Peewee が正しく動作するようになります。使用中の接続をランダムに開いたり閉じたり、エラーが発生したりすることはありません。

しかし、Peewee に非同期のスーパーパワーを与えるわけではありません。引き続き、async def ではなく、通常の def 関数を使用する必要があります。

カスタム PeeweeConnectionState クラスを使用する

次に、新しい PeeweeConnectionState を使用して、Peewee データベースの db オブジェクトの ._state 内部属性を上書きします。

from contextvars import ContextVar

import peewee

DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())


class PeeweeConnectionState(peewee._ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)

db._state = PeeweeConnectionState()

ヒント

db を作成したに、db._state を上書きしてください。

ヒント

PostgresqlDatabaseMySQLDatabase など、他のすべての Peewee データベースについても同様に行います。

データベースモデルを作成する

次に、ファイル sql_app/models.py を見てみましょう。

データ用の Peewee モデルを作成する

次に、UserItem の Peewee モデル(クラス)を作成します。

これは、Peewee チュートリアルに従い、SQLAlchemy チュートリアルと同じデータを持つようにモデルを更新した場合と同じです。

ヒント

Peewee は、データベースと対話するこれらのクラスとインスタンスを指すために「モデル」という用語も使用します。

しかし、Pydantic も「モデル」という用語を、データの検証、変換、およびドキュメントのクラスとインスタンスという異なるものを指すために使用します。

database (上記のファイル database.py) から db をインポートし、ここで使用します。

import peewee

from .database import db


class User(peewee.Model):
    email = peewee.CharField(unique=True, index=True)
    hashed_password = peewee.CharField()
    is_active = peewee.BooleanField(default=True)

    class Meta:
        database = db


class Item(peewee.Model):
    title = peewee.CharField(index=True)
    description = peewee.CharField(index=True)
    owner = peewee.ForeignKeyField(User, backref="items")

    class Meta:
        database = db

ヒント

Peewee はいくつかのマジック属性を作成します。

自動的に、主キーとなる整数として id 属性を追加します。

クラス名に基づいてテーブルの名前を選択します。

Item の場合、User の整数 ID を持つ属性 owner_id を作成しますが、どこにも宣言しません。

Pydantic モデルを作成する

次に、ファイル sql_app/schemas.py を確認しましょう。

ヒント

Peewee のモデルと Pydantic のモデルの間の混乱を避けるために、Peewee モデルを含むファイル models.py と、Pydantic モデルを含むファイル schemas.py を用意します。

これらの Pydantic モデルは、多かれ少なかれ「スキーマ」(有効なデータ形状)を定義します。

これにより、両方を使用する際の混乱を避けることができます。

Pydantic のモデル/スキーマを作成する

SQLAlchemy チュートリアルと同じすべての Pydantic モデルを作成します。

from typing import Any, List, Union

import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict


class PeeweeGetterDict(GetterDict):
    def get(self, key: Any, default: Any = None):
        res = getattr(self._obj, key, default)
        if isinstance(res, peewee.ModelSelect):
            return list(res)
        return res


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict

ヒント

ここでは、id を持つモデルを作成しています。

Peewee モデルでは id 属性を明示的に指定しませんでしたが、Peewee は自動的に 1 つ追加します。

また、マジック owner_id 属性を Item に追加しています。

Pydantic のモデル/スキーマ用の PeeweeGetterDict を作成する

some_user.items のように、Peewee オブジェクトでリレーションシップにアクセスすると、Peewee は Itemlist を提供しません。

クラス ModelSelect の特別なカスタムオブジェクトを提供します。

list(some_user.items) を使用して、その項目の list を作成できます。

ただし、オブジェクト自体は list ではありません。また、実際の Python ジェネレーターでもありません。このため、Pydantic はデフォルトでそれを Pydantic のモデル/スキーマの list に変換する方法を知りません。

しかし、Pydantic の最近のバージョンでは、pydantic.utils.GetterDict から継承するカスタムクラスを提供して、ORM モデル属性の値を取得するために orm_mode = True を使用する際に使用される機能を提供できます。

カスタムの PeeweeGetterDict クラスを作成し、orm_mode を使用するすべての同じ Pydantic のモデル/スキーマで使用します。

from typing import Any, List, Union

import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict


class PeeweeGetterDict(GetterDict):
    def get(self, key: Any, default: Any = None):
        res = getattr(self._obj, key, default)
        if isinstance(res, peewee.ModelSelect):
            return list(res)
        return res


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict

ここでは、アクセスされている属性(たとえば、some_user.items.items)が peewee.ModelSelect のインスタンスであるかどうかを確認します。

そして、その場合、それを含む list を返すだけです。

次に、構成変数 getter_dict = PeeweeGetterDict を使用して、orm_mode = True を使用する Pydantic のモデル/スキーマで使用します。

ヒント

PeeweeGetterDict クラスは 1 つだけ作成する必要があり、すべての Pydantic のモデル/スキーマで使用できます。

CRUD ユーティリティ

次に、ファイル sql_app/crud.py を見てみましょう。

すべての CRUD ユーティリティを作成する

SQLAlchemy チュートリアルと同じすべての CRUD ユーティリティを作成します。コードは非常に似ています。

from . import models, schemas


def get_user(user_id: int):
    return models.User.filter(models.User.id == user_id).first()


def get_user_by_email(email: str):
    return models.User.filter(models.User.email == email).first()


def get_users(skip: int = 0, limit: int = 100):
    return list(models.User.select().offset(skip).limit(limit))


def create_user(user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db_user.save()
    return db_user


def get_items(skip: int = 0, limit: int = 100):
    return list(models.Item.select().offset(skip).limit(limit))


def create_user_item(item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db_item.save()
    return db_item

SQLAlchemy チュートリアルのコードとはいくつかの違いがあります。

db 属性を渡しません。代わりに、モデルを直接使用します。これは、db オブジェクトが、すべての接続ロジックを含むグローバルオブジェクトであるためです。だからこそ、上記ですべての contextvars の更新を行う必要がありました。

また、get_users のように、いくつかのオブジェクトを返す場合は、次のように、直接 list を呼び出します。

list(models.User.select())

これは、カスタムの PeeweeGetterDict を作成する必要があったのと同じ理由です。しかし、peewee.ModelSelect の代わりにすでに list であるものを返すことで、List[models.User] を使用したパス操作response_model (後で説明します) が正しく動作します。

メインの FastAPI アプリ

次に、ファイル sql_app/main.py で、前に作成した他のすべてのパーツを統合して使用しましょう。

データベーステーブルを作成する

非常に単純な方法で、データベーステーブルを作成します。

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

依存関係を作成する

リクエストの開始時にデータベースに接続し、終了時に切断する依存関係を作成します。

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

ここでは、実際にはデータベースオブジェクトを直接使用していないため、空の yield があります。

データベースに接続し、各リクエストに独立した内部変数(上記の contextvars トリックを使用)に接続データを格納します。

データベース接続は I/O ブロックになる可能性があるため、この依存関係は通常の def 関数で作成されます。

次に、データベースにアクセスする必要がある各パス操作関数に、依存関係として追加します。

ただし、この依存関係によって与えられた値を使用しません(実際には、空の yield があるため、値は与えられません)。したがって、パス操作関数には追加しませんが、dependencies パラメーター内のパス操作デコレーターに追加します。

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

コンテキスト変数サブ依存関係

contextvars のすべてのパーツが機能するためには、データベースを使用する各リクエストの ContextVar に独立した値があることを確認する必要があります。また、その値はリクエスト全体のデータベース状態(接続、トランザクションなど)として使用されます。

そのため、get_db() でサブ依存関係として使用される別の async 依存関係 reset_db_state() を作成する必要があります。これにより、リクエスト全体のデータベース状態として使用されるコンテキスト変数(デフォルトの dict を使用)の値が設定されます。次に、依存関係 get_db() が、データベース状態(接続、トランザクションなど)をそこに格納します。

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

次のリクエストでは、async 依存関係 reset_db_state() でそのコンテキスト変数を再度リセットし、次に get_db() 依存関係で新しい接続を作成するため、その新しいリクエストは独自のデータベース状態(接続、トランザクションなど)を持つことになります。

ヒント

FastAPI は非同期フレームワークであるため、1 つのリクエストの処理が開始され、完了する前に、別のリクエストが受信されて処理も開始され、すべてが同じスレッドで処理される可能性があります。

しかし、コンテキスト変数はこれらの非同期機能を認識しているため、async 依存関係 reset_db_state() で設定された Peewee データベース状態は、リクエスト全体で独自のデータを保持します。

同時に、別の同時リクエストは、リクエスト全体に対して独立した独自のデータベース状態を持ちます。

Peeweeプロキシ

Peeweeプロキシを使用している場合、実際のデータベースはdb.objにあります。

そのため、以下のようにリセットします。

async def reset_db_state():
    database.db.obj._state._state.set(db_state_default.copy())
    database.db.obj._state.reset()

FastAPIパス操作を作成する

さて、最後に、標準的なFastAPIパス操作のコードです。

import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

defasync defについて

SQLAlchemyと同様に、次のようなことはしません。

user = await models.User.select().first()

...代わりに、次のように使用します。

user = models.User.select().first()

したがって、ここでも、パス操作関数と依存関係は、async defを使用せず、通常のdefを使用して宣言する必要があります。

# Something goes here
def read_users(skip: int = 0, limit: int = 100):
    # Something goes here

Peeweeを非同期でテストする

この例には、time.sleep(sleep_time)を使用して長い処理リクエストをシミュレートする追加のパス操作が含まれています。

これは、最初にデータベース接続を開き、応答するまでに数秒間待機します。そして、新しいリクエストごとに1秒ずつ待機時間が短くなります。

これにより、PeeweeとFastAPIを使用したアプリが、スレッドに関するすべての事柄で正しく動作していることを簡単にテストできます。

Peeweeを修正せずに使用した場合にアプリがどのように壊れるかを確認したい場合は、sql_app/database.pyファイルに移動して、次の行をコメントアウトしてください。

# db._state = PeeweeConnectionState()

そして、sql_app/main.pyファイルで、async依存関係reset_db_state()の本体をコメントアウトし、passに置き換えます。

async def reset_db_state():
#     database.db._state._state.set(db_state_default.copy())
#     database.db._state.reset()
    pass

次に、Uvicornでアプリを実行します。

$ uvicorn sql_app.main:app --reload

<span style="color: green;">INFO</span>:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

http://127.0.0.1:8000/docsでブラウザを開き、ユーザーを数人作成します。

次に、http://127.0.0.1:8000/docs#/default/read_slow_users_slowusers__getで10個のタブを同時に開きます。

すべてのタブでパス操作 "Get /slowusers/"に移動します。「試してみる」ボタンを使用して、各タブで順番にリクエストを実行します。

タブはしばらく待機し、一部のタブにはInternal Server Errorが表示されます。

何が起こるか

最初のタブでは、アプリがデータベースへの接続を作成し、応答してデータベース接続を閉じるまで数秒間待機します。

次に、次のタブのリクエストでは、アプリは1秒短い時間待機し、以下同様です。

つまり、最後のタブのリクエストの一部が、以前のタブのリクエストよりも早く完了することになります。

次に、待機時間が短い最後の要求の1つがデータベース接続を開こうとしますが、他のタブに対する以前の要求の1つが最初の要求と同じスレッドで処理される可能性があるため、既に開いている同じデータベース接続を持ち、Peeweeはエラーをスローし、ターミナルに表示され、レスポンスにはInternal Server Errorが表示されます。

これは、複数のタブで発生する可能性があります。

複数のクライアントがアプリに同時に接続している場合、これが起こる可能性があります。

また、アプリが同時に処理するクライアントが増えるにつれて、エラーをトリガーするために、単一のリクエストでの待ち時間は短くする必要があります。

FastAPIでPeeweeを修正する

次に、sql_app/database.pyファイルに戻り、次の行のコメントを解除します。

db._state = PeeweeConnectionState()

そして、sql_app/main.pyファイルで、async依存関係reset_db_state()の本体のコメントを解除します。

async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()

実行中のアプリを終了し、再度起動します。

10個のタブで同じプロセスを繰り返します。今回はすべてが待機し、エラーなしですべての結果が得られます。

...修正されました!

すべてのファイルを確認する

my_super_project(または任意)という名前のディレクトリがあり、その中にsql_appというサブディレクトリが含まれている必要があります。

sql_appには、次のファイルが必要です。

  • sql_app/__init__.py:空のファイルです。

  • sql_app/database.py:

from contextvars import ContextVar

import peewee

DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())


class PeeweeConnectionState(peewee._ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)

db._state = PeeweeConnectionState()
  • sql_app/models.py:
import peewee

from .database import db


class User(peewee.Model):
    email = peewee.CharField(unique=True, index=True)
    hashed_password = peewee.CharField()
    is_active = peewee.BooleanField(default=True)

    class Meta:
        database = db


class Item(peewee.Model):
    title = peewee.CharField(index=True)
    description = peewee.CharField(index=True)
    owner = peewee.ForeignKeyField(User, backref="items")

    class Meta:
        database = db
  • sql_app/schemas.py:
from typing import Any, List, Union

import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict


class PeeweeGetterDict(GetterDict):
    def get(self, key: Any, default: Any = None):
        res = getattr(self._obj, key, default)
        if isinstance(res, peewee.ModelSelect):
            return list(res)
        return res


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
        getter_dict = PeeweeGetterDict
  • sql_app/crud.py:
from . import models, schemas


def get_user(user_id: int):
    return models.User.filter(models.User.id == user_id).first()


def get_user_by_email(email: str):
    return models.User.filter(models.User.email == email).first()


def get_users(skip: int = 0, limit: int = 100):
    return list(models.User.select().offset(skip).limit(limit))


def create_user(user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db_user.save()
    return db_user


def get_items(skip: int = 0, limit: int = 100):
    return list(models.Item.select().offset(skip).limit(limit))


def create_user_item(item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db_item.save()
    return db_item
  • sql_app/main.py:
import time
from typing import List

from fastapi import Depends, FastAPI, HTTPException

from . import crud, database, models, schemas
from .database import db_state_default

database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()

app = FastAPI()

sleep_time = 10


async def reset_db_state():
    database.db._state._state.set(db_state_default.copy())
    database.db._state.reset()


def get_db(db_state=Depends(reset_db_state)):
    try:
        database.db.connect()
        yield
    finally:
        if not database.db.is_closed():
            database.db.close()


@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
    db_user = crud.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(user=user)


@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
    users = crud.get_users(skip=skip, limit=limit)
    return users


@app.get(
    "/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
    db_user = crud.get_user(user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post(
    "/users/{user_id}/items/",
    response_model=schemas.Item,
    dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
    return crud.create_user_item(item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
    items = crud.get_items(skip=skip, limit=limit)
    return items


@app.get(
    "/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
    global sleep_time
    sleep_time = max(0, sleep_time - 1)
    time.sleep(sleep_time)  # Fake long processing request
    users = crud.get_users(skip=skip, limit=limit)
    return users

技術的な詳細

警告

これらは、おそらく必要のない非常に技術的な詳細です。

問題点

Peeweeは、データベースの「状態」データ(接続、トランザクションなど)を格納するために、デフォルトでthreading.localを使用します。

threading.localは現在のスレッドに排他的な値を作成しますが、非同期フレームワークはすべてのコード(たとえば、リクエストごとに)を同じスレッドで、おそらく順番に実行しません。

さらに、非同期フレームワークは、同じリクエストに属するものであっても、スレッドプール(asyncio.run_in_executorを使用)で同期コードを実行する可能性があります。

これは、Peeweeの現在の実装では、複数のタスクが同じthreading.local変数を使用して、同じ接続とデータ(共有すべきではない)を共有する可能性があることを意味します。同時に、スレッドプールで同期I/Oブロッキングコードを実行する場合(FastAPIの通常のdef関数の場合のように、パス操作と依存関係)、そのコードはデータベース状態変数にアクセスできません。たとえそれが同じリクエストの一部であり、同じデータベース状態にアクセスできるべきであってもです。

コンテキスト変数

Python 3.7には、contextvarsがあり、threading.localと非常によく似たローカル変数を作成できますが、これらの非同期機能もサポートしています。

留意すべき点がいくつかあります。

ContextVarは、次のように、モジュールの先頭で作成する必要があります。

some_var = ContextVar("some_var", default="default value")

現在の「コンテキスト」(たとえば、現在のリクエスト)で使用される値を設定するには、次のように使用します。

some_var.set("new value")

コンテキスト内の任意の場所で(たとえば、現在のリクエストを処理する任意の部分で)値を取得するには、次のように使用します。

some_var.get()

async依存関係reset_db_state()でコンテキスト変数を設定する

非同期コードの一部がsome_var.set("updated in function")を使用して値を設定した場合(たとえば、async依存関係のように)、その後のコード(awaitで呼び出されたasync関数内のコードを含む)は、その新しい値を参照します。

そのため、この場合、async依存関係で(デフォルトのdictを使用して)Peewee状態変数を設定すると、アプリの内部コードの残りの部分はすべてこの値を参照し、リクエスト全体で再利用できるようになります。

また、コンテキスト変数は、同時実行であっても、次のリクエストのために再度設定されます。

依存関係get_db()でデータベース状態を設定する

get_db()は通常のdef関数であるため、FastAPIはそれをスレッドプールで実行し、コンテキスト変数の同じ値(リセットされたデータベース状態を持つdict)を保持する「コンテキスト」のコピーを使用します。次に、接続などのデータベース状態をそのdictに追加できます。

ただし、コンテキスト変数(デフォルトのdict)の値がその通常のdef関数で設定されている場合、スレッドプールのそのスレッドのみにとどまる新しい値が作成され、残りのコード(パス操作関数など)はそれにアクセスできません。get_db()では、dict自体ではなく、dictに値のみを設定できます。

したがって、async依存関係reset_db_state()を使用して、コンテキスト変数にdictを設定する必要があります。これにより、すべてのコードが単一のリクエストのデータベース状態に対して同じdictにアクセスできるようになります。

依存関係get_db()で接続と切断を行う

次に、get_db()ではなく、async依存関係自体でデータベースを接続および切断しないのはなぜかという疑問が生じます。

async依存関係は、リクエストの残りの部分でコンテキスト変数が保持されるようにasyncである必要がありますが、データベース接続の作成と終了は潜在的にブロッキングであるため、そこにあるとパフォーマンスが低下する可能性があります。

そのため、通常のdef依存関係get_db()も必要です。