Python

用語

Celery

djangoベースのジョブマネージャ。

RabbitMQ

AMQPサーバ。

ブローカ(broker)

Celeryがブローカとなりジョブ全体をまとめる。ジョブマネージャシステム全体で一つ。

スーパーバイザ(supervisor)

Celeryからジョブを受け取りワーカプロセスを動作させる。RemoteTask?を使用しない場合、Celery自体がスーパバイザとなる。

ワーカ(worker)

Celeryからジョブを依頼されて実処理を行うワーカプロセスを起動する。

ワーカプロセスの並列度はConcurrencyセッティングにより変更可能。

メモ

信頼性

CeleryはRDBMS(djangoが使用可能なものならなんでも)/memcache/TokyoTyrant?のいずれかを使用する。

永続化が必要なデータを書き込むため、ストレージの信頼性=Celeryの信頼性となる。ストレージが失われれば未実行のジョブも失われる。

RDBMSにおいてはブロックデバイスレベルで冗長化する必要があるため、DRBD+keepalivedなどのHAクラスタ構成が必要。

拡張性

ジョブの一貫性が必要なためデータストレージ単位で拡張する必要がある。

RDBMSを使う場合はid range partitioningが良いかも知れない。

ジョブの実行はRemoteTask?を利用して別のノードを利用できる。そのため、ワーカノードはほぼ制限なく増設可能。

RemoteTask?の実行はRESTProxyTask?クラスを使用し、通信にはHTTP、データシリアライズにはJSONを利用する。

タスク実装例

ログに引数を出力して、10秒待ってから 42 と返すサンプルアプリケーション(celeryのサイトにあるやつ)。

from celery.task import Task
from celery.registry import tasks

import time

class MyTask(Task):
  name="mydummy_task"

  def run(self, some_arg, **kwargs):
    logger=self.get_logger(**kwargs)
    logger.info("Did something: %s" % some_arg)
    time.sleep(10)
    return 42

tasks.register(MyTask)

タスク実行例

ipythonを起動してタスクを実行する。

debian:~/project/celerytest# ipython
In [1]: from tasks import MyTask
In [2]: res=MyTask.delay('hoge')

実行直後の状態はPENDING(実行待ち)となる。

In [3]: res.status
Out[3]: u'PENDING'

successful()の結果はFalse

In [4]: res.successful()
Out[4]: False

10秒待つと結果が返る。statusの値はDONEとなり、is_done()はTrueを返す。

In [5]: res.status
Out[5]: u'DONE'
In [6]: res.successful()
Out[6]: True

返ってきた値にアクセスするにはget()を使う。

In [7]: res.get()
Out[7]: 42

定期タスク実装例

PeriodicTask?を継承したクラスを作る。 name, run_everyプロパティは必須。run_everyにはtimedeltaオブジェクトを渡す。

from celery.task import PeriodicTask
from celery.registry import tasks
from datetime import timedelta

class UrlFetchPeriodicTask(PeriodicTask):
  '''
  5秒おきにwww.yahoo.co.jpをフェッチして返す(どこに?)
  '''
  name='converter.urlfetcher_periodic'
  run_every=timedelta(seconds=5)

  def run(self, **kwargs):
    url='http://www.yahoo.co.jp'
    resp=url_fetch(url)
    return resp

tasks.register(UrlFetchPeriodicTask)

リモートタスク実装例

djangoのviewで書いてみた。戻り値には以下の2つの値が必要。

  • status
  • retval

レスポンス形式はJSONを使う。

from django.http import HttpResponse
from anyjson import serialize

def processor(request):
  result={'status':'success', 'retval':'hogehoge'}
  return HttpResponse(serialize(result), mimetype='application/json')

リモートタスク実行例

RESTProxyTask?を使うことでリモートタスクを実行できる。

In [1]: from celery.task import RESTProxyTask
In [2]: res=RESTProxyTask.delay('http://127.0.0.1:8000/celerytest/processor')

取得した値の扱いは通常タスクと一緒。

投入後ジョブの監視

ジョブが発行されるとtask_idが返ってくるのでそれを利用して監視できる。

In [1]: task_a=RESTProxyTask.delay('http://127.0.0.1:8000/celerytest/processor')
In [2]: str(task_a)
Out[2]: '7f54d236-2ebd-4f96-8ea3-39e9c021eb55'

celery.result.AsyncResult?を利用する(他のモジュールはよく知らない)。

In [1]: from celery.result import AsyncResult
In [2]: result=AsyncResult(str(task_a))
In [3]: result
Out[3]: <AsyncResult: 7f54d236-2ebd-4f96-8ea3-39e9c021eb55>
In [4]: result.status
Out[4]: u'DONE'
In [5]: result.get()
Out[5]: u'hogehoge'

タスクの読み込みタイミング

celerydが起動したタイミングでタスクが読み込まれる。 tasks.pyを更新しても、celerydが再起動されない限り読み込まれない点に注意。

読み込まれたタスクは起動時のログにあるtasksパラメタで確認できる。

[2011-02-06 18:59:52,603: WARNING/MainProcess] Configuration ->
    . broker -> amqp://mybike@localhost:5672/myvhost
    . queues ->
        . celery -> exchange:celery (direct) binding:celery
    . concurrency -> 8
    . loader -> djcelery.loaders.DjangoLoader
    . logfile -> [stderr]@INFO
    . events -> OFF
    . beat -> OFF
    . tasks ->
	. bike.tasks.update_component_ride_summary
	. bike.tasks.update_frame_ride_summary
	. core.tasks.build_bike_images
	. core.tasks.build_thumbnail
	. core.tasks.build_wallpicture

Celery/RabbitMQの関係

Celery/RabbitMQ間は常時接続。プロトコルはAMQP。 RabbitMQが死ぬとcelerydはConnection refusedのエラーを返す。RabbitMQ未接続の状態でアプリケーション側からタスクを呼び出すとexceptionが飛んでくる。

タスクの流れ

  1. タスク実行する
  2. celeryがステータス、キューIDをModelに格納→AMQPでRabbitMQに通知
  3. RabbitMQがWorker起動
  4. Workerが結果を返す
  5. RabbitMQがceleryに通知
  6. celeryがステータスをModelに格納

古いタスクの削除

celerystatsオプションによりこれまでのタスクの実行統計を表示し、expiredタスクを削除する。

$ ./manage.py celerystats
* Gathering statistics...
Total processing time by task type:
	celery.delete_expired_task_meta: 0.858625650406 secs. (for a total of 12 executed.)
	converter.tasks.UrlFetchTask: 15.7969090939 secs. (for a total of 1 executed.)
	celery.task.rest.RESTProxyTask: 132.508999109 secs. (for a total of 18 executed.)
	urlfetcher.tasks.UrlFetchTask: 706.054086447 secs. (for a total of 59 executed.)
	kiwi.tasks.MyTask: 178.916535139 secs. (for a total of 37 executed.)
	imgconvert.converter.tasks.UrlFetchTask: 554.16418767 secs. (for a total of 262 executed.)
Total task processing time: 1588.29934311 secs.
Total tasks processed: 389

キューの確認(list_queues)

# /some/where/rabbitmqctl list_queues -p hogeproject name messages consumers
Listing queues ...
celery  383     1
...done.

タスクの削除(discard_all)

RabbitMQからキューを削除する。

from celery.task import discard_all
discard_all()

ロック処理

この辺を参考にする:http://celeryproject.org/cookbook/tasks.html

carrotのシリアライザ変更

デフォルトではJSONになっていて、これではシリアライズ出来ないオブジェクトもある。

from carrot import seralization
serialization.set_default_serializer("pickle")

djb daemontoolsから実行する

要点まとめ。

  • workdirを設定する
  • detachしない(フォアグラウンド実行する)
  • 2>&1をつけて、すべての出力を標準出力へ
#!/bin/sh
WORKDIR=/home/www/hoge-prj/hoge/
CONCURRENCY=4

cd ${WORKDIR}
exec setuidgid www \
        /usr/local/bin/python manage.py celeryd \
                --statistics \
                --workdir=${WORKDIR} \
                --concurrency=${CONCURRENCY} \
                --loglevel=INFO \
                --traceback \
                --settings=settings_production 2>&1

トラブルシューティング

タスクが実行されない

チェックリスト。

  • タスクがどこまで通知されているか
    1. Taskクラスからタスクの投入
    2. celery経由でAMQPサーバへディスパッチ
    3. workerがAMQPサーバから取得してジョブ実行
  • タスクを単品で実行出来るか確認(interactive consoleから)
  • --workdirの設定をする

タスクの投入とAMQPサーバへの登録確認

タスククラスのdelayメソッドを呼び出すことで、celeryを経由してキュー登録が行われます。

hoge.tasks.UpdateIndex.delay(hogehoge)

Broker(celery)がキューを登録するとINFOレベルのログを出力します。 以下の例ではhoge.tasks.UpdateIndex?というのがタスククラス、 1a883464-bcb2-4769-8612-fa9bbceb0cdbがキューのIDになります。

[INFO/MainProcess] Got task from broker: hoge.tasks.UpdateIndex[1a883464-bcb2-4769-8612-fa9bbceb0cdb]

workerによるジョブ実行

workerはタスクを実行します。 タスククラスでは、成功、失敗時ともに何かしらのreturnを行うようにしておくことで、 実行時のログに出力されます。 以下の例ではprocessed 1000が実行結果出力です。

[INFO/MainProcess] Task hoge.tasks.UpdateIndex[c0cbb30d-775a-44ad-a9e9-8e5940630fb3] processed: 1000

タスクの単品実行

celeryのタスクはinteractive consoleからも実行できます。

端末を2枚ひらいて次のようにするとわかりやすいです。

  • コンソール1でceleryの実行状態を監視(foreground実行)
  • コンソール2でタスクの投入

delay()を呼びだすとキューに放りこまれてしまうのでapply()を使用します (ちなみにdelayメソッドは、apply_asyncメソッドの別名です)。

$ python
> from hoge.tasks import UpdateIndex
> UpdateIndex.apply_sync(args=(), kwargs={})

Unknown task ignored

タスククラスにnameプロパティが不足している。

class MyTask(Task):
  name="mytask" # これ必須

MySQLのデッドロック

MySQLを利用している場合、MySQLのデッドロックが検知されるとログに記録が残る。

@400000004af619d22500f044 OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')

mysqldの隔離レベルをread commitedに変更する。

[mysqld]
transaction-isolation = READ-COMMITTED

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2011-02-06 (日) 19:01:47 (2359d)