【Python】Sanicでoracle接続

f:id:tm200:20210530152852p:plain
f:id:tm200:20210530153108j:plain

pythonoracle接続を実装したのでメモ。
非同期処理用のフレームワークsanicを使用しました。

ドキュメント

sanic

Python v3.7~が必要です。
v3.6は2021年末にサポートが終了するので、バージョンアップを検討した方が良いと思います。 sanic.readthedocs.io

cx_Oracle

oracle接続用のc++モジュールがありました。
python bindしてあるので、呼び出すだけで接続できます。

cx-oracle.readthedocs.io

cx_Oracle_async

非同期処理版はこちら。
oracle cloudで使用しているのですが環境上ではインストールできなかった為、git hubソースからビルドしました。 pypi.org

実装

app.py
# -*- coding: utf-8 -*-
import os

from src.api.base import app

cpu_count = os.cpu_count()
workers = cpu_count - 1 if cpu_count > 1 else 1

if __name__ == '__main__':
    # ssl = {'cert': "./ssl/server.crt", 'key': "./ssl/server.key"} ssl通信の場合
    app.run(host="0.0.0.0", port=8000, workers=workers, access_log=True, debug=True)
database.py

DB接続用モジュール。

# -*- coding: utf-8 -*-
import os
import time

import cx_Oracle
import cx_Oracle_async

from sanic.log import logger
from functools import wraps


class SanicOracle:
    def __init__(self, app, dsn, user, password):
        @app.listener('before_server_start')
        async def setup_oracle_session_listener(_app, loop):
            await self.start(_app, loop, dsn, user, password)

        @app.listener('after_server_stop')
        async def teardown_oracle_session_listener(_app, loop):
            logger.info('closing oracle connection for [pid:{}]'.format(os.getpid()))
            await _app.db.close(force=True)

    async def start(self, _app, loop, dsn, user, password):
        oracle_pool = await cx_Oracle_async.create_pool(
            dsn=dsn,
            user=user,
            password=password,
            min=2,
            max=100,
            loop=loop
        )

        async def _fetch_all(query: str, args: tuple = None):
            result = []
            async with oracle_pool.acquire() as connection:
                async with connection.cursor() as cursor:
                    if args:
                        await cursor.execute(query, args)
                    else:
                        await cursor.execute(query)
                    for r in await cursor.fetchall():
                        result.append(r)
            return result

        async def _execute_query(query: str, args: list = None):
            async with oracle_pool.acquire() as connection:
                async with connection.cursor() as cursor:
                    await cursor.execute(query, args)
                    await connection.commit()

        setattr(oracle_pool, 'fetch_all', _fetch_all)
        setattr(oracle_pool, 'execute_query', _execute_query)

        _app.db = oracle_pool


def retry(f):
    """リトライ用"""
    _attempts = 5
    _delay = 1

    @wraps
    def wrapper(*args, **kwargs):
        _retry = 0
        while True:
            try:
                _retry += 1
                result = f(*args, **kwargs)
                return result
            except cx_Oracle.OperationalError as ex:
                if _retry >= _attempts:
                    error, = ex.args
                    logger.error(error.message)
                    raise ex
                time.sleep(_delay)
    return wrapper
src/api/base.py

APIの基本設定モジュール。

# -*- coding: utf-8 -*-
import os

from sanic import Sanic
from sanic.exceptions import NotFound, InvalidUsage, MethodNotSupported,\
    ServerError, Unauthorized
from sanic.response import text, json
from sanic.log import logger

from database import SanicOracle
from src.api.test import test_bp

app_name = 'test-app'


def create_app():
    sanic_app = Sanic(app_name)
    sanic_app.blueprint(test_bp)
    os.environ['TNS_ADMIN'] = '<Oracle Net Services構成ファイルの配置先>'
    dsn = 'xxxxxxxxxx_high'
    user = os.getenv('USER')
    password = os.getenv('PASS')
    SanicOracle(sanic_app, dsn, user, password)
    logger.info('api initialized.')

    @sanic_app.route('/')
    async def index(request):
        return text('ok')

    @sanic_app.middleware('response')
    async def prevent_xss(request, response):
        response.headers['x-xss-protection'] = '1; mode=block'

    return sanic_app


app = create_app()


@app.exception(InvalidUsage)
async def bad_request(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=400)


@app.exception(Unauthorized)
async def unauthorized(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=401)


@app.exception(NotFound)
async def not_found(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=404)


@app.exception(MethodNotSupported)
async def method_not_allowed(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=405)


@app.exception(ServerError)
async def internal_server_error(request, exception):
    logger.error(exception)
    return json({'message': exception.args[0]}, status=500)
src/api/test.py

blueprintの割り振り先。
実際はフロントエンドにvue.jsを使用している為、jsonレスポンスの返却のみとなります。

# -*- coding: utf-8 -*-
import traceback

from sanic import Blueprint
from sanic.exceptions import abort
from sanic.log import logger
from sanic.response import json


test_bp = Blueprint(__name__)
msg_500 = 'Internal sever error'


@test_bp.route('/get_sample')
async def get_sample(request):
    try:
        query = 'SELECT id, name, modified FROM ( SELECT id, name, modified FROM user ORDER BY modified DESC ) WHERE ROWNUM <= 200'
        rows = await request.app.db.fetch_all(query)
        result = []
        for row in rows:
            result.append({
                'id': row[0],
                'name': row[1],
                'modified': row[2]
        })
        return json({'result': result})
    except:
        logger.error(traceback.format_exc())
        abort(500, msg_500)

実行

$ python3.8 app.py > /var/log/api/api.log &

【shell】便利コマンド

f:id:tm200:20210529165647j:plain

忘れそうなので、業務で実装したバッチ処理などで利用したshellコマンドのメモ。
随時、追加して行きます。  

現在日時からの加減算

# 1時間後
$ echo $(date "+%Y%m%d %H%M%S" -d '1 hour')
# 1時間前
$ echo $(date "+%Y%m%d %H%M%S" -d '1 hour ago')

ファイルの拡張子を取得

$ for file in $(find <パス> -type f); do ext=${file##*.}; echo ${ext}; done

最終更新日時 < 30分前のファイルを削除

$ find . -type f -mmin +30 -print0 | xargs -0 --no-run-if-empty rm -f

shellでsendmail

# 関数化
send_mail() {
    __mail_to=$1
    __mail_from=$2
    __mail_subject=$3
    __mail_message=$4

    <send_mail格納先のパス>/sendmail -f "$__mail_from" -t "$__mail_to" << _EOB_
Content-Type: text/plain; charset=euc-jp
Content-transfer-encoding: 7bit
To: $__mail_to
From: $__mail_from
Subject: $__mail_subject

$__mail_message

_EOB_

}
# 送信
$ send_mail ${TO} ${FROM} "<subject>" "<メッセージ>"
# trap
$ trap "send_mail ${TO} ${FROM} \"<subject>\" \"<メッセージ>\"" INT HUP TERM

拡張子を指定したfindの実行結果をtarにアーカイブ

$ find <検索対象パス> ( -name "*.png" -o -name "*.jpg" -o -name "*.jpeg" ) -print0 | tar -cvz -T - --null -f <アーカイブファイル名>

特定の拡張子を除外してアーカイブ

$ tar --exclude *.png --exclude *.jpg --exclude *.jpeg -cvf <アーカイブファイル名> <アーカイブ対象パス>

パイプラインで実行した際の終了ステータスを確認したい場合

$ find input/ -type f -print0 | xargs --no-run-if-empty -0 -I {} cp -p {} output/

$ for st in ${PIPESTATUS[@]}; do if [ $st -ne 0 ]; then echo "エラー発生"; fi

ファイルから指定した文字列を削除

$ sed -i -e 's/<削除したい文字列>//g' <対象のファイルパス>

apacheのログ出力をlog rotateに変更

$ sed -i  \
   -e "s/ErrorLog \"logs\/error_log\"/ErrorLog \"|\/usr\/sbin\/rotatelogs \/var\/log\/httpd\/error\.\%Y\%m\%d\.log 86400 540\"/g"
   -e "s/CustomLog \"logs\/access_log\" combined /CustomLog \"|\/usr\/sbin\/rotatelogs \/var\/log\/httpd\/access\.\%Y\%m\%d\.log 86400 540\" combined/g" \
   /etc/httpd/conf/httpd.conf

ssl証明書内容をopensslで確認

# 証明書ファイルの内容を確認
$ openssl x509 -text -noout -in <サーバ証明書のパス>

# 秘密鍵ファイルの内容を確認
$ openssl rsa -text -noout -in <秘密鍵のパス>

# CSRファイルの内容を確認
$ openssl req -text -noout -in <CSRファイルのパス>

【Kubernetes】サイドカー方式でOAtuh2-proxyの認証処理を追加する

f:id:tm200:20210523114141p:plain f:id:tm200:20210423222508p:plain  

kubernetes上でのOAuth2 proxyコンテナの利用方法をメモ。
nginxの使い方について詳しく知りたい方は、この本がお勧めです。
手元にあれば、実装時にいちいち検索しなくて済むかもしれません。

OAuth2 proxy

ドキュメントはこちら。

oauth2-proxy.github.io

構成

リバースプロキシにnginxを使用する方法です。
nginxのauth_requestでOAuth2 proxyに認証を依頼し、認証成功時にアプリケーションにプロキシされます。
認証方法はOIDC。

openid.net

config

各configはkubernetesのconfigMapに登録します。

nginx.conf
user nginx;
worker_processes 1;
pid /var/run/nginx.pid
daemon off; # dockerで起動する場合、フォアグラウンドで起動する必要がある

events {
    worker_connections 2048;
}

http {
    include               /etc/nginx/mime.types;
    default_type      application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                                  '$status $body_bytes_sent "$http_referer" '
                                  '"$http_user_agent" "$http_x_forwarded_for"';

    access_log   /dev/stdout main;
    error_log       /dev/stderr error;

    sendfile                      on;
    # tcp_nopush            on;
    # gzip                         on;
    keepalive_timeout    65;

    
    # oauth2 proxyの為に、HTTPヘッダー用のバッファ領域を増やす
    proxy_buffers           8 32k;
    proxy_buffer_size        32k;

    # "Request Header Or Cookie Too Large"の対応
    large_client_header_buffers    8 32k;
    client_header_buffer_size           32k;

    server_token off;

    upstream backend {
        server 127.0.0.1:8080;             # アプリケーションコンテナ
    }

    upstream oauth-proxy {
        server 127.0.0.1:4180;             # OAuth2 proxyコンテナ
    }

    server {
        listen 80;
        return 301 https://$host$request_uri;
    }

    server {
        listen 403 ssl;
        ssl_certificate                         /etc/nginx/ssl/tls.crt;                     # 中間証明書がある場合は連結する
        ssl_certificate_key                 /etc/nginx/ssl/tls.key;
        ssl_protocols TLSv1.2;
        ssl_ciphers 'ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:EC
DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256';
        ssl_session_cache builtin:1000 shared:SSL:10m;

        client_max_body_size       32m;
        client_body_buffer_size    64k;

        location /oauth2/ {
            proxy_pass                http://oauth-proxy;
            proxy_set_header     Host                                          $host;
            proxy_set_header     X-Real_IP                                  $remote_addr;
            proxy_set_header     X-Sheme                                  $sheme;
            proxy_set_header     X-Auth-Request-Redirect      $request_uri;
            # 他ドメインに転送する場合
            # proxy_set_header     X-Auth-Request-Redirect      $sheme://$host$request_uri;
        }

        # 認証用エンドポイント、「=」なことに注意
        location = /oauth2/auth {
            proxy_pass                http://oauth-proxy;
            proxy_set_header     Host                                          $host;
            proxy_set_header     X-Real_IP                                  $remote_addr;
            proxy_set_header     X-Sheme                                  $sheme;
            # nginxのauth_requestにbodyは不要
            proxy_set_header     Content-Length                       "";
            proxy_pass_request_body                                         off;
        }

        location / {
            satisfy     any;
            auth_request           /oauth2/auth;
            error_page 401       /oauth2/sign_in;

            proxy_set_header     Host                                          $host;
            proxy_set_header     X-Real_IP                                  $remote_addr;
            proxy_set_header     X-Forwarded-For                    $remote_addr;
            proxy_set_header     X-Forwarded-Proto                 https;

            auth_request_set     X-User      $user;
            auth_request_set     X-Email     $email;
            auth_request_set     X-ID          $empid;
            auth_request_set     X-Roles     $roles;

            auth_request_set     $token                      $upstream_http_x_auth_request_access_token;
            proxy_set_header     X-Access-Token     $token;

            auth_request_set     $auth_cookie_name_upstream_1 $upstream_cookie_auth_cookie_name_1;

            proxy_pass http://backend/;
        }

        location /proxy/ {
            proxy_set_header     Host                                          $host;
            proxy_set_header     X-Real_IP                                  $remote_addr;
            proxy_set_header     X-Forwarded-For                    $remote_addr;
            proxy_set_header     X-Forwarded-Proto                 https;
            proxy_pass                http://oauth-proxy;
        }
    }
}
oauth2_proxy.cfg
http_address = "0.0.0.0:4180"

upstream = ["http://127.0.0.1:8080/"]

# メールアドレスのドメインを指定、アスタリスクで全て許容
email_domain = ["*"]

# 認可するロール
permission_policies = []

oidc_issuer_url = "https://xxx.xxx.xxx/dex"

cookie_secure = true

# 認証方法を指定
provider = "oidc"

# X-Forwarded-Access-Token
pass_access_token = true

# Authorization Bearer header
pass_authorization_header = true

reverse_proxy = true
set_xauthrequest = true
set_authorization_header = true

skip_provider_button = true

# ヘルスチェック用エンドポイントもこちらに記載する
skip_auth_regex = [
    "^(/|/forbidden|/health|/favicon.ico)$",
    "^(/|/forbidden|/health|/favicon.ico)$",
    "^(js|image|css|stylesheets)/.*$",
    ".(png|css|ico|js)$"
]

configMapとSecret

上記configはconfigMapに、tlsの鍵データとoauth2proxy用各パラメータはSecretに登録します。
cookie secretをpythonで生成する際に、byte数が(16, 24, 32)のいずれかである必要があります。

$ NAMESPACE=xxxxx
$ alias k=kubectl

# tls作成
$ k -n=${NAMESPACE} create secret tls pki-tls --key=secret/server.key --cert=secret/server.crt
secret/pki-tls created

# OAuth2 proxy 各パラメータをbase64エンコードして、マニフェストに記載する
$ echo -n "<client-id>" | base64

$ echo -n "<client-secret>" | base64

# cookie secretの生成、byte型になっていたり改行が含まれているとエラーとなるので注意
$ python -c 'import os,base64; print(base64.urlsafe_b64encode(os.random(16)).decode(), end="")' | base64

$ cat secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: oauth2proxy
  namespace: xxx-xxx-xxx
type: Opaque
data:
  client-id: <base64 client-id>
  client-secret: <base64 client-secret>
  cookie-secret: <base64 cookie-secret>

$ k apply -f secret.yaml
secret/oauth2proxy created

# configMap
$ k create cm nginx-config --from-file=config/nginx.conf
$ k create cm proxy-config --from-file=config/oauth2_proxy.cfg

# 確認コマンド
$ k -n=${NAMESPACE} describe secret kpi-tls

$ k -n=${NAMESPACE} describe secret oauth2proxy

# 要jqコマンドのインストール
$ k -n=${NAMESPACE} get secret kpi-tls -o json | jq -r '.data."tls.crt"'

$ k -n=${NAMESPACE} get cm nginx-config -o json | jq -r '.data."nginx.conf"'

$ k -n=${NAMESPACE} get cm proxy-config -o json | jq -r '.data."oauth2_proxy.cfg"'

# 24byteであることの確認
$ k -n=${NAMESPACE} get secret oauth2proxy -o json | jq -r '.data."cookie-secret"' | base64 -d | wc -c
24

マニフェストファイル

特に変わった使い方はしていません。
deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: test-app
  namespace: xxx-xxx-xxx
spec:
  replicas: 2
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 1
  minReadySeconds: 10
  template:
    spec:
      volumes:
        - name: secret-volume
          secret:
            secretName: pki-tls
        - name: nginx-volume
          configMap:
            name: nginx-config
        - name: oidc-volume
          configMap:
            name: proxy-config
      containers:
      - image: nginx
        name: nginx
        ports:
          - containerPort: 443
        resources:
          limits:
            memory: 64Mi
          requests:
            cpu: 100m
            memory: 64Mi
        volumeMounts:
          - mountPath: /etc/nginx/ssl
             name: secret-volume
             readOnly: true
          - mountPath: /etc/nginx/nginx.conf
             subPath: nginx.conf
             name: nginx-volume
             readOnly: true
      - image: oauth2_proxy  # PJで管理しているイメージです、公開されているものと同じはず
         args: ["-config=/etc/oauth2/oauth2_proxy.cfg"]
         ports:
           - containerPort: 4180
        resources:
          limits:
            memory: 512Mi
          requests:
            cpu: 100m
            memory: 512Mi
        volumeMounts:
          - mountPath: /etc/oauth2/oauth2_proxy.cfg
             subPath: oauth2_proxy.cfg
             name: oidc-volume
             readOnly: true
        env:
          - name: OAUTH2_PROXY_CLIENT_ID
             valueFrom:
               secretKeyRef:
                 name: oauth2proxy
                 key: clinet-id
          - name: OAUTH2_PROXY_CLIENT_SECRET
             valueFrom:
               secretKeyRef:
                 name: oauth2proxy
                 key: cookie-secret
          - name: OAUTH2_PROXY_COOKIE_SECRET
             valueFrom:
               secretKeyRef:
                 name: oauth2proxy
                 key: clinet-id
          - name: OAUTH2_PROXY_HTTP_ADDRESS  # ここで指定しないと、127.0.0.1で起動してしまう?
             value: "0.0.0.0:4180"
          - name: OAUTH2_PROXY_REDIRECT_URL
             value: <登録したリダイレクトURLを指定>
      - image: python-app  # 詳細は割愛、実装したのはflaskアプリです
         name: python-app
         ports:
           - containerPort: 8080
         resources:
           limits:
             cpu: 1000m
             memory: 2G
         lifecyle:
           preStop:
             exec:
               command: ["/bin/sh", "-c", "sleep 20"]
         readinessProbe:
           httpGet:
             path: /health
             port: 8080
           initialDelaySeconds: 30
           periodSeconds: 5
           failureThreshold: 10

         livenessProbe:
           httpGet:
             path: /health
             port: 8080
           initialDelaySeconds: 60
           periodSeconds: 10
           timeoutSeconds: 10

ingress.yaml

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  name: test-ingress
  namespace: test-namespace
  annotations:
    ingress.zlab.co.jp/backend-config: '{"xxx-xxx-xxx": {"443": {"tls": true, "sni": "<ドメイン名を記載>"}}}'
spec:
  tls:
    - secretName: pki-tls
  rules:
  - host: <ドメイン名を記載>
     http:
       paths:
       - backend:
           serviceName: testsvc
           servicePort: 443

 service.yaml

apiVersion: v1
kind: Service
metadata:
  name: testsvc
  labels:
    app: test-app
  namespace: xxx-xxx-xxx
spec:
  selector:
    app: test-app
  ports:
  - protocol: TCP
     port: 443
     targetPort: 443

【Kubernetes】kustomizeで環境毎のマニフェストを作成

f:id:tm200:20210507170855p:plain

kubernetesで環境毎の設定管理にkustomizeを使用したのでメモ。
ingressのannotationsにスラッシュが含まれていた為、patchesJson6902のプロパティ指定はどうやるのだろうと思ったのですが、エスケープに「~1」を用いる事で解決しました。


⬇️ これからkubernetesやDockerの経験を積みたい方にお勧めです!
Dockerfileや各種マニフェストの基本的な使い方が書かれています。

Docker/Kubernetes 実践コンテナ開発入門

Docker/Kubernetes 実践コンテナ開発入門

  • 作者:山田 明憲
  • 発売日: 2018/08/25
  • メディア: 単行本(ソフトカバー)


kustomize

github.com

環境

macOS Catalina 10.15.7

# インストール
$ brew install kustomize

ディレクトリはこんな感じ。

$ tree
.
...
├── k8s
│    ├── base
|    |    ├── config
│    |    |    └── nginx.conf # configMapに登録する設定ファイル
│    |    ├── deployment.yaml
│    |    ├── ingress.yaml
│    |    ├── kustomization.yaml
│    |    ├── pod_disruption_budget.yaml
|    |    └── service.yaml
│    └── overlays
|        ├── staging
|        |   ├── kustomization.yaml
|        |   ├── patch_deployment.yaml
|        |   └── patch_ingress.yaml
|        └── prod
|             └── ...
...

base

ベースとなる各種yamlを配置。

kustomization.yaml

configMapGeneratorでconfigMapを生成する方法を記載。
jsonyamlなどもこの方法で指定できる、、、はず。

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

commonLabels:
  app: test-app

resources:
- deployment.yaml
- ingress.yaml
- service.yaml
- pod_disruption_budget.yaml

# configMapの設定、jsonなどの設定ファイルから取り込む場合
configMapGenerator:
- name: nginx-config
   namespace: test-namespace # ネームスペース を個別に指定する場合
   files:
     - config/nginx.conf

# configMapの共通設定
generatorOptions:
  disableNameSuffixHash: true # 生成されるconfigMapのサフィックスからハッシュ値を取り除く場合
  labels:
    app: test-app

deployment.yaml

普通の3層クライアントサーバーシステムっぽい設定。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: test-app
spec:
  selector:
    matchLabels:
      app: test-app
  replicas: 2
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 1
  minReadySeconds: 10
  template:
    metadata:
      labels:
        app: test-app
    spec:
      volumes:
      - name: nginx-volume
        configMap:
          name: nginx-config # volumeに割り当て

      containers:
      - name: nginx
        image: nginx:1.14.2
        ports:
        - containerPort: 80
        limits:
          memory: 64Mi
        requests:
          cpu: 100m
          memory: 64Mi
        volumeMounts:
          - mountPath: /etc/nginx/nginx.conf
             subPath: nginx.conf
             name: nginx-volume
             readOnly: true
      - name: hello-python # pythonコードは割愛
        image: hello-python:latest
        imagePullPolicy: Never
        env:
          - name: TEST_ENV_VALUE
             value: 0
        ports:
        - containerPort: 8080
        limits:
          memory: 1G
          cpu: 1000m

        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 5
          failureThreshold: 10

        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 60
          periodSeconds: 10
          timeoutSeconds: 10

        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 20"]

pod_disruption_budget.yaml

停止状態として許容できる Pod 数を定義。

apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: test-app
spec:
  maxUnavailable: 1
  selector:
    matchLabels:
      app: test-app

ingress.yaml

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  name: test-ingress
  namespace: test-namespace
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  rules:
  - host: http://test.example1.com
     http:
       paths:
       - backend:
           serviceName: testsvc
           servicePort: 80

service.yaml

apiVersion: v1
kind: Service
metadata:
  name: testsvc
  labels:
    app: test-app
  namespace: test-namespace
spec:
  selector:
    app: test-app
  ports:
  - protocol: TCP
     port: 80
     targetPort: 80

overlays

環境差分を記載するyamlはこちらに配置。
基本はスラッシュ区切りでプロパティを指定する。
配列要素はインデックスを指定する事でアクセスできる。

kustomization.yaml

ベースとなるディレクトリとパッチするリソースをここで設定する。
ここではIngressとDeploymentを指定。

bases:
  - ../../base

patchesJson6902:
- target:
    group: networking.k8s.io
    version: v1beta1
    kind: Ingress
    name: test-ingress
  patch: patch_ingress.yaml # 差分を記載したファイルを指定
- target:
    group: apps
    version: v1
    kind: Deployment
    name: test-app
  patch: patch_deployment.yaml # こちらも同様

patch_ingress.yaml

ドメインとannotationsを変更する例。
annotationsのスラッシュは「~1」でエスケープ。

- op: replace
   path: /spec/rules/0/host
   value: http://test.example2.com
- op: replace
   # スラッシュを「~1」に置き換える
   path: /metadata/annotations/nginx.ingress.kubernetes.io~1rewrite-target
   value: /another

patch_deployment.yaml

レプリカ数と環境変数を変更。

- op: replace
   path: /spec/replicas
   value: 3
- op: replace
    # 環境変数「TEST_ENV_VALUE」を変更
   path: /spec/template/spec/containers/1/env/0/value
   value: 1

build

差分が反映されるか確認する。

$ cd k8s/base

$ kustomize build ../overlays/staging
apiVersion: v1
data:
  nginx.conf: | -
    ....
kind: ConfigMap
metadata:
  labels:
    app: test-app
  name: nginx-config
  namespace: test-namespace
---
apiVersion: v1
... 
# ビルド結果が出力されるので内容を確認する

【Node.js】Expressからcassandraにアクセスする

f:id:tm200:20210426210331p:plain

Node.jsとExpressでAPIを実装した際にcassandraに接続したので、やり方をメモ。
Markdown 記法に変えたので、若干書体が変わっています。

⬇️ 分かりやすい良書です😊

環境

Node.jsバージョンは14。
プロジェクトはexpress-generator で作成しました。
作成方法は割愛。

package.json
{
  "name": "sample",
  "version": "1.0.0",
  "description": "sample project",
  "author": "sample",
  "private": true,
  "scripts": {
    "start": "node ./bin/www",
    "test": "./node_modules/.bin/jest",
    "lint": "./node_modules/.bin/eslint --ext .js ./",
  },
  "dependencies": {
    "express": "^4.17.1",
    "cassandra-driver": "^4.2.0",
    "cookie-parser": "~1.4.4",
    "debug": "^4.3.1",
    "http-errors": "^1.8.0"
  },
  "devDependencies": {
    .....
  }
}

cassandra-driver

docs.datastax.com

実装

./bin/www

cassandraオブジェクトを毎回クローズするとたまにエラーが発生する為、グローバルに保持してアプリケーション終了時にクローズします。
やり方は色々あると思います。
他にはuidとgidの設定、グレースフルシャットダウンの処理があります。

#!/usr/bin/env node
const app = require('../app');
const debug = require('debug')('sample:server');
const https = require('https');
const fs = require('fs');
const cassandra = require('../app-lib/cassandra/driver');

const user = 'app-user';
const group = 99999;
const port = normalizePort(process.env.PORT || '403');
app.set('port', port);

const options = {
  key: fs.readFileSync('/path/to/tls/private/xxxx.key'),
  cert: fs.readFileSync('/path/to/tls/certs/xxxx.crt'),
  ca: fs.readFileSync('/path/to/tls/certs/xxxx-ca.crt'),
};

// ここでコネクションを取得
cassandra.connect()
  .then(() => {
    app.set('cassandra', cassandra);
    const server = https.createServer(options. app);

    function wireUpServer(server) {
      let connections = {};
      server.on('connection', function(conn) {
        const key = conn.remoteAddress + ':' + conn.remotePort;
        connections[key] = conn;
        conn.on('close', function() {
          delete connections[key];
        });
      });

      server.destroy = function(cb) {
        // 終了時にcassandraクローズ
        cassandra.close().catch(() => null);
        server.close(cb);
        for (const key in connections) {
          connections[key].destroy();
        }
      };
    }

    wireUpServer(server);

    const gracefulShutdown = function() {
      server.destroy(() => {
        process.exit();
      });
    };
    process.on('SIGTERM', gracefulShutdown);
    process.on('SIGINT', gracefulShutdown);

    function onListening() {
      let addr = server.address();
      let bind = typeof addr === 'string'
        ? 'pipe ' + addr
        : 'port ' + addr.port;
      debug('Listening on ' + bind);
    }
    
    server.listen(port, () => {
      try {
        process.setgid(group);
        process.setuid(user);
      } catch (err) {
        console.error(err);
        process.exit(1);
      }
    });

    server.on('error', onError);
    server.on('listening', onListening);
  })
  .catch(() => process.exit(1));

function normalizePort(val) {
  let port = parseInt(val, 10);

  if (isNaN(port)) {
    return val;
  }

  if (port >= 0) {
    return port;
  }

  return false;
}

function onError(error) {
  if (error.syscall !== 'listen') {
    throw error;
  }

  let bind = typeof port === 'string'
    ? 'Pipe ' + port
    : 'Port ' + port;

  switch (error.code) {
    case 'EACCES':
      console.error(bind + ' requires elevated privileges');
      process.exit(1);
      break;
    case 'EADDRINUSE':
      console.error(bind + ' is already in use');
      process.exit(1);
      break;
    default:
      throw error;
  }
}
app.js

こちらは大して変更ありません。

'use strict';
const express = require('express');
const cookieParser = require('cookie-parser');

const indexRouter = require('./routes/index');

const app = express();

app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());

app.use('/', indexRouter);

/* catch 404 and forward to error handler */
app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars
  res.status(404).json({message: 'Request url not found.'});
});

/* error handler */
app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars
  res.locals.message = err.message;
  res.locals.error = req.app.get('env') === 'development' ? err : {};

  res.status(err.status || 500).json({message: err.message});
});

module.exports = app;
./app-lib/cassandra/config.js

実際は外部から受け取れるようにするか、configを複数用意して環境毎に設定します。

'use strict';

module.exports = {
  config: {
    contactPoints: ['sample.cassandra1.co.jp', 'sample.cassandra2.co.jp', 'sample.cassandra3.co.jp'],
    keyspace: 'test_key_space',
    localDataCenter: 'dc1',
    protocolOptions: {
      port: 9042,
    },
    socketOptions: {
      connectTimeout: 120000,
    },
  },
};
./app-lib/cassandra/driver.js

cassandra-driverをラップ。

'use strict';
const cassandra = require('cassandra-driver');
const Config = require('./config');

// リトライ処理
function retryable(func, retryCount=5) {
  let promise = Promise.reject().catch(() => func());
  for (let i = 0; i < retryCount; i++) {
    promise = promise.catch(() => func());
  }
  return promise;
}

module.exports = {
  client: null;
  connect: () => {
    return new Promise((resolve, reject) => {
      try {
        // オブジェクトを上書きする場合はディープコピーが必要
        const csConfig = JSON.parse(JSON.stringfy(Config.config));
        // クレデンシャルの取得処理(今回は何もしません)
        csConfig.credentials.username = 'test-user';
        csConfig.credentials.password = 'xxxxx';
        this.clinet = new cassandra.Client(csConfig);
        resolve(this.clinet);
      } catch (e) {
        reject(e);
      }
    });
  },

  insert: (query, param=null, retryCount=5) {
    return new Promise((resolve, reject) => {
      retryable(() => this.client.execute(query, param, { prepare: true })), retryCount)
        .then(() => resolve)
        .catch(e => reject(e));
    });
  },

  find: (query, param=null, retryCount=5) {
    return new Promise((resolve, reject) => {
      retryable(() => this.client.execute(query, param, { prepare: true })), retryCount)
        .then(result => resolve(result.rows))
        .catch(e => reject(e));
    });
  },

  close: () {
    return new Promise((resolve, reject) => {
      if (this.client) {
        this.client.shutdown()
          .then(()=> resolve)
          .catch(e => reject(e));
      } else {
        resolve();
      }
    });
  },
};
./routes/index.js

エラーは補足してnextに渡す。

'use strict';
const express = require('express');
const router = express.Router();

const wrap = fn => (...args) => fn(...args).catch(args[2]);

router.get('/', wrap( async(req, res, next) => {
    try {
      const pk = req.body.id;
      const cassandra = req.app.get('cassandra');
      const result = await cassandra.find('SELECT * FROM test_table WHERE id = :id', {id: pk});
      return res.status(200).json({result: result});
    } catch (e) {
      next(e);
    }
  }
));

module.exports = router;

【Python】DataStax Python Driverでcassandraにアクセスする

 

                                           f:id:tm200:20210424173901p:plain

 

Pythonでcassandraの登録処理や取得処理を実装したのでメモ。

DataStaxのpython driverを使用しています。

テーブル設計なども行ったのですがパーテションキー=nodeという概念で、1pk=1nodeに多くても100レコード以内が望ましいそうです。

一意なID等なら問題ないのですが、それ以外の条件で取得する場合は設計に注意する必要があります。

 

環境

Pythonのバージョンは3.7。

cassandra-driverをインストールするにはCythonが必要です。

$ pip3 install Cython
// 先にCythonのインストールが必要
$ pip3 install cassandra-driver

もしくは以下のオプションを指定。

$ pip3 install cassandra-driver --install-option="--no-cython"

cassandra-driver

 この辺りはよく見るかも?

docs.datastax.com

 

実装

要件で接続失敗時にリトライする必要があった為、そちらも記載。

# -*- coding: utf-8 -*-
import os
import time
from functools import wrap
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.query import dict_factory, SimpleStatement


def exec_cql(func):
"""リトライ処理"""
// configに定義するなど、お好きに
delay = int(os.getenv('DELAY', 10))
attempts = int(os.getenv('RETRY', 5))

@wrap(func)
def wrapper(*args, **kwargs):
retry = 0
while retry <= attempts:
try:
retry += 1
result = func(*args, **kwargs)
return result
except Exception as e:
if retry > attempts:
raise e
time.sleep(delay)
return wrapper


class CassandraClient:

def __init__(self, hosts, key_space, user, password,
port=9042, timeout=120, protocol_version=3, local_dc=None):
self._hosts = hosts
self._port = port
self._user = user
self._pass = password
self._key_space = key_space
self._timeout = timeout
self._protocol_version = protocol_version
self._local_dc = local_dc

self.cluster = None
self.session = None
self._connect()

def _connect():
auth_provider = PlainTextAuthProvider(username=self._user, password=self._pass)

if self._local_dc:
policy = DCAwareRoundRobinPolicy(local_dc=self._local_dc)
else:
policy = DCAwareRoundRobinPolicy()

profile = ExecutionProfile(
load_balancing_policy=policy
raw_factory=dict_factory // 戻り値が辞書になる
)
// contact_pointsはリスト型
self.cluster = Cluster(
contact_points=self._hosts, port=self._port, auth_provider=auth_provider,
execution_profiles={ EXEC_PROFILE_DEFAULT: profile}
)
self.session = self.cluster.connect(self._key_space)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

@exec_cql
def execute_query(self, cql, param=None):
"""クエリ発行"""
return self.session.execute(cql, parameters=param, timeout=self._timeout)

@exec_cql
def paging(self, cql, param=None, fetch_size=100):
"""逐次処理がしたい場合"""
stmt = SimpleStatement(cql, fetch_size=fetch_size)
return self.session.execute_async(cql, parameters=param, timeout=self._timeout)

def close(self):
self.session.shutdown()
self.cluster.shutdown()

def make_insert_query(self, table: str, param: dict):
"""辞書型のパラメータからINSERTクエリを組み立てる"""
cql = 'INSERT INTO ' + table
cql += ' ('
cql += ', '.join(param)
cql += ') VALUES ('
cql += ', '.join(map(self._dict_value_pad, param))
cql += ')'
return cql


def make_insert_query(self, table: str, param: dict, keys: list):
"""辞書型のパラメータからUPDATEクエリを組み立てる"""
i = 0
j = 0
cql = f'UPDAT {table} SET '
for k in param:
if k not in param:
if i > 0:
cql += ', '
cql += f'{k} = %({k})s'
i += 1
cql += ' WHERE '
for pk in keys:
if j > 0:
cql += ' AND '
cql += f'{pk} = %({pk})s'
j += 1
return cql

@staticmethod
def dict_value_pad(key: str):
return f'%({key})s'


if __name__ == '__main__':
hosts = ['xxxx.xxxx1.jp','xxxx.xxxx2.jp','xxxx.xxxx3.jp']
key_space = 'test_space'
user = 'test-useer'
password = 'xxxxx'
with CassandraClient(hosts, key_space, user, password) as client:
result = client.execute_query('SELECT * FROM sample WHERE id=%s', ('1',))
print(result)

 

逐次処理用のハンドラークラス。

from threading import Event

class
PagedResultHandlar(object):
def __init__(self, future, item):
self.future = future
self.item = item // 保持したい値があれば
self.error = None
self.finished_event = Event()
self.future.add.callbacks(callback=self.handle_page, errorback=self.handle_error)

def handle_page(self, rows):
for row in rows:
// 取得した値にゴニョゴニョする
print(row)
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()

def
handle_error(self, exec):
self.error = exec
self.finished_event.set()


// 使用例

with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
future = client.paging('SELECT * FROM sample WHERE date=%s', param=('20210101',))
handlar = PagedResultHandlar(future, '渡したいパラメータ')
handlar.finished_event.wait()
if handlar.error:
raise handlar.error

ValueSequence

IN句を使用する場合、ValueSequenceを使用します。

with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
        query = 'SELECT * FROM sample WHERE id in %s'
result = client.execute_query(query, param=(ValueSequence['id1', 'id2', 'id3'],))
print(result.current_rows)

Prepared Statement

何度も発行するクエリはPrepared Statementを利用すると通信効率で有利なようです。

with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
        // バインド部が`?`となる
        query = client.session.prepare('SELECT * FROM sample WHERE id in ?')
result = client.execute_query(query, param=(ValueSequence['id1', 'id2', 'id3'],))
print(result.current_rows)

【Python】PyWebHdfsでHDFSを操作する

f:id:tm200:20210423221904j:plain

PythonHDFSにファイルアップロードする処理を実装したのでメモ。

単純な処理ならcurlで十分なのですが、色々やりたい時は便利そうです。

事前にKerberos認証済みの想定です。

 

 ⬇️ まだ読めてないです😎

環境

やはりPythonのバージョンは3.7。

$ pip3 install pywebhdfs==0.4.1 requests-kerberos==0.12.0

最初にローカルでテストコードを実行しようとして、インストール出来なかったんですよね。

Mac osのバージョンによってはARCHFLAGSの宣言とか必要かも?

$ export ARCHFLAGS="-arch x86_64"
// 先にインストールが必要かも?
$ pip3 install pbr==5.5.1

PyWebHdfs

pythonhosted.org

実装

PyWebHdfsClientモジュールを継承して使いやすくしています。

# -*- coding: utf-8 -*-
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
from pywebhdfs.webhdfs import PyWebHdfsClient


class HttpFsClient(PyWebHdfsClient):

def __init__(self, base_uri_pattern, dir_permision='750', permision='640', overwrite=False,
reprication=3, block_size=134217728, stream=False, debug=False, verify=False, auth=None):
self.dir_permision = dir_permision
self.permision = permision
self.overwrite = overwrite
self.replication = replication
self.block_size = block_size
self.debug = debug
if auth is None:
auth = HTTPKerberosAuth(mutal_authentication=OPTIONAL)

self.webhdfs_kwargs = {
'base_uri_pattern': base_uri_pattern,
'timeout': 120,
'
requests_extra_opts': {
'
verify': verify,
'
auth': auth,
'
stream': stream
}
}
PyWebHdfsClient.__init__(
self, **self.webhdfs_kwargs)

def put_file(self, hdfs_path, src_file):
with open(src_file, 'rb') as file_data:
kwargs = {
'overwrite': self.overwrite,
'permision': self.permision,
'blocksize': self.block_size,
'reprication': self.replication
}
return PyWebHdfsClient.create_file(self, hdfs_path, file_data=file_data, **kwargs)

def create_dir(self, hdfs_path):
if not self.exists_file_dir(hdfs_path):
self.make_dir(hdfs_path)


if __name__ == '__main__':
client = HttpFsClient('https://xxxx.xxx:4443/webhdfs/v1/')
client.create_dir('projects/sample/test')
client.put_file('projects/sample/test/xxx.tsv')
// 継承しているので、元のモジュールと同じように使える
print(client.list_dir('projects/sample/test'))