celery
Celery는 방대한 양의 메시지를 처리 할 수있는 간단하고 유연하며 안정적인 분산 시스템이다.
백그라운드 작업 / 주기적 작업 / 비동기 처리 / 남는 시간에 처리
Getting started
태스크 대기열(Task queues)은 스레드 또는 기계에서 작업을 분배하는 메커니즘으로 사용됩니다. 태스크 큐의 입력은 태스크(task)라고하는 작업 단위입니다. 전용 작업자(worker) 프로세스는 새 작업을 수행하기 위해 지속적으로 작업 대기열을 모니터링합니다.
샐러리는 메시지를 통해 통신하며, 대개 브로커(broker)를 사용하여 클라이언트(clients)와 작업자(workers)를 중재합니다. 클라이언트가 작업을 시작하기 위해 클라이언트가 대기열에 메시지를 추가하면 브로커는 해당 메시지를 작업자(worker)에게 전달합니다.
- 백그라운드 태스크를 추가하면 장고에서 직접 처리하는 것은 아니다. 태스크를 넘긴다.
- 브로커는 태스크를 위임받는다. 우리는 브로커로 AWS SQS를 사용한다.
- 브로커는 위임받은 태스크 내용을 확인해 워커에게 보낸다.
- 워커는 일을 받았을 때 장고랑 별개로 나중에 시간이 될 때 처리한다.
-
작업한 결과는 저장할수도 있고, 아닐 수도 있다. 저장하면 result backend에 들어간다.
First steps with Django
AWS IAM
IAM에서 SQSFull Access 권한을 가진 유저를 만든다.
설치
$ pip install celery
프로젝트 세팅
Cutom User Model
$ manage -d startapp member
# settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'member.apps.MemberConfig',
]
AUTH_USER_MODEL = 'member.User'
# member/models.py
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
pass
Custom Command 생성
# member/commands/createsu.py
from django.core.management import BaseCommand
from member.models import User
class Command(BaseCommand):
def handle(self, *args, **options):
User.objects.create_superuser(
username='hm',
password='...',
email='dev@abc.com',
)
$ manage -d createsu
$ manage -d runserver
http://localhost:8000/admin/ 접속 해 로그인이 정상적으로 되는지 확인한다.
Celery 설정
config/celery.py
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('docker-nginx-node-proxy')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
# 작업을 등록해야 하는데 앱별로 분리되어 있을 경우,
# 검색해서 태스크를 로드한다.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
config/init.py
from .celery import app as celery_app
__all__ = (
'celery_app'
)
Task 추가
member/models.py
class CeleryTest(models.Model):
request_at = models.DateTimeField()
created_at = models.DateTimeField(default=timezone.now)
$ ./manage.py makemigrations
$ ./manage.py migrate
member/tasks.py 생성
import time
from django.utils import timezone
from config.celery import app
from member.models import CeleryTest
# 셀러리의 테스크로 인식
@app.task
def celery_test():
request_at = timezone.now()
time.sleep(10)
CeleryTest.objects.create(request_at=request_at)
member/admin.py
from django.contrib import admin
from member.models import CeleryTest
class CeleryTestAdmin(admin.ModelAdmin):
readonly_fields = (
'created_at',
)
admin.site.register(CeleryTest)
실행
$ ./manage.py shell
>>> from member.tasks import celery_test
>>> celery_test()
http://localhost:8000/admin/member/celerytest/ 에서 잘 저장되어 있는지 확인한다.
Broker 설정
settings.py
# Celery
CELERY_BROKER_TRANSPORT = 'sqs'
CELERY_BROKER_URL = 'sqs://{aws_access_key_id}:{aws_secret_access_key}@'.format(
aws_access_key_id=quote(config['aws']['access_key_id'], safe=''),
aws_secret_access_key=quote(config['aws']['secret_access_key'], safe=''),
)
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region': 'ap-northeast-2',
}
설치
$ pip install boto
$ pip install pycurl
# worker 프로세스 시작
$ celery -A config worker -l info
# boto 에러가 있어 에러 발생
$ pip uninstall boto
$ pip install git+git://github.com/michaelhenry/boto.git@sqs_fixed
실행
# tab1
$ celery -A config worker -l info
# tab2
$ ./manage.py shell
>>> from member.tasks import celery_test
>>> celery_test.delay()
결과
[2017-04-11 13:28:01,011: INFO/MainProcess] Received task: member.tasks.celery_test[b8b8ff6d-bed5-4384-a33b-12c87151e785]
[2017-04-11 13:28:02,014: INFO/MainProcess] Received task: member.tasks.celery_test[0766272a-269f-4031-8e6f-ecc0acb9957a]
[2017-04-11 13:28:03,049: INFO/MainProcess] Received task: member.tasks.celery_test[baa99294-b98b-4815-a32e-926831b4e4ac]
[2017-04-11 13:28:07,735: INFO/PoolWorker-2] Task member.tasks.celery_test[6952dd83-ce18-4be1-b9de-1f969f11240f] succeeded in 10.038976837997325s: None
[2017-04-11 13:28:08,214: INFO/PoolWorker-1] Task member.tasks.celery_test[853cdd38-b0ac-4bdb-b49a-a5f1679384b5] succeeded in 10.012917116997414s: None
[2017-04-11 13:28:09,037: INFO/PoolWorker-3] Task member.tasks.celery_test[655e1e64-712a-431f-9e3e-2f7083666a07] succeeded in 10.033461324012023s: None
[2017-04-11 13:28:10,071: INFO/PoolWorker-4] Task member.tasks.celery_test[6172e3b7-a3b4-4d84-ad22-6fcbb444fdac] succeeded in 10.045007570006419s: None
[2017-04-11 13:28:17,752: INFO/PoolWorker-2] Task member.tasks.celer
result backend 설정
$ pip install django-celery-results
# settings.py
CELERY_RESULT_BACKEND = 'django-db'
INSTALLED_APPS = [
# ...
'django_celery_results',
'member.apps.MemberConfig',
]
$ manage -d migrate
이제 result backend에 저장한 작업결과를 확인할 수 있다. http://localhost:8000/admin/django_celery_results/taskresult/ 접속해본다.
스케쥴링
주기적인 작업 실행을 장고의 데이터베이스를 참조해 실행한다.
설치
$ pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
# ...
'django_celery_results',
'django_celery_beat',
'member.apps.MemberConfig',
]
$ manage -d migrate
# tab1
# 스케쥴링한 것을 실행
$ celery -A config beat -l info -S django
# tab2
# 셀러리 실행
$ celery -A config worker -l info
http://localhost:8000/admin/django_celery_beat/ 접속한다.
- periodic task : 기존에 있는 것을 지우고, 새로 생성한다.
- Name
- Task (registered)
- interval
버그
작업시간을 변경해도 자동으로 적용되지 않는 버그가 있으므로 fix된 버전으로 다시 설치한다.
$ pip uninstall celery
$ pip install git+https://github.com/celery/celery.git@b27c0f143b86989a5f655bcc9592221bbbba0f5f
배포
docker 빌드를 다시해 테스트 해본 후, 배포한다.