【Node.js】Expressからcassandraにアクセスする

f:id:tm200:20210426210331p:plain

Node.jsとExpressでAPIを実装した際にcassandraに接続したので、やり方をメモ。
Markdown 記法に変えたので、若干書体が変わっています。

⬇️ 分かりやすい良書です😊

環境

Node.jsバージョンは14。
プロジェクトはexpress-generator で作成しました。
作成方法は割愛。

package.json
{
  "name": "sample",
  "version": "1.0.0",
  "description": "sample project",
  "author": "sample",
  "private": true,
  "scripts": {
    "start": "node ./bin/www",
    "test": "./node_modules/.bin/jest",
    "lint": "./node_modules/.bin/eslint --ext .js ./",
  },
  "dependencies": {
    "express": "^4.17.1",
    "cassandra-driver": "^4.2.0",
    "cookie-parser": "~1.4.4",
    "debug": "^4.3.1",
    "http-errors": "^1.8.0"
  },
  "devDependencies": {
    .....
  }
}

cassandra-driver

docs.datastax.com

実装

./bin/www

cassandraオブジェクトを毎回クローズするとたまにエラーが発生する為、グローバルに保持してアプリケーション終了時にクローズします。
やり方は色々あると思います。
他にはuidとgidの設定、グレースフルシャットダウンの処理があります。

#!/usr/bin/env node
const app = require('../app');
const debug = require('debug')('sample:server');
const https = require('https');
const fs = require('fs');
const cassandra = require('../app-lib/cassandra/driver');

const user = 'app-user';
const group = 99999;
const port = normalizePort(process.env.PORT || '403');
app.set('port', port);

const options = {
  key: fs.readFileSync('/path/to/tls/private/xxxx.key'),
  cert: fs.readFileSync('/path/to/tls/certs/xxxx.crt'),
  ca: fs.readFileSync('/path/to/tls/certs/xxxx-ca.crt'),
};

// ここでコネクションを取得
cassandra.connect()
  .then(() => {
    app.set('cassandra', cassandra);
    const server = https.createServer(options. app);

    function wireUpServer(server) {
      let connections = {};
      server.on('connection', function(conn) {
        const key = conn.remoteAddress + ':' + conn.remotePort;
        connections[key] = conn;
        conn.on('close', function() {
          delete connections[key];
        });
      });

      server.destroy = function(cb) {
        // 終了時にcassandraクローズ
        cassandra.close().catch(() => null);
        server.close(cb);
        for (const key in connections) {
          connections[key].destroy();
        }
      };
    }

    wireUpServer(server);

    const gracefulShutdown = function() {
      server.destroy(() => {
        process.exit();
      });
    };
    process.on('SIGTERM', gracefulShutdown);
    process.on('SIGINT', gracefulShutdown);

    function onListening() {
      let addr = server.address();
      let bind = typeof addr === 'string'
        ? 'pipe ' + addr
        : 'port ' + addr.port;
      debug('Listening on ' + bind);
    }
    
    server.listen(port, () => {
      try {
        process.setgid(group);
        process.setuid(user);
      } catch (err) {
        console.error(err);
        process.exit(1);
      }
    });

    server.on('error', onError);
    server.on('listening', onListening);
  })
  .catch(() => process.exit(1));

function normalizePort(val) {
  let port = parseInt(val, 10);

  if (isNaN(port)) {
    return val;
  }

  if (port >= 0) {
    return port;
  }

  return false;
}

function onError(error) {
  if (error.syscall !== 'listen') {
    throw error;
  }

  let bind = typeof port === 'string'
    ? 'Pipe ' + port
    : 'Port ' + port;

  switch (error.code) {
    case 'EACCES':
      console.error(bind + ' requires elevated privileges');
      process.exit(1);
      break;
    case 'EADDRINUSE':
      console.error(bind + ' is already in use');
      process.exit(1);
      break;
    default:
      throw error;
  }
}
app.js

こちらは大して変更ありません。

'use strict';
const express = require('express');
const cookieParser = require('cookie-parser');

const indexRouter = require('./routes/index');

const app = express();

app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());

app.use('/', indexRouter);

/* catch 404 and forward to error handler */
app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars
  res.status(404).json({message: 'Request url not found.'});
});

/* error handler */
app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars
  res.locals.message = err.message;
  res.locals.error = req.app.get('env') === 'development' ? err : {};

  res.status(err.status || 500).json({message: err.message});
});

module.exports = app;
./app-lib/cassandra/config.js

実際は外部から受け取れるようにするか、configを複数用意して環境毎に設定します。

'use strict';

module.exports = {
  config: {
    contactPoints: ['sample.cassandra1.co.jp', 'sample.cassandra2.co.jp', 'sample.cassandra3.co.jp'],
    keyspace: 'test_key_space',
    localDataCenter: 'dc1',
    protocolOptions: {
      port: 9042,
    },
    socketOptions: {
      connectTimeout: 120000,
    },
  },
};
./app-lib/cassandra/driver.js

cassandra-driverをラップ。

'use strict';
const cassandra = require('cassandra-driver');
const Config = require('./config');

// リトライ処理
function retryable(func, retryCount=5) {
  let promise = Promise.reject().catch(() => func());
  for (let i = 0; i < retryCount; i++) {
    promise = promise.catch(() => func());
  }
  return promise;
}

module.exports = {
  client: null;
  connect: () => {
    return new Promise((resolve, reject) => {
      try {
        // オブジェクトを上書きする場合はディープコピーが必要
        const csConfig = JSON.parse(JSON.stringfy(Config.config));
        // クレデンシャルの取得処理(今回は何もしません)
        csConfig.credentials.username = 'test-user';
        csConfig.credentials.password = 'xxxxx';
        this.clinet = new cassandra.Client(csConfig);
        resolve(this.clinet);
      } catch (e) {
        reject(e);
      }
    });
  },

  insert: (query, param=null, retryCount=5) {
    return new Promise((resolve, reject) => {
      retryable(() => this.client.execute(query, param, { prepare: true })), retryCount)
        .then(() => resolve)
        .catch(e => reject(e));
    });
  },

  find: (query, param=null, retryCount=5) {
    return new Promise((resolve, reject) => {
      retryable(() => this.client.execute(query, param, { prepare: true })), retryCount)
        .then(result => resolve(result.rows))
        .catch(e => reject(e));
    });
  },

  close: () {
    return new Promise((resolve, reject) => {
      if (this.client) {
        this.client.shutdown()
          .then(()=> resolve)
          .catch(e => reject(e));
      } else {
        resolve();
      }
    });
  },
};
./routes/index.js

エラーは補足してnextに渡す。

'use strict';
const express = require('express');
const router = express.Router();

const wrap = fn => (...args) => fn(...args).catch(args[2]);

router.get('/', wrap( async(req, res, next) => {
    try {
      const pk = req.body.id;
      const cassandra = req.app.get('cassandra');
      const result = await cassandra.find('SELECT * FROM test_table WHERE id = :id', {id: pk});
      return res.status(200).json({result: result});
    } catch (e) {
      next(e);
    }
  }
));

module.exports = router;