このエントリでは、
motoを使って、PySparkでS3にアクセスする単体テストを書く手順をまとめます。
単体テストはpytestを使います。

以下のGitHub Issueを参考にしています。

Accessing Mocked S3 Bucket via Pyspark? | spulec/moto | GitHub
https://github.com/spulec/moto/issues/1543

Pyspark環境のセットアップ

まずは、PySparkを動かすための環境を作ります。

作業用のディレクトリを作ります。

mkdir study && cd $_

venvでPythonの仮想環境を作ります。

python -m venv venv
. venv/bin/activate

ApacheSparkをダウンロード、展開します。
以下のサイトから、使用するVersionを選びます。
(ここでは、Spark3.3.0, Hadoop3.3を使います)

Apache Spark
https://spark.apache.org/

mkdir .apache-spark
wget https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
tar zxf spark-3.3.0-bin-hadoop3.tgz -C .apache-spark
rm spark-3.3.0-bin-hadoop3.tgz

環境変数SPARK_HOMEを指定します。

export SPARK_HOME=$(pwd)/.apache-spark/spark-3.3.0-bin-hadoop3

SPARKのライブラリをPythonの仮想環境に追加します。

ls $SPARK_HOME/python/lib/*.zip > venv/lib/python3.10/site-packages/pyspark.pth

セットアップ確認のため、適当なPySparkを利用したコードを用意します。

target.py

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame([["val1"]], ["col1"])
    df.show()

実行すると、以下のように表示されます。

$ python target.py
※途中省略※
+----+
|col1|
+----+
|val1|
+----+

pytest, motoのセットアップ

pytest, motoをセットアップします。

pip install --upgrade pip
pip install pytest
pip install "moto[server]"
pip install mock

motoはServerModeを利用するのでmoto[server]を指定します。

Non-Python SDK’s / Server Mode | Moto
http://docs.getmoto.org/en/latest/docs/server_mode.html

テスト対象のコード

テスト対象のコードを追加します。
ここでは、output_csvという関数をテスト対象とします。

output.csvは、
入力として指定したDataFrameを、S3にCSV形式で出力します。

target.py

import urllib.parse
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame


def output_csv(df: DataFrame, s3uri: str, s3uri_tmp: str) -> None:
    o_out = urllib.parse.urlparse(s3uri)
    out_bucket = o_out.hostname
    out_object = o_out.path[1:]

    o_tmp = urllib.parse.urlparse(s3uri_tmp)
    tmp_bucket = o_tmp.hostname
    tmp_prefix = o_tmp.path[1:]

    df.coalesce(1).write.format('csv').mode('overwrite').save(s3uri_tmp)

    s3cli = boto3.client("s3")
    resp = s3cli.list_objects_v2(
        Bucket=tmp_bucket,
        Prefix=tmp_prefix,
    )
    keys = [c['Key']
            for c in resp['Contents']
            if str(c['Key']).endswith('.csv')]
    s3cli.copy_object(
        CopySource={"Bucket": tmp_bucket, "Key": keys[0]},
        Bucket=out_bucket, Key=out_object
    )


if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame([["val1"]], ["col1"])
    output_csv(df, "s3://bucket-a/output.csv", "s3://bucket-a/tmp")

テストコード

テストコードを追加します。

moto_svでmotoをservermodeで起動し、
sparkで、motoに接続したSparkSessionを作ります。
テスト関数に渡された fixture spark を使うと、
s3 schemaでmotoのservermodeで実行しているS3にアクセス出来ます。

また、boto3からもmotoのservermodeにアクセスするために、 boto3patchでboto3 clientのendpointを変更しています。

import pytest
import boto3
from moto.server import ThreadedMotoServer
from pyspark.sql import SparkSession
from contextlib import closing
import socket
from mock import patch
from target import output_csv


@pytest.fixture(scope="session")
def moto_sv():
    """
    motoをservermodeで起動する
    """
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
        sock.bind(('', 0))
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        port = sock.getsockname()[1]

    server = ThreadedMotoServer(port=port)
    server.start()
    yield f"http://localhost:{port}"
    server.stop()


@pytest.fixture(scope="session")
def spark(moto_sv) -> SparkSession:
    """
    servermodeで起動したmotoのs3にアクセスできるspark sessionを作る
    """
    spark = SparkSession.builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.3") \
        .config("spec.deps.repositories", ["https://repos.spark-packages.org"]) \
        .getOrCreate()
    hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hadoop_conf.set("fs.s3a.access.key", "dummy")
    hadoop_conf.set("fs.s3a.secret.key", "dummy")
    hadoop_conf.set("fs.s3a.endpoint", moto_sv)
    hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
    return spark


@pytest.fixture
def boto3patch(moto_sv):
    """
    boto3を利用する時、servermodeのmotoをendpointに指定する
    """
    def boto3_client(*args, **kwargs):
        from boto3 import _get_default_session
        kwargs["endpoint_url"] = moto_sv
        return _get_default_session().client(*args, **kwargs)
    with patch("boto3.client", boto3_client):
        yield


def test_output_csv(boto3patch, spark):
    # prepare bucket
    s3cli = boto3.client("s3")
    s3cli.create_bucket(Bucket="test", CreateBucketConfiguration={'LocationConstraint': "test"})

    # create input
    df = spark.createDataFrame([["val1"]], ["col1"])

    # call
    output_csv(df, "s3://test/out.csv", "s3://test/tmp")

    # validate
    obj = s3cli.get_object(Bucket="test", Key="out.csv")
    assert obj["Body"].read().decode("utf-8").strip().split("\n") == ["val1"]

テストは、以下のコマンドで実行できます。

pytest tests/test_target.py 

テストを並列に実行する場合は、servermodeのS3の状態に気をつけてください。

以上。。