【Python】PyHiveでHiveQL

f:id:tm200:20210423222055p:plain

 

業務でPyHiveを使用したのでメモ。(忘れる自信しかない😂 )

環境によっては認証周りが変わりそうです。

 

 ⬇️ クリーンアーキテクチャの事が分かりやすく書かれていて、お勧めです🐱

環境

Pythonのバージョンは3.7。

$ pip3 install PyHive==0.6.3 sasl==0.2.1 thrift==0.13.0 thrift-sasl==0.4.2

 

SASL

www.cyrusimap.org

 

Hive

 

Hiveの高速化はこちらが参考になります。

qiita.com

 

実装

最近、with ステートメントがスマートでお気に入りです。  

configrationはconfに切り出して管理すれば、環境毎に設定を分けたりしやすくなります。

# -*- encoding: utf-8 -*-
import os
import sasl
import thrift
import thrift-sasl
from thrift import transport
from pyhive import hive


class HiveConnector:

def __init__(self, host, port, db, service='hive', auth='GSSAPI', queue='unfunded'):
self.host = host
self.port = int(port)
self.db = db
self.service = service
self.auth = auth
self.queue = queue
self.max_buf_size = 256 * 1024 * 1024

socket = thrift.transport.TSocket.TSocket(self.host, self.port)
thrift_transport = thrift_sasl.TSaslClientTransport(
self._sasl_factory, self.auth, socket)
self.connection = hive.Connection(
thrift_transport=thrift_transport,
database=self.db,
configuration={ // オプションはここで指定、別ファイルに切り出しても良さそう
'tez.queue.name': self.queue,
'hive.exec.scratchdir': '/user/{ユーザー名}/tmp',
'hive.exec.dynamic.partition': 'true',
'hive.exec.dynamic.partition.mode': 'nonstrict',
'hive.optimize.sort.dynamic.partition': 'false',
'hive.vectorized.execution.enabled': 'true',
'hive.vectorized.execution.reduce.enabled': 'true',
'hive.cbo.enabled': 'true',
'hive.compute.query.using.stats': 'true',
'hive.stats.fetch.column.stats': 'true',
'hive.stats.fetch.partition.stats': 'true'
}
)

def _sasl_factory(self):

sasl_client = sasl.Client()
sasl_client.setAttr('host', self.host)
sasl_client.setAttr('service', self.service)
sasl_client.setAttr('maxbufsize', self.max_buf_size)
sasl_client.init()
return sasl_client

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

   // 更新系
def execute(self, query, param=None):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
self.connection.commit()

   // 1件取得
def fetchone(self, query, param=None):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
return cur.fetchone()

   // 全件取得
def fetchall(self, query, param=None):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
return cur.fetchall()

   // 逐次取得
def fetchmany(self, query, param=None, size=1000):
with self.connection.cursor() as cur:
cur.execute(query, parameters=param)
          with True:
rows = cur.fetchmany()
              if not rows:
                  break
for row in rows:
                  yield row

def close(self):
self.connection.close()


if __name__ == '__main__':
host = 'https://xxxx.xxx'
with HiveConnector(host, 10000, 'test_db') as hc:
query = 'SELECT * FROM access_logs WHERE log_date=%s'
result = hc.fetchall(query, ('20210101',))
print(result)