Pythonでcassandraの登録処理や取得処理を実装したのでメモ。
DataStaxのpython driverを使用しています。
テーブル設計なども行ったのですがパーテションキー=nodeという概念で、1pk=1nodeに多くても100レコード以内が望ましいそうです。
一意なID等なら問題ないのですが、それ以外の条件で取得する場合は設計に注意する必要があります。
環境
Pythonのバージョンは3.7。
cassandra-driverをインストールするにはCythonが必要です。
$ pip3 install Cython
// 先にCythonのインストールが必要
$ pip3 install cassandra-driver
もしくは以下のオプションを指定。
$ pip3 install cassandra-driver --install-option="--no-cython"
cassandra-driver
この辺りはよく見るかも?
docs.datastax.com
実装
要件で接続失敗時にリトライする必要があった為、そちらも記載。
# -*- coding: utf-8 -*-
import os
import time
from functools import wrap
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.query import dict_factory, SimpleStatement
def exec_cql(func):
"""リトライ処理"""
// configに定義するなど、お好きに
delay = int(os.getenv('DELAY', 10))
attempts = int(os.getenv('RETRY', 5))
@wrap(func)
def wrapper(*args, **kwargs):
retry = 0
while retry <= attempts:
try:
retry += 1
result = func(*args, **kwargs)
return result
except Exception as e:
if retry > attempts:
raise e
time.sleep(delay)
return wrapper
class CassandraClient:
def __init__(self, hosts, key_space, user, password,
port=9042, timeout=120, protocol_version=3, local_dc=None):
self._hosts = hosts
self._port = port
self._user = user
self._pass = password
self._key_space = key_space
self._timeout = timeout
self._protocol_version = protocol_version
self._local_dc = local_dc
self.cluster = None
self.session = None
self._connect()
def _connect():
auth_provider = PlainTextAuthProvider(username=self._user, password=self._pass)
if self._local_dc:
policy = DCAwareRoundRobinPolicy(local_dc=self._local_dc)
else:
policy = DCAwareRoundRobinPolicy()
profile = ExecutionProfile(
load_balancing_policy=policy
raw_factory=dict_factory // 戻り値が辞書になる
)
// contact_pointsはリスト型
self.cluster = Cluster(
contact_points=self._hosts, port=self._port, auth_provider=auth_provider,
execution_profiles={ EXEC_PROFILE_DEFAULT: profile}
)
self.session = self.cluster.connect(self._key_space)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
@exec_cql
def execute_query(self, cql, param=None):
"""クエリ発行"""
return self.session.execute(cql, parameters=param, timeout=self._timeout)
@exec_cql
def paging(self, cql, param=None, fetch_size=100):
"""逐次処理がしたい場合"""
stmt = SimpleStatement(cql, fetch_size=fetch_size)
return self.session.execute_async(cql, parameters=param, timeout=self._timeout)
def close(self):
self.session.shutdown()
self.cluster.shutdown()
def make_insert_query(self, table: str, param: dict):
"""辞書型のパラメータからINSERTクエリを組み立てる"""
cql = 'INSERT INTO ' + table
cql += ' ('
cql += ', '.join(param)
cql += ') VALUES ('
cql += ', '.join(map(self._dict_value_pad, param))
cql += ')'
return cql
def make_insert_query(self, table: str, param: dict, keys: list):
"""辞書型のパラメータからUPDATEクエリを組み立てる"""
i = 0
j = 0
cql = f'UPDAT {table} SET '
for k in param:
if k not in param:
if i > 0:
cql += ', '
cql += f'{k} = %({k})s'
i += 1
cql += ' WHERE '
for pk in keys:
if j > 0:
cql += ' AND '
cql += f'{pk} = %({pk})s'
j += 1
return cql
@staticmethod
def dict_value_pad(key: str):
return f'%({key})s'
if __name__ == '__main__':
hosts = ['xxxx.xxxx1.jp','xxxx.xxxx2.jp','xxxx.xxxx3.jp']
key_space = 'test_space'
user = 'test-useer'
password = 'xxxxx'
with CassandraClient(hosts, key_space, user, password) as client:
result = client.execute_query('SELECT * FROM sample WHERE id=%s', ('1',))
print(result)
逐次処理用のハンドラークラス。
from threading import Event
class PagedResultHandlar(object):
def __init__(self, future, item):
self.future = future
self.item = item // 保持したい値があれば
self.error = None
self.finished_event = Event()
self.future.add.callbacks(callback=self.handle_page, errorback=self.handle_error)
def handle_page(self, rows):
for row in rows:
// 取得した値にゴニョゴニョする
print(row)
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exec):
self.error = exec
self.finished_event.set()
// 使用例
with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
future = client.paging('SELECT * FROM sample WHERE date=%s', param=('20210101',))
handlar = PagedResultHandlar(future, '渡したいパラメータ')
handlar.finished_event.wait()
if handlar.error:
raise handlar.error
ValueSequence
IN句を使用する場合、ValueSequenceを使用します。
with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
query = 'SELECT * FROM sample WHERE id in %s'
result = client.execute_query(query, param=(ValueSequence['id1', 'id2', 'id3'],))
print(result.current_rows)
Prepared Statement
何度も発行するクエリはPrepared Statementを利用すると通信効率で有利なようです。
with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
// バインド部が`?`となる
query = client.session.prepare('SELECT * FROM sample WHERE id in ?')
result = client.execute_query(query, param=(ValueSequence['id1', 'id2', 'id3'],))
print(result.current_rows)