【Python】Sanicでoracle接続
pythonでoracle接続を実装したのでメモ。
非同期処理用のフレームワークsanic
を使用しました。
ドキュメント
sanic
Python v3.7~が必要です。
v3.6は2021年末にサポートが終了するので、バージョンアップを検討した方が良いと思います。
sanic.readthedocs.io
cx_Oracle
oracle接続用のc++モジュールがありました。
python bindしてあるので、呼び出すだけで接続できます。
cx_Oracle_async
非同期処理版はこちら。
oracle cloudで使用しているのですが環境上ではインストールできなかった為、git hubソースからビルドしました。
pypi.org
実装
app.py
# -*- coding: utf-8 -*- import os from src.api.base import app cpu_count = os.cpu_count() workers = cpu_count - 1 if cpu_count > 1 else 1 if __name__ == '__main__': # ssl = {'cert': "./ssl/server.crt", 'key': "./ssl/server.key"} ssl通信の場合 app.run(host="0.0.0.0", port=8000, workers=workers, access_log=True, debug=True)
database.py
DB接続用モジュール。
# -*- coding: utf-8 -*- import os import time import cx_Oracle import cx_Oracle_async from sanic.log import logger from functools import wraps class SanicOracle: def __init__(self, app, dsn, user, password): @app.listener('before_server_start') async def setup_oracle_session_listener(_app, loop): await self.start(_app, loop, dsn, user, password) @app.listener('after_server_stop') async def teardown_oracle_session_listener(_app, loop): logger.info('closing oracle connection for [pid:{}]'.format(os.getpid())) await _app.db.close(force=True) async def start(self, _app, loop, dsn, user, password): oracle_pool = await cx_Oracle_async.create_pool( dsn=dsn, user=user, password=password, min=2, max=100, loop=loop ) async def _fetch_all(query: str, args: tuple = None): result = [] async with oracle_pool.acquire() as connection: async with connection.cursor() as cursor: if args: await cursor.execute(query, args) else: await cursor.execute(query) for r in await cursor.fetchall(): result.append(r) return result async def _execute_query(query: str, args: list = None): async with oracle_pool.acquire() as connection: async with connection.cursor() as cursor: await cursor.execute(query, args) await connection.commit() setattr(oracle_pool, 'fetch_all', _fetch_all) setattr(oracle_pool, 'execute_query', _execute_query) _app.db = oracle_pool def retry(f): """リトライ用""" _attempts = 5 _delay = 1 @wraps def wrapper(*args, **kwargs): _retry = 0 while True: try: _retry += 1 result = f(*args, **kwargs) return result except cx_Oracle.OperationalError as ex: if _retry >= _attempts: error, = ex.args logger.error(error.message) raise ex time.sleep(_delay) return wrapper
src/api/base.py
APIの基本設定モジュール。
# -*- coding: utf-8 -*- import os from sanic import Sanic from sanic.exceptions import NotFound, InvalidUsage, MethodNotSupported,\ ServerError, Unauthorized from sanic.response import text, json from sanic.log import logger from database import SanicOracle from src.api.test import test_bp app_name = 'test-app' def create_app(): sanic_app = Sanic(app_name) sanic_app.blueprint(test_bp) os.environ['TNS_ADMIN'] = '<Oracle Net Services構成ファイルの配置先>' dsn = 'xxxxxxxxxx_high' user = os.getenv('USER') password = os.getenv('PASS') SanicOracle(sanic_app, dsn, user, password) logger.info('api initialized.') @sanic_app.route('/') async def index(request): return text('ok') @sanic_app.middleware('response') async def prevent_xss(request, response): response.headers['x-xss-protection'] = '1; mode=block' return sanic_app app = create_app() @app.exception(InvalidUsage) async def bad_request(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=400) @app.exception(Unauthorized) async def unauthorized(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=401) @app.exception(NotFound) async def not_found(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=404) @app.exception(MethodNotSupported) async def method_not_allowed(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=405) @app.exception(ServerError) async def internal_server_error(request, exception): logger.error(exception) return json({'message': exception.args[0]}, status=500)
src/api/test.py
blueprintの割り振り先。
実際はフロントエンドにvue.jsを使用している為、jsonレスポンスの返却のみとなります。
# -*- coding: utf-8 -*- import traceback from sanic import Blueprint from sanic.exceptions import abort from sanic.log import logger from sanic.response import json test_bp = Blueprint(__name__) msg_500 = 'Internal sever error' @test_bp.route('/get_sample') async def get_sample(request): try: query = 'SELECT id, name, modified FROM ( SELECT id, name, modified FROM user ORDER BY modified DESC ) WHERE ROWNUM <= 200' rows = await request.app.db.fetch_all(query) result = [] for row in rows: result.append({ 'id': row[0], 'name': row[1], 'modified': row[2] }) return json({'result': result}) except: logger.error(traceback.format_exc()) abort(500, msg_500)
実行
$ python3.8 app.py > /var/log/api/api.log &
【shell】便利コマンド
忘れそうなので、業務で実装したバッチ処理などで利用したshellコマンドのメモ。
随時、追加して行きます。
現在日時からの加減算
# 1時間後 $ echo $(date "+%Y%m%d %H%M%S" -d '1 hour') # 1時間前 $ echo $(date "+%Y%m%d %H%M%S" -d '1 hour ago')
ファイルの拡張子を取得
$ for file in $(find <パス> -type f); do ext=${file##*.}; echo ${ext}; done
最終更新日時 < 30分前のファイルを削除
$ find . -type f -mmin +30 -print0 | xargs -0 --no-run-if-empty rm -f
shellでsendmail
# 関数化 send_mail() { __mail_to=$1 __mail_from=$2 __mail_subject=$3 __mail_message=$4 <send_mail格納先のパス>/sendmail -f "$__mail_from" -t "$__mail_to" << _EOB_ Content-Type: text/plain; charset=euc-jp Content-transfer-encoding: 7bit To: $__mail_to From: $__mail_from Subject: $__mail_subject $__mail_message _EOB_ }
# 送信 $ send_mail ${TO} ${FROM} "<subject>" "<メッセージ>"
# trap $ trap "send_mail ${TO} ${FROM} \"<subject>\" \"<メッセージ>\"" INT HUP TERM
拡張子を指定したfindの実行結果をtarにアーカイブ
$ find <検索対象パス> ( -name "*.png" -o -name "*.jpg" -o -name "*.jpeg" ) -print0 | tar -cvz -T - --null -f <アーカイブファイル名>
特定の拡張子を除外してアーカイブ
$ tar --exclude *.png --exclude *.jpg --exclude *.jpeg -cvf <アーカイブファイル名> <アーカイブ対象パス>
パイプラインで実行した際の終了ステータスを確認したい場合
$ find input/ -type f -print0 | xargs --no-run-if-empty -0 -I {} cp -p {} output/ $ for st in ${PIPESTATUS[@]}; do if [ $st -ne 0 ]; then echo "エラー発生"; fi
ファイルから指定した文字列を削除
$ sed -i -e 's/<削除したい文字列>//g' <対象のファイルパス>
apacheのログ出力をlog rotateに変更
$ sed -i \ -e "s/ErrorLog \"logs\/error_log\"/ErrorLog \"|\/usr\/sbin\/rotatelogs \/var\/log\/httpd\/error\.\%Y\%m\%d\.log 86400 540\"/g" -e "s/CustomLog \"logs\/access_log\" combined /CustomLog \"|\/usr\/sbin\/rotatelogs \/var\/log\/httpd\/access\.\%Y\%m\%d\.log 86400 540\" combined/g" \ /etc/httpd/conf/httpd.conf
ssl証明書内容をopensslで確認
# 証明書ファイルの内容を確認 $ openssl x509 -text -noout -in <サーバ証明書のパス> # 秘密鍵ファイルの内容を確認 $ openssl rsa -text -noout -in <秘密鍵のパス> # CSRファイルの内容を確認 $ openssl req -text -noout -in <CSRファイルのパス>
【Kubernetes】サイドカー方式でOAtuh2-proxyの認証処理を追加する
kubernetes上でのOAuth2 proxyコンテナの利用方法をメモ。
nginxの使い方について詳しく知りたい方は、この本がお勧めです。
手元にあれば、実装時にいちいち検索しなくて済むかもしれません。
OAuth2 proxy
ドキュメントはこちら。
構成
リバースプロキシにnginxを使用する方法です。
nginxのauth_requestでOAuth2 proxyに認証を依頼し、認証成功時にアプリケーションにプロキシされます。
認証方法はOIDC。
config
各configはkubernetesのconfigMapに登録します。
nginx.conf
user nginx; worker_processes 1; pid /var/run/nginx.pid daemon off; # dockerで起動する場合、フォアグラウンドで起動する必要がある events { worker_connections 2048; } http { include /etc/nginx/mime.types; default_type application/octet-stream; log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; access_log /dev/stdout main; error_log /dev/stderr error; sendfile on; # tcp_nopush on; # gzip on; keepalive_timeout 65; # oauth2 proxyの為に、HTTPヘッダー用のバッファ領域を増やす proxy_buffers 8 32k; proxy_buffer_size 32k; # "Request Header Or Cookie Too Large"の対応 large_client_header_buffers 8 32k; client_header_buffer_size 32k; server_token off; upstream backend { server 127.0.0.1:8080; # アプリケーションコンテナ } upstream oauth-proxy { server 127.0.0.1:4180; # OAuth2 proxyコンテナ } server { listen 80; return 301 https://$host$request_uri; } server { listen 403 ssl; ssl_certificate /etc/nginx/ssl/tls.crt; # 中間証明書がある場合は連結する ssl_certificate_key /etc/nginx/ssl/tls.key; ssl_protocols TLSv1.2; ssl_ciphers 'ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:EC DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256'; ssl_session_cache builtin:1000 shared:SSL:10m; client_max_body_size 32m; client_body_buffer_size 64k; location /oauth2/ { proxy_pass http://oauth-proxy; proxy_set_header Host $host; proxy_set_header X-Real_IP $remote_addr; proxy_set_header X-Sheme $sheme; proxy_set_header X-Auth-Request-Redirect $request_uri; # 他ドメインに転送する場合 # proxy_set_header X-Auth-Request-Redirect $sheme://$host$request_uri; } # 認証用エンドポイント、「=」なことに注意 location = /oauth2/auth { proxy_pass http://oauth-proxy; proxy_set_header Host $host; proxy_set_header X-Real_IP $remote_addr; proxy_set_header X-Sheme $sheme; # nginxのauth_requestにbodyは不要 proxy_set_header Content-Length ""; proxy_pass_request_body off; } location / { satisfy any; auth_request /oauth2/auth; error_page 401 /oauth2/sign_in; proxy_set_header Host $host; proxy_set_header X-Real_IP $remote_addr; proxy_set_header X-Forwarded-For $remote_addr; proxy_set_header X-Forwarded-Proto https; auth_request_set X-User $user; auth_request_set X-Email $email; auth_request_set X-ID $empid; auth_request_set X-Roles $roles; auth_request_set $token $upstream_http_x_auth_request_access_token; proxy_set_header X-Access-Token $token; auth_request_set $auth_cookie_name_upstream_1 $upstream_cookie_auth_cookie_name_1; proxy_pass http://backend/; } location /proxy/ { proxy_set_header Host $host; proxy_set_header X-Real_IP $remote_addr; proxy_set_header X-Forwarded-For $remote_addr; proxy_set_header X-Forwarded-Proto https; proxy_pass http://oauth-proxy; } } }
oauth2_proxy.cfg
http_address = "0.0.0.0:4180" upstream = ["http://127.0.0.1:8080/"] # メールアドレスのドメインを指定、アスタリスクで全て許容 email_domain = ["*"] # 認可するロール permission_policies = [] oidc_issuer_url = "https://xxx.xxx.xxx/dex" cookie_secure = true # 認証方法を指定 provider = "oidc" # X-Forwarded-Access-Token pass_access_token = true # Authorization Bearer header pass_authorization_header = true reverse_proxy = true set_xauthrequest = true set_authorization_header = true skip_provider_button = true # ヘルスチェック用エンドポイントもこちらに記載する skip_auth_regex = [ "^(/|/forbidden|/health|/favicon.ico)$", "^(/|/forbidden|/health|/favicon.ico)$", "^(js|image|css|stylesheets)/.*$", ".(png|css|ico|js)$" ]
configMapとSecret
上記configはconfigMapに、tlsの鍵データとoauth2proxy用各パラメータはSecretに登録します。
cookie secretをpythonで生成する際に、byte数が(16, 24, 32)のいずれかである必要があります。
$ NAMESPACE=xxxxx $ alias k=kubectl # tls作成 $ k -n=${NAMESPACE} create secret tls pki-tls --key=secret/server.key --cert=secret/server.crt secret/pki-tls created # OAuth2 proxy 各パラメータをbase64エンコードして、マニフェストに記載する $ echo -n "<client-id>" | base64 $ echo -n "<client-secret>" | base64 # cookie secretの生成、byte型になっていたり改行が含まれているとエラーとなるので注意 $ python -c 'import os,base64; print(base64.urlsafe_b64encode(os.random(16)).decode(), end="")' | base64 $ cat secret.yaml apiVersion: v1 kind: Secret metadata: name: oauth2proxy namespace: xxx-xxx-xxx type: Opaque data: client-id: <base64 client-id> client-secret: <base64 client-secret> cookie-secret: <base64 cookie-secret> $ k apply -f secret.yaml secret/oauth2proxy created # configMap $ k create cm nginx-config --from-file=config/nginx.conf $ k create cm proxy-config --from-file=config/oauth2_proxy.cfg # 確認コマンド $ k -n=${NAMESPACE} describe secret kpi-tls $ k -n=${NAMESPACE} describe secret oauth2proxy # 要jqコマンドのインストール $ k -n=${NAMESPACE} get secret kpi-tls -o json | jq -r '.data."tls.crt"' $ k -n=${NAMESPACE} get cm nginx-config -o json | jq -r '.data."nginx.conf"' $ k -n=${NAMESPACE} get cm proxy-config -o json | jq -r '.data."oauth2_proxy.cfg"' # 24byteであることの確認 $ k -n=${NAMESPACE} get secret oauth2proxy -o json | jq -r '.data."cookie-secret"' | base64 -d | wc -c 24
マニフェストファイル
特に変わった使い方はしていません。
deployment.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: test-app namespace: xxx-xxx-xxx spec: replicas: 2 strategy: type: RollingUpdate rollingUpdate: maxSurge: 1 maxUnavailable: 1 minReadySeconds: 10 template: spec: volumes: - name: secret-volume secret: secretName: pki-tls - name: nginx-volume configMap: name: nginx-config - name: oidc-volume configMap: name: proxy-config containers: - image: nginx name: nginx ports: - containerPort: 443 resources: limits: memory: 64Mi requests: cpu: 100m memory: 64Mi volumeMounts: - mountPath: /etc/nginx/ssl name: secret-volume readOnly: true - mountPath: /etc/nginx/nginx.conf subPath: nginx.conf name: nginx-volume readOnly: true - image: oauth2_proxy # PJで管理しているイメージです、公開されているものと同じはず args: ["-config=/etc/oauth2/oauth2_proxy.cfg"] ports: - containerPort: 4180 resources: limits: memory: 512Mi requests: cpu: 100m memory: 512Mi volumeMounts: - mountPath: /etc/oauth2/oauth2_proxy.cfg subPath: oauth2_proxy.cfg name: oidc-volume readOnly: true env: - name: OAUTH2_PROXY_CLIENT_ID valueFrom: secretKeyRef: name: oauth2proxy key: clinet-id - name: OAUTH2_PROXY_CLIENT_SECRET valueFrom: secretKeyRef: name: oauth2proxy key: cookie-secret - name: OAUTH2_PROXY_COOKIE_SECRET valueFrom: secretKeyRef: name: oauth2proxy key: clinet-id - name: OAUTH2_PROXY_HTTP_ADDRESS # ここで指定しないと、127.0.0.1で起動してしまう? value: "0.0.0.0:4180" - name: OAUTH2_PROXY_REDIRECT_URL value: <登録したリダイレクトURLを指定> - image: python-app # 詳細は割愛、実装したのはflaskアプリです name: python-app ports: - containerPort: 8080 resources: limits: cpu: 1000m memory: 2G lifecyle: preStop: exec: command: ["/bin/sh", "-c", "sleep 20"] readinessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 5 failureThreshold: 10 livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 60 periodSeconds: 10 timeoutSeconds: 10
apiVersion: networking.k8s.io/v1beta1 kind: Ingress metadata: name: test-ingress namespace: test-namespace annotations: ingress.zlab.co.jp/backend-config: '{"xxx-xxx-xxx": {"443": {"tls": true, "sni": "<ドメイン名を記載>"}}}' spec: tls: - secretName: pki-tls rules: - host: <ドメイン名を記載> http: paths: - backend: serviceName: testsvc servicePort: 443
service.yaml
apiVersion: v1 kind: Service metadata: name: testsvc labels: app: test-app namespace: xxx-xxx-xxx spec: selector: app: test-app ports: - protocol: TCP port: 443 targetPort: 443
【Kubernetes】kustomizeで環境毎のマニフェストを作成
kubernetesで環境毎の設定管理にkustomizeを使用したのでメモ。
ingressのannotationsにスラッシュが含まれていた為、patchesJson6902のプロパティ指定はどうやるのだろうと思ったのですが、エスケープに「~1」を用いる事で解決しました。
⬇️ これからkubernetesやDockerの経験を積みたい方にお勧めです!
Dockerfileや各種マニフェストの基本的な使い方が書かれています。
- 作者:山田 明憲
- 発売日: 2018/08/25
- メディア: 単行本(ソフトカバー)
kustomize
環境
macOS Catalina 10.15.7
# インストール $ brew install kustomize
ディレクトリはこんな感じ。
$ tree . ... ├── k8s │ ├── base | | ├── config │ | | └── nginx.conf # configMapに登録する設定ファイル │ | ├── deployment.yaml │ | ├── ingress.yaml │ | ├── kustomization.yaml │ | ├── pod_disruption_budget.yaml | | └── service.yaml │ └── overlays | ├── staging | | ├── kustomization.yaml | | ├── patch_deployment.yaml | | └── patch_ingress.yaml | └── prod | └── ... ...
base
ベースとなる各種yamlを配置。
kustomization.yaml
configMapGeneratorでconfigMapを生成する方法を記載。
json、yamlなどもこの方法で指定できる、、、はず。
apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization commonLabels: app: test-app resources: - deployment.yaml - ingress.yaml - service.yaml - pod_disruption_budget.yaml # configMapの設定、jsonなどの設定ファイルから取り込む場合 configMapGenerator: - name: nginx-config namespace: test-namespace # ネームスペース を個別に指定する場合 files: - config/nginx.conf # configMapの共通設定 generatorOptions: disableNameSuffixHash: true # 生成されるconfigMapのサフィックスからハッシュ値を取り除く場合 labels: app: test-app
deployment.yaml
普通の3層クライアントサーバーシステムっぽい設定。
apiVersion: apps/v1 kind: Deployment metadata: name: test-app spec: selector: matchLabels: app: test-app replicas: 2 strategy: type: RollingUpdate rollingUpdate: maxSurge: 1 maxUnavailable: 1 minReadySeconds: 10 template: metadata: labels: app: test-app spec: volumes: - name: nginx-volume configMap: name: nginx-config # volumeに割り当て containers: - name: nginx image: nginx:1.14.2 ports: - containerPort: 80 limits: memory: 64Mi requests: cpu: 100m memory: 64Mi volumeMounts: - mountPath: /etc/nginx/nginx.conf subPath: nginx.conf name: nginx-volume readOnly: true - name: hello-python # pythonコードは割愛 image: hello-python:latest imagePullPolicy: Never env: - name: TEST_ENV_VALUE value: 0 ports: - containerPort: 8080 limits: memory: 1G cpu: 1000m readinessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 5 failureThreshold: 10 livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 60 periodSeconds: 10 timeoutSeconds: 10 lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 20"]
pod_disruption_budget.yaml
停止状態として許容できる Pod 数を定義。
apiVersion: policy/v1beta1 kind: PodDisruptionBudget metadata: name: test-app spec: maxUnavailable: 1 selector: matchLabels: app: test-app
ingress.yaml
apiVersion: networking.k8s.io/v1beta1 kind: Ingress metadata: name: test-ingress namespace: test-namespace annotations: nginx.ingress.kubernetes.io/rewrite-target: / spec: rules: - host: http://test.example1.com http: paths: - backend: serviceName: testsvc servicePort: 80
service.yaml
apiVersion: v1 kind: Service metadata: name: testsvc labels: app: test-app namespace: test-namespace spec: selector: app: test-app ports: - protocol: TCP port: 80 targetPort: 80
overlays
環境差分を記載するyamlはこちらに配置。
基本はスラッシュ区切りでプロパティを指定する。
配列要素はインデックスを指定する事でアクセスできる。
kustomization.yaml
ベースとなるディレクトリとパッチするリソースをここで設定する。
ここではIngressとDeploymentを指定。
bases: - ../../base patchesJson6902: - target: group: networking.k8s.io version: v1beta1 kind: Ingress name: test-ingress patch: patch_ingress.yaml # 差分を記載したファイルを指定 - target: group: apps version: v1 kind: Deployment name: test-app patch: patch_deployment.yaml # こちらも同様
patch_ingress.yaml
ドメインとannotationsを変更する例。
annotationsのスラッシュは「~1」でエスケープ。
- op: replace path: /spec/rules/0/host value: http://test.example2.com - op: replace # スラッシュを「~1」に置き換える path: /metadata/annotations/nginx.ingress.kubernetes.io~1rewrite-target value: /another
patch_deployment.yaml
レプリカ数と環境変数を変更。
- op: replace path: /spec/replicas value: 3 - op: replace # 環境変数「TEST_ENV_VALUE」を変更 path: /spec/template/spec/containers/1/env/0/value value: 1
build
差分が反映されるか確認する。
$ cd k8s/base $ kustomize build ../overlays/staging apiVersion: v1 data: nginx.conf: | - .... kind: ConfigMap metadata: labels: app: test-app name: nginx-config namespace: test-namespace --- apiVersion: v1 ... # ビルド結果が出力されるので内容を確認する
【Node.js】Expressからcassandraにアクセスする
Node.jsとExpressでAPIを実装した際にcassandraに接続したので、やり方をメモ。
Markdown 記法に変えたので、若干書体が変わっています。
⬇️ 分かりやすい良書です😊
環境
Node.jsバージョンは14。
プロジェクトはexpress-generator で作成しました。
作成方法は割愛。
package.json
{ "name": "sample", "version": "1.0.0", "description": "sample project", "author": "sample", "private": true, "scripts": { "start": "node ./bin/www", "test": "./node_modules/.bin/jest", "lint": "./node_modules/.bin/eslint --ext .js ./", }, "dependencies": { "express": "^4.17.1", "cassandra-driver": "^4.2.0", "cookie-parser": "~1.4.4", "debug": "^4.3.1", "http-errors": "^1.8.0" }, "devDependencies": { ..... } }
cassandra-driver
実装
./bin/www
cassandraオブジェクトを毎回クローズするとたまにエラーが発生する為、グローバルに保持してアプリケーション終了時にクローズします。
やり方は色々あると思います。
他にはuidとgidの設定、グレースフルシャットダウンの処理があります。
#!/usr/bin/env node const app = require('../app'); const debug = require('debug')('sample:server'); const https = require('https'); const fs = require('fs'); const cassandra = require('../app-lib/cassandra/driver'); const user = 'app-user'; const group = 99999; const port = normalizePort(process.env.PORT || '403'); app.set('port', port); const options = { key: fs.readFileSync('/path/to/tls/private/xxxx.key'), cert: fs.readFileSync('/path/to/tls/certs/xxxx.crt'), ca: fs.readFileSync('/path/to/tls/certs/xxxx-ca.crt'), }; // ここでコネクションを取得 cassandra.connect() .then(() => { app.set('cassandra', cassandra); const server = https.createServer(options. app); function wireUpServer(server) { let connections = {}; server.on('connection', function(conn) { const key = conn.remoteAddress + ':' + conn.remotePort; connections[key] = conn; conn.on('close', function() { delete connections[key]; }); }); server.destroy = function(cb) { // 終了時にcassandraクローズ cassandra.close().catch(() => null); server.close(cb); for (const key in connections) { connections[key].destroy(); } }; } wireUpServer(server); const gracefulShutdown = function() { server.destroy(() => { process.exit(); }); }; process.on('SIGTERM', gracefulShutdown); process.on('SIGINT', gracefulShutdown); function onListening() { let addr = server.address(); let bind = typeof addr === 'string' ? 'pipe ' + addr : 'port ' + addr.port; debug('Listening on ' + bind); } server.listen(port, () => { try { process.setgid(group); process.setuid(user); } catch (err) { console.error(err); process.exit(1); } }); server.on('error', onError); server.on('listening', onListening); }) .catch(() => process.exit(1)); function normalizePort(val) { let port = parseInt(val, 10); if (isNaN(port)) { return val; } if (port >= 0) { return port; } return false; } function onError(error) { if (error.syscall !== 'listen') { throw error; } let bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port; switch (error.code) { case 'EACCES': console.error(bind + ' requires elevated privileges'); process.exit(1); break; case 'EADDRINUSE': console.error(bind + ' is already in use'); process.exit(1); break; default: throw error; } }
app.js
こちらは大して変更ありません。
'use strict'; const express = require('express'); const cookieParser = require('cookie-parser'); const indexRouter = require('./routes/index'); const app = express(); app.use(express.json()); app.use(express.urlencoded({ extended: false })); app.use(cookieParser()); app.use('/', indexRouter); /* catch 404 and forward to error handler */ app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars res.status(404).json({message: 'Request url not found.'}); }); /* error handler */ app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars res.locals.message = err.message; res.locals.error = req.app.get('env') === 'development' ? err : {}; res.status(err.status || 500).json({message: err.message}); }); module.exports = app;
./app-lib/cassandra/config.js
実際は外部から受け取れるようにするか、configを複数用意して環境毎に設定します。
'use strict'; module.exports = { config: { contactPoints: ['sample.cassandra1.co.jp', 'sample.cassandra2.co.jp', 'sample.cassandra3.co.jp'], keyspace: 'test_key_space', localDataCenter: 'dc1', protocolOptions: { port: 9042, }, socketOptions: { connectTimeout: 120000, }, }, };
./app-lib/cassandra/driver.js
cassandra-driverをラップ。
'use strict'; const cassandra = require('cassandra-driver'); const Config = require('./config'); // リトライ処理 function retryable(func, retryCount=5) { let promise = Promise.reject().catch(() => func()); for (let i = 0; i < retryCount; i++) { promise = promise.catch(() => func()); } return promise; } module.exports = { client: null; connect: () => { return new Promise((resolve, reject) => { try { // オブジェクトを上書きする場合はディープコピーが必要 const csConfig = JSON.parse(JSON.stringfy(Config.config)); // クレデンシャルの取得処理(今回は何もしません) csConfig.credentials.username = 'test-user'; csConfig.credentials.password = 'xxxxx'; this.clinet = new cassandra.Client(csConfig); resolve(this.clinet); } catch (e) { reject(e); } }); }, insert: (query, param=null, retryCount=5) { return new Promise((resolve, reject) => { retryable(() => this.client.execute(query, param, { prepare: true })), retryCount) .then(() => resolve) .catch(e => reject(e)); }); }, find: (query, param=null, retryCount=5) { return new Promise((resolve, reject) => { retryable(() => this.client.execute(query, param, { prepare: true })), retryCount) .then(result => resolve(result.rows)) .catch(e => reject(e)); }); }, close: () { return new Promise((resolve, reject) => { if (this.client) { this.client.shutdown() .then(()=> resolve) .catch(e => reject(e)); } else { resolve(); } }); }, };
./routes/index.js
エラーは補足してnextに渡す。
'use strict'; const express = require('express'); const router = express.Router(); const wrap = fn => (...args) => fn(...args).catch(args[2]); router.get('/', wrap( async(req, res, next) => { try { const pk = req.body.id; const cassandra = req.app.get('cassandra'); const result = await cassandra.find('SELECT * FROM test_table WHERE id = :id', {id: pk}); return res.status(200).json({result: result}); } catch (e) { next(e); } } )); module.exports = router;
【Python】DataStax Python Driverでcassandraにアクセスする
Pythonでcassandraの登録処理や取得処理を実装したのでメモ。
DataStaxのpython driverを使用しています。
テーブル設計なども行ったのですがパーテションキー=nodeという概念で、1pk=1nodeに多くても100レコード以内が望ましいそうです。
一意なID等なら問題ないのですが、それ以外の条件で取得する場合は設計に注意する必要があります。
環境
Pythonのバージョンは3.7。
cassandra-driverをインストールするにはCythonが必要です。
もしくは以下のオプションを指定。
cassandra-driver
この辺りはよく見るかも?
実装
要件で接続失敗時にリトライする必要があった為、そちらも記載。
逐次処理用のハンドラークラス。
ValueSequence
IN句を使用する場合、ValueSequenceを使用します。
Prepared Statement
何度も発行するクエリはPrepared Statementを利用すると通信効率で有利なようです。