IntelliJ IDEAを用いたAWS GlueのJob開発環境(Python版)の構築メモ
IntelliJ IDEA(or PyCharm)を用いた、
ローカルPCでのAWS GlueのJobの開発環境の構築メモです。
環境の作り方は、以下の公式ドキュメントに記載があるのですが、
ある程度pysparkの知識が無いと、
公式の説明だけで環境構築するのは無理があると思います。
Developing Locally with Python | AWS Glue | AWS Documentation https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-libraries.html#develop-local-python
前提
前提として、JavaとPythonはセットアップ済みとします。
ここでは、以下のVersionを利用しています。
- Java 1.8.0_281
- Python 3.7.9
ちなみにOSは、macOS Big Surです。
必要なライブラリのダウンロード
まずは必要なライブラリのダウンロードから始めます。
AWS Glueの開発環境用ディレクトリを作ります。
mkdir ~/.aws-glue && cd $_
Apache Mavenをダウンロード&展開します。
wget https://aws-glue-etl-artifacts.s3.amazonaws.com/glue-common/apache-maven-3.6.0-bin.tar.gz
tar zxf apache-maven-3.6.0-bin.tar.gz
Apache Sparkをダウンロード&展開します。
wget https://aws-glue-etl-artifacts.s3.amazonaws.com/glue-1.0/spark-2.4.3-bin-hadoop2.8.tgz
tar zxf spark-2.4.3-bin-hadoop2.8.tgz
AWS Glueのライブラリをcloneします。(ここではGlue 1.0以上向けを利用します)
git clone git@github.com:awslabs/aws-glue-libs.git -b glue-1.0
SPARKの環境変数の設定、Mavenへのパスを通しておきます。
export PATH=`pwd`/apache-maven-3.6.0/bin:$PATH
export SPARK_HOME=`pwd`/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8
AWS Glueに必要なjarをダウンロードします。
sh aws-glue-libs/bin/glue-setup.sh
Sparkの環境変数を設定
Sparkを起動する際の環境変数を設定しておきます。
設定ファイルを準備(テンプレートをコピー)します。
cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
AWS GlueのjarにCLASSPATHを通します。
$SPARK_HOME/conf/spark-env.sh
の末尾に以下を追記します。
export SPARK_DIST_CLASSPATH=$(printf %s: $HOME/.aws-glue/aws-glue-libs/jarsv1/*.jar)
複数のversionのJavaがインストールされている場合は、
以下の内容も追記して、versinoを固定しておきます。
この記載方法はmacOSの場合なので、環境に合わせて書き換えて下さい。
export JAVA_HOME=`/usr/libexec/java_home -v 1.8.0`
(2021.06.03 追記)
開発用にGlueJobのログレベルをDEBUGにしておきます。
設定ファイルを準備(テンプレートをコピー)します。
cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
$SPARK_HOME/conf/log4j.properties
の末尾に以下を追記します。
log4j.logger.com.amazonaws.services.glue.log.GlueLogger=DEBUG
(2021.06.03 追記 ここまで)
Python仮想環境の作成
AWS Glue開発環境用に、Pythonの仮想環境を作っておきます。
/Library/Frameworks/Python.framework/Versions/3.7/bin/python3.7
は、
macOSの場合なので、環境にあわせてPython3.7の実行ファイルを指定して下さい。
/Library/Frameworks/Python.framework/Versions/3.7/bin/python3.7 -m venv venv
source venv/bin/activate
仮想環境にPySpark、AWS Glueのライブラリへのパスを追加します。
ls $SPARK_HOME/python/lib/*.zip > venv/lib/python3.7/site-packages/pyspark.pth
echo $HOME/.aws-glue/aws-glue-libs > venv/lib/python3.7/site-packages/awsglue.pth
(2021.06.02 追記)
AWS Glueで提供されているライブラリをインストールします。
docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01
docker run -it amazon/aws-glue-libs:glue_libs_1.0.0_image_01 pip freeze | grep -v pyspark > requirements.txt
pip install -r requirements.txt
Linux(x86)環境では無い場合は、以下のようにして代替します。
cat requirements.txt | sed -e 's/.* @ http.*libs\/\([^-]*\)-\([0-9.]*[0-9]\).*/\1==\2/' > requirements-nonlinux.txt
pip install -r requirements-nonlinux.txt
# 以下のページに提供ライブラリが書いてるのですが。
# 実環境では、記載の無いライブラリも入っているので、
# dockerイメージからリストを作っています
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-libraries.html
(2021.06.02 追記 ここまで)
IntelliJ IDEA(or PyCharm)でコード補完出来るように、
対応するversionの、stubも追加しておきます。
pip install pyspark-stubs==2.4.0.post9 --no-deps
AWS Glueの開発環境の動作確認
前項までで、AWS Glueの開発環境の作成は完了したので、
pyspark-shellを動かしてみます。
SPARKの環境変数を指定、Python仮想環境を有効化します。
export SPARK_HOME=$HOME/.aws-glue/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8
source $HOME/.aws-glue/venv/bin/activate
以下のコマンドで、pyspark-shellが起動します。
$HOME/.aws-glue/aws-glue-libs/bin/gluepyspark
以下のような入力すると、GlueContextが利用できます。
>>> from awsglue.context import GlueContext
>>> glueContext = GlueContext(sc)
IntelliJ IDEA(or PyCharm)の設定
作成したAWS Glueの開発環境を利用するよう、IntelliJ IDEAを設定します。
IntelliJ IDEAでプロジェクトを作成します。
File → Project Structure → ProjectSDK → New から 「Python SDK」を選択します。
Virtualenv Environment を選び、
Existing environmentで、$HOME/.aws-glue/venv/bin/python
を指定します。
# PyCharmの場合はメニューの位置が少し異なっていて、
# File → Project Structure ではなく、Preferences → Project 配下です。
Run → Edit Configuration → Python Test → Unittests を選択します。
Environment Variablesを選択し、
環境変数「SPARK_HOME」に$HOME/.aws-glue/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8
を指定します。
ローカル環境でのJobのテスト
ローカル環境でAWS GlueのJobをテストしてみます。
例えば、次のように、
Jobのコードsample.py
と、
それに対するテストコードtests/test_sample.py
を作成します。
sample.py
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from awsglue import DynamicFrame
from awsglue.transforms import Filter
def transform_process(dyf: DynamicFrame) -> DynamicFrame:
return Filter.apply(frame=dyf, f=lambda x: x["id"] > 2)
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
glueContext: GlueContext = GlueContext(spark.sparkContext)
logger = glueContext.get_logger()
logger.info("---- start sample")
# テーブルからデータ取得
dyf = glueContext.create_dynamic_frame.from_catalog(database='sampledb', table_name='sampletbl')
# データの加工
dyf_filtered = transform_process(dyf)
# データの出力: 実際には dyf_filtered.write(...
logger.info("---- end sample")
tests/test_sample.py
import unittest
from sample import transform_process
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from awsglue import DynamicFrame
from pyspark.sql import functions as F
class TestSample(unittest.TestCase):
def test_transform_process(self):
spark = SparkSession.builder.getOrCreate()
glueContext: GlueContext = GlueContext(spark.sparkContext)
df = spark.createDataFrame([(1, "one"), (2, "two"), (3, "three"), (4, "four")], ["id", "body"])
dyf = DynamicFrame.fromDF(df, glueContext, "dyf")
dyf_filtered = transform_process(dyf)
dyf_filtered.show()
assert dyf_filtered.toDF().where(F.col('id') <= 2).count() == 0
assert dyf_filtered.toDF().where(F.col('id') > 2).count() > 0
この例では、
ETL処理で主に単体テストをしたいものは、変換処理になると思うので、
変換処理をtransform_process
という関数にしています。
(この例はid > 2
の条件でフィルターしているだけですが)
実際のJobでは、
Glue Catalogからデータを取得して、変換処理を呼び出し、出力という流れに対して、
テストコードでは、
入力データは手動で作成して、変換処理を呼び出し、その返却値を検証
という流れでテストをしています。
このテストコードは、IntelliJのUIから実行出来ます。
もし、次のようなエラーが出た場合は、
Sparkから呼ばれるPythonが正しくない可能性があります。
TypeError: an integer is required (got type bytes)
呼び出すPythonを決めるため、次のように環境変数を指定します。
環境変数「SPARK_HOME」を設定したときと同様に、
export PYSPARK_PYTHON=$HOME/.aws-glue/venv/bin/python
export PYSPARK_DRIVER_PYTHON=$HOME/.aws-glue/venv/bin/python
以上。