【Kubernetes】サイドカー方式でOAtuh2-proxyの認証処理を追加する

f:id:tm200:20210523114141p:plain f:id:tm200:20210423222508p:plain  

kubernetes上でのOAuth2 proxyコンテナの利用方法をメモ。
nginxの使い方について詳しく知りたい方は、この本がお勧めです。
手元にあれば、実装時にいちいち検索しなくて済むかもしれません。

OAuth2 proxy

ドキュメントはこちら。

oauth2-proxy.github.io

構成

リバースプロキシにnginxを使用する方法です。
nginxのauth_requestでOAuth2 proxyに認証を依頼し、認証成功時にアプリケーションにプロキシされます。
認証方法はOIDC。

openid.net

config

各configはkubernetesのconfigMapに登録します。

nginx.conf
user nginx;
worker_processes 1;
pid /var/run/nginx.pid
daemon off; # dockerで起動する場合、フォアグラウンドで起動する必要がある

events {
    worker_connections 2048;
}

http {
    include               /etc/nginx/mime.types;
    default_type      application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                                  '$status $body_bytes_sent "$http_referer" '
                                  '"$http_user_agent" "$http_x_forwarded_for"';

    access_log   /dev/stdout main;
    error_log       /dev/stderr error;

    sendfile                      on;
    # tcp_nopush            on;
    # gzip                         on;
    keepalive_timeout    65;

    
    # oauth2 proxyの為に、HTTPヘッダー用のバッファ領域を増やす
    proxy_buffers           8 32k;
    proxy_buffer_size        32k;

    # "Request Header Or Cookie Too Large"の対応
    large_client_header_buffers    8 32k;
    client_header_buffer_size           32k;

    server_token off;

    upstream backend {
        server 127.0.0.1:8080;             # アプリケーションコンテナ
    }

    upstream oauth-proxy {
        server 127.0.0.1:4180;             # OAuth2 proxyコンテナ
    }

    server {
        listen 80;
        return 301 https://$host$request_uri;
    }

    server {
        listen 403 ssl;
        ssl_certificate                         /etc/nginx/ssl/tls.crt;                     # 中間証明書がある場合は連結する
        ssl_certificate_key                 /etc/nginx/ssl/tls.key;
        ssl_protocols TLSv1.2;
        ssl_ciphers 'ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:EC
DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256';
        ssl_session_cache builtin:1000 shared:SSL:10m;

        client_max_body_size       32m;
        client_body_buffer_size    64k;

        location /oauth2/ {
            proxy_pass                http://oauth-proxy;
            proxy_set_header     Host                                          $host;
            proxy_set_header     X-Real_IP                                  $remote_addr;
            proxy_set_header     X-Sheme                                  $sheme;
            proxy_set_header     X-Auth-Request-Redirect      $request_uri;
            # 他ドメインに転送する場合
            # proxy_set_header     X-Auth-Request-Redirect      $sheme://$host$request_uri;
        }

        # 認証用エンドポイント、「=」なことに注意
        location = /oauth2/auth {
            proxy_pass                http://oauth-proxy;
            proxy_set_header     Host                                          $host;
            proxy_set_header     X-Real_IP                                  $remote_addr;
            proxy_set_header     X-Sheme                                  $sheme;
            # nginxのauth_requestにbodyは不要
            proxy_set_header     Content-Length                       "";
            proxy_pass_request_body                                         off;
        }

        location / {
            satisfy     any;
            auth_request           /oauth2/auth;
            error_page 401       /oauth2/sign_in;

            proxy_set_header     Host                                          $host;
            proxy_set_header     X-Real_IP                                  $remote_addr;
            proxy_set_header     X-Forwarded-For                    $remote_addr;
            proxy_set_header     X-Forwarded-Proto                 https;

            auth_request_set     X-User      $user;
            auth_request_set     X-Email     $email;
            auth_request_set     X-ID          $empid;
            auth_request_set     X-Roles     $roles;

            auth_request_set     $token                      $upstream_http_x_auth_request_access_token;
            proxy_set_header     X-Access-Token     $token;

            auth_request_set     $auth_cookie_name_upstream_1 $upstream_cookie_auth_cookie_name_1;

            proxy_pass http://backend/;
        }

        location /proxy/ {
            proxy_set_header     Host                                          $host;
            proxy_set_header     X-Real_IP                                  $remote_addr;
            proxy_set_header     X-Forwarded-For                    $remote_addr;
            proxy_set_header     X-Forwarded-Proto                 https;
            proxy_pass                http://oauth-proxy;
        }
    }
}
oauth2_proxy.cfg
http_address = "0.0.0.0:4180"

upstream = ["http://127.0.0.1:8080/"]

# メールアドレスのドメインを指定、アスタリスクで全て許容
email_domain = ["*"]

# 認可するロール
permission_policies = []

oidc_issuer_url = "https://xxx.xxx.xxx/dex"

cookie_secure = true

# 認証方法を指定
provider = "oidc"

# X-Forwarded-Access-Token
pass_access_token = true

# Authorization Bearer header
pass_authorization_header = true

reverse_proxy = true
set_xauthrequest = true
set_authorization_header = true

skip_provider_button = true

# ヘルスチェック用エンドポイントもこちらに記載する
skip_auth_regex = [
    "^(/|/forbidden|/health|/favicon.ico)$",
    "^(/|/forbidden|/health|/favicon.ico)$",
    "^(js|image|css|stylesheets)/.*$",
    ".(png|css|ico|js)$"
]

configMapとSecret

上記configはconfigMapに、tlsの鍵データとoauth2proxy用各パラメータはSecretに登録します。
cookie secretをpythonで生成する際に、byte数が(16, 24, 32)のいずれかである必要があります。

$ NAMESPACE=xxxxx
$ alias k=kubectl

# tls作成
$ k -n=${NAMESPACE} create secret tls pki-tls --key=secret/server.key --cert=secret/server.crt
secret/pki-tls created

# OAuth2 proxy 各パラメータをbase64エンコードして、マニフェストに記載する
$ echo -n "<client-id>" | base64

$ echo -n "<client-secret>" | base64

# cookie secretの生成、byte型になっていたり改行が含まれているとエラーとなるので注意
$ python -c 'import os,base64; print(base64.urlsafe_b64encode(os.random(16)).decode(), end="")' | base64

$ cat secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: oauth2proxy
  namespace: xxx-xxx-xxx
type: Opaque
data:
  client-id: <base64 client-id>
  client-secret: <base64 client-secret>
  cookie-secret: <base64 cookie-secret>

$ k apply -f secret.yaml
secret/oauth2proxy created

# configMap
$ k create cm nginx-config --from-file=config/nginx.conf
$ k create cm proxy-config --from-file=config/oauth2_proxy.cfg

# 確認コマンド
$ k -n=${NAMESPACE} describe secret kpi-tls

$ k -n=${NAMESPACE} describe secret oauth2proxy

# 要jqコマンドのインストール
$ k -n=${NAMESPACE} get secret kpi-tls -o json | jq -r '.data."tls.crt"'

$ k -n=${NAMESPACE} get cm nginx-config -o json | jq -r '.data."nginx.conf"'

$ k -n=${NAMESPACE} get cm proxy-config -o json | jq -r '.data."oauth2_proxy.cfg"'

# 24byteであることの確認
$ k -n=${NAMESPACE} get secret oauth2proxy -o json | jq -r '.data."cookie-secret"' | base64 -d | wc -c
24

マニフェストファイル

特に変わった使い方はしていません。
deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: test-app
  namespace: xxx-xxx-xxx
spec:
  replicas: 2
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 1
  minReadySeconds: 10
  template:
    spec:
      volumes:
        - name: secret-volume
          secret:
            secretName: pki-tls
        - name: nginx-volume
          configMap:
            name: nginx-config
        - name: oidc-volume
          configMap:
            name: proxy-config
      containers:
      - image: nginx
        name: nginx
        ports:
          - containerPort: 443
        resources:
          limits:
            memory: 64Mi
          requests:
            cpu: 100m
            memory: 64Mi
        volumeMounts:
          - mountPath: /etc/nginx/ssl
             name: secret-volume
             readOnly: true
          - mountPath: /etc/nginx/nginx.conf
             subPath: nginx.conf
             name: nginx-volume
             readOnly: true
      - image: oauth2_proxy  # PJで管理しているイメージです、公開されているものと同じはず
         args: ["-config=/etc/oauth2/oauth2_proxy.cfg"]
         ports:
           - containerPort: 4180
        resources:
          limits:
            memory: 512Mi
          requests:
            cpu: 100m
            memory: 512Mi
        volumeMounts:
          - mountPath: /etc/oauth2/oauth2_proxy.cfg
             subPath: oauth2_proxy.cfg
             name: oidc-volume
             readOnly: true
        env:
          - name: OAUTH2_PROXY_CLIENT_ID
             valueFrom:
               secretKeyRef:
                 name: oauth2proxy
                 key: clinet-id
          - name: OAUTH2_PROXY_CLIENT_SECRET
             valueFrom:
               secretKeyRef:
                 name: oauth2proxy
                 key: cookie-secret
          - name: OAUTH2_PROXY_COOKIE_SECRET
             valueFrom:
               secretKeyRef:
                 name: oauth2proxy
                 key: clinet-id
          - name: OAUTH2_PROXY_HTTP_ADDRESS  # ここで指定しないと、127.0.0.1で起動してしまう?
             value: "0.0.0.0:4180"
          - name: OAUTH2_PROXY_REDIRECT_URL
             value: <登録したリダイレクトURLを指定>
      - image: python-app  # 詳細は割愛、実装したのはflaskアプリです
         name: python-app
         ports:
           - containerPort: 8080
         resources:
           limits:
             cpu: 1000m
             memory: 2G
         lifecyle:
           preStop:
             exec:
               command: ["/bin/sh", "-c", "sleep 20"]
         readinessProbe:
           httpGet:
             path: /health
             port: 8080
           initialDelaySeconds: 30
           periodSeconds: 5
           failureThreshold: 10

         livenessProbe:
           httpGet:
             path: /health
             port: 8080
           initialDelaySeconds: 60
           periodSeconds: 10
           timeoutSeconds: 10

ingress.yaml

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  name: test-ingress
  namespace: test-namespace
  annotations:
    ingress.zlab.co.jp/backend-config: '{"xxx-xxx-xxx": {"443": {"tls": true, "sni": "<ドメイン名を記載>"}}}'
spec:
  tls:
    - secretName: pki-tls
  rules:
  - host: <ドメイン名を記載>
     http:
       paths:
       - backend:
           serviceName: testsvc
           servicePort: 443

 service.yaml

apiVersion: v1
kind: Service
metadata:
  name: testsvc
  labels:
    app: test-app
  namespace: xxx-xxx-xxx
spec:
  selector:
    app: test-app
  ports:
  - protocol: TCP
     port: 443
     targetPort: 443

【Kubernetes】kustomizeで環境毎のマニフェストを作成

f:id:tm200:20210507170855p:plain

kubernetesで環境毎の設定管理にkustomizeを使用したのでメモ。
ingressのannotationsにスラッシュが含まれていた為、patchesJson6902のプロパティ指定はどうやるのだろうと思ったのですが、エスケープに「~1」を用いる事で解決しました。


⬇️ これからkubernetesやDockerの経験を積みたい方にお勧めです!
Dockerfileや各種マニフェストの基本的な使い方が書かれています。

Docker/Kubernetes 実践コンテナ開発入門

Docker/Kubernetes 実践コンテナ開発入門

  • 作者:山田 明憲
  • 発売日: 2018/08/25
  • メディア: 単行本(ソフトカバー)


kustomize

github.com

環境

macOS Catalina 10.15.7

# インストール
$ brew install kustomize

ディレクトリはこんな感じ。

$ tree
.
...
├── k8s
│    ├── base
|    |    ├── config
│    |    |    └── nginx.conf # configMapに登録する設定ファイル
│    |    ├── deployment.yaml
│    |    ├── ingress.yaml
│    |    ├── kustomization.yaml
│    |    ├── pod_disruption_budget.yaml
|    |    └── service.yaml
│    └── overlays
|        ├── staging
|        |   ├── kustomization.yaml
|        |   ├── patch_deployment.yaml
|        |   └── patch_ingress.yaml
|        └── prod
|             └── ...
...

base

ベースとなる各種yamlを配置。

kustomization.yaml

configMapGeneratorでconfigMapを生成する方法を記載。
jsonyamlなどもこの方法で指定できる、、、はず。

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

commonLabels:
  app: test-app

resources:
- deployment.yaml
- ingress.yaml
- service.yaml
- pod_disruption_budget.yaml

# configMapの設定、jsonなどの設定ファイルから取り込む場合
configMapGenerator:
- name: nginx-config
   namespace: test-namespace # ネームスペース を個別に指定する場合
   files:
     - config/nginx.conf

# configMapの共通設定
generatorOptions:
  disableNameSuffixHash: true # 生成されるconfigMapのサフィックスからハッシュ値を取り除く場合
  labels:
    app: test-app

deployment.yaml

普通の3層クライアントサーバーシステムっぽい設定。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: test-app
spec:
  selector:
    matchLabels:
      app: test-app
  replicas: 2
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 1
  minReadySeconds: 10
  template:
    metadata:
      labels:
        app: test-app
    spec:
      volumes:
      - name: nginx-volume
        configMap:
          name: nginx-config # volumeに割り当て

      containers:
      - name: nginx
        image: nginx:1.14.2
        ports:
        - containerPort: 80
        limits:
          memory: 64Mi
        requests:
          cpu: 100m
          memory: 64Mi
        volumeMounts:
          - mountPath: /etc/nginx/nginx.conf
             subPath: nginx.conf
             name: nginx-volume
             readOnly: true
      - name: hello-python # pythonコードは割愛
        image: hello-python:latest
        imagePullPolicy: Never
        env:
          - name: TEST_ENV_VALUE
             value: 0
        ports:
        - containerPort: 8080
        limits:
          memory: 1G
          cpu: 1000m

        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 5
          failureThreshold: 10

        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 60
          periodSeconds: 10
          timeoutSeconds: 10

        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 20"]

pod_disruption_budget.yaml

停止状態として許容できる Pod 数を定義。

apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: test-app
spec:
  maxUnavailable: 1
  selector:
    matchLabels:
      app: test-app

ingress.yaml

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  name: test-ingress
  namespace: test-namespace
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  rules:
  - host: http://test.example1.com
     http:
       paths:
       - backend:
           serviceName: testsvc
           servicePort: 80

service.yaml

apiVersion: v1
kind: Service
metadata:
  name: testsvc
  labels:
    app: test-app
  namespace: test-namespace
spec:
  selector:
    app: test-app
  ports:
  - protocol: TCP
     port: 80
     targetPort: 80

overlays

環境差分を記載するyamlはこちらに配置。
基本はスラッシュ区切りでプロパティを指定する。
配列要素はインデックスを指定する事でアクセスできる。

kustomization.yaml

ベースとなるディレクトリとパッチするリソースをここで設定する。
ここではIngressとDeploymentを指定。

bases:
  - ../../base

patchesJson6902:
- target:
    group: networking.k8s.io
    version: v1beta1
    kind: Ingress
    name: test-ingress
  patch: patch_ingress.yaml # 差分を記載したファイルを指定
- target:
    group: apps
    version: v1
    kind: Deployment
    name: test-app
  patch: patch_deployment.yaml # こちらも同様

patch_ingress.yaml

ドメインとannotationsを変更する例。
annotationsのスラッシュは「~1」でエスケープ。

- op: replace
   path: /spec/rules/0/host
   value: http://test.example2.com
- op: replace
   # スラッシュを「~1」に置き換える
   path: /metadata/annotations/nginx.ingress.kubernetes.io~1rewrite-target
   value: /another

patch_deployment.yaml

レプリカ数と環境変数を変更。

- op: replace
   path: /spec/replicas
   value: 3
- op: replace
    # 環境変数「TEST_ENV_VALUE」を変更
   path: /spec/template/spec/containers/1/env/0/value
   value: 1

build

差分が反映されるか確認する。

$ cd k8s/base

$ kustomize build ../overlays/staging
apiVersion: v1
data:
  nginx.conf: | -
    ....
kind: ConfigMap
metadata:
  labels:
    app: test-app
  name: nginx-config
  namespace: test-namespace
---
apiVersion: v1
... 
# ビルド結果が出力されるので内容を確認する

【Node.js】Expressからcassandraにアクセスする

f:id:tm200:20210426210331p:plain

Node.jsとExpressでAPIを実装した際にcassandraに接続したので、やり方をメモ。
Markdown 記法に変えたので、若干書体が変わっています。

⬇️ 分かりやすい良書です😊

環境

Node.jsバージョンは14。
プロジェクトはexpress-generator で作成しました。
作成方法は割愛。

package.json
{
  "name": "sample",
  "version": "1.0.0",
  "description": "sample project",
  "author": "sample",
  "private": true,
  "scripts": {
    "start": "node ./bin/www",
    "test": "./node_modules/.bin/jest",
    "lint": "./node_modules/.bin/eslint --ext .js ./",
  },
  "dependencies": {
    "express": "^4.17.1",
    "cassandra-driver": "^4.2.0",
    "cookie-parser": "~1.4.4",
    "debug": "^4.3.1",
    "http-errors": "^1.8.0"
  },
  "devDependencies": {
    .....
  }
}

cassandra-driver

docs.datastax.com

実装

./bin/www

cassandraオブジェクトを毎回クローズするとたまにエラーが発生する為、グローバルに保持してアプリケーション終了時にクローズします。
やり方は色々あると思います。
他にはuidとgidの設定、グレースフルシャットダウンの処理があります。

#!/usr/bin/env node
const app = require('../app');
const debug = require('debug')('sample:server');
const https = require('https');
const fs = require('fs');
const cassandra = require('../app-lib/cassandra/driver');

const user = 'app-user';
const group = 99999;
const port = normalizePort(process.env.PORT || '403');
app.set('port', port);

const options = {
  key: fs.readFileSync('/path/to/tls/private/xxxx.key'),
  cert: fs.readFileSync('/path/to/tls/certs/xxxx.crt'),
  ca: fs.readFileSync('/path/to/tls/certs/xxxx-ca.crt'),
};

// ここでコネクションを取得
cassandra.connect()
  .then(() => {
    app.set('cassandra', cassandra);
    const server = https.createServer(options. app);

    function wireUpServer(server) {
      let connections = {};
      server.on('connection', function(conn) {
        const key = conn.remoteAddress + ':' + conn.remotePort;
        connections[key] = conn;
        conn.on('close', function() {
          delete connections[key];
        });
      });

      server.destroy = function(cb) {
        // 終了時にcassandraクローズ
        cassandra.close().catch(() => null);
        server.close(cb);
        for (const key in connections) {
          connections[key].destroy();
        }
      };
    }

    wireUpServer(server);

    const gracefulShutdown = function() {
      server.destroy(() => {
        process.exit();
      });
    };
    process.on('SIGTERM', gracefulShutdown);
    process.on('SIGINT', gracefulShutdown);

    function onListening() {
      let addr = server.address();
      let bind = typeof addr === 'string'
        ? 'pipe ' + addr
        : 'port ' + addr.port;
      debug('Listening on ' + bind);
    }
    
    server.listen(port, () => {
      try {
        process.setgid(group);
        process.setuid(user);
      } catch (err) {
        console.error(err);
        process.exit(1);
      }
    });

    server.on('error', onError);
    server.on('listening', onListening);
  })
  .catch(() => process.exit(1));

function normalizePort(val) {
  let port = parseInt(val, 10);

  if (isNaN(port)) {
    return val;
  }

  if (port >= 0) {
    return port;
  }

  return false;
}

function onError(error) {
  if (error.syscall !== 'listen') {
    throw error;
  }

  let bind = typeof port === 'string'
    ? 'Pipe ' + port
    : 'Port ' + port;

  switch (error.code) {
    case 'EACCES':
      console.error(bind + ' requires elevated privileges');
      process.exit(1);
      break;
    case 'EADDRINUSE':
      console.error(bind + ' is already in use');
      process.exit(1);
      break;
    default:
      throw error;
  }
}
app.js

こちらは大して変更ありません。

'use strict';
const express = require('express');
const cookieParser = require('cookie-parser');

const indexRouter = require('./routes/index');

const app = express();

app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());

app.use('/', indexRouter);

/* catch 404 and forward to error handler */
app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars
  res.status(404).json({message: 'Request url not found.'});
});

/* error handler */
app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars
  res.locals.message = err.message;
  res.locals.error = req.app.get('env') === 'development' ? err : {};

  res.status(err.status || 500).json({message: err.message});
});

module.exports = app;
./app-lib/cassandra/config.js

実際は外部から受け取れるようにするか、configを複数用意して環境毎に設定します。

'use strict';

module.exports = {
  config: {
    contactPoints: ['sample.cassandra1.co.jp', 'sample.cassandra2.co.jp', 'sample.cassandra3.co.jp'],
    keyspace: 'test_key_space',
    localDataCenter: 'dc1',
    protocolOptions: {
      port: 9042,
    },
    socketOptions: {
      connectTimeout: 120000,
    },
  },
};
./app-lib/cassandra/driver.js

cassandra-driverをラップ。

'use strict';
const cassandra = require('cassandra-driver');
const Config = require('./config');

// リトライ処理
function retryable(func, retryCount=5) {
  let promise = Promise.reject().catch(() => func());
  for (let i = 0; i < retryCount; i++) {
    promise = promise.catch(() => func());
  }
  return promise;
}

module.exports = {
  client: null;
  connect: () => {
    return new Promise((resolve, reject) => {
      try {
        // オブジェクトを上書きする場合はディープコピーが必要
        const csConfig = JSON.parse(JSON.stringfy(Config.config));
        // クレデンシャルの取得処理(今回は何もしません)
        csConfig.credentials.username = 'test-user';
        csConfig.credentials.password = 'xxxxx';
        this.clinet = new cassandra.Client(csConfig);
        resolve(this.clinet);
      } catch (e) {
        reject(e);
      }
    });
  },

  insert: (query, param=null, retryCount=5) {
    return new Promise((resolve, reject) => {
      retryable(() => this.client.execute(query, param, { prepare: true })), retryCount)
        .then(() => resolve)
        .catch(e => reject(e));
    });
  },

  find: (query, param=null, retryCount=5) {
    return new Promise((resolve, reject) => {
      retryable(() => this.client.execute(query, param, { prepare: true })), retryCount)
        .then(result => resolve(result.rows))
        .catch(e => reject(e));
    });
  },

  close: () {
    return new Promise((resolve, reject) => {
      if (this.client) {
        this.client.shutdown()
          .then(()=> resolve)
          .catch(e => reject(e));
      } else {
        resolve();
      }
    });
  },
};
./routes/index.js

エラーは補足してnextに渡す。

'use strict';
const express = require('express');
const router = express.Router();

const wrap = fn => (...args) => fn(...args).catch(args[2]);

router.get('/', wrap( async(req, res, next) => {
    try {
      const pk = req.body.id;
      const cassandra = req.app.get('cassandra');
      const result = await cassandra.find('SELECT * FROM test_table WHERE id = :id', {id: pk});
      return res.status(200).json({result: result});
    } catch (e) {
      next(e);
    }
  }
));

module.exports = router;

【Python】DataStax Python Driverでcassandraにアクセスする

 

                                           f:id:tm200:20210424173901p:plain

 

Pythonでcassandraの登録処理や取得処理を実装したのでメモ。

DataStaxのpython driverを使用しています。

テーブル設計なども行ったのですがパーテションキー=nodeという概念で、1pk=1nodeに多くても100レコード以内が望ましいそうです。

一意なID等なら問題ないのですが、それ以外の条件で取得する場合は設計に注意する必要があります。

 

環境

Pythonのバージョンは3.7。

cassandra-driverをインストールするにはCythonが必要です。

$ pip3 install Cython
// 先にCythonのインストールが必要
$ pip3 install cassandra-driver

もしくは以下のオプションを指定。

$ pip3 install cassandra-driver --install-option="--no-cython"

cassandra-driver

 この辺りはよく見るかも?

docs.datastax.com

 

実装

要件で接続失敗時にリトライする必要があった為、そちらも記載。

# -*- coding: utf-8 -*-
import os
import time
from functools import wrap
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.query import dict_factory, SimpleStatement


def exec_cql(func):
"""リトライ処理"""
// configに定義するなど、お好きに
delay = int(os.getenv('DELAY', 10))
attempts = int(os.getenv('RETRY', 5))

@wrap(func)
def wrapper(*args, **kwargs):
retry = 0
while retry <= attempts:
try:
retry += 1
result = func(*args, **kwargs)
return result
except Exception as e:
if retry > attempts:
raise e
time.sleep(delay)
return wrapper


class CassandraClient:

def __init__(self, hosts, key_space, user, password,
port=9042, timeout=120, protocol_version=3, local_dc=None):
self._hosts = hosts
self._port = port
self._user = user
self._pass = password
self._key_space = key_space
self._timeout = timeout
self._protocol_version = protocol_version
self._local_dc = local_dc

self.cluster = None
self.session = None
self._connect()

def _connect():
auth_provider = PlainTextAuthProvider(username=self._user, password=self._pass)

if self._local_dc:
policy = DCAwareRoundRobinPolicy(local_dc=self._local_dc)
else:
policy = DCAwareRoundRobinPolicy()

profile = ExecutionProfile(
load_balancing_policy=policy
raw_factory=dict_factory // 戻り値が辞書になる
)
// contact_pointsはリスト型
self.cluster = Cluster(
contact_points=self._hosts, port=self._port, auth_provider=auth_provider,
execution_profiles={ EXEC_PROFILE_DEFAULT: profile}
)
self.session = self.cluster.connect(self._key_space)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

@exec_cql
def execute_query(self, cql, param=None):
"""クエリ発行"""
return self.session.execute(cql, parameters=param, timeout=self._timeout)

@exec_cql
def paging(self, cql, param=None, fetch_size=100):
"""逐次処理がしたい場合"""
stmt = SimpleStatement(cql, fetch_size=fetch_size)
return self.session.execute_async(cql, parameters=param, timeout=self._timeout)

def close(self):
self.session.shutdown()
self.cluster.shutdown()

def make_insert_query(self, table: str, param: dict):
"""辞書型のパラメータからINSERTクエリを組み立てる"""
cql = 'INSERT INTO ' + table
cql += ' ('
cql += ', '.join(param)
cql += ') VALUES ('
cql += ', '.join(map(self._dict_value_pad, param))
cql += ')'
return cql


def make_insert_query(self, table: str, param: dict, keys: list):
"""辞書型のパラメータからUPDATEクエリを組み立てる"""
i = 0
j = 0
cql = f'UPDAT {table} SET '
for k in param:
if k not in param:
if i > 0:
cql += ', '
cql += f'{k} = %({k})s'
i += 1
cql += ' WHERE '
for pk in keys:
if j > 0:
cql += ' AND '
cql += f'{pk} = %({pk})s'
j += 1
return cql

@staticmethod
def dict_value_pad(key: str):
return f'%({key})s'


if __name__ == '__main__':
hosts = ['xxxx.xxxx1.jp','xxxx.xxxx2.jp','xxxx.xxxx3.jp']
key_space = 'test_space'
user = 'test-useer'
password = 'xxxxx'
with CassandraClient(hosts, key_space, user, password) as client:
result = client.execute_query('SELECT * FROM sample WHERE id=%s', ('1',))
print(result)

 

逐次処理用のハンドラークラス。

from threading import Event

class
PagedResultHandlar(object):
def __init__(self, future, item):
self.future = future
self.item = item // 保持したい値があれば
self.error = None
self.finished_event = Event()
self.future.add.callbacks(callback=self.handle_page, errorback=self.handle_error)

def handle_page(self, rows):
for row in rows:
// 取得した値にゴニョゴニョする
print(row)
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()

def
handle_error(self, exec):
self.error = exec
self.finished_event.set()


// 使用例

with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
future = client.paging('SELECT * FROM sample WHERE date=%s', param=('20210101',))
handlar = PagedResultHandlar(future, '渡したいパラメータ')
handlar.finished_event.wait()
if handlar.error:
raise handlar.error

ValueSequence

IN句を使用する場合、ValueSequenceを使用します。

with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
        query = 'SELECT * FROM sample WHERE id in %s'
result = client.execute_query(query, param=(ValueSequence['id1', 'id2', 'id3'],))
print(result.current_rows)

Prepared Statement

何度も発行するクエリはPrepared Statementを利用すると通信効率で有利なようです。

with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
        // バインド部が`?`となる
        query = client.session.prepare('SELECT * FROM sample WHERE id in ?')
result = client.execute_query(query, param=(ValueSequence['id1', 'id2', 'id3'],))
print(result.current_rows)

【Python】PyWebHdfsでHDFSを操作する

f:id:tm200:20210423221904j:plain

PythonHDFSにファイルアップロードする処理を実装したのでメモ。

単純な処理ならcurlで十分なのですが、色々やりたい時は便利そうです。

事前にKerberos認証済みの想定です。

 

 ⬇️ まだ読めてないです😎

環境

やはりPythonのバージョンは3.7。

$ pip3 install pywebhdfs==0.4.1 requests-kerberos==0.12.0

最初にローカルでテストコードを実行しようとして、インストール出来なかったんですよね。

Mac osのバージョンによってはARCHFLAGSの宣言とか必要かも?

$ export ARCHFLAGS="-arch x86_64"
// 先にインストールが必要かも?
$ pip3 install pbr==5.5.1

PyWebHdfs

pythonhosted.org

実装

PyWebHdfsClientモジュールを継承して使いやすくしています。

# -*- coding: utf-8 -*-
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
from pywebhdfs.webhdfs import PyWebHdfsClient


class HttpFsClient(PyWebHdfsClient):

def __init__(self, base_uri_pattern, dir_permision='750', permision='640', overwrite=False,
reprication=3, block_size=134217728, stream=False, debug=False, verify=False, auth=None):
self.dir_permision = dir_permision
self.permision = permision
self.overwrite = overwrite
self.replication = replication
self.block_size = block_size
self.debug = debug
if auth is None:
auth = HTTPKerberosAuth(mutal_authentication=OPTIONAL)

self.webhdfs_kwargs = {
'base_uri_pattern': base_uri_pattern,
'timeout': 120,
'
requests_extra_opts': {
'
verify': verify,
'
auth': auth,
'
stream': stream
}
}
PyWebHdfsClient.__init__(
self, **self.webhdfs_kwargs)

def put_file(self, hdfs_path, src_file):
with open(src_file, 'rb') as file_data:
kwargs = {
'overwrite': self.overwrite,
'permision': self.permision,
'blocksize': self.block_size,
'reprication': self.replication
}
return PyWebHdfsClient.create_file(self, hdfs_path, file_data=file_data, **kwargs)

def create_dir(self, hdfs_path):
if not self.exists_file_dir(hdfs_path):
self.make_dir(hdfs_path)


if __name__ == '__main__':
client = HttpFsClient('https://xxxx.xxx:4443/webhdfs/v1/')
client.create_dir('projects/sample/test')
client.put_file('projects/sample/test/xxx.tsv')
// 継承しているので、元のモジュールと同じように使える
print(client.list_dir('projects/sample/test'))

 

【Python】PyHiveでHiveQL

f:id:tm200:20210423222055p:plain

 

業務でPyHiveを使用したのでメモ。(忘れる自信しかない😂 )

環境によっては認証周りが変わりそうです。

 

 ⬇️ クリーンアーキテクチャの事が分かりやすく書かれていて、お勧めです🐱

環境

Pythonのバージョンは3.7。

$ pip3 install PyHive==0.6.3 sasl==0.2.1 thrift==0.13.0 thrift-sasl==0.4.2

 

SASL

www.cyrusimap.org

 

Hive

 

Hiveの高速化はこちらが参考になります。

qiita.com

 

実装

最近、with ステートメントがスマートでお気に入りです。  

configrationはconfに切り出して管理すれば、環境毎に設定を分けたりしやすくなります。

# -*- encoding: utf-8 -*-
import os
import sasl
import thrift
import thrift-sasl
from thrift import transport
from pyhive import hive


class HiveConnector:

def __init__(self, host, port, db, service='hive', auth='GSSAPI', queue='unfunded'):
self.host = host
self.port = int(port)
self.db = db
self.service = service
self.auth = auth
self.queue = queue
self.max_buf_size = 256 * 1024 * 1024

socket = thrift.transport.TSocket.TSocket(self.host, self.port)
thrift_transport = thrift_sasl.TSaslClientTransport(
self._sasl_factory, self.auth, socket)
self.connection = hive.Connection(
thrift_transport=thrift_transport,
database=self.db,
configuration={ // オプションはここで指定、別ファイルに切り出しても良さそう
'tez.queue.name': self.queue,
'hive.exec.scratchdir': '/user/{ユーザー名}/tmp',
'hive.exec.dynamic.partition': 'true',
'hive.exec.dynamic.partition.mode': 'nonstrict',
'hive.optimize.sort.dynamic.partition': 'false',
'hive.vectorized.execution.enabled': 'true',
'hive.vectorized.execution.reduce.enabled': 'true',
'hive.cbo.enabled': 'true',
'hive.compute.query.using.stats': 'true',
'hive.stats.fetch.column.stats': 'true',
'hive.stats.fetch.partition.stats': 'true'
}
)

def _sasl_factory(self):

sasl_client = sasl.Client()
sasl_client.setAttr('host', self.host)
sasl_client.setAttr('service', self.service)
sasl_client.setAttr('maxbufsize', self.max_buf_size)
sasl_client.init()
return sasl_client

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

   // 更新系
def execute(self, query, param=None):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
self.connection.commit()

   // 1件取得
def fetchone(self, query, param=None):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
return cur.fetchone()

   // 全件取得
def fetchall(self, query, param=None):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
return cur.fetchall()

   // 逐次取得
def fetchmany(self, query, param=None, size=1000):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
          with True:
rows = cur.fetchmany()
              if not rows:
                  break
for row in rows:
                  yield row

def close(self):
self.connection.close()


if __name__ == '__main__':
host = 'https://xxxx.xxx'
with HiveConnector(host, 10000, 'test_db') as hc:
query = 'SELECT * FROM access_logs WHERE log_date=%s'
result = hc.fetchall(query, ('20210101',))
print(result)

 

【Python】boto3でS3のファイルをSSE-C暗号化

                                            f:id:tm200:20210423222228j:plain
 

音声ファイルをS3に格納する際にSSE-C方式で暗号化したので備忘録。

botoの語源はアマゾンカワイルカだとか🐬

 

coin-look.pages.dev

公式

boto3.amazonaws.com

環境

Pythonのバージョンは3.7。

3.6は2021年末にサポートが切れるようです。

pip3 install boto3==1.17.22

実装

# -*- coding: utf-8 -*-
import hashlib
import base64

from boto3 import Session


def main():
// クレデンシャルはべた書きNG!
access_key_id = 'xxxxx'
secret_access_key = 'xxxxx'
sse_c_secret_key = ({秘密鍵の文字列
}).encode('utf-8')
// MD5で鍵をハッシュ化
digest = hashlib.md5(sse_c_secret_key).digest()

// HTTPヘッダの設定
sse_c_key_base64 = base64.b64encode(sse_c_secret_key).decode()
sse_c_key_md5 = base64.b64encode(digest).decode()
sse_c_header = {
'
SSECustomerAlgorithm': 'AES256',
'SSECustomerKey': sse_c_key_base64,
'SSECustomerKeyMD5': sse_c_key_md5
}
session = Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name='xxxxx',
)

con = session.resource('s3', endpoint_url='https://xxxx.xxx')
bucket = con.Bucket('bucket_name')

// アップロード
bucket.upload_file('{アップロードするファイルのパス}', '{S3のパス}', ExtraArgs=sse_c_header)

// ダウンロード

bucket.download_file('{S3のパス}', '{出力先のパス}', ExtraArgs=sse_c_header)


if __name__ == '__main__':
main()