pysparkで、MAXMIND GeoLite2 Databasesを使用してIPアドレスから国を判定する手順
このエントリでは、pysparkで、
MAXMINDのGeoLite2 Databasesを使用してIPアドレスから国を判定する手順を説明します。
MAXMIND GeoLite2 Databases
https://dev.maxmind.com/geoip/geoip2/geolite2/
Databaseは上記のURLから、MAXMINDのアカウントを作成してダウンロードします。
ここでは「GeoLite2-Country-CSV」というCSV形式のデータを利用します。
GeoLite2のデータベースには、
次のように、IPアドレスのネットワークと国コードの対応情報が入っています。
# 「geoname_id」がどの国に対応しているかは、
# 「GeoLite2-Country-Locations-ja.csv」で確認できます。
$ head -5 GeoLite2-Country-Blocks-IPv4.csv
network,geoname_id,registered_country_geoname_id,represented_country_geoname_id,is_anonymous_proxy,is_satellite_provider
1.0.0.0/24,2077456,2077456,,0,0
1.0.1.0/24,1814991,1814991,,0,0
1.0.2.0/23,1814991,1814991,,0,0
1.0.4.0/22,2077456,2077456,,0,0
$ head -5 GeoLite2-Country-Locations-ja.csv
geoname_id,locale_code,continent_code,continent_name,country_iso_code,country_name,is_in_european_union
49518,ja,AF,"アフリカ",RW,"ルワンダ共和国",0
51537,ja,AF,"アフリカ",SO,"ソマリア",0
69543,ja,AS,"アジア",YE,"イエメン共和国",0
99237,ja,AS,"アジア",IQ,"イラク共和国",0
これらの情報を使い、次の流れでIPアドレスから国を判定させます。
- IPアドレス範囲と国を対応づけるDataFrameを作成する
- 対象データに、対応づけDataFrameを結合して国を判定する
IPアドレス範囲と国を対応づけるDataFrameを作成する
GeoLite2-Country-Blocks-IPv4.csv と GeoLite2-Country-Locations-ja.csv を
「geoname_id」を使って結合すると、ネットワークと国コード・国名の対応表を作ることが出来ます。
但し、ネットーワークが 1.0.0.0/24
のような表現となっており、
IPアドレスの値と結合を考えると、扱いにくいので、Long型の値に変換します。
この処理を書いたサンプルコードを以下に示します。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
import os
spark = SparkSession.builder.appName('local').getOrCreate()
cwd = os.getcwd()
# MAXMIND GeoLite2 CSVファイルの読み込み
df_geo_block = spark.read.format("csv").option("header", "true") \
.load("{}/GeoLite2-Country-Blocks-IPv4.csv".format(cwd))
df_geo_name = spark.read.format("csv").option("header", "true") \
.load("{}/GeoLite2-Country-Locations-ja.csv".format(cwd))
# Blocks, Locations の結合
df_geoip = df_geo_block.join(df_geo_name, "geoname_id") \
.select("network", "country_iso_code", "country_name")
# IPアドレスをLongに変換する処理の定義
ip_to_long = F.split(F.col("ip"), "\.")[0].astype(LongType()) * 256 * 256 * 256 \
+ F.split(F.col("ip"), "\.")[1].astype(LongType()) * 256 * 256 \
+ F.split(F.col("ip"), "\.")[2].astype(LongType()) * 256 \
+ F.split(F.col("ip"), "\.")[3].astype(LongType())
# IPアドレスの範囲と国コードを対応づける DataFrameを作成
df_georange = df_geoip.withColumn("ip", F.split(F.col("network"), "/")[0]) \
.withColumn("s", ip_to_long) \
.withColumn("e", F.col("s")
+ F.pow(2, F.lit(32) - F.split(F.col("network"), "/")[1].astype(LongType()))
.astype(LongType())
- 1) \
.select("s", "e", "country_iso_code", "country_name")
# IPアドレスの範囲と国コードを対応づける DataFrameを保存
df_georange.write.mode("overwrite").parquet("{}/geolite2-iprange-country-ja.csv".format(cwd))
df_georange.show()
これを実行すると、以下のようなDataFrameが作成されます。
「s」「e」カラムが、IPアドレスの範囲を示しています。
$ python geolite2_prepare.py
+--------+--------+----------------+--------------+
| s| e|country_iso_code| country_name|
+--------+--------+----------------+--------------+
|16777216|16777471| AU|オーストラリア|
|16777472|16777727| CN| 中国|
対象データに、対応づけDataFrameを結合して国を判定する
ここまで準備が出来たら、
後はIPアドレスの値と、前項で作成したDataFrameを結合するだけです。
以下にサンプルコードを示します。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
import os
spark = SparkSession.builder.appName('local').getOrCreate()
cwd = os.getcwd()
# IPアドレスの範囲と国コードを対応づける DataFrameを読み込み
df_georange = spark.read.parquet("{}/geolite2-iprange-country-ja.csv".format(cwd))
# IPアドレスをLongに変換する処理の定義
ip_to_long = F.split(F.col("ip"), "\.")[0].astype(LongType()) * 256 * 256 * 256 \
+ F.split(F.col("ip"), "\.")[1].astype(LongType()) * 256 * 256 \
+ F.split(F.col("ip"), "\.")[2].astype(LongType()) * 256 \
+ F.split(F.col("ip"), "\.")[3].astype(LongType())
# IPアドレスのサンプルデータを準備
df_access_log = spark.createDataFrame([
["user_a", "1.2.3.4"],
["user_b", "192.168.0.1"],
], 'user ip'.split(' '))
# IPアドレスから国を判定
df_access_log.withColumn("ip_long", ip_to_long) \
.alias("base") \
.join(df_georange.alias("geoip"),
F.col("base.ip_long").between(F.col("geoip.s"), F.col("geoip.e")),
"left") \
.select("user", "ip", "country_iso_code", "country_name").show()
実行結果は、以下のようになります。
$ python geolite2_sample.py
+------+-----------+----------------+------------+
| user| ip|country_iso_code|country_name|
+------+-----------+----------------+------------+
|user_a| 1.2.3.4| RU| ロシア|
|user_b|192.168.0.1| null| null|
+------+-----------+----------------+------------+