このエントリでは、
DatabricksのPython上で、ファイル削除を高速化する方法を紹介します。

Databricksでのファイル一覧の取得・削除の高速化は、
以下のKnowledge Baseに手法が書いてありますが、
この方法は、Scalaでは可能ですが、
PythonのNotebookでは実施できないようです。

How to list and delete files faster in Databricks | Databricks Knowledge Base
https://kb.databricks.com/data/list-delete-files-faster.html

上記の手法の説明を読んでみると、
ファイル一覧をSparkのDataframeに変換して、
Sparkの機能で並列に削除を実行していることが分かります。

Sparkの機能を用いなくても、
削除の並列実行は、Pythonのthreadingでもよいので、
以下のようなコードで削除の高速化が可能です。

# 削除対象のデータを作る
def create_data(path: str) -> None:
  df = spark.createDataFrame([[i, "data-{}".format(i)] for i in range(1, 100)], ['id', 'data'])
  df.write.mode("overwrite").partitionBy("id").parquet(path)

# 並列削除関数, parallelに最大並列数を指定
def rm_parallel(root: str, parallel: int = 40) -> None:
  import threading

  def rm_worker(path: str):
    dbutils.fs.rm(path, recurse=True)

  files = [f.path for f in dbutils.fs.ls(root)]
  for idx in range(0, len(files), parallel):
    ths = [threading.Thread(target=rm_worker, kwargs={'path': f}) for f in files[idx:idx + parallel]]
    for t in ths:
      t.start()
    for t in ths:
      t.join()

# メインの処理
import time
data_path = 'dbfs:///tmp/delete_data'

# 最大並列数 40で削除を実施
create_data(data_path)
start = time.time()
rm_parallel(data_path)
print(time.time() - start)

# 最大並列数 20で削除を実施
create_data(data_path)
start = time.time()
rm_parallel(data_path, parallel=20)
print(time.time() - start)

# 並列処理無しで、削除を実施
create_data(data_path)
start = time.time()
dbutils.fs.rm(data_path, recurse=True)
print(time.time() - start)

上記のコードで定義しているrm_parallel関数で、
並列に削除を実行する処理を実装しています。

簡単に処理時間を比較してみると、以下のような結果になりました。
# このテストコードでは、
# サブディレクトリのサイズに偏りが無いので、
# 理想的過ぎる速度改善の効果が出ている感はありますが。

  • 並列無し: 11.5秒
  • 20並列: 2.9秒
  • 40並列: 2.4秒

以上。