CeleryでFastAPIのバックグラウンドジョブを動かす手順
このエントリでは、
CeleryでFastAPIのバックグラウンドジョブを動かす手順をまとめています。
また、動かすプロセスがいくつかあり面倒なので、
Supservisorでまとめて起動できるようにしています。
各プロセスは、次の役割を担います。
- FastAPI: ジョブを受け付ける
- Redis: ジョブのキューを管理する
- Celery: ジョブを起動する
各ツールのドキュメントは以下を参照してください。
FastAPI
https://fastapi.tiangolo.com/ja/
Celery
https://docs.celeryq.dev/
Supervisor: A Process Control System
http://supervisord.org/index.html
Flower
https://flower.readthedocs.io/en/latest/
Flowerはジョブの状態をブラウザで確認するために利用します。
アプリケーションのディレクトリを用意する
次のように、アプリケーションのディレクトリを用意します。
mkdir celery_study && cd $_
python -m venv venv
. venv/bin/activate
Redisを入れる
Redisをインストールします。
この例は、UbuntuLinuxの場合。
sudo apt-get install redis
起動できることを確認します。
/usr/bin/redis-server
FastAPIを動かす
FastAPIとgunicornをインストールします。
pip install fastapi
pip install gunicorn
pip install uvicorn
サンプルコードを用意します。
main.py
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
gunicornを起動します。
gunicorn -k uvicorn.workers.UvicornWorker main:app
動作を確認します。
$ curl http://localhost:8000
{"Hello": "World"}
Celeryを動かす
Celeryをインストールします。
pip install celery[redis]
サンプルコードを用意します。
main.py
from fastapi import FastAPI
from tasks import celery, hello
app = FastAPI()
@app.get("/")
def read_root():
task_id = hello.delay(message="world")
return {"task_id": str(task_id)}
@app.get("/status")
def status(task_id: str):
result = celery.AsyncResult(task_id)
if not result.ready():
return {}
return {"result": str(result.get())}
tasks.py
from celery import Celery
celery = Celery("tasks",
broker="redis://localhost:6379",
result_backend="redis://localhost:6379")
@celery.task(name='hello')
def hello(message: str) -> str:
return f"Hello {message}"
redis, gunicorn, celeryを起動します(teminalを3つ開く感じで)。
redis-server
gunicorn -k uvicorn.workers.UvicornWorker main:app
celery -A tasks.celery worker --loglevel=INFO
動作を確認します。
1つ目のcurlがジョブの起動、2つ目のcurlがジョブの実行結果確認です。
$ curl http://localhost:8000
{"task_id":"c08bbfaf-cbf1-4fc0-a678-9831ef4ed807"}
$ curl http://localhost:8000/status?task_id=c08bbfaf-cbf1-4fc0-a678-9831ef4ed807
{"result":"Hello world"}
supervisorを動かす
redis, gunicorn, celeryと3つのプロセスを起動するのが面倒なので、
Supervisorでまとめて起動できるようにします。
pip install supervisor
pip install flower
Supervisorの設定ファイルを用意します。
ついでに、Flowerも起動するように設定を記載しておきます。
supervisord.conf
[supervisord]
nodaemon=true
[program:redis]
command=/usr/bin/redis-server
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
priority=0
[program:gunicorn]
directory=%(ENV_PWD)s
environment=PYTHONPATH=.
command=venv/bin/gunicorn -k uvicorn.workers.UvicornWorker main:app
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:celery]
directory=%(ENV_PWD)s
environment=PYTHONPATH=.
command=venv/bin/celery -A tasks.celery worker --loglevel=INFO
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:flower]
directory=%(ENV_PWD)s
environment=PYTHONPATH=.
command=venv/bin/celery --broker=redis://localhost:6379/ flower
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
Supervisorで各プロセスを起動します。
supervisord
curl http://localhost:8000
でJobを投入した後、http://localhost:5555/
にアクセスするとFlowerrでjobのステータスを確認できます。
以上。