motoを使ってPySparkでS3にアクセスする単体テストを書く
このエントリでは、
motoを使って、PySparkでS3にアクセスする単体テストを書く手順をまとめます。
単体テストはpytestを使います。
以下のGitHub Issueを参考にしています。
Accessing Mocked S3 Bucket via Pyspark? | spulec/moto | GitHub
https://github.com/spulec/moto/issues/1543
Pyspark環境のセットアップ
まずは、PySparkを動かすための環境を作ります。
作業用のディレクトリを作ります。
mkdir study && cd $_
venvでPythonの仮想環境を作ります。
python -m venv venv
. venv/bin/activate
ApacheSparkをダウンロード、展開します。
以下のサイトから、使用するVersionを選びます。
(ここでは、Spark3.3.0, Hadoop3.3を使います)
Apache Spark
https://spark.apache.org/
mkdir .apache-spark
wget https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
tar zxf spark-3.3.0-bin-hadoop3.tgz -C .apache-spark
rm spark-3.3.0-bin-hadoop3.tgz
環境変数SPARK_HOMEを指定します。
export SPARK_HOME=$(pwd)/.apache-spark/spark-3.3.0-bin-hadoop3
SPARKのライブラリをPythonの仮想環境に追加します。
ls $SPARK_HOME/python/lib/*.zip > venv/lib/python3.10/site-packages/pyspark.pth
セットアップ確認のため、適当なPySparkを利用したコードを用意します。
target.py
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([["val1"]], ["col1"])
df.show()
実行すると、以下のように表示されます。
$ python target.py
※途中省略※
+----+
|col1|
+----+
|val1|
+----+
pytest, motoのセットアップ
pytest, motoをセットアップします。
pip install --upgrade pip
pip install pytest
pip install "moto[server]"
pip install mock
motoはServerModeを利用するのでmoto[server]
を指定します。
Non-Python SDK’s / Server Mode | Moto
http://docs.getmoto.org/en/latest/docs/server_mode.html
テスト対象のコード
テスト対象のコードを追加します。
ここでは、output_csvという関数をテスト対象とします。
output.csvは、
入力として指定したDataFrameを、S3にCSV形式で出力します。
target.py
import urllib.parse
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
def output_csv(df: DataFrame, s3uri: str, s3uri_tmp: str) -> None:
o_out = urllib.parse.urlparse(s3uri)
out_bucket = o_out.hostname
out_object = o_out.path[1:]
o_tmp = urllib.parse.urlparse(s3uri_tmp)
tmp_bucket = o_tmp.hostname
tmp_prefix = o_tmp.path[1:]
df.coalesce(1).write.format('csv').mode('overwrite').save(s3uri_tmp)
s3cli = boto3.client("s3")
resp = s3cli.list_objects_v2(
Bucket=tmp_bucket,
Prefix=tmp_prefix,
)
keys = [c['Key']
for c in resp['Contents']
if str(c['Key']).endswith('.csv')]
s3cli.copy_object(
CopySource={"Bucket": tmp_bucket, "Key": keys[0]},
Bucket=out_bucket, Key=out_object
)
if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([["val1"]], ["col1"])
output_csv(df, "s3://bucket-a/output.csv", "s3://bucket-a/tmp")
テストコード
テストコードを追加します。
moto_sv
でmotoをservermodeで起動し、
spark
で、motoに接続したSparkSessionを作ります。
テスト関数に渡された fixture spark
を使うと、
s3
schemaでmotoのservermodeで実行しているS3にアクセス出来ます。
また、boto3からもmotoのservermodeにアクセスするために、
boto3patch
でboto3 clientのendpointを変更しています。
import pytest
import boto3
from moto.server import ThreadedMotoServer
from pyspark.sql import SparkSession
from contextlib import closing
import socket
from mock import patch
from target import output_csv
@pytest.fixture(scope="session")
def moto_sv():
"""
motoをservermodeで起動する
"""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(('', 0))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port = sock.getsockname()[1]
server = ThreadedMotoServer(port=port)
server.start()
yield f"http://localhost:{port}"
server.stop()
@pytest.fixture(scope="session")
def spark(moto_sv) -> SparkSession:
"""
servermodeで起動したmotoのs3にアクセスできるspark sessionを作る
"""
spark = SparkSession.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.3") \
.config("spec.deps.repositories", ["https://repos.spark-packages.org"]) \
.getOrCreate()
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", "dummy")
hadoop_conf.set("fs.s3a.secret.key", "dummy")
hadoop_conf.set("fs.s3a.endpoint", moto_sv)
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
return spark
@pytest.fixture
def boto3patch(moto_sv):
"""
boto3を利用する時、servermodeのmotoをendpointに指定する
"""
def boto3_client(*args, **kwargs):
from boto3 import _get_default_session
kwargs["endpoint_url"] = moto_sv
return _get_default_session().client(*args, **kwargs)
with patch("boto3.client", boto3_client):
yield
def test_output_csv(boto3patch, spark):
# prepare bucket
s3cli = boto3.client("s3")
s3cli.create_bucket(Bucket="test", CreateBucketConfiguration={'LocationConstraint': "test"})
# create input
df = spark.createDataFrame([["val1"]], ["col1"])
# call
output_csv(df, "s3://test/out.csv", "s3://test/tmp")
# validate
obj = s3cli.get_object(Bucket="test", Key="out.csv")
assert obj["Body"].read().decode("utf-8").strip().split("\n") == ["val1"]
テストは、以下のコマンドで実行できます。
pytest tests/test_target.py
テストを並列に実行する場合は、servermodeのS3の状態に気をつけてください。
以上。。