Pythonのスクリプトで集計処理などを書いていると、
時々、dictで扱うにはちょっと大きい、と思われる規模の集計が必要になる事があります。
このような時には、Redis等のKey-Value-Storeを使うことになりますが、
別にプロセスを立ち上げるとなると、実行手順が面倒になってしまいます。

そこで、RedisServerの起動・終了をPythonのスクリプトで行う事を考えました。

利用イメージとしては、
単体テスト用に作られているPurePython版Redis Serverのように、
扱えることを目指しました。

rediserver - Pure Python Redis server implementation | GitHub https://github.com/chekart/rediserver

実装

実装したクラスは次の通りです。

import subprocess
import time
import redis
import tempfile
import os
import shutil
from typing import Optional
import socket
from contextlib import closing


class RedisServer(object):
    def __init__(
            self,
            dump_path: str = None,
            redis_server_bin: str = None,
            port: int = None,
    ):
        """
        :param dump_path: dumpのバックアップファイルパス
        :param redis_server_bin: redis-serverのパス
        :param port: 利用するポート
        """
        self.dump_path: Optional[str] = dump_path
        self.redis_server_bin: Optional[str] = redis_server_bin
        self.port: Optional[int] = port
        self.proc: Optional[subprocess.Popen] = None
        self.tmpdir: Optional[tempfile.TemporaryDirectory] = None
        self.redis_cli: Optional[redis.Redis] = None

    def __enter__(self):
        self.start_server()
        return self

    def __exit__(self, a, b, c):
        self.terminate_server()

    def start_server(self):
        # redis-serverのパス特定
        if self.redis_server_bin is None:
            which_redis = subprocess.run(["which", "redis-server"], capture_output=True)
            self.redis_server_bin = which_redis.stdout.decode('utf-8').strip()

        # 空きポートを探す
        if self.port is None:
            with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
                sock.bind(('', 0))
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self.port = sock.getsockname()[1]

        # backupからdbをrestore
        self.tmpdir = tempfile.TemporaryDirectory()
        if self.dump_path is not None and os.path.exists(self.dump_path):
            db_file = '{}/dump.rdb'.format(self.tmpdir.name)
            shutil.copyfile(self.dump_path, db_file)
            print('restore db from {}'.format(self.dump_path))

        # redis-serverの起動
        self.proc = subprocess.Popen([self.redis_server_bin, '--dir', self.tmpdir.name, '--port', str(self.port)])

        # 起動出来ていなければ開始失敗
        time.sleep(2)
        if self.proc.poll() is not None:
            raise Exception("fail to start redis-server")

        # ping出来なければ開始失敗
        self.redis_cli = redis.Redis(host='localhost', port=self.port, db=0)
        if not self.redis_cli.ping():
            self.terminate_server()
            raise Exception("fail to ping redis-server")

    def terminate_server(self):
        self.proc.kill()
        self.tmpdir.cleanup()

    def get_cli(self) -> redis.Redis:
        return self.redis_cli

    def backup(self) -> None:
        if self.redis_cli is not None and self.tmpdir is not None and self.dump_path is not None:
            self.redis_cli.save()
            db_file = '{}/dump.rdb'.format(self.tmpdir.name)
            shutil.copyfile(db_file, self.dump_path)

前提として、redis-server・redis-pyのインストールが必要です。

redis-serverは、homebrewを使うなら次のように。

$ brew install redis

redis-pyは、pipを使って次のように。

$ pip install redis

使い方

基本的な使い方は、次の例のようになります。

start_server()で起動、
terminate_server()で終了、
get_cli()でredis-pyのRedisオブジェクトが取得出来ます。

redis_sv = RedisServer()
redis_sv.start_server()
redis_cli = redis_sv.get_cli()
redis_cli.set('x', 1)
redis_cli.set('y', 2)
print(redis_cli.get('x').decode())
redis_sv.terminate_server()

with句を使って、次のように書くことも出来ます。

with RedisServer() as redis_sv:
    redis_cli = redis_sv.get_cli()
    redis_cli.set('x', 1)
    redis_cli.set('y', 2)
    print(redis_cli.get('x').decode())

次のように、
dump_pathパラメータを指定して、backup()を呼び出すと。
RedisのDBをファイルに出力して永続化することが出来ます。

with RedisServer(dump_path='backup_dump.rdb') as redis_sv:
    redis_cli = redis_sv.get_cli()
    redis_cli.set('x', 1)
    redis_cli.set('y', 2)
    redis_sv.backup()

with RedisServer(dump_path='backup_dump.rdb') as redis_sv:
    redis_cli = redis_sv.get_cli()
    print(redis_cli.get('x').decode())

利用例

最後に、このRedisServerクラスを使って、
シェイクスピアの「リア王」のWordCountをやってみます。

gutenbergからテキストを取得します。

$ curl -L http://www.gutenberg.org/cache/epub/2266/pg2266.txt > pg2266.txt

RedisのINCRを使って、出現した単語を数え上げていきます。

with RedisServer(dump_path='king_lear_dump.rdb') as redis_sv, open('./pg2266.txt') as fr:
    redis_cli = redis_sv.get_cli()
    while True:
        ln = fr.readline()
        if len(ln) == 0:
            break
        words = [w.strip(',').strip('.').lower() for w in ln.strip().split(' ')]
        for w in words:
            if len(w) > 0:
                redis_cli.incr(w, 1)
    for k in redis_cli.keys():
        print(k.decode(), redis_cli.get(k).decode())
    redis_sv.backup()

こういうクラスさえ作ってしまえば、
redis-serverコマンドへのパスさえ通っていれば、
もろもろの環境を気にせず、同じPythonスクリプトを動かせるので、便利ですね。