このエントリでは、
CeleryでFastAPIのバックグラウンドジョブを動かす手順をまとめています。

また、動かすプロセスがいくつかあり面倒なので、
Supservisorでまとめて起動できるようにしています。

各プロセスは、次の役割を担います。

  • FastAPI: ジョブを受け付ける
  • Redis: ジョブのキューを管理する
  • Celery: ジョブを起動する

各ツールのドキュメントは以下を参照してください。

FastAPI
https://fastapi.tiangolo.com/ja/

Celery
https://docs.celeryq.dev/

Supervisor: A Process Control System
http://supervisord.org/index.html

Flower
https://flower.readthedocs.io/en/latest/

Flowerはジョブの状態をブラウザで確認するために利用します。

アプリケーションのディレクトリを用意する

次のように、アプリケーションのディレクトリを用意します。

mkdir celery_study && cd $_
python -m venv venv
. venv/bin/activate

Redisを入れる

Redisをインストールします。
この例は、UbuntuLinuxの場合。

sudo apt-get install redis

起動できることを確認します。

/usr/bin/redis-server

FastAPIを動かす

FastAPIとgunicornをインストールします。

pip install fastapi
pip install gunicorn
pip install uvicorn

サンプルコードを用意します。

main.py

from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}

gunicornを起動します。

gunicorn -k uvicorn.workers.UvicornWorker main:app

動作を確認します。

$ curl http://localhost:8000
{"Hello": "World"}

Celeryを動かす

Celeryをインストールします。

pip install celery[redis]

サンプルコードを用意します。

main.py

from fastapi import FastAPI
from tasks import celery, hello

app = FastAPI()

@app.get("/")
def read_root():
    task_id = hello.delay(message="world")
    return {"task_id": str(task_id)}

@app.get("/status")
def status(task_id: str):
    result = celery.AsyncResult(task_id)
    if not result.ready():
        return {}
    return {"result": str(result.get())}

tasks.py

from celery import Celery

celery = Celery("tasks",
                broker="redis://localhost:6379",
                result_backend="redis://localhost:6379")

@celery.task(name='hello')
def hello(message: str) -> str:
    return f"Hello {message}"

redis, gunicorn, celeryを起動します(teminalを3つ開く感じで)。

redis-server
gunicorn -k uvicorn.workers.UvicornWorker main:app
celery -A tasks.celery worker --loglevel=INFO

動作を確認します。
1つ目のcurlがジョブの起動、2つ目のcurlがジョブの実行結果確認です。

$ curl http://localhost:8000
{"task_id":"c08bbfaf-cbf1-4fc0-a678-9831ef4ed807"}
$ curl http://localhost:8000/status?task_id=c08bbfaf-cbf1-4fc0-a678-9831ef4ed807
{"result":"Hello world"}

supervisorを動かす

redis, gunicorn, celeryと3つのプロセスを起動するのが面倒なので、
Supervisorでまとめて起動できるようにします。

pip install supervisor
pip install flower

Supervisorの設定ファイルを用意します。
ついでに、Flowerも起動するように設定を記載しておきます。

supervisord.conf

[supervisord]
nodaemon=true
[program:redis]
command=/usr/bin/redis-server
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
priority=0
[program:gunicorn]
directory=%(ENV_PWD)s
environment=PYTHONPATH=.
command=venv/bin/gunicorn -k uvicorn.workers.UvicornWorker main:app
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:celery]
directory=%(ENV_PWD)s
environment=PYTHONPATH=.
command=venv/bin/celery -A tasks.celery worker --loglevel=INFO
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:flower]
directory=%(ENV_PWD)s
environment=PYTHONPATH=.
command=venv/bin/celery --broker=redis://localhost:6379/ flower
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0

Supervisorで各プロセスを起動します。

supervisord

curl http://localhost:8000でJobを投入した後、
http://localhost:5555/にアクセスするとFlowerrでjobのステータスを確認できます。

以上。