このエントリでは、pysparkで、
MAXMINDのGeoLite2 Databasesを使用してIPアドレスから国を判定する手順を説明します。

MAXMIND GeoLite2 Databases
https://dev.maxmind.com/geoip/geoip2/geolite2/

Databaseは上記のURLから、MAXMINDのアカウントを作成してダウンロードします。
ここでは「GeoLite2-Country-CSV」というCSV形式のデータを利用します。

GeoLite2のデータベースには、
次のように、IPアドレスのネットワークと国コードの対応情報が入っています。
# 「geoname_id」がどの国に対応しているかは、
# 「GeoLite2-Country-Locations-ja.csv」で確認できます。

$ head -5 GeoLite2-Country-Blocks-IPv4.csv 
network,geoname_id,registered_country_geoname_id,represented_country_geoname_id,is_anonymous_proxy,is_satellite_provider
1.0.0.0/24,2077456,2077456,,0,0
1.0.1.0/24,1814991,1814991,,0,0
1.0.2.0/23,1814991,1814991,,0,0
1.0.4.0/22,2077456,2077456,,0,0

$ head -5 GeoLite2-Country-Locations-ja.csv
geoname_id,locale_code,continent_code,continent_name,country_iso_code,country_name,is_in_european_union
49518,ja,AF,"アフリカ",RW,"ルワンダ共和国",0
51537,ja,AF,"アフリカ",SO,"ソマリア",0
69543,ja,AS,"アジア",YE,"イエメン共和国",0
99237,ja,AS,"アジア",IQ,"イラク共和国",0

これらの情報を使い、次の流れでIPアドレスから国を判定させます。

  • IPアドレス範囲と国を対応づけるDataFrameを作成する
  • 対象データに、対応づけDataFrameを結合して国を判定する

IPアドレス範囲と国を対応づけるDataFrameを作成する

GeoLite2-Country-Blocks-IPv4.csv と GeoLite2-Country-Locations-ja.csv を
「geoname_id」を使って結合すると、ネットワークと国コード・国名の対応表を作ることが出来ます。

但し、ネットーワークが 1.0.0.0/24 のような表現となっており、
IPアドレスの値と結合を考えると、扱いにくいので、Long型の値に変換します。

この処理を書いたサンプルコードを以下に示します。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
import os

spark = SparkSession.builder.appName('local').getOrCreate()
cwd = os.getcwd()

# MAXMIND GeoLite2 CSVファイルの読み込み
df_geo_block = spark.read.format("csv").option("header", "true") \
  .load("{}/GeoLite2-Country-Blocks-IPv4.csv".format(cwd))
df_geo_name = spark.read.format("csv").option("header", "true") \
  .load("{}/GeoLite2-Country-Locations-ja.csv".format(cwd))

# Blocks, Locations の結合
df_geoip = df_geo_block.join(df_geo_name, "geoname_id") \
  .select("network", "country_iso_code", "country_name")

# IPアドレスをLongに変換する処理の定義
ip_to_long = F.split(F.col("ip"), "\.")[0].astype(LongType()) * 256 * 256 * 256 \
             + F.split(F.col("ip"), "\.")[1].astype(LongType()) * 256 * 256 \
             + F.split(F.col("ip"), "\.")[2].astype(LongType()) * 256 \
             + F.split(F.col("ip"), "\.")[3].astype(LongType())

# IPアドレスの範囲と国コードを対応づける DataFrameを作成
df_georange = df_geoip.withColumn("ip", F.split(F.col("network"), "/")[0]) \
  .withColumn("s", ip_to_long) \
  .withColumn("e", F.col("s")
              + F.pow(2, F.lit(32) - F.split(F.col("network"), "/")[1].astype(LongType()))
              .astype(LongType())
              - 1) \
  .select("s", "e", "country_iso_code", "country_name")

# IPアドレスの範囲と国コードを対応づける DataFrameを保存
df_georange.write.mode("overwrite").parquet("{}/geolite2-iprange-country-ja.csv".format(cwd))

df_georange.show()

これを実行すると、以下のようなDataFrameが作成されます。
「s」「e」カラムが、IPアドレスの範囲を示しています。

$ python geolite2_prepare.py 
+--------+--------+----------------+--------------+                             
|       s|       e|country_iso_code|  country_name|
+--------+--------+----------------+--------------+
|16777216|16777471|              AU|オーストラリア|
|16777472|16777727|              CN|          中国|

対象データに、対応づけDataFrameを結合して国を判定する

ここまで準備が出来たら、
後はIPアドレスの値と、前項で作成したDataFrameを結合するだけです。

以下にサンプルコードを示します。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
import os

spark = SparkSession.builder.appName('local').getOrCreate()
cwd = os.getcwd()

# IPアドレスの範囲と国コードを対応づける DataFrameを読み込み
df_georange = spark.read.parquet("{}/geolite2-iprange-country-ja.csv".format(cwd))

# IPアドレスをLongに変換する処理の定義
ip_to_long = F.split(F.col("ip"), "\.")[0].astype(LongType()) * 256 * 256 * 256 \
             + F.split(F.col("ip"), "\.")[1].astype(LongType()) * 256 * 256 \
             + F.split(F.col("ip"), "\.")[2].astype(LongType()) * 256 \
             + F.split(F.col("ip"), "\.")[3].astype(LongType())

# IPアドレスのサンプルデータを準備
df_access_log = spark.createDataFrame([
  ["user_a", "1.2.3.4"],
  ["user_b", "192.168.0.1"],
], 'user ip'.split(' '))

# IPアドレスから国を判定
df_access_log.withColumn("ip_long", ip_to_long) \
  .alias("base") \
  .join(df_georange.alias("geoip"),
        F.col("base.ip_long").between(F.col("geoip.s"), F.col("geoip.e")),
        "left") \
  .select("user", "ip", "country_iso_code", "country_name").show()

実行結果は、以下のようになります。

$ python geolite2_sample.py 
+------+-----------+----------------+------------+                              
|  user|         ip|country_iso_code|country_name|
+------+-----------+----------------+------------+
|user_a|    1.2.3.4|              RU|      ロシア|
|user_b|192.168.0.1|            null|        null|
+------+-----------+----------------+------------+