celery

Celery는 방대한 양의 메시지를 처리 할 수있는 간단하고 유연하며 안정적인 분산 시스템이다.

백그라운드 작업 / 주기적 작업 / 비동기 처리 / 남는 시간에 처리

Getting started

태스크 대기열(Task queues)은 스레드 또는 기계에서 작업을 분배하는 메커니즘으로 사용됩니다. 태스크 큐의 입력은 태스크(task)라고하는 작업 단위입니다. 전용 작업자(worker) 프로세스는 새 작업을 수행하기 위해 지속적으로 작업 대기열을 모니터링합니다.

샐러리는 메시지를 통해 통신하며, 대개 브로커(broker)를 사용하여 클라이언트(clients)와 작업자(workers)를 중재합니다. 클라이언트가 작업을 시작하기 위해 클라이언트가 대기열에 메시지를 추가하면 브로커는 해당 메시지를 작업자(worker)에게 전달합니다.

  • 백그라운드 태스크를 추가하면 장고에서 직접 처리하는 것은 아니다. 태스크를 넘긴다.
  • 브로커는 태스크를 위임받는다. 우리는 브로커로 AWS SQS를 사용한다.
  • 브로커는 위임받은 태스크 내용을 확인해 워커에게 보낸다.
  • 워커는 일을 받았을 때 장고랑 별개로 나중에 시간이 될 때 처리한다.
  • 작업한 결과는 저장할수도 있고, 아닐 수도 있다. 저장하면 result backend에 들어간다.

First steps with Django

AWS IAM

IAM에서 SQSFull Access 권한을 가진 유저를 만든다.

설치

$ pip install celery

프로젝트 세팅

Cutom User Model

$ manage -d startapp member
# settings.py
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',

    'member.apps.MemberConfig',
]


AUTH_USER_MODEL = 'member.User'
# member/models.py
from django.contrib.auth.models import AbstractUser


class User(AbstractUser):
    pass

Custom Command 생성

# member/commands/createsu.py
from django.core.management import BaseCommand

from member.models import User


class Command(BaseCommand):
    def handle(self, *args, **options):
        User.objects.create_superuser(
            username='hm',
            password='...',
            email='dev@abc.com',
        )
$ manage -d createsu
$ manage -d runserver

http://localhost:8000/admin/ 접속 해 로그인이 정상적으로 되는지 확인한다.

Celery 설정

config/celery.py

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

app = Celery('docker-nginx-node-proxy')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 작업을 등록해야 하는데 앱별로 분리되어 있을 경우,
# 검색해서 태스크를 로드한다.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

config/init.py

from .celery import app as celery_app

__all__ = (
    'celery_app'
)

Task 추가

member/models.py

class CeleryTest(models.Model):
    request_at = models.DateTimeField()
    created_at = models.DateTimeField(default=timezone.now)
$ ./manage.py makemigrations
$ ./manage.py migrate

member/tasks.py 생성

import time

from django.utils import timezone

from config.celery import app
from member.models import CeleryTest


# 셀러리의 테스크로 인식
@app.task
def celery_test():
    request_at = timezone.now()
    time.sleep(10)
    CeleryTest.objects.create(request_at=request_at)

member/admin.py

from django.contrib import admin

from member.models import CeleryTest

class CeleryTestAdmin(admin.ModelAdmin):
    readonly_fields = (
        'created_at',
    )

admin.site.register(CeleryTest)

실행

$ ./manage.py shell
>>> from member.tasks import celery_test
>>> celery_test()

http://localhost:8000/admin/member/celerytest/ 에서 잘 저장되어 있는지 확인한다.

Broker 설정

settings.py

# Celery
CELERY_BROKER_TRANSPORT = 'sqs'
CELERY_BROKER_URL = 'sqs://{aws_access_key_id}:{aws_secret_access_key}@'.format(
    aws_access_key_id=quote(config['aws']['access_key_id'], safe=''),
    aws_secret_access_key=quote(config['aws']['secret_access_key'], safe=''),
)
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'region': 'ap-northeast-2',
}

설치

$ pip install boto
$ pip install pycurl
# worker 프로세스 시작
$ celery -A config worker -l info

# boto 에러가 있어 에러 발생
$ pip uninstall boto
$ pip install git+git://github.com/michaelhenry/boto.git@sqs_fixed

실행

# tab1
$ celery -A config worker -l info

# tab2
$ ./manage.py shell
>>> from member.tasks import celery_test
>>> celery_test.delay()

결과

[2017-04-11 13:28:01,011: INFO/MainProcess] Received task: member.tasks.celery_test[b8b8ff6d-bed5-4384-a33b-12c87151e785]
[2017-04-11 13:28:02,014: INFO/MainProcess] Received task: member.tasks.celery_test[0766272a-269f-4031-8e6f-ecc0acb9957a]
[2017-04-11 13:28:03,049: INFO/MainProcess] Received task: member.tasks.celery_test[baa99294-b98b-4815-a32e-926831b4e4ac]
[2017-04-11 13:28:07,735: INFO/PoolWorker-2] Task member.tasks.celery_test[6952dd83-ce18-4be1-b9de-1f969f11240f] succeeded in 10.038976837997325s: None
[2017-04-11 13:28:08,214: INFO/PoolWorker-1] Task member.tasks.celery_test[853cdd38-b0ac-4bdb-b49a-a5f1679384b5] succeeded in 10.012917116997414s: None
[2017-04-11 13:28:09,037: INFO/PoolWorker-3] Task member.tasks.celery_test[655e1e64-712a-431f-9e3e-2f7083666a07] succeeded in 10.033461324012023s: None
[2017-04-11 13:28:10,071: INFO/PoolWorker-4] Task member.tasks.celery_test[6172e3b7-a3b4-4d84-ad22-6fcbb444fdac] succeeded in 10.045007570006419s: None
[2017-04-11 13:28:17,752: INFO/PoolWorker-2] Task member.tasks.celer

result backend 설정

$ pip install django-celery-results
# settings.py
CELERY_RESULT_BACKEND = 'django-db'

INSTALLED_APPS = [
    # ...
    
    'django_celery_results',

    'member.apps.MemberConfig',
]
$ manage -d migrate

이제 result backend에 저장한 작업결과를 확인할 수 있다. http://localhost:8000/admin/django_celery_results/taskresult/ 접속해본다.

스케쥴링

주기적인 작업 실행을 장고의 데이터베이스를 참조해 실행한다.

설치

$ pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
    # ...
    
    'django_celery_results',
    'django_celery_beat',

    'member.apps.MemberConfig',
]
$ manage -d migrate
# tab1
# 스케쥴링한 것을 실행
$ celery -A config beat -l info -S django

# tab2
# 셀러리 실행
$ celery -A config worker -l info

http://localhost:8000/admin/django_celery_beat/ 접속한다.

  • periodic task : 기존에 있는 것을 지우고, 새로 생성한다.
    • Name
    • Task (registered)
    • interval

버그

작업시간을 변경해도 자동으로 적용되지 않는 버그가 있으므로 fix된 버전으로 다시 설치한다.

$ pip uninstall celery
$ pip install git+https://github.com/celery/celery.git@b27c0f143b86989a5f655bcc9592221bbbba0f5f

배포

docker 빌드를 다시해 테스트 해본 후, 배포한다.