【Python】Sanicでoracle接続

f:id:tm200:20210530152852p:plain
f:id:tm200:20210530153108j:plain

pythonoracle接続を実装したのでメモ。
非同期処理用のフレームワークsanicを使用しました。

ドキュメント

sanic

Python v3.7~が必要です。
v3.6は2021年末にサポートが終了するので、バージョンアップを検討した方が良いと思います。 sanic.readthedocs.io

cx_Oracle

oracle接続用のc++モジュールがありました。
python bindしてあるので、呼び出すだけで接続できます。

cx-oracle.readthedocs.io

cx_Oracle_async

非同期処理版はこちら。
oracle cloudで使用しているのですが環境上ではインストールできなかった為、git hubソースからビルドしました。 pypi.org

実装

app.py
# -*- coding: utf-8 -*-
import os

from src.api.base import app

cpu_count = os.cpu_count()
workers = cpu_count - 1 if cpu_count > 1 else 1

if __name__ == '__main__':
    # ssl = {'cert': "./ssl/server.crt", 'key': "./ssl/server.key"} ssl通信の場合
    app.run(host="0.0.0.0", port=8000, workers=workers, access_log=True, debug=True)
database.py

DB接続用モジュール。

# -*- coding: utf-8 -*-
import os
import time

import cx_Oracle
import cx_Oracle_async

from sanic.log import logger
from functools import wraps


class SanicOracle:
    def __init__(self, app, dsn, user, password):
        @app.listener('before_server_start')
        async def setup_oracle_session_listener(_app, loop):
            await self.start(_app, loop, dsn, user, password)

        @app.listener('after_server_stop')
        async def teardown_oracle_session_listener(_app, loop):
            logger.info('closing oracle connection for [pid:{}]'.format(os.getpid()))
            await _app.db.close(force=True)

    async def start(self, _app, loop, dsn, user, password):
        oracle_pool = await cx_Oracle_async.create_pool(
            dsn=dsn,
            user=user,
            password=password,
            min=2,
            max=100,
            loop=loop
        )

        async def _fetch_all(query: str, args: tuple = None):
            result = []
            async with oracle_pool.acquire() as connection:
                async with connection.cursor() as cursor:
                    if args:
                        await cursor.execute(query, args)
                    else:
                        await cursor.execute(query)
                    for r in await cursor.fetchall():
                        result.append(r)
            return result

        async def _execute_query(query: str, args: list = None):
            async with oracle_pool.acquire() as connection:
                async with connection.cursor() as cursor:
                    await cursor.execute(query, args)
                    await connection.commit()

        setattr(oracle_pool, 'fetch_all', _fetch_all)
        setattr(oracle_pool, 'execute_query', _execute_query)

        _app.db = oracle_pool


def retry(f):
    """リトライ用"""
    _attempts = 5
    _delay = 1

    @wraps
    def wrapper(*args, **kwargs):
        _retry = 0
        while True:
            try:
                _retry += 1
                result = f(*args, **kwargs)
                return result
            except cx_Oracle.OperationalError as ex:
                if _retry >= _attempts:
                    error, = ex.args
                    logger.error(error.message)
                    raise ex
                time.sleep(_delay)
    return wrapper
src/api/base.py

APIの基本設定モジュール。

# -*- coding: utf-8 -*-
import os

from sanic import Sanic
from sanic.exceptions import NotFound, InvalidUsage, MethodNotSupported,\
    ServerError, Unauthorized
from sanic.response import text, json
from sanic.log import logger

from database import SanicOracle
from src.api.test import test_bp

app_name = 'test-app'


def create_app():
    sanic_app = Sanic(app_name)
    sanic_app.blueprint(test_bp)
    os.environ['TNS_ADMIN'] = '<Oracle Net Services構成ファイルの配置先>'
    dsn = 'xxxxxxxxxx_high'
    user = os.getenv('USER')
    password = os.getenv('PASS')
    SanicOracle(sanic_app, dsn, user, password)
    logger.info('api initialized.')

    @sanic_app.route('/')
    async def index(request):
        return text('ok')

    @sanic_app.middleware('response')
    async def prevent_xss(request, response):
        response.headers['x-xss-protection'] = '1; mode=block'

    return sanic_app


app = create_app()


@app.exception(InvalidUsage)
async def bad_request(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=400)


@app.exception(Unauthorized)
async def unauthorized(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=401)


@app.exception(NotFound)
async def not_found(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=404)


@app.exception(MethodNotSupported)
async def method_not_allowed(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=405)


@app.exception(ServerError)
async def internal_server_error(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=500)
src/api/test.py

blueprintの割り振り先。
実際はフロントエンドにvue.jsを使用している為、jsonレスポンスの返却のみとなります。

# -*- coding: utf-8 -*-
import traceback

from sanic import Blueprint
from sanic.exceptions import abort
from sanic.log import logger
from sanic.response import json


test_bp = Blueprint(__name__)
msg_500 = 'Internal sever error'


@test_bp.route('/get_sample')
async def get_sample(request):
    try:
        query = 'SELECT id, name, modified FROM ( SELECT id, name, modified FROM user ORDER BY modified DESC ) WHERE ROWNUM <= 200'
        rows = await request.app.db.fetch_all(query)
        result = []
        for row in rows:
            result.append({
                'id': row[0],
                'name': row[1],
                'modified': row[2]
        })
        return json({'result': result})
    except:
        logger.error(traceback.format_exc())
        abort(500, msg_500)

実行

$ python3.8 app.py > /var/log/api/api.log &