【Node.js】Expressからcassandraにアクセスする
Node.jsとExpressでAPIを実装した際にcassandraに接続したので、やり方をメモ。
Markdown 記法に変えたので、若干書体が変わっています。
⬇️ 分かりやすい良書です😊
環境
Node.jsバージョンは14。
プロジェクトはexpress-generator で作成しました。
作成方法は割愛。
package.json
{ "name": "sample", "version": "1.0.0", "description": "sample project", "author": "sample", "private": true, "scripts": { "start": "node ./bin/www", "test": "./node_modules/.bin/jest", "lint": "./node_modules/.bin/eslint --ext .js ./", }, "dependencies": { "express": "^4.17.1", "cassandra-driver": "^4.2.0", "cookie-parser": "~1.4.4", "debug": "^4.3.1", "http-errors": "^1.8.0" }, "devDependencies": { ..... } }
cassandra-driver
実装
./bin/www
cassandraオブジェクトを毎回クローズするとたまにエラーが発生する為、グローバルに保持してアプリケーション終了時にクローズします。
やり方は色々あると思います。
他にはuidとgidの設定、グレースフルシャットダウンの処理があります。
#!/usr/bin/env node const app = require('../app'); const debug = require('debug')('sample:server'); const https = require('https'); const fs = require('fs'); const cassandra = require('../app-lib/cassandra/driver'); const user = 'app-user'; const group = 99999; const port = normalizePort(process.env.PORT || '403'); app.set('port', port); const options = { key: fs.readFileSync('/path/to/tls/private/xxxx.key'), cert: fs.readFileSync('/path/to/tls/certs/xxxx.crt'), ca: fs.readFileSync('/path/to/tls/certs/xxxx-ca.crt'), }; // ここでコネクションを取得 cassandra.connect() .then(() => { app.set('cassandra', cassandra); const server = https.createServer(options. app); function wireUpServer(server) { let connections = {}; server.on('connection', function(conn) { const key = conn.remoteAddress + ':' + conn.remotePort; connections[key] = conn; conn.on('close', function() { delete connections[key]; }); }); server.destroy = function(cb) { // 終了時にcassandraクローズ cassandra.close().catch(() => null); server.close(cb); for (const key in connections) { connections[key].destroy(); } }; } wireUpServer(server); const gracefulShutdown = function() { server.destroy(() => { process.exit(); }); }; process.on('SIGTERM', gracefulShutdown); process.on('SIGINT', gracefulShutdown); function onListening() { let addr = server.address(); let bind = typeof addr === 'string' ? 'pipe ' + addr : 'port ' + addr.port; debug('Listening on ' + bind); } server.listen(port, () => { try { process.setgid(group); process.setuid(user); } catch (err) { console.error(err); process.exit(1); } }); server.on('error', onError); server.on('listening', onListening); }) .catch(() => process.exit(1)); function normalizePort(val) { let port = parseInt(val, 10); if (isNaN(port)) { return val; } if (port >= 0) { return port; } return false; } function onError(error) { if (error.syscall !== 'listen') { throw error; } let bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port; switch (error.code) { case 'EACCES': console.error(bind + ' requires elevated privileges'); process.exit(1); break; case 'EADDRINUSE': console.error(bind + ' is already in use'); process.exit(1); break; default: throw error; } }
app.js
こちらは大して変更ありません。
'use strict'; const express = require('express'); const cookieParser = require('cookie-parser'); const indexRouter = require('./routes/index'); const app = express(); app.use(express.json()); app.use(express.urlencoded({ extended: false })); app.use(cookieParser()); app.use('/', indexRouter); /* catch 404 and forward to error handler */ app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars res.status(404).json({message: 'Request url not found.'}); }); /* error handler */ app.use(function(err, req, res, next) { // eslint-disabled-line no-unused-vars res.locals.message = err.message; res.locals.error = req.app.get('env') === 'development' ? err : {}; res.status(err.status || 500).json({message: err.message}); }); module.exports = app;
./app-lib/cassandra/config.js
実際は外部から受け取れるようにするか、configを複数用意して環境毎に設定します。
'use strict'; module.exports = { config: { contactPoints: ['sample.cassandra1.co.jp', 'sample.cassandra2.co.jp', 'sample.cassandra3.co.jp'], keyspace: 'test_key_space', localDataCenter: 'dc1', protocolOptions: { port: 9042, }, socketOptions: { connectTimeout: 120000, }, }, };
./app-lib/cassandra/driver.js
cassandra-driverをラップ。
'use strict'; const cassandra = require('cassandra-driver'); const Config = require('./config'); // リトライ処理 function retryable(func, retryCount=5) { let promise = Promise.reject().catch(() => func()); for (let i = 0; i < retryCount; i++) { promise = promise.catch(() => func()); } return promise; } module.exports = { client: null; connect: () => { return new Promise((resolve, reject) => { try { // オブジェクトを上書きする場合はディープコピーが必要 const csConfig = JSON.parse(JSON.stringfy(Config.config)); // クレデンシャルの取得処理(今回は何もしません) csConfig.credentials.username = 'test-user'; csConfig.credentials.password = 'xxxxx'; this.clinet = new cassandra.Client(csConfig); resolve(this.clinet); } catch (e) { reject(e); } }); }, insert: (query, param=null, retryCount=5) { return new Promise((resolve, reject) => { retryable(() => this.client.execute(query, param, { prepare: true })), retryCount) .then(() => resolve) .catch(e => reject(e)); }); }, find: (query, param=null, retryCount=5) { return new Promise((resolve, reject) => { retryable(() => this.client.execute(query, param, { prepare: true })), retryCount) .then(result => resolve(result.rows)) .catch(e => reject(e)); }); }, close: () { return new Promise((resolve, reject) => { if (this.client) { this.client.shutdown() .then(()=> resolve) .catch(e => reject(e)); } else { resolve(); } }); }, };
./routes/index.js
エラーは補足してnextに渡す。
'use strict'; const express = require('express'); const router = express.Router(); const wrap = fn => (...args) => fn(...args).catch(args[2]); router.get('/', wrap( async(req, res, next) => { try { const pk = req.body.id; const cassandra = req.app.get('cassandra'); const result = await cassandra.find('SELECT * FROM test_table WHERE id = :id', {id: pk}); return res.status(200).json({result: result}); } catch (e) { next(e); } } )); module.exports = router;