【Python】FastAPIのディレクトリ構成
クラウドソーシングでFastAPI
の実装を行う機会があったので、調べた事などをメモ。
作成したソースコードはGit Hubに上げてあります。
ドキュメント
公式ドキュメントは分かりやすく書かれていた印象です。
ディレクトリ構成
公式ドキュメントや現場のPJを参考に、以下の構成にしました。
※ Docker関連ファイルなどは除外してあります。
. ├── __init__.py ├── app.py ├── cli.py ├── database.py ├── db_config.py ├── dependencies │ ├── __init__.py │ ├── auth.py │ └── request.py ├── exception.py ├── gunicorn_conf.py ├── handlers │ ├── __init__.py │ └── exception.py ├── midlewares │ ├── __init__.py │ └── timer.py ├── model │ ├── __init__.py │ └── users.py ├── plugins │ └── __init__.py ├── requirements.txt ├── response.py └── routers ├── __init__.py ├── index.py └── users.py
app.py
create_app
でルーターやDB、ミドルウェア、エラーハンドリングの設定を行っています。
uvicorn
やgunicorn
からこちらのモジュールを指定して起動します。
cli.py
開発時の動作確認用。
uvicorn.run
で起動する他、ArgumentParser
でホストとポートを指定可能です。
起動コマンドはpython cli.py
database.py
SQLite、MySQLと単体テスト用のデータベース設定を定義しています。
ORMはsqlalchemy
。
exception.py
独自のExceptionクラスを定義しています。
response.py
JSONResponse
を継承した、APIのレスポンスクラスを定義しています。
orjson
を組み込むだけならORJSONResponse
クラスを直接利用すれば良いと思います。
gunicorn_conf.py
本番起動時のgunicorn
に関する設定を記載しています。
詳細は以下を参照してください。
dependencies/*
fastapi.Depends
でrouter
モジュール内のエンドポイント定義関数の引数に設定するモジュール群です。
公式で言うと以下の辺りです。
handlers/exception.py
handlers
配下に定義したエラーハンドリング関数をapp.py
でadd_exception_handler
する事により、エラーハンドリングを集約する事ができます。
midlewares/*
ミドルウェア用関数を定義しています。
app.py
でadd_middleware
する事により使用可能となります。
各エンドポイントのレスポンス返却前に、何かしらの処理を追加できます。
公式に記載があるのですが、yield
を使った依存関係をもつ場合の終了コードはミドルウェアの後に実行されるようです。
model/*
model
配下にsqlalchemy
のモデルクラスを定義します。
plugins/*
PJ独自のライブラリを使用した共通処理がある場合などで、実装モジュールを分けられる場合に使用します。
routers/*
fastapi.APIRouter
で各エンドポイントを定義します。
app.py
でinclude時にsqlalchemy.orm.Session
を注入してエンドポイントで利用できるようにしています。
まとめ
FastAPI
を利用したい時にサッと引っ張ってこれる雛形が欲しいという事もあり、この記事の執筆に至りました。
おかしな点などありましたら、教えて頂けると嬉しいです😊
関係ないのですが、クラウドソーシングをしていると動けば良いというスタンスのコードをよく見かけるので、こういった記事が増える事により、〇〇コードが少しでも減ってくれれば良いなと思います。
(構成以前に謎文法な事も多いので、まずは公開されているpythonコードで自分が実装する分野と近しい実装を見て欲しいです、、、😂 )
【Python】pyhive で Presto 接続
PyHiveでPresto接続方法のメモ。
認証方法はプロダクト毎に違うと思いますので、適宜変更。
この辺りを見れば実装方法が分かると思います。
インストール
$ pip3 install pyhive[presto]==0.6.4
実装
# -*- coding: utf-8 -*- import base64 from pyhive import presto def execute(): cursor = presto.connect( '<ホスト名>', port=443, catalog='<カタログ名>', username='<ユーザー名>', protocol='https', ).cursor() # パラメータは辞書・リスト・tupleのいずれか cursor.execute('SELECT * FROM sample WHERE test_column = %s', parameters=('param_value',)) print(cursor.fetchall()) if __name__ == '__main__': execute()
参考
逐次処理等がしたい場合はこちらを参考にすると良いかも。
機会があれば実装しようと思います。
オススメ書籍
色々と考えさせられる一冊です!
ページ数も少なく、時間がない人でも読める量だと思います(`・∀・´)
【Kubernetes】Job・CronJobの設定方法
Job・CronJobの実装方法をメモ。
kubernetes環境上で実行するjobは並列起動してもエラーとならない様な設計にすると良いそうです。
後はdockerイメージ更新はタグをベースに行う事により、imagePullPolicy: IfNotPresent
で更新の必要がない時はpullをskipさせる事ができます。
タグがlatest
だと更新漏れ等事故の元となる為、お勧めしません。
Jobの定義
マニフェストに定義してkubectl apply -f job.yaml
で実行できます。
jobの確認方法はkubectl get job
など、他のリソースと同じです。
apiVersion: batch/v1 kind: Job metadata: name: test-job spec: completions: 5 parallelism: 3 backoffLimit: 1 activeDeadlineSeconds: 3600 # この秒数を超える処理はエラーとなるので注意 template: spec: restartPolicy: Never containers: - name: test-job-container image: test-image:0.0.1 imagePullPolicy: IfNotPresent command: - sh - -c - "python3 main.py" env: - name: MYSQL_PASSWORD #jsonをsecretに格納した場合 valueFrom: secretKeyRef: name: test.secret.mysql key: passwd
CronJobの定義
実行環境によりますが毎時起動などで0分もしくは5の倍数分を指定するPJが多い為、若干ずらして指定すると環境に優しいです。
実行後のPOD保管数をsuccessfulJobsHistoryLimit
とfailedJobsHistoryLimit
で指定できます。
もしPOD内にクレデンシャル情報などが生成される場合、不要になった時点で削除しておくと安全かもしれません。
apiVersion: batch/v1beta1 kind: CronJob metadata: name: test-cronjob spec: schedule: "02 * * * *" concurrencyPolicy: Allow startingDeadlineSeconds: 2000 # 開始時刻 + startingDeadlineSeconds までの間に起動可能 successfulJobsHistoryLimit: 3 # 成功したジョブのPOD保管数 failedJobsHistoryLimit: 3 # 失敗したジョブのPOD保管数 jobTemplate: spec: backoffLimit: 1 activeDeadlineSeconds: 3600 # この秒数を超える処理はエラーとなるので注意 template: spec: containers: - name: test-cronjob-container image: test-image:0.0.1 imagePullPolicy: IfNotPresent command: - sh - -c - "python3 main.py" env: - name: MYSQL_PASSWORD valueFrom: secretKeyRef: name: test.secret.mysql key: passwd
【Kubernetes】PODの安全な起動・終了について
KubernetesのPODを安全に起動・終了する方法をメモ。
アプリケーションのGraceful shutdownは言語によって異なる為、都度実装する事になります。
The Twelve-Factor App
現在のPJではThe Twelve-Factor Appに沿った開発が推奨されています。
PODの安全な起動・停止に関する考慮も重要です。
12factor.net
安全に停止させるには
deploymentのマニフェストにPreStop
を入れる事で、PODの破棄命令〜SIGTERMまでの時間を稼ぐ事ができます。
下記は単純にsleepしているだけですが、何かしらの処理を入れる事も可能です。
deployment.yaml
lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 20"]
起動時にアプリケーションのスタンバイが間に合わない場合
アプリケーションがReady状態になったかを確認するエンドポイントを設定する事ができます。
こちらもマニフェストに入れる事が可能です。
deployment.yaml
readinessProbe: initialDelaySeconds: 10 timeoutSeconds: 1 httpGet: path: /status port: 443 scheme: HTTPS
ローリングアップデート時のPOD数
PodDisruptionBudget
を作成する事により、クラスタ内に必ずPODが残るように退避しながらノードが入れ替わっていきます。
これにより、ローリングアップデート時にクラスタ内にPODが存在しないという状態を回避できます。
PDB設定例
apiVersion: policy/v1beta1 kind: PodDisruptionBudget metadata: name: <任意のPDB名> spec: maxUnavaliable: 1 selector: matchLabels: app: <PODのラベル名>
【curl】通信時間の計測
curlのオプションで通信時にかかる時間の計測ができます。
どの時間帯が長引いているか等で、不具合の原因切り分けができるかもしれません。
manページ
コマンド
$ curl -w "\ntime_namelookup: %{time_namelookup}\ntime_connect: %{time_connect}\ntime_appconnect: %{time_appconnect}\ntime_pretransfer: %{time_pretransfer}\ntime_redirect: %{time_redirect}\ntime_starttransfer: %{time_starttransfer}\ntime_total: %{time_total}\n" https://curl.se/docs/manpage.html <レスポンス内容...> time_namelookup: 0.004318 time_connect: 0.061376 time_appconnect: 0.180256 time_pretransfer: 0.180681 time_redirect: 0.000000 time_starttransfer: 0.217821 time_total: 0.375091
参考
time_namelookup
が大きい場合...名前解決に時間がかかっている
time_pretransfer
とtime_starttransfer
の差が大きい場合...サーバー側の処理に時間がかかっている
time_namelookup: <開始から名前解決完了までにかかった時間> time_connect: <開始からTCP接続完了までにかかった時間> time_appconnect: <開始からSSLハンドシェイク成功までにかかった時間> time_pretransfer: <開始からファイル転送開始までにかかった時間> time_redirect: <リダイレクトにかかった時間> time_starttransfer: <開始からレスポンスの最初の1byteが到達するまでにかかった時間> time_total: <開始から終了まで>
【Python】Sanicでoracle接続
pythonでoracle接続を実装したのでメモ。
非同期処理用のフレームワークsanic
を使用しました。
ドキュメント
sanic
Python v3.7~が必要です。
v3.6は2021年末にサポートが終了するので、バージョンアップを検討した方が良いと思います。
sanic.readthedocs.io
cx_Oracle
oracle接続用のc++モジュールがありました。
python bindしてあるので、呼び出すだけで接続できます。
cx_Oracle_async
非同期処理版はこちら。
oracle cloudで使用しているのですが環境上ではインストールできなかった為、git hubソースからビルドしました。
pypi.org
実装
app.py
# -*- coding: utf-8 -*- import os from src.api.base import app cpu_count = os.cpu_count() workers = cpu_count - 1 if cpu_count > 1 else 1 if __name__ == '__main__': # ssl = {'cert': "./ssl/server.crt", 'key': "./ssl/server.key"} ssl通信の場合 app.run(host="0.0.0.0", port=8000, workers=workers, access_log=True, debug=True)
database.py
DB接続用モジュール。
# -*- coding: utf-8 -*- import os import time import cx_Oracle import cx_Oracle_async from sanic.log import logger from functools import wraps class SanicOracle: def __init__(self, app, dsn, user, password): @app.listener('before_server_start') async def setup_oracle_session_listener(_app, loop): await self.start(_app, loop, dsn, user, password) @app.listener('after_server_stop') async def teardown_oracle_session_listener(_app, loop): logger.info('closing oracle connection for [pid:{}]'.format(os.getpid())) await _app.db.close(force=True) async def start(self, _app, loop, dsn, user, password): oracle_pool = await cx_Oracle_async.create_pool( dsn=dsn, user=user, password=password, min=2, max=100, loop=loop ) async def _fetch_all(query: str, args: tuple = None): result = [] async with oracle_pool.acquire() as connection: async with connection.cursor() as cursor: if args: await cursor.execute(query, args) else: await cursor.execute(query) for r in await cursor.fetchall(): result.append(r) return result async def _execute_query(query: str, args: list = None): async with oracle_pool.acquire() as connection: async with connection.cursor() as cursor: await cursor.execute(query, args) await connection.commit() setattr(oracle_pool, 'fetch_all', _fetch_all) setattr(oracle_pool, 'execute_query', _execute_query) _app.db = oracle_pool def retry(f): """リトライ用""" _attempts = 5 _delay = 1 @wraps def wrapper(*args, **kwargs): _retry = 0 while True: try: _retry += 1 result = f(*args, **kwargs) return result except cx_Oracle.OperationalError as ex: if _retry >= _attempts: error, = ex.args logger.error(error.message) raise ex time.sleep(_delay) return wrapper
src/api/base.py
APIの基本設定モジュール。
# -*- coding: utf-8 -*- import os from sanic import Sanic from sanic.exceptions import NotFound, InvalidUsage, MethodNotSupported,\ ServerError, Unauthorized from sanic.response import text, json from sanic.log import logger from database import SanicOracle from src.api.test import test_bp app_name = 'test-app' def create_app(): sanic_app = Sanic(app_name) sanic_app.blueprint(test_bp) os.environ['TNS_ADMIN'] = '<Oracle Net Services構成ファイルの配置先>' dsn = 'xxxxxxxxxx_high' user = os.getenv('USER') password = os.getenv('PASS') SanicOracle(sanic_app, dsn, user, password) logger.info('api initialized.') @sanic_app.route('/') async def index(request): return text('ok') @sanic_app.middleware('response') async def prevent_xss(request, response): response.headers['x-xss-protection'] = '1; mode=block' return sanic_app app = create_app() @app.exception(InvalidUsage) async def bad_request(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=400) @app.exception(Unauthorized) async def unauthorized(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=401) @app.exception(NotFound) async def not_found(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=404) @app.exception(MethodNotSupported) async def method_not_allowed(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=405) @app.exception(ServerError) async def internal_server_error(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=500)
src/api/test.py
blueprintの割り振り先。
実際はフロントエンドにvue.jsを使用している為、jsonレスポンスの返却のみとなります。
# -*- coding: utf-8 -*- import traceback from sanic import Blueprint from sanic.exceptions import abort from sanic.log import logger from sanic.response import json test_bp = Blueprint(__name__) msg_500 = 'Internal sever error' @test_bp.route('/get_sample') async def get_sample(request): try: query = 'SELECT id, name, modified FROM ( SELECT id, name, modified FROM user ORDER BY modified DESC ) WHERE ROWNUM <= 200' rows = await request.app.db.fetch_all(query) result = [] for row in rows: result.append({ 'id': row[0], 'name': row[1], 'modified': row[2] }) return json({'result': result}) except: logger.error(traceback.format_exc()) abort(500, msg_500)
実行
$ python3.8 app.py > /var/log/api/api.log &
【shell】便利コマンド
忘れそうなので、業務で実装したバッチ処理などで利用したshellコマンドのメモ。
随時、追加して行きます。
現在日時からの加減算
# 1時間後 $ echo $(date "+%Y%m%d %H%M%S" -d '1 hour') # 1時間前 $ echo $(date "+%Y%m%d %H%M%S" -d '1 hour ago')
ファイルの拡張子を取得
$ for file in $(find <パス> -type f); do ext=${file##*.}; echo ${ext}; done
最終更新日時 < 30分前のファイルを削除
$ find . -type f -mmin +30 -print0 | xargs -0 --no-run-if-empty rm -f
shellでsendmail
# 関数化 send_mail() { __mail_to=$1 __mail_from=$2 __mail_subject=$3 __mail_message=$4 <send_mail格納先のパス>/sendmail -f "$__mail_from" -t "$__mail_to" << _EOB_ Content-Type: text/plain; charset=euc-jp Content-transfer-encoding: 7bit To: $__mail_to From: $__mail_from Subject: $__mail_subject $__mail_message _EOB_ }
# 送信 $ send_mail ${TO} ${FROM} "<subject>" "<メッセージ>"
# trap $ trap "send_mail ${TO} ${FROM} \"<subject>\" \"<メッセージ>\"" INT HUP TERM
拡張子を指定したfindの実行結果をtarにアーカイブ
$ find <検索対象パス> ( -name "*.png" -o -name "*.jpg" -o -name "*.jpeg" ) -print0 | tar -cvz -T - --null -f <アーカイブファイル名>
特定の拡張子を除外してアーカイブ
$ tar --exclude *.png --exclude *.jpg --exclude *.jpeg -cvf <アーカイブファイル名> <アーカイブ対象パス>
パイプラインで実行した際の終了ステータスを確認したい場合
$ find input/ -type f -print0 | xargs --no-run-if-empty -0 -I {} cp -p {} output/ $ for st in ${PIPESTATUS[@]}; do if [ $st -ne 0 ]; then echo "エラー発生"; fi
ファイルから指定した文字列を削除
$ sed -i -e 's/<削除したい文字列>//g' <対象のファイルパス>
apacheのログ出力をlog rotateに変更
$ sed -i \ -e "s/ErrorLog \"logs\/error_log\"/ErrorLog \"|\/usr\/sbin\/rotatelogs \/var\/log\/httpd\/error\.\%Y\%m\%d\.log 86400 540\"/g" -e "s/CustomLog \"logs\/access_log\" combined /CustomLog \"|\/usr\/sbin\/rotatelogs \/var\/log\/httpd\/access\.\%Y\%m\%d\.log 86400 540\" combined/g" \ /etc/httpd/conf/httpd.conf
ssl証明書内容をopensslで確認
# 証明書ファイルの内容を確認 $ openssl x509 -text -noout -in <サーバ証明書のパス> # 秘密鍵ファイルの内容を確認 $ openssl rsa -text -noout -in <秘密鍵のパス> # CSRファイルの内容を確認 $ openssl req -text -noout -in <CSRファイルのパス>