APIの認証・認可をためしてみる

この記事では、Pythonで効率的かつ迅速なAPIを実装するための最先端フレームワークであるFastAPIに、認証・認可の仕組みを追加する方法についてご紹介します。

FastAPIの環境構築や基本的な使い方は、以下の記事で紹介していますので参考にしてください。

APIの認証・認可をためしてみる

今回は、OAuth 2.0という権限を認可するためのオープンスタンダードな仕組みを FastAPI に組み込んで、ユーザーの認証・認可を行う RESTful API を実装していきます。

OAuth 2.0は、クライアントが操作するウェブアプリケーション、デスクトップアプリケーション、スマートフォンなどのアプリケーション開発に対し、リソースへのアクセスに対するクライアントの権限を認可するための簡単な方法を提供してくれます。

認証・認可のロールとシーケンス

OAuth 2.0は、以下の4種類のロール(役割)を考えることが必要です。

クライアントリソースへのアクセスを行うアプリケーションやサービスのこと
リソースにアクセスする クライアントがリソース(データや機能)にアクセスすること
リソースオーナーリソースへアクセスするクライアントの正当性を確認し認証する
認可サーバクライアントの正当なアクセスに対して、アクセストークンを発行するサーバ
リソースサーバアクセストークンの内容に基づいて、クライアントにリソースを提供するサーバ
OAuth 2.0のロール(役割)一覧

認証・認可のシーケンスは、以下のとおりです。

認証・認可のシーケンス図

FastAPIの実行環境を構築する

この記事では、Pythonの仮想環境を利用しています。

まずはじめにFastAPIの実行環境を構築します。fastapiuvicornをインストールします。

(FastAPI) [workuser@devsrv01 FastAPI]$ pip3 install fastapi uvicorn

また、クライアントのウェブアプリケーションをPythonで模擬するため、pytho-multipartrequestsもあわせてインストールしておきます。

(FastAPI) [workuser@devsrv01 FastAPI]$ pip3 install python-multipart requests

インストールされたモジュールの一覧は、以下のとおりです。

(FastAPI) [workuser@devsrv01 FastAPI]$ pip3 list
Package Version
------------------ --------
annotated-types 0.6.0
anyio 4.3.0
certifi 2024.2.2
charset-normalizer 3.3.2
click 8.1.7
fastapi 0.110.2
h11 0.14.0
idna 3.7
pip 23.2.1
pydantic 2.7.0
pydantic_core 2.18.1
python-multipart 0.0.9
requests 2.31.0
sniffio 1.3.1
starlette 0.37.2
typing_extensions 4.11.0
urllib3 2.2.1
uvicorn 0.29.0

認証・認可付きAPIのコードを作成する

認証・認可付きAPIのコードを作成していきます。コードの全体は、以下のとおりです。

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import secrets

# ダミーのユーザーデータベース
tempo_users_db = {
"user1": {
"username": "user1",
"password": "password1",
},
"user2": {
"username": "user2",
}

# ユーザーを検証する関数
async def authenticate_user(username: str, password: str):
user = tempo_users_db.get(username)
if not user:
return False
if not password == user["password"]:
return False
return user

# ランダムなトークンを生成する関数
def generate_random_token(length=32):
alphabet = string.ascii_letters + string.digits
token = ''.join(secrets.choice(alphabet) for _ in range(length))
return token

# トークンを検証する関数
async def get_current_user(token: str = Depends(OAuth2PasswordBearer(tokenUrl="token"))):
for user in tempo_users_db.values():
if user.get("access_token") == token and user.get("token_expire_time") > datetime.utcnow():
return user
raise HTTPException(
status_code=401,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)

# FastAPIのインスタンスを作成する
app = FastAPI()

# トークンを取得するエンドポイント
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)

if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# トークンの有効期限を15分に設定
access_token_expires = timedelta(minutes=15)

# トークンの有効期限と発行時刻を計算
access_token_expire_time = datetime.utcnow() + access_token_expires

# ユーザーが認証された場合、ランダムなトークンを生成して返す
access_token = generate_random_token()
user["access_token"] = access_token
user["token_expire_time"] = access_token_expire_time
return {"access_token": access_token, "token_type": "bearer",
"expires_at": access_token_expire_time
}


# プロテクトされたエンドポイント
@app.get("/protected")
async def protected_route(user: dict = Depends(get_current_user)):
# ユーザーが認証された場合、結果を返す
return {"message": f"Success to authentication, {user['username']}!"}

コードの内容について解説していきます。

認証・認可を実装するために必要となるFastAPIのクラスは、DependsHTTPExceptionOAuth2PasswordBearerOAuth2PasswordRequestFormです。

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import secrets
Depends依存性注入システムを使用して、エンドポイントのロジックの前に実行する必要がある依存関係を解決する関数またはデコレータ
HTTPExceptionHTTPステータスコードとエラーメッセージを指定して、エラーレスポンスを生成するために使用するFastAPIのクラス
OAuthPasswordBearerOAuth 2.0のパスワードフローを使用して、認証ヘッダーからアクセストークンを取得し、認証されたユーザーを識別するために使用するクラス
OAuth2PasswordRequestFormパスワードフローの認証リクエストを受け取るためのデータモデルのクラス
認証・認可に必要なFastAPIのクラスと概要説明

以下は、辞書で作成したダミーのユーザデータベースです。

通常、クライアントの認証情報はデータベースを利用して構築されることが一般的ですが、今回のトライアルでは、クライアントの認証情報を辞書ベースで模擬しています。

# ダミーのユーザーデータベース
tempo_users_db = {
"user1": {
"username": "user1",
"password": "password1",
},
"user2": {
"username": "user2",
"password": "password2",
},
}

以下では、ユーザを検証するための関数を定義しています。

この関数では、引数で渡されるユーザの名前とパスワードがダミーのユーザーデータベースに登録されているか確認し、登録がない場合はFalseを返し、登録されている場合はユーザーの名前を返します。

# ユーザーを検証する関数
async def authenticate_user(username: str, password: str):
user = tempo_users_db.get(username)
if not user:
return False
if not password == user["password"]:
return False
return user

以下では、ランダムなトークンを生成する関数を定義しています。

この関数では、32文字のランダムなトークンが生成されます。文字列モジュールから英字と数字を含むアルファベットを取得し、それらの文字を組み合わせてランダムなトークンを生成して返します。

# ランダムなトークンを生成する関数
def generate_random_token(length=32):
alphabet = string.ascii_letters + string.digits
token = ''.join(secrets.choice(alphabet) for _ in range(length))
return token

以下では、トークンを検証する関数を定義しています。

この関数では、アクセストークンを受け取り、そのトークンを持つユーザーを検証します。アクセストークンが有効であり、トークンの有効期限が現在の時刻よりも後である場合、対応するユーザー情報が返されます。

有効なユーザーが見つからない場合、HTTPステータスコード401(認証エラー)が返され、詳細には「Invalid authentication credentials」というエラーメッセージを出力します。また、WWW-AuthenticateヘッダーにはBearerスキームを指定しています。

BearerはHTTP認証スキームの一種です。通常は、アクセストークンとして使用されます。

Bearerトークンは、HTTPリクエストのAuthorizationヘッダーに含まれ、そのヘッダーにはBearerというキーワードの後にアクセストークンが続く文字列が記録されています。このスキームは、OAuth 2.0などのプロトコルで一般的に使用されています。

# トークンを検証する関数
async def get_current_user(token: str = Depends(OAuth2PasswordBearer(tokenUrl="token"))):
for user in tempo_users_db.values():
if user.get("access_token") == token and user.get("token_expire_time") > datetime.utcnow():
return user
raise HTTPException(
status_code=401,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)

以下では、FastAPIのインスタンスを作成しています。

# FastAPIのインスタンスを作成する
app = FastAPI()

以下では、アクセストークンを取得するFastAPIのエンドポイントを定義しています。

「/token」にPOSTリクエストが送信されると、指定されたフォームデータ(ユーザーの名前とパスワード)を使用してユーザーを認証します。

認証が成功した場合は、ランダムなアクセストークンが生成され、トークンの有効期限が15分に設定されます。

その後、生成されたアクセストークンとトークンの有効期限が含まれたレスポンスが返されます。もし認証に失敗した場合、HTTPステータスコード401が返され、詳細には「Incorrect username or password」というエラーメッセージを出力します。

# トークンを取得するエンドポイント
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)

if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# トークンの有効期限を15分に設定
access_token_expires = timedelta(minutes=15)

# トークンの有効期限と発行時刻を計算
access_token_expire_time = datetime.utcnow() + access_token_expires

# ユーザーが認証された場合、ランダムなトークンを生成して返す
access_token = generate_random_token()
user["access_token"] = access_token
user["token_expire_time"] = access_token_expire_time
return {"access_token": access_token, "token_type": "bearer",
"expires_at": access_token_expire_time
}

以下では、認証・認可されたユーザーにリソースを提供するFastAPIのエンドポイントを定義しています。

「/protected」にGETリクエストが送信されると、トークンを起点としてユーザーが認証され、そのユーザーが認可されたリソースにアクセスすることが許可されます。

以下では、認可に成功した場合にユーザー名を含むメッセージが含まれたレスポンスが返されます。

# プロテクトされたエンドポイント
@app.get("/protected")
async def protected_route(user: dict = Depends(get_current_user)):
# ユーザーが認証された場合、結果を返す
return {"message": f"Success to authentication, {user['username']}!"}

APIを起動する

uvicornを使って、作成したAPIを起動します。特にエラーが出力されていなければ、APIはアクセス待ちの状態になっています。

(FastAPI) [workuser@devsrv01 FastAPI]$ uvicorn main:app --host=0.0.0.0 --port=8001 --reload
INFO: Will watch for changes in these directories: ['/home/workuser/FastAPI']
INFO: Uvicorn running on http://0.0.0.0:8001 (Press CTRL+C to quit)
INFO: Started reloader process [3360] using StatReload
INFO: Started server process [3362]
INFO: Waiting for application startup.
INFO: Application startup complete.

APIにアクセスしてみる

作成したAPIにアクセスするには、クライアント側からウェブアプリケーションを介して行う必要があります。

今回はフロントエンドの開発までは含んでいないため、Pythonを使ってクライアントのウェブアクセスを模擬していきます。

ウェブアクセスを模擬するコードを作成する

クライアントのウェブアクセスを模擬するコードを作成します。コードの全体は、以下のとおりです。

import requests

def get_access_token(username, password):
# トークンを取得するためのエンドポイント
token_url = "http://192.168.11.200:8001/token"

# ユーザー名とパスワードを含むフォームデータ
form_data = {
"username": username,
"password": password
}

# POSTリクエストを送信してトークンを取得
response = requests.post(token_url, data=form_data, timeout=10)

# レスポンスのステータスコードを確認
if response.status_code == 200:
# レスポンスからトークンを抽出
access_token = response.json()["access_token"]
return access_token
else:
# エラーが発生した場合はエラーメッセージを表示
print("Error:", response.text)
return None

# ユーザー名とパスワードを指定してトークンを取得
username = "user1"
password = "password1"
access_token = get_access_token(username, password)

if access_token:
print("Access Token:", access_token )
else:
print("Failed to get access token.")

### エンドポイントにアクセスする
url = "http://192.168.11.200:8001/protected"

headers = {"Authorization": f"Bearer {access_token}"}

print(headers)

response = requests.get(url, headers=headers)

# レスポンスの内容を表示
print(response.text)

まずはじめに、pythonでウェブアクセスするためにrequestsをインポートします。

次に、トークンを取得するためのエンドポイントにアクセスする関数を定義します。この関数では、トークンを取得するためのエンドポイントのURLと引数で渡されるユーザーの名前とパスワードをフォームデータを作成します。

その後、POSTリクエストを使用してフォームデータをトークン取得エンドポイントに送信し、レスポンスのステータスコードを確認します。

成功した場合は、レスポンスからアクセストークンを抽出して返します。エラーが発生した場合は、エラーメッセージを表示します。

import requests

def get_access_token(username, password):
# トークンを取得するためのエンドポイント
token_url = "http://192.168.11.200:8001/token"

# ユーザー名とパスワードを含むフォームデータ
form_data = {
"username": username,
"password": password
}

# POSTリクエストを送信してトークンを取得
response = requests.post(token_url, data=form_data, timeout=10)

# レスポンスのステータスコードを確認
if response.status_code == 200:
# レスポンスからトークンを抽出
access_token = response.json()["access_token"]
return access_token
else:
# エラーが発生した場合はエラーメッセージを表示
print("Error:", response.text)
return None

以下では、ユーザの名前とパスワードを設定し、トークン取得エンドポイントにアクセスするget_access_token関数を実行します。

アクセストークンが返された場合は、アクセストークンの文字列を表示し、アクセストークンの取得が失敗した場合は、「Failed to get access token.」のメッセージを表示します。

# ユーザー名とパスワードを指定してトークンを取得
username = "user1"
password = "password1"
access_token = get_access_token(username, password)

if access_token:
print("Access Token:", access_token )
else:
print("Failed to get access token.")

以下では、取得したアクセストークンをリクエストヘッダに追加して、リソースを取得するエンドポイントにウェブアクセスし、レスポンスの内容を表示させます。

### エンドポイントにアクセスする
url = "http://192.168.11.200:8001/protected"

headers = {"Authorization": f"Bearer {access_token}"}

print(headers)

response = requests.get(url, headers=headers)

# レスポンスの内容を表示
print(response.text)

ウェブアクセスを実行してみる

作成したクライアントのウェブアクセスを模擬するコードを実行します。

ダミーのユーザーデータベースに登録されているユーザーの場合は、認証・認可が成功してアクセストークンが発行されています。

取得したアクセストークンを使ってリソースエンドポイントにアクセスし、ユーザ名が正しく取得できています。

(FastAPI) [workuser@devsrv01 FastAPI]$ python3 client.py
Access Token: 2PI2oI7kXXSVG64lz5GonsUiAnkfcTLy
{'Authorization': 'Bearer 2PI2oI7kXXSVG64lz5GonsUiAnkfcTLy'}
{"message":"Success to authentication, user1!"}

ダミーのユーザーデータベースに登録されていないユーザーの場合は、認証の失敗に関するメッセージとアクセストークンの取得失敗に関するメッセージが出力されます。

アクセストークンを保持していないため、リソースエンドポイントにアクセスしても、認証・認可でエラーが返されています。

(FastAPI) [workuser@devsrv01 FastAPI]$ python3 client.py
Error: {"detail":"Incorrect username or password"}
Failed to get access token.
{'Authorization': 'Bearer None'}
{"detail":"Invalid authentication credentials"}

まとめ

いかがでしたでしょうか。

今回は、FastAPIに認証・認可の仕組みを追加する方法についてご紹介しました。

APIに認証・認可の仕組みを導入することにより、不正なアクセスや悪意のある攻撃からシステムを保護することができます。認証を通じて、システムにアクセスするユーザーが誰であるかを確認し、認可を通じて、それぞれのユーザーに対して適切なリソースや操作へのアクセス権を与えることが可能となります。

また、個人情報や機密情報を取り扱うシステムやサービスでは特に重要であり、様々な認証・認可の仕組みが利用されています。

APIの認証・認可の仕組みは、システムのセキュリティと信頼性を確保し、法的要件を遵守するために不可欠なのです。

参考になれば幸いです。

システムのお悩みについてご相談ください

本サイトの掲載内容に関するお問い合わせは、こちらから承ります。
SOHOのシステム運用管理に関するお悩みごとについて、なんでもお気兼ねなくご相談ください。
現役システムエンジニアのスタッフが、ボランティアでご相談にご対応させていただきます。