首页 » Python » python3 高级篇 » 正文

django 发布系统–celery

使用djcelery model api创建和更新定时任务

mark

也即是:

mark

在实际使用中我们使用分离式的设置,即celery的设置和初始化与tasks分开 首先,要设置django中的django.conf,即settings.py

1、首先安装django-celery,使用Redis作为Broker还需要安装celery-with-redis:

pip install django-celery
pip install celery-with-redis

2、setting.py配置

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'demo1',
    'djcelery',     #注册下载的django-celery
]


###settings.py文件末尾添加:

import djcelery
from celery.schedules import crontab
from datetime import timedelta

djcelery.setup_loader()     #加载djcelery
CELERY_TIMEZONE = TIME_ZONE
BROKER_URL = 'redis://192.168.171.173:6379/6'       # 传递消息时使用的redis ip 端口 数据库名
CELERY_RESULT_BACKEND = 'redis://192.168.171.173:6379/8'
CELERY_ACCEPT_CONTENT = ['applications/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Nairobi'
CELERY_IMPORTS = ['demo1.task',]        # booktest文件夹里面的task模块的内容--所以需要创建模块
CELERYD_MAX_TASKS_PER_CHILD = 3     # 每个worker最多执行3个任务就会被销毁,可防止内存泄露
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'      #Backend数据库

在发布系统中创建 celery.py 文件:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', '发布系统.settings')        ##发布系统是我创建项目的名字,假如是cmdb 那么就是cmdb.settings
app = Celery('发布系统')

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print ('Request:  {0!x}'.format(self.request))

在demo1中创建 task.py 文件:

from __future__ import absolute_import, unicode_literals
import time
import requests
from celery import shared_task
from django.views.decorators.csrf import csrf_exempt, csrf_protect
from django.shortcuts import render, HttpResponse,redirect


@shared_task
def add(x, y):
    return x+y   # 定义自己的推送代码        例如salt需要执行的代码

@shared_task
def mul(x, y):
    return x*y

@shared_task
def xsum(numbers):
    print (sum(numbers))
    return sum(numbers)

路由:

from django.contrib import admin
# from django.urls import path
from demo1 import views
from django.conf.urls import url, include

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^fabu/', views.fabu),
    url(r'^celery/', views.celery_status),
]

views.py (含celery)

from django.shortcuts import render, HttpResponse, redirect
from django.views import View
from demo1 import models
from demo1.task import add
from celery.result import AsyncResult
from 发布系统.celery import app

# Create your views here.

def fabu(request):
    if request.method == "GET":
        env = models.RecordEnv.objects.all()
        return render(request, 'fabu.html', locals())
    else:
        env = request.POST.get('env')       ##拿到从前端传过来的envapp
        app = request.POST.get('app')
        obj_li = models.App.objects.filter(name=app, environment__name=env)

        host_li = [{'id':'test', 'path':'/app/www/payment'},]
        package = 'svn://xxxxx'
        app_name = "payment"

        return HttpResponse("ok")


    #第一步:
    ##紧接着,从models中获取到主机列表,代码地址
    ##循环主机,将代码分别推送到主机
    ## 自动化平台(salt-master 中下载代码(打包)
    #import os
    #import sys
    #path = os.getcwd() + r'/project_path/'
    ##获取代码的path,然后通过subprocesslinux命令)进入path<mkdir $app_name; cd $app_name; git clone XXX; 打包 tar zcf>

    #第二步:
    ##推送 --> slat-stack   state.sls
    ##state.sls规则的yaml文件
    ##通过python代码  salt_api调用state触发推送

    #第三步:
    ##执行远端代码  -->  cd path ;  启动程序


def celery_status(request):
    import datetime
    import json
    if request.method == 'GET':
        if request.GET.get('x') and request.GET.get('y'):
            ##立即执行
            ##ret = add.delay(int(request.GET.get('x'), int(request.GET.get('y')))
            if request.GET.get('after'):
                ctime = datetime.datetime.now()
                utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
                s1 = datetime.timedelta(seconds=int(request.GET.get('after'))*60)
                ctime_x = utc_ctime + s1
                ##使用apply_async并设定时间

            year = request.GET.get('year')      ##获取前端传过来的值
            month = request.GET.get('month')
            day = request.GET.get('day')
            hour = request.GET.get('hour')
            minute = request.GET.get('minute')

            if year and month and day and hour and minute:
                ctime = datetime.datetime(year=int(year), month=int(month), day=int(day), hour=int(hour), minute=int(minute))
                # 把当前的本地时间转换成 UTC 时间
                ctime_x = datetime.datetime.utcfromtimestamp(ctime.timestamp())

                if ctime_x:
                    #将任务注入到celery
                    ret = add.apply_async(args=[int(request.GET.get('x')), int(request.GET.get('y'))], eta=ctime_x)
                    num = ret.id

        if request.GET.get('cancel'):       ##假如刚刚开始执行,我们可以取消掉
            async = AsyncResult(id=request.GET.get('cancel'), app=app)
            async.revoke(terminate=True)
            cancel_tag = '取消成功'

        if request.GET.get('stop'):     ##如果执行了一段时间了,我们只能终止
            async = AsyncResult(id=request.GET.get('stop'), app=app)
            async.revoke()
            stop_tag = '终止成功'
        return render(request, 'celery.html', locals())

    else:
        ret = request.POST.get('id', '')
        data = ""
        if ret:
            async = AsyncResult(id=ret, app=app)
            if async.successful():
                data = "执行成功,数据是: " + str(async.get())
                async.forget()
            elif async.failed():
                data = '执行失败'
            elif async.status == 'PBNDING':
                data = "等待被执行"
            elif async.status == 'RBTPY':
                data = '任务异常正常重试'
            elif async.status == 'STARTBD':
                data = "任务正在执行"
            else:
                data = "未知"
        return render(request, 'celery.html', locals())

我们如果使用了 celery 就需要启动2个 一个django 一个celery

启动celery(在Terminal窗口执行)

celery -A 发布系统 worker --pool=solo -l info

mark

mark

mark

查询是否执行成功:

mark

赞 (2)

发表评论