Celeryの使い方を理解するためのハンズオン資料です。
単純なリクエストエンドポイントを叩いて、ロジックの実行をX秒待たせる
X秒間レスポンス待ちが発生することを確認する
構成イメージは以下の通り

今回は、動作の説明のために、djangoのserverを シングルスレッド かつ 1プロセス での動作を行っている。
- 以下のブランチをチェックアウトする
git checkout -b step1 oring/step1- 環境を立ち上げる
$docker-compose build
$docker-compose up
- ブラウザで
localhost:8000/handsonにアクセスし、ボタンを押してみる。この時、chromeであればデベロッパーツールでレスポンスをチェックできるようにしておく。
- Buttonをクリックして、レスポンスが返るまで10秒以上かかることを確認する
views.pyにて、ボタンを押した際のエンドポイントがTaskStep1 クラスで定義されており、このクラス内でレスポンスを返す前に10秒sleepしている処理があることがわかる。
def post(self, request, *args, **kwargs):
# レスポンスを10秒待つ
time.sleep(10)
return redirect(reverse("base"))
ボタンを押したあと、別tabでlocalhost:8000/handsonにアクセスしようとするとレスポンスが返ってこないことも確認できる。
処理時間がかかるような処理をフロントで待ち続けないように & フロントエンドサーバのリソースを食いつぶさないためにも、別コンテナでの非同期処理を導入する
celery + redisの登場
構成イメージは以下の通り

taskを実行するbackendサーバに、djangoに組み込むことが用意なCeleryを利用している。
非同期処理を実行するbackendサーバにtaskの実行命令をpushするために、brokerと呼ばれるメッセージのqueueingシステムが必要になる。今回はRedisを利用している。
backendサーバでのtask実行結果を格納する必要がある。今回はRedisを利用しているが、MySQL等も選択可能である。
また、今回も、動作の説明のために、djangoのserverを シングルスレッド かつ 1プロセス での動作を行っている。
以下PR参照 #2
- 以下のブランチをチェックアウトする
git checkout -b step2 oring/step2- 環境を立ち上げる
$docker-compose build
$docker-compose up
- ブラウザで
localhost:8000/handsonにアクセスし、ボタンを押してみる。この時、chromeであればデベロッパーツールでレスポンスをチェックできるようにしておく。
-
Buttonをクリックして、レスポンスが即時返ることを確認する
-
docker logを確認すると、10秒遅れて非同期処理が終わった旨のメッセージが流れていることが確認できる
handson-backend | [2020-12-10 01:42:51,129: WARNING/ForkPoolWorker-1] hello world
handson-backend | [2020-12-10 01:42:51,138: INFO/ForkPoolWorker-1] Task app.tasks.hello[6cee6185-7bfb-48b4-9c54-3a083a9d0836] succeeded in 10.011956540000028s: None
- 非同期処理の導入手順
- Redisをbrokerとして利用するため、redis serviceを用意する。 docker-composeを以下のように修正。
また、djangoにおいて、brokerとして認識させる。 appのsettings.pyに以下設定を追加。redis: image: redis:6.0.9CELERY_BROKER_URL = "redis://redis:6379/1"- Celeryの利用にあたっては
- taskロジック自体はdjangoに組み込まれているため、handson-webapp serviceと同じく、djangoのimageをbuildしたcelery serviceを用意する。 docker-composeを以下のように修正。
celery: build: context: ./handson-webapp args: BUILD_TAG: 3.7- 非同期処理を実行するプロセスをdaemonとして立ち上げるためには、celeryの機能を使って以下コマンドを叩く。 docker-composeを以下のように修正。
concurrencyは並列処理数。今回はお試しなので 1。command: celery -A app worker --concurrency=1 -l info - 実行結果の格納先情報を記載する。 appのsettings.pyに以下設定を追加。
CELERY_RESULT_BACKEND = 'redis://redis:6379' - 非同期スクリプトの実装方法
- celery用のdecolaterを事前に読み込むために、
app.__init__.pyに以下内容を記述する
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('app',)- appディレクトリ配下に、celery.py, tasks.pyを用意する
- celery.pyは以下の内容で記述。アプリケーションによって、変更と書かれている場所を修正すること。
# celery.py from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault( 'DJANGO_SETTINGS_MODULE', 'app.settings' # 変更1: アプリケーションのsettingsを指定 ) app = Celery('app') # 変更2: appに変更 # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object( 'django.conf:settings', namespace='CELERY' ) # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))- tasks.pyには、非同期で実行させたいpythonコードを好きに書く。この時、
@shared_taskデコレータを指定することで非同期スクリプトであると明示する必要がある。
from __future__ import absolute_import, unicode_literals from celery import shared_task import time @shared_task def hello(proj_id): time.sleep(10) print('hello')- 上記のtasks.pyをviews.pyから呼び出す。この時、
hoge.delay(args)と書くことで非同期処理が実行される。
def post(self, request, *args, **kwargs): hello.delay('hello world') # 非同期処理を呼び出す場合はdeleyメソッドを叩く return redirect(reverse("base")) - celery用のdecolaterを事前に読み込むために、
非同期処理の状態を確認してみる
以下を参照 #3
- 以下のブランチをチェックアウトする
git checkout -b step3 oring/step3- 環境を立ち上げる
$docker-compose build
$docker-compose up
- ブラウザで
localhost:8000/handsonにアクセスし、ボタンを押してみる。この時、chromeであればデベロッパーツールでレスポンスをチェックできるようにしておく。
- Task2をクリックして、task一覧にSTARTEDの状態のタスクが存在することを確認する。表示されない場合はリロード。
5. タスクが完了すると、SUCCESSになることを確認する。確認する際は画面をリロード。
- taskの実行状態および結果は、settings.pyで指定した
CELERY_RESULT_BACKENDに格納される。今回はRedisを指定。 redisに格納されているkeyを全て取得し、valueの値をlist表示しているだけ。 - celeryは、デフォルトの状態だとタスクが
STARTEDになったことを検知しないで、PENDINGままになってしまう。そこで、settings.pyに以下設定を入れることで対処する。
CELERY_TASK_TRACK_STARTED = True
- 結果に含まれる
task_idを使えば、任意のタスクの実行状態を確認することも可能である。
非同期処理を停止してみる
以下を参照 #4
- 以下のブランチをチェックアウトする
git checkout -b step4 oring/step4- 環境を立ち上げる
$docker-compose build
$docker-compose up
- ブラウザで
localhost:8000/handsonにアクセスし、ボタンを押してみる。この時、chromeであればデベロッパーツールでレスポンスをチェックできるようにしておく。
- Task2をクリックして、task一覧にSTARTEDの状態のタスクが存在することを確認する。表示されない場合はリロード。
5. タスクが完了する前に、StopAllTasksボタンを押し、タスク一覧の中のタスクがREVOKEになることを確認する。表示されない場合はリロード。

- celeryのクラスである
AsyncResultを用いると、task_idを指定して以下メソッドでタスクを停止させることができる。この時、terminateを指定しないと再起動時にタスクが再実行される(らしい?)ため、きちんと止めておく。
AsyncResult(task_id).revoke(terminate=True)
Task2を複数回クリックしたらどうなるか?
docker-compose downでコンテナを完全に落とした場合、タスクのキュー情報はどうなっているか?


