PythonからRedis serverを立ち上げてアクセスする
Pythonのスクリプトで集計処理などを書いていると、
時々、dictで扱うにはちょっと大きい、と思われる規模の集計が必要になる事があります。
このような時には、Redis等のKey-Value-Storeを使うことになりますが、
別にプロセスを立ち上げるとなると、実行手順が面倒になってしまいます。
そこで、RedisServerの起動・終了をPythonのスクリプトで行う事を考えました。
利用イメージとしては、
単体テスト用に作られているPurePython版Redis Serverのように、
扱えることを目指しました。
rediserver - Pure Python Redis server implementation | GitHub https://github.com/chekart/rediserver
実装
実装したクラスは次の通りです。
import subprocess
import time
import redis
import tempfile
import os
import shutil
from typing import Optional
import socket
from contextlib import closing
class RedisServer(object):
def __init__(
self,
dump_path: str = None,
redis_server_bin: str = None,
port: int = None,
):
"""
:param dump_path: dumpのバックアップファイルパス
:param redis_server_bin: redis-serverのパス
:param port: 利用するポート
"""
self.dump_path: Optional[str] = dump_path
self.redis_server_bin: Optional[str] = redis_server_bin
self.port: Optional[int] = port
self.proc: Optional[subprocess.Popen] = None
self.tmpdir: Optional[tempfile.TemporaryDirectory] = None
self.redis_cli: Optional[redis.Redis] = None
def __enter__(self):
self.start_server()
return self
def __exit__(self, a, b, c):
self.terminate_server()
def start_server(self):
# redis-serverのパス特定
if self.redis_server_bin is None:
which_redis = subprocess.run(["which", "redis-server"], capture_output=True)
self.redis_server_bin = which_redis.stdout.decode('utf-8').strip()
# 空きポートを探す
if self.port is None:
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(('', 0))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.port = sock.getsockname()[1]
# backupからdbをrestore
self.tmpdir = tempfile.TemporaryDirectory()
if self.dump_path is not None and os.path.exists(self.dump_path):
db_file = '{}/dump.rdb'.format(self.tmpdir.name)
shutil.copyfile(self.dump_path, db_file)
print('restore db from {}'.format(self.dump_path))
# redis-serverの起動
self.proc = subprocess.Popen([self.redis_server_bin, '--dir', self.tmpdir.name, '--port', str(self.port)])
# 起動出来ていなければ開始失敗
time.sleep(2)
if self.proc.poll() is not None:
raise Exception("fail to start redis-server")
# ping出来なければ開始失敗
self.redis_cli = redis.Redis(host='localhost', port=self.port, db=0)
if not self.redis_cli.ping():
self.terminate_server()
raise Exception("fail to ping redis-server")
def terminate_server(self):
self.proc.kill()
self.tmpdir.cleanup()
def get_cli(self) -> redis.Redis:
return self.redis_cli
def backup(self) -> None:
if self.redis_cli is not None and self.tmpdir is not None and self.dump_path is not None:
self.redis_cli.save()
db_file = '{}/dump.rdb'.format(self.tmpdir.name)
shutil.copyfile(db_file, self.dump_path)
前提として、redis-server・redis-pyのインストールが必要です。
redis-serverは、homebrewを使うなら次のように。
$ brew install redis
redis-pyは、pipを使って次のように。
$ pip install redis
使い方
基本的な使い方は、次の例のようになります。
start_server()で起動、
terminate_server()で終了、
get_cli()でredis-pyのRedisオブジェクトが取得出来ます。
redis_sv = RedisServer()
redis_sv.start_server()
redis_cli = redis_sv.get_cli()
redis_cli.set('x', 1)
redis_cli.set('y', 2)
print(redis_cli.get('x').decode())
redis_sv.terminate_server()
with句を使って、次のように書くことも出来ます。
with RedisServer() as redis_sv:
redis_cli = redis_sv.get_cli()
redis_cli.set('x', 1)
redis_cli.set('y', 2)
print(redis_cli.get('x').decode())
次のように、
dump_pathパラメータを指定して、backup()を呼び出すと。
RedisのDBをファイルに出力して永続化することが出来ます。
with RedisServer(dump_path='backup_dump.rdb') as redis_sv:
redis_cli = redis_sv.get_cli()
redis_cli.set('x', 1)
redis_cli.set('y', 2)
redis_sv.backup()
with RedisServer(dump_path='backup_dump.rdb') as redis_sv:
redis_cli = redis_sv.get_cli()
print(redis_cli.get('x').decode())
利用例
最後に、このRedisServerクラスを使って、
シェイクスピアの「リア王」のWordCountをやってみます。
gutenbergからテキストを取得します。
$ curl -L http://www.gutenberg.org/cache/epub/2266/pg2266.txt > pg2266.txt
RedisのINCRを使って、出現した単語を数え上げていきます。
with RedisServer(dump_path='king_lear_dump.rdb') as redis_sv, open('./pg2266.txt') as fr:
redis_cli = redis_sv.get_cli()
while True:
ln = fr.readline()
if len(ln) == 0:
break
words = [w.strip(',').strip('.').lower() for w in ln.strip().split(' ')]
for w in words:
if len(w) > 0:
redis_cli.incr(w, 1)
for k in redis_cli.keys():
print(k.decode(), redis_cli.get(k).decode())
redis_sv.backup()
こういうクラスさえ作ってしまえば、
redis-serverコマンドへのパスさえ通っていれば、
もろもろの環境を気にせず、同じPythonスクリプトを動かせるので、便利ですね。