DatabricksのPythonからのファイル削除を高速化する
このエントリでは、
DatabricksのPython上で、ファイル削除を高速化する方法を紹介します。
Databricksでのファイル一覧の取得・削除の高速化は、
以下のKnowledge Baseに手法が書いてありますが、
この方法は、Scalaでは可能ですが、
PythonのNotebookでは実施できないようです。
How to list and delete files faster in Databricks | Databricks Knowledge Base
https://kb.databricks.com/data/list-delete-files-faster.html
上記の手法の説明を読んでみると、
ファイル一覧をSparkのDataframeに変換して、
Sparkの機能で並列に削除を実行していることが分かります。
Sparkの機能を用いなくても、
削除の並列実行は、Pythonのthreadingでもよいので、
以下のようなコードで削除の高速化が可能です。
# 削除対象のデータを作る
def create_data(path: str) -> None:
df = spark.createDataFrame([[i, "data-{}".format(i)] for i in range(1, 100)], ['id', 'data'])
df.write.mode("overwrite").partitionBy("id").parquet(path)
# 並列削除関数, parallelに最大並列数を指定
def rm_parallel(root: str, parallel: int = 40) -> None:
import threading
def rm_worker(path: str):
dbutils.fs.rm(path, recurse=True)
files = [f.path for f in dbutils.fs.ls(root)]
for idx in range(0, len(files), parallel):
ths = [threading.Thread(target=rm_worker, kwargs={'path': f}) for f in files[idx:idx + parallel]]
for t in ths:
t.start()
for t in ths:
t.join()
# メインの処理
import time
data_path = 'dbfs:///tmp/delete_data'
# 最大並列数 40で削除を実施
create_data(data_path)
start = time.time()
rm_parallel(data_path)
print(time.time() - start)
# 最大並列数 20で削除を実施
create_data(data_path)
start = time.time()
rm_parallel(data_path, parallel=20)
print(time.time() - start)
# 並列処理無しで、削除を実施
create_data(data_path)
start = time.time()
dbutils.fs.rm(data_path, recurse=True)
print(time.time() - start)
上記のコードで定義しているrm_parallel
関数で、
並列に削除を実行する処理を実装しています。
簡単に処理時間を比較してみると、以下のような結果になりました。
# このテストコードでは、
# サブディレクトリのサイズに偏りが無いので、
# 理想的過ぎる速度改善の効果が出ている感はありますが。
- 並列無し: 11.5秒
- 20並列: 2.9秒
- 40並列: 2.4秒
以上。