【Python】FastAPIのディレクトリ構成

f:id:tm200:20210725133513p:plain

クラウドソーシングでFastAPIの実装を行う機会があったので、調べた事などをメモ。
作成したソースコードGit Hubに上げてあります。

ドキュメント

公式ドキュメントは分かりやすく書かれていた印象です。

fastapi.tiangolo.com

ディレクトリ構成

公式ドキュメントや現場のPJを参考に、以下の構成にしました。
※ Docker関連ファイルなどは除外してあります。

.
├── __init__.py
├── app.py
├── cli.py
├── database.py
├── db_config.py
├── dependencies
│   ├── __init__.py
│   ├── auth.py
│   └── request.py
├── exception.py
├── gunicorn_conf.py
├── handlers
│   ├── __init__.py
│   └── exception.py
├── midlewares
│   ├── __init__.py
│   └── timer.py
├── model
│   ├── __init__.py
│   └── users.py
├── plugins
│   └── __init__.py
├── requirements.txt
├── response.py
└──  routers
    ├── __init__.py
    ├── index.py
    └── users.py

app.py

create_appルーターやDB、ミドルウェア、エラーハンドリングの設定を行っています。
uvicorngunicornからこちらのモジュールを指定して起動します。

cli.py

開発時の動作確認用。
uvicorn.runで起動する他、ArgumentParserでホストとポートを指定可能です。
起動コマンドはpython cli.py

database.py

SQLiteMySQL単体テスト用のデータベース設定を定義しています。
ORMはsqlalchemy

exception.py

独自のExceptionクラスを定義しています。

response.py

JSONResponseを継承した、APIのレスポンスクラスを定義しています。
orjsonを組み込むだけならORJSONResponseクラスを直接利用すれば良いと思います。

gunicorn_conf.py

本番起動時のgunicornに関する設定を記載しています。
詳細は以下を参照してください。

docs.gunicorn.org

dependencies/*

fastapi.Dependsrouterモジュール内のエンドポイント定義関数の引数に設定するモジュール群です。
公式で言うと以下の辺りです。

fastapi.tiangolo.com

handlers/exception.py

handlers配下に定義したエラーハンドリング関数をapp.pyadd_exception_handlerする事により、エラーハンドリングを集約する事ができます。

midlewares/*

ミドルウェア用関数を定義しています。
app.pyadd_middlewareする事により使用可能となります。
各エンドポイントのレスポンス返却前に、何かしらの処理を追加できます。
公式に記載があるのですが、yieldを使った依存関係をもつ場合の終了コードはミドルウェアの後に実行されるようです。

fastapi.tiangolo.com

model/*

model配下にsqlalchemyのモデルクラスを定義します。

plugins/*

PJ独自のライブラリを使用した共通処理がある場合などで、実装モジュールを分けられる場合に使用します。

routers/*

fastapi.APIRouterで各エンドポイントを定義します。
app.pyでinclude時にsqlalchemy.orm.Sessionを注入してエンドポイントで利用できるようにしています。

まとめ

FastAPIを利用したい時にサッと引っ張ってこれる雛形が欲しいという事もあり、この記事の執筆に至りました。
おかしな点などありましたら、教えて頂けると嬉しいです😊

関係ないのですが、クラウドソーシングをしていると動けば良いというスタンスのコードをよく見かけるので、こういった記事が増える事により、〇〇コードが少しでも減ってくれれば良いなと思います。
(構成以前に謎文法な事も多いので、まずは公開されているpythonコードで自分が実装する分野と近しい実装を見て欲しいです、、、😂 )

【Python】pyhive で Presto 接続

f:id:tm200:20210704143739p:plain


PyHiveでPresto接続方法のメモ。
認証方法はプロダクト毎に違うと思いますので、適宜変更。
この辺りを見れば実装方法が分かると思います。

github.com

インストール

$ pip3 install pyhive[presto]==0.6.4

実装

# -*- coding: utf-8 -*-
import base64
from pyhive import presto

def execute():
    cursor = presto.connect(
        '<ホスト名>',
        port=443,
        catalog='<カタログ名>',
        username='<ユーザー名>',
        protocol='https',
    ).cursor()

    # パラメータは辞書・リスト・tupleのいずれか
    cursor.execute('SELECT * FROM sample WHERE test_column = %s', parameters=('param_value',))
    print(cursor.fetchall())


if __name__ == '__main__':
    execute()

参考

逐次処理等がしたい場合はこちらを参考にすると良いかも。
機会があれば実装しようと思います。

github.com

www.python.org

オススメ書籍

色々と考えさせられる一冊です!
ページ数も少なく、時間がない人でも読める量だと思います(`・∀・´)

【Kubernetes】Job・CronJobの設定方法

f:id:tm200:20210611174228p:plain

Job・CronJobの実装方法をメモ。
kubernetes環境上で実行するjobは並列起動してもエラーとならない様な設計にすると良いそうです。
後はdockerイメージ更新はタグをベースに行う事により、imagePullPolicy: IfNotPresentで更新の必要がない時はpullをskipさせる事ができます。
タグがlatestだと更新漏れ等事故の元となる為、お勧めしません。

Jobの定義

マニフェストに定義してkubectl apply -f job.yamlで実行できます。
jobの確認方法はkubectl get jobなど、他のリソースと同じです。

apiVersion: batch/v1
kind: Job
metadata:
  name: test-job
spec:
  completions: 5
  parallelism: 3
  backoffLimit: 1
  activeDeadlineSeconds: 3600  # この秒数を超える処理はエラーとなるので注意
  template:
    spec:
      restartPolicy: Never
      containers:
      - name: test-job-container
         image: test-image:0.0.1
         imagePullPolicy: IfNotPresent
         command:
           - sh
           - -c
           - "python3 main.py"
         env:
         - name: MYSQL_PASSWORD  #jsonをsecretに格納した場合
           valueFrom:
             secretKeyRef:
               name: test.secret.mysql
               key: passwd

CronJobの定義

実行環境によりますが毎時起動などで0分もしくは5の倍数分を指定するPJが多い為、若干ずらして指定すると環境に優しいです。
実行後のPOD保管数をsuccessfulJobsHistoryLimitfailedJobsHistoryLimitで指定できます。
もしPOD内にクレデンシャル情報などが生成される場合、不要になった時点で削除しておくと安全かもしれません。

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: test-cronjob
spec:
  schedule: "02 * * * *"
  concurrencyPolicy: Allow
  startingDeadlineSeconds: 2000  # 開始時刻 + startingDeadlineSeconds までの間に起動可能
  successfulJobsHistoryLimit: 3 # 成功したジョブのPOD保管数
  failedJobsHistoryLimit: 3 # 失敗したジョブのPOD保管数
  jobTemplate:
    spec:
      backoffLimit: 1
      activeDeadlineSeconds: 3600  # この秒数を超える処理はエラーとなるので注意
      template:
        spec:
          containers:
          - name: test-cronjob-container
            image: test-image:0.0.1
            imagePullPolicy: IfNotPresent
            command:
              - sh
              - -c
              - "python3 main.py"
            env:
            - name: MYSQL_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: test.secret.mysql
                  key: passwd

【Kubernetes】PODの安全な起動・終了について

f:id:tm200:20210611174228p:plain

KubernetesのPODを安全に起動・終了する方法をメモ。
アプリケーションのGraceful shutdownは言語によって異なる為、都度実装する事になります。

The Twelve-Factor App

現在のPJではThe Twelve-Factor Appに沿った開発が推奨されています。
PODの安全な起動・停止に関する考慮も重要です。
12factor.net

安全に停止させるには

deploymentのマニフェストPreStopを入れる事で、PODの破棄命令〜SIGTERMまでの時間を稼ぐ事ができます。
下記は単純にsleepしているだけですが、何かしらの処理を入れる事も可能です。

deployment.yaml
lifecycle:
  preStop:
    exec:
      command: ["/bin/sh", "-c", "sleep 20"]

起動時にアプリケーションのスタンバイが間に合わない場合

アプリケーションがReady状態になったかを確認するエンドポイントを設定する事ができます。
こちらもマニフェストに入れる事が可能です。

deployment.yaml
readinessProbe:
  initialDelaySeconds: 10
  timeoutSeconds: 1
  httpGet:
    path: /status
  port: 443
  scheme: HTTPS

ローリングアップデート時のPOD数

PodDisruptionBudgetを作成する事により、クラスタ内に必ずPODが残るように退避しながらノードが入れ替わっていきます。
これにより、ローリングアップデート時にクラスタ内にPODが存在しないという状態を回避できます。

PDB設定例
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: <任意のPDB名>
spec:
  maxUnavaliable: 1
  selector:
    matchLabels:
      app: <PODのラベル名>

【curl】通信時間の計測

f:id:tm200:20210610165136p:plain

curlのオプションで通信時にかかる時間の計測ができます。  
どの時間帯が長引いているか等で、不具合の原因切り分けができるかもしれません。

manページ

curl.se  

コマンド

$ curl -w "\ntime_namelookup: %{time_namelookup}\ntime_connect: %{time_connect}\ntime_appconnect: %{time_appconnect}\ntime_pretransfer: %{time_pretransfer}\ntime_redirect: %{time_redirect}\ntime_starttransfer: %{time_starttransfer}\ntime_total: %{time_total}\n" https://curl.se/docs/manpage.html

<レスポンス内容...>

time_namelookup: 0.004318
time_connect: 0.061376
time_appconnect: 0.180256
time_pretransfer: 0.180681
time_redirect: 0.000000
time_starttransfer: 0.217821
time_total: 0.375091

参考

time_namelookupが大きい場合...名前解決に時間がかかっている
time_pretransfertime_starttransferの差が大きい場合...サーバー側の処理に時間がかかっている

time_namelookup: <開始から名前解決完了までにかかった時間>
time_connect: <開始からTCP接続完了までにかかった時間>
time_appconnect: <開始からSSLハンドシェイク成功までにかかった時間>
time_pretransfer: <開始からファイル転送開始までにかかった時間>
time_redirect: <リダイレクトにかかった時間>
time_starttransfer: <開始からレスポンスの最初の1byteが到達するまでにかかった時間>
time_total: <開始から終了まで>

【Python】Sanicでoracle接続

f:id:tm200:20210530152852p:plain
f:id:tm200:20210530153108j:plain

pythonoracle接続を実装したのでメモ。
非同期処理用のフレームワークsanicを使用しました。

ドキュメント

sanic

Python v3.7~が必要です。
v3.6は2021年末にサポートが終了するので、バージョンアップを検討した方が良いと思います。 sanic.readthedocs.io

cx_Oracle

oracle接続用のc++モジュールがありました。
python bindしてあるので、呼び出すだけで接続できます。

cx-oracle.readthedocs.io

cx_Oracle_async

非同期処理版はこちら。
oracle cloudで使用しているのですが環境上ではインストールできなかった為、git hubソースからビルドしました。 pypi.org

実装

app.py
# -*- coding: utf-8 -*-
import os

from src.api.base import app

cpu_count = os.cpu_count()
workers = cpu_count - 1 if cpu_count > 1 else 1

if __name__ == '__main__':
    # ssl = {'cert': "./ssl/server.crt", 'key': "./ssl/server.key"} ssl通信の場合
    app.run(host="0.0.0.0", port=8000, workers=workers, access_log=True, debug=True)
database.py

DB接続用モジュール。

# -*- coding: utf-8 -*-
import os
import time

import cx_Oracle
import cx_Oracle_async

from sanic.log import logger
from functools import wraps


class SanicOracle:
    def __init__(self, app, dsn, user, password):
        @app.listener('before_server_start')
        async def setup_oracle_session_listener(_app, loop):
            await self.start(_app, loop, dsn, user, password)

        @app.listener('after_server_stop')
        async def teardown_oracle_session_listener(_app, loop):
            logger.info('closing oracle connection for [pid:{}]'.format(os.getpid()))
            await _app.db.close(force=True)

    async def start(self, _app, loop, dsn, user, password):
        oracle_pool = await cx_Oracle_async.create_pool(
            dsn=dsn,
            user=user,
            password=password,
            min=2,
            max=100,
            loop=loop
        )

        async def _fetch_all(query: str, args: tuple = None):
            result = []
            async with oracle_pool.acquire() as connection:
                async with connection.cursor() as cursor:
                    if args:
                        await cursor.execute(query, args)
                    else:
                        await cursor.execute(query)
                    for r in await cursor.fetchall():
                        result.append(r)
            return result

        async def _execute_query(query: str, args: list = None):
            async with oracle_pool.acquire() as connection:
                async with connection.cursor() as cursor:
                    await cursor.execute(query, args)
                    await connection.commit()

        setattr(oracle_pool, 'fetch_all', _fetch_all)
        setattr(oracle_pool, 'execute_query', _execute_query)

        _app.db = oracle_pool


def retry(f):
    """リトライ用"""
    _attempts = 5
    _delay = 1

    @wraps
    def wrapper(*args, **kwargs):
        _retry = 0
        while True:
            try:
                _retry += 1
                result = f(*args, **kwargs)
                return result
            except cx_Oracle.OperationalError as ex:
                if _retry >= _attempts:
                    error, = ex.args
                    logger.error(error.message)
                    raise ex
                time.sleep(_delay)
    return wrapper
src/api/base.py

APIの基本設定モジュール。

# -*- coding: utf-8 -*-
import os

from sanic import Sanic
from sanic.exceptions import NotFound, InvalidUsage, MethodNotSupported,\
    ServerError, Unauthorized
from sanic.response import text, json
from sanic.log import logger

from database import SanicOracle
from src.api.test import test_bp

app_name = 'test-app'


def create_app():
    sanic_app = Sanic(app_name)
    sanic_app.blueprint(test_bp)
    os.environ['TNS_ADMIN'] = '<Oracle Net Services構成ファイルの配置先>'
    dsn = 'xxxxxxxxxx_high'
    user = os.getenv('USER')
    password = os.getenv('PASS')
    SanicOracle(sanic_app, dsn, user, password)
    logger.info('api initialized.')

    @sanic_app.route('/')
    async def index(request):
        return text('ok')

    @sanic_app.middleware('response')
    async def prevent_xss(request, response):
        response.headers['x-xss-protection'] = '1; mode=block'

    return sanic_app


app = create_app()


@app.exception(InvalidUsage)
async def bad_request(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=400)


@app.exception(Unauthorized)
async def unauthorized(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=401)


@app.exception(NotFound)
async def not_found(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=404)


@app.exception(MethodNotSupported)
async def method_not_allowed(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=405)


@app.exception(ServerError)
async def internal_server_error(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=500)
src/api/test.py

blueprintの割り振り先。
実際はフロントエンドにvue.jsを使用している為、jsonレスポンスの返却のみとなります。

# -*- coding: utf-8 -*-
import traceback

from sanic import Blueprint
from sanic.exceptions import abort
from sanic.log import logger
from sanic.response import json


test_bp = Blueprint(__name__)
msg_500 = 'Internal sever error'


@test_bp.route('/get_sample')
async def get_sample(request):
    try:
        query = 'SELECT id, name, modified FROM ( SELECT id, name, modified FROM user ORDER BY modified DESC ) WHERE ROWNUM <= 200'
        rows = await request.app.db.fetch_all(query)
        result = []
        for row in rows:
            result.append({
                'id': row[0],
                'name': row[1],
                'modified': row[2]
        })
        return json({'result': result})
    except:
        logger.error(traceback.format_exc())
        abort(500, msg_500)

実行

$ python3.8 app.py > /var/log/api/api.log &

【shell】便利コマンド

f:id:tm200:20210529165647j:plain

忘れそうなので、業務で実装したバッチ処理などで利用したshellコマンドのメモ。
随時、追加して行きます。  

現在日時からの加減算

# 1時間後
$ echo $(date "+%Y%m%d %H%M%S" -d '1 hour')
# 1時間前
$ echo $(date "+%Y%m%d %H%M%S" -d '1 hour ago')

ファイルの拡張子を取得

$ for file in $(find <パス> -type f); do ext=${file##*.}; echo ${ext}; done

最終更新日時 < 30分前のファイルを削除

$ find . -type f -mmin +30 -print0 | xargs -0 --no-run-if-empty rm -f

shellでsendmail

# 関数化
send_mail() {
    __mail_to=$1
    __mail_from=$2
    __mail_subject=$3
    __mail_message=$4

    <send_mail格納先のパス>/sendmail -f "$__mail_from" -t "$__mail_to" << _EOB_
Content-Type: text/plain; charset=euc-jp
Content-transfer-encoding: 7bit
To: $__mail_to
From: $__mail_from
Subject: $__mail_subject

$__mail_message

_EOB_

}
# 送信
$ send_mail ${TO} ${FROM} "<subject>" "<メッセージ>"
# trap
$ trap "send_mail ${TO} ${FROM} \"<subject>\" \"<メッセージ>\"" INT HUP TERM

拡張子を指定したfindの実行結果をtarにアーカイブ

$ find <検索対象パス> ( -name "*.png" -o -name "*.jpg" -o -name "*.jpeg" ) -print0 | tar -cvz -T - --null -f <アーカイブファイル名>

特定の拡張子を除外してアーカイブ

$ tar --exclude *.png --exclude *.jpg --exclude *.jpeg -cvf <アーカイブファイル名> <アーカイブ対象パス>

パイプラインで実行した際の終了ステータスを確認したい場合

$ find input/ -type f -print0 | xargs --no-run-if-empty -0 -I {} cp -p {} output/

$ for st in ${PIPESTATUS[@]}; do if [ $st -ne 0 ]; then echo "エラー発生"; fi