【Python】DataStax Python Driverでcassandraにアクセスする

 

                                           f:id:tm200:20210424173901p:plain

 

Pythonでcassandraの登録処理や取得処理を実装したのでメモ。

DataStaxのpython driverを使用しています。

テーブル設計なども行ったのですがパーテションキー=nodeという概念で、1pk=1nodeに多くても100レコード以内が望ましいそうです。

一意なID等なら問題ないのですが、それ以外の条件で取得する場合は設計に注意する必要があります。

 

環境

Pythonのバージョンは3.7。

cassandra-driverをインストールするにはCythonが必要です。

$ pip3 install Cython
// 先にCythonのインストールが必要
$ pip3 install cassandra-driver

もしくは以下のオプションを指定。

$ pip3 install cassandra-driver --install-option="--no-cython"

cassandra-driver

 この辺りはよく見るかも?

docs.datastax.com

 

実装

要件で接続失敗時にリトライする必要があった為、そちらも記載。

# -*- coding: utf-8 -*-
import os
import time
from functools import wrap
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.query import dict_factory, SimpleStatement


def exec_cql(func):
"""リトライ処理"""
// configに定義するなど、お好きに
delay = int(os.getenv('DELAY', 10))
attempts = int(os.getenv('RETRY', 5))

@wrap(func)
def wrapper(*args, **kwargs):
retry = 0
while retry <= attempts:
try:
retry += 1
result = func(*args, **kwargs)
return result
except Exception as e:
if retry > attempts:
raise e
time.sleep(delay)
return wrapper


class CassandraClient:

def __init__(self, hosts, key_space, user, password,
port=9042, timeout=120, protocol_version=3, local_dc=None):
self._hosts = hosts
self._port = port
self._user = user
self._pass = password
self._key_space = key_space
self._timeout = timeout
self._protocol_version = protocol_version
self._local_dc = local_dc

self.cluster = None
self.session = None
self._connect()

def _connect():
auth_provider = PlainTextAuthProvider(username=self._user, password=self._pass)

if self._local_dc:
policy = DCAwareRoundRobinPolicy(local_dc=self._local_dc)
else:
policy = DCAwareRoundRobinPolicy()

profile = ExecutionProfile(
load_balancing_policy=policy
raw_factory=dict_factory // 戻り値が辞書になる
)
// contact_pointsはリスト型
self.cluster = Cluster(
contact_points=self._hosts, port=self._port, auth_provider=auth_provider,
execution_profiles={ EXEC_PROFILE_DEFAULT: profile}
)
self.session = self.cluster.connect(self._key_space)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

@exec_cql
def execute_query(self, cql, param=None):
"""クエリ発行"""
return self.session.execute(cql, parameters=param, timeout=self._timeout)

@exec_cql
def paging(self, cql, param=None, fetch_size=100):
"""逐次処理がしたい場合"""
stmt = SimpleStatement(cql, fetch_size=fetch_size)
return self.session.execute_async(cql, parameters=param, timeout=self._timeout)

def close(self):
self.session.shutdown()
self.cluster.shutdown()

def make_insert_query(self, table: str, param: dict):
"""辞書型のパラメータからINSERTクエリを組み立てる"""
cql = 'INSERT INTO ' + table
cql += ' ('
cql += ', '.join(param)
cql += ') VALUES ('
cql += ', '.join(map(self._dict_value_pad, param))
cql += ')'
return cql


def make_insert_query(self, table: str, param: dict, keys: list):
"""辞書型のパラメータからUPDATEクエリを組み立てる"""
i = 0
j = 0
cql = f'UPDAT {table} SET '
for k in param:
if k not in param:
if i > 0:
cql += ', '
cql += f'{k} = %({k})s'
i += 1
cql += ' WHERE '
for pk in keys:
if j > 0:
cql += ' AND '
cql += f'{pk} = %({pk})s'
j += 1
return cql

@staticmethod
def dict_value_pad(key: str):
return f'%({key})s'


if __name__ == '__main__':
hosts = ['xxxx.xxxx1.jp','xxxx.xxxx2.jp','xxxx.xxxx3.jp']
key_space = 'test_space'
user = 'test-useer'
password = 'xxxxx'
with CassandraClient(hosts, key_space, user, password) as client:
result = client.execute_query('SELECT * FROM sample WHERE id=%s', ('1',))
print(result)

 

逐次処理用のハンドラークラス。

from threading import Event

class
PagedResultHandlar(object):
def __init__(self, future, item):
self.future = future
self.item = item // 保持したい値があれば
self.error = None
self.finished_event = Event()
self.future.add.callbacks(callback=self.handle_page, errorback=self.handle_error)

def handle_page(self, rows):
for row in rows:
// 取得した値にゴニョゴニョする
print(row)
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()

def
handle_error(self, exec):
self.error = exec
self.finished_event.set()


// 使用例

with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
future = client.paging('SELECT * FROM sample WHERE date=%s', param=('20210101',))
handlar = PagedResultHandlar(future, '渡したいパラメータ')
handlar.finished_event.wait()
if handlar.error:
raise handlar.error

ValueSequence

IN句を使用する場合、ValueSequenceを使用します。

with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
        query = 'SELECT * FROM sample WHERE id in %s'
result = client.execute_query(query, param=(ValueSequence['id1', 'id2', 'id3'],))
print(result.current_rows)

Prepared Statement

何度も発行するクエリはPrepared Statementを利用すると通信効率で有利なようです。

with CassandraClient(['xxxx.xxxx1.jp'], 'test_space', 'test-useer', 'xxxxx') as client:
        // バインド部が`?`となる
        query = client.session.prepare('SELECT * FROM sample WHERE id in ?')
result = client.execute_query(query, param=(ValueSequence['id1', 'id2', 'id3'],))
print(result.current_rows)