【Python】Sanicでoracle接続
pythonでoracle接続を実装したのでメモ。
非同期処理用のフレームワークsanic
を使用しました。
ドキュメント
sanic
Python v3.7~が必要です。
v3.6は2021年末にサポートが終了するので、バージョンアップを検討した方が良いと思います。
sanic.readthedocs.io
cx_Oracle
oracle接続用のc++モジュールがありました。
python bindしてあるので、呼び出すだけで接続できます。
cx_Oracle_async
非同期処理版はこちら。
oracle cloudで使用しているのですが環境上ではインストールできなかった為、git hubソースからビルドしました。
pypi.org
実装
app.py
# -*- coding: utf-8 -*- import os from src.api.base import app cpu_count = os.cpu_count() workers = cpu_count - 1 if cpu_count > 1 else 1 if __name__ == '__main__': # ssl = {'cert': "./ssl/server.crt", 'key': "./ssl/server.key"} ssl通信の場合 app.run(host="0.0.0.0", port=8000, workers=workers, access_log=True, debug=True)
database.py
DB接続用モジュール。
# -*- coding: utf-8 -*- import os import time import cx_Oracle import cx_Oracle_async from sanic.log import logger from functools import wraps class SanicOracle: def __init__(self, app, dsn, user, password): @app.listener('before_server_start') async def setup_oracle_session_listener(_app, loop): await self.start(_app, loop, dsn, user, password) @app.listener('after_server_stop') async def teardown_oracle_session_listener(_app, loop): logger.info('closing oracle connection for [pid:{}]'.format(os.getpid())) await _app.db.close(force=True) async def start(self, _app, loop, dsn, user, password): oracle_pool = await cx_Oracle_async.create_pool( dsn=dsn, user=user, password=password, min=2, max=100, loop=loop ) async def _fetch_all(query: str, args: tuple = None): result = [] async with oracle_pool.acquire() as connection: async with connection.cursor() as cursor: if args: await cursor.execute(query, args) else: await cursor.execute(query) for r in await cursor.fetchall(): result.append(r) return result async def _execute_query(query: str, args: list = None): async with oracle_pool.acquire() as connection: async with connection.cursor() as cursor: await cursor.execute(query, args) await connection.commit() setattr(oracle_pool, 'fetch_all', _fetch_all) setattr(oracle_pool, 'execute_query', _execute_query) _app.db = oracle_pool def retry(f): """リトライ用""" _attempts = 5 _delay = 1 @wraps def wrapper(*args, **kwargs): _retry = 0 while True: try: _retry += 1 result = f(*args, **kwargs) return result except cx_Oracle.OperationalError as ex: if _retry >= _attempts: error, = ex.args logger.error(error.message) raise ex time.sleep(_delay) return wrapper
src/api/base.py
APIの基本設定モジュール。
# -*- coding: utf-8 -*- import os from sanic import Sanic from sanic.exceptions import NotFound, InvalidUsage, MethodNotSupported,\ ServerError, Unauthorized from sanic.response import text, json from sanic.log import logger from database import SanicOracle from src.api.test import test_bp app_name = 'test-app' def create_app(): sanic_app = Sanic(app_name) sanic_app.blueprint(test_bp) os.environ['TNS_ADMIN'] = '<Oracle Net Services構成ファイルの配置先>' dsn = 'xxxxxxxxxx_high' user = os.getenv('USER') password = os.getenv('PASS') SanicOracle(sanic_app, dsn, user, password) logger.info('api initialized.') @sanic_app.route('/') async def index(request): return text('ok') @sanic_app.middleware('response') async def prevent_xss(request, response): response.headers['x-xss-protection'] = '1; mode=block' return sanic_app app = create_app() @app.exception(InvalidUsage) async def bad_request(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=400) @app.exception(Unauthorized) async def unauthorized(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=401) @app.exception(NotFound) async def not_found(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=404) @app.exception(MethodNotSupported) async def method_not_allowed(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=405) @app.exception(ServerError) async def internal_server_error(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=500)
src/api/test.py
blueprintの割り振り先。
実際はフロントエンドにvue.jsを使用している為、jsonレスポンスの返却のみとなります。
# -*- coding: utf-8 -*- import traceback from sanic import Blueprint from sanic.exceptions import abort from sanic.log import logger from sanic.response import json test_bp = Blueprint(__name__) msg_500 = 'Internal sever error' @test_bp.route('/get_sample') async def get_sample(request): try: query = 'SELECT id, name, modified FROM ( SELECT id, name, modified FROM user ORDER BY modified DESC ) WHERE ROWNUM <= 200' rows = await request.app.db.fetch_all(query) result = [] for row in rows: result.append({ 'id': row[0], 'name': row[1], 'modified': row[2] }) return json({'result': result}) except: logger.error(traceback.format_exc()) abort(500, msg_500)
実行
$ python3.8 app.py > /var/log/api/api.log &