データベースを組み合わせたFastAPIの実装

この記事では、Pythonで効率的かつ迅速なAPIを実装するための最先端フレームワークであるFastAPIを使って、PostgreSQLデータベースから検索したデータを提供するための方法についてご紹介します。

FastAPIの環境構築や基本的な使い方は、以下の記事で紹介していますので参考にしてください。

また、PythonでPostgreSQLデータベースを検索する方法については、以下の記事で紹介していますのでこちらも参考にしてください。

データベースの検索結果を返すコードを作成する

今回は、PostgreSQLデータベースに登録されているデータの中から、指定した日時の検索結果を返すRESTful APIを作成していきます。

Pythonのコードは、メインとクラスに分けて作成します。

クラスコードの作成

はじめにクラスを作成します。クラスコードの全体は、以下のとおりです。

import psycopg2
import pandas as pd
import json
import os

class DBSelectModule :
def __init__(self,_host, _database, _user) :

# 環境変数からデータベース接続用のパスワードを取得する
_auth = os.environ.get('DB_PASSWORD')

# データベース接続文字列を作成する
self.dsn = f"host={_host} dbname={_database} user={_user} password={_auth}"

def selectAmedasData(self, ymd) :

# 初期化する
conn = None
json_data = None

try:
# データベースに接続する
conn = psycopg2.connect(self.dsn)

# ここでデータベース操作を行う
cur = conn.cursor()
cur.execute("SELECT * FROM obsdata.amedas WHERE TO_CHAR(ymdi, 'YYYY-MM-DD HH24:MI:SS') LIKE %s",(ymd + '%',))

# カラム名を取得する
cols = [col.name for col in cur.description]

# 取得したデータをデータフレームに入れる
df = pd.DataFrame(cur.fetchall(), columns = cols)

# データフレームをJSONに変換する
json_data = df.to_json(orient='records',date_format='iso',force_ascii=False)

except psycopg2.Error as e:
print("データベース接続エラー:", e)

finally:
if conn is not None:
conn.close()

return json_data

クラスコードで使用するPythonモジュールは、psycopg2pandasjsonです。また、環境変数に設定したデータベースユーザのパスワードを読み取るため、osもインポートしています。

import psycopg2
import pandas as pd
import json
import os

次は、コンストラクタメソッドで環境変数に設定したデータベースユーザのパスワードを取得し、メインコードから渡されるホストなどのデータベース接続情報とあわせて、データベース接続文字列を作成しています。

  def __init__(self,_host, _database, _user) :

# 環境変数からデータベース接続用のパスワードを取得する
_auth = os.environ.get('DB_PASSWORD')

# データベース接続文字列を作成する
self.dsn = f"host={_host} dbname={_database} user={_user} password={_auth}"

selectAmedasDataメソッドは、「データベース接続」→「SQLの実行」→「Pandasデータフレームへ代入」→「JSONデータへ変換」→「データベース切断」の順で処理を行います。

SQL実行などのデータベース操作は、cursor(カーソル)を介して実行されます。cursor.execute()

でSQLを実行し、cursor.fetchall()でSQLの結果をすべて取得しています。

  def selectAmedasData(self, ymd) :

# 初期化する
conn = None
json_data = None

try:
# データベースに接続する
conn = psycopg2.connect(self.dsn)

# ここでデータベース操作を行う
cur = conn.cursor()
cur.execute("SELECT * FROM obsdata.amedas WHERE TO_CHAR(ymdi, 'YYYY-MM-DD HH24:MI:SS') LIKE %s",(ymd + '%',))

# カラム名を取得する
cols = [col.name for col in cur.description]

# 取得したデータをデータフレームに入れる
df = pd.DataFrame(cur.fetchall(), columns = cols)

# データフレームをJSONに変換する
json_data = df.to_json(orient='records',date_format='iso',force_ascii=False)

except psycopg2.Error as e:
print("データベース接続エラー:", e)

finally:
if conn is not None:
conn.close()

return json_data

メインコードの作成

次に、メインを作成します。メインコードの全体は、以下のとおりです。

from fastapi import FastAPI
from fastapi.responses import JSONResponse
from typing import Optional
from extention.DBSelectModule import DBSelectModule

# PostgreSQL接続情報
_host = '192.168.11.220'
_database = 'test_database'
_user = 'workuser'

# FastAPIをインスタンス化する
app = FastAPI()

@app.get("/")
async def getAmedasApi(ymd: Optional[str] = '1970-01-01') :

# データベース接続クラスをインスタンス化する
dbconn = DBSelectModule(_host, _database, _user)

# データベースを検索するメソッドを実行する
res_json = dbconn.selectAmedasData(ymd)

# JSON形式でデータをレスポンスする
return JSONResponse(res_json, media_type="application/json")

メインコードで使用するPythonモジュールは、Restful APIを実装するために必要なPythonのFastAPIモジュールとPythonの型ヒントを使用するため、Optionalモジュールもあわせてインポートします。

また、先に作成したクラスコードをインポートします。以下の例では、main.pyと同一階層のextentionディレクトリに配置されたクラスコードをインポートしています。

from fastapi import FastAPI
from fastapi.responses import JSONResponse
from typing import Optional
from extention.DBSelectModule import DBSelectModule

以下は、クラスに渡すデータベース接続情報を設定します。

# PostgreSQL接続情報
_host = '192.168.11.220'
_database = 'test_database'
_user = 'workuser'

最後に、FastAPIをインスタンス化し、デコレーターを使用してGETリクエストを処理するためのエンドポイントを定義しています。

async def getAmedasApi関数で、非同期処理で動作するAPIを定義します。この関数の中では、データベースへ接続する処理と返されたJSON形式の結果をAPIレスポンスする処理が含まれています。

# FastAPIをインスタンス化する
app = FastAPI()

@app.get("/")
async def getAmedasApi(ymd: Optional[str] = '1970-01-01') :

# データベース接続クラスをインスタンス化する
dbconn = DBSelectModule(_host, _database, _user)

# データベースを検索するメソッドを実行する
res_json = dbconn.selectAmedasData(ymd)

# JSON形式でデータをレスポンスする
return JSONResponse(res_json, media_type="application/json")

APIを実行してみる

早速、作成したAPIを実行してみます。作成したAPIはuvicornを使って起動します。全てのネットワークに対する応答を許可するために、–host=0.0.0.0を設定します。

リッスンポートは--port=8001で指定します。

(test_env01) [workuser@devsrv01 PostgreSQL]$ uvicorn main:app --host=0.0.0.0 --port=8001 --reload
INFO: Will watch for changes in these directories: ['/home/workuser/PostgreSQL']
INFO: Uvicorn running on http://0.0.0.0:8001 (Press CTRL+C to quit)
INFO: Started reloader process [220] using StatReload
INFO: Started server process [222]
INFO: Waiting for application startup.
INFO: Application startup complete.

ブラウザから、APIを実装したサーバの8001番ポートにアクセスしてみます。

クエリ変数を何もつけていないので、空のJSONが返されました。次に検索日付をクエリ変数に設定してアクセスしてみます。

データベースには、2024年2月分の東京のアメダス観測結果が入っています。クエリ変数はymdで、値には検索する日付をDate型で与えてみます。

クエリ条件にマッチするデータベースの内容は、以下のようにJSON形式で表示されます。

次に、検索する日付をTimestamp型で指定してみます。条件にマッチする1つの結果が以下のように表示されます。

まとめ

いかがでしたでしょうか。

今回は、FastAPIを使って、PostgreSQLデータベースから検索したデータをAPIで提供するための実装方法について紹介しました。

FastAPIを利用することで、システム間において標準的なhttpプロトコルを使用したAPI連携が容易に実装できるため、効率的なシステム開発や機動的なシステム運用が可能なります。

特にクラウド・バイ・デフォルトを掲げる日本政府の指針においては、システム連接やデータ連携は標準化されたAPIで実装することが求められており、これにより、行政や自治体の電子サービスの構築を柔軟にして、より良いサービス提供を目標としています。

APIを介してクライアントが保有する様々なデバイスにも効率的に気象データを提供できるようになれば、エッジコンピューティングを活用した気象防災ソリューションの発達に寄与し、SDGsの目標13「気候変動に具体的な対策を」の達成に貢献できるかもしれません。

参考になれば幸いです。

システムのお悩みについてご相談ください

本サイトの掲載内容に関するお問い合わせは、こちらから承ります。
SOHOのシステム運用管理に関するお悩みごとについて、なんでもお気兼ねなくご相談ください。
現役システムエンジニアのスタッフが、ボランティアでご相談にご対応させていただきます。