#IT明星不是梦#【从0开始Python开发实战】Django集成Celery,实现异步任务调度系统

目录:

1.  Django集成Celery

2.  声明异步任务

3.  封装工具 task_util.py

4.  单元测试 test_task_util.py

5.  创建异步任务

6.  常见问题和解决方法

Celery是一个灵活可靠的分布式系统,用于异步任务调度,主要有3部分组成:消息中间件broker,任务执行单元worker,执行结果存储task result store。Celery使用第三方消息中间件Redis,RabbitMQ等。

系统通常将一些耗时的操作任务提交给 Celery去异步执行,典型架构示意图如下 本文详细介绍 Django集成 Celery配置方法和功能测试

时序图如下:

示例代码: https://github.com/rickding/HelloPython/tree/master/hello_ celery

├── __init__.py

├── settings.py

├── celery.py

├── tasks.py

├── util

│   └── task_util.py

├── tests

│   └── test_task_util.py

一, Django集成Celery

代码文件

功能 要点

Django集成Celery

requirements.txt

安装Celery, Redis和工具包:

celery == 4.2.1

flower == 0.9.2

redis == 3.2.0

eventlet == 0.24.1

celery.py

配置 Celery,依赖的消息中间件broker和后端backend地址配置在settings.py中集中维护。

__init__.py

配置项目加载celery.app

声明异步任务

tasks.py

声明Celery可调度的任务 @shared_task

封装工具task_util

task_util.py

异步任务创建和分发

单元测试

test_task_util.py

测试异步任务创建和分发功能

创建异步任务

views.py

增加REST接口/chk/job

1.  新建 Django项目,运行: django-admin startproject hello_celery

2.  进到目录 hello_ celery ,增加应用: python manage.py startapp app

项目的目录文件结构如下:

3.  安装 Celery和依赖包,pip install celery >= 4.2.1,如果不是新建项目,注意版本兼容问题。

celery == 4.2.1
flower == 0.9.2
redis == 3.2.0
eventlet == 0.24.1

4.  增加 celery.py,配置信息:

from __future__ import absolute_import , unicode_literals
import os
from celery import Celery , platforms
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault( 'DJANGO_SETTINGS_MODULE' , 'hello_celery.settings' )

app = Celery(
     'hello_celery' ,
     include =[ 'hello_celery.tasks' ] ,
     broker =settings.CELERY_BROKER ,
     backend =settings.CELERY_BACKEND
)

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object( 'django.conf:settings' , namespace = 'CELERY' )

app.conf.update(
     CELERY_ACKS_LATE = True,
     CELERY_ACCEPT_CONTENT =[ 'pickle' , 'json' ] ,
     CELERYD_FORCE_EXECV = True,
     CELERYD_MAX_TASKS_PER_CHILD = 500 ,
     BROKER_HEARTBEAT = 0 ,
)

# Optional configuration, see the application user guide.
app.conf.update(
     CELERY_TASK_RESULT_EXPIRES = 3600 ,   # celery任务执行结果的超时时间,即结果在backend里的保存时间,单位s
)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

platforms.C_FORCE_ROOT = True

5.  打开 settings.py,配置BROKER和BACKEND地址:

CELERY_BROKER = 'redis://127.0.0.1:6379/2'
CELERY_BACKEND = 'redis://127.0.0.1:6379/3'

6.  打开 __init__.py,增加代码:

from __future__ import absolute_import , unicode_literals
from .celery import app as celery_app

__all__ = [ 'celery_app' ]

二, 增加 tasks.py,声明异步任务

from __future__ import absolute_import , unicode_literals
import logging
import json

from celery import shared_task
from hello_celery.util.task_util import dispatch_task

log = logging.getLogger(__name__)


@shared_task
def task (param_str):
    log.info( 'task starts: %s, %s' % ( type (param_str) , param_str))

    param_dict = None
    try :
        param_dict = json.loads(param_str)
     except Exception as e:
        log.warning( 'Exception when parse param: %s' % str (e))

    log.info( 'parsed param: {}, {}' .format( type (param_dict) , param_dict))
     return 'finished'

正确配置后,运行命令: celery -A hello_celery worker -l info -P eventlet,注意Win10环境中需要增加eventlet,Celery成功启动信息:

三, 封装工具 task_util.py

封装两个有用的工具函数,分别用于分发(创建)异步任务和获取任务信息:

import logging
import json

log = logging.getLogger(__name__)

def dispatch_task (task_func , param_dict):
    param_json = json.dumps(param_dict)

     try :
         return task_func.apply_async(
            [param_json] ,
             retry = True,
             retry_policy ={
                 'max_retries' : 1 ,
                 'interval_start' : 0 ,
                 'interval_step' : 0.2 ,
                 'interval_max' : 0.2 ,
             } ,
         )
     except Exception as ex:
        log.info(ex)
         raise


def get_task_status (task_func , task_id):
    t = task_func.AsyncResult(task_id)
    status = t.state
    progress = 0

     if status == u'SUCCESS' :
        progress = 100
     elif status == u'FAILURE' :
        progress = 0
     elif status == 'PROGRESS' :
        progress = t.info[ 'progress' ]

     return { 'status' : status , 'progress' : progress}

四, 单元测试 test_task_util.py

增加测试函数,创建一个任务并获取信息:

import logging

from django.test import TestCase

from hello_celery.tasks import task
from hello_celery.util.task_util import dispatch_task , get_task_status

log = logging.getLogger(__name__)


class TasksTest(TestCase):
     def test_get_task_status ( self ):
        t = dispatch_task(task , { 'msg' : 'test_task' })
         self .assertIsNotNone(t)

        ret = get_task_status(task , t.id)
        log.info( 'task status: %s,%s, %s' % (ret , t.id , str (task)))
         self .assertIsNotNone(ret.get( 'status' ))

运行 python manage.py test,同时Celery将执行测试函数创建的任务:

五, 创建异步任务

1.  views.py中增加请求处理函数 ,创建一个异步执行的任务:

from django.http import JsonResponse
from hello_celery.tasks import do_task


def chk_job (req):
    param_dict = {
         'url' : req.get_raw_uri() ,
         'path' : req.get_full_path() ,
     }
    job = do_task(param_dict)
     return JsonResponse({ 'code' : 0 , 'msg' : 'success' , 'job' : job.task_id})

2.  urls.py中配置路由

from django.urls import path
from app.views import chk_ job

urlpatterns = [
    path( '' , chk_ job , name = 'chk' ) ,
]

3.  运行命令启动服务: python manage.py runserver 0.0.0.0:8001

4.  REST接口创建异步任务示例

 

六, 常见问题和解决方法

启动 Celery: celery -A hello_celery worker -l info, 运行出错:

Unrecoverable error: VersionMismatch('Redis transport requires redis-py versions 3.2.0 or later. You have 2.10.6',)

解决:指定 Redis使用3.2.0或更高pip install redis>=3.2.0

原因: Redis版本兼容问题。

启动 Celery: celery -A hello_celery worker -l info, 运行出错:

Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)

解决:安装 eventlet: pip install eventlet ,然后更新命令行: celery -A hello_celery worker -l info -P eventlet

原因: C elery4.x在win10上需要安装 eventlet包

启动 Celery Flower任务管理工具: celery flower --broker=redis://127.0.0.1:6379/2,运行出错

ATTRIBUTEERROR: MODULE ‘TORNADO.WEB  HAS NO ATTRIBUTE 'ASYNCHRONOUS’

解决:安装 tornado5.1.1版本: pip uninstall -y tornado; pip install tornado==5.1.1

原因: tornado6.0开始使用coroutine并删除了tornado.web.asynchronous,5.1版本能正常调用。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章