国外做蛋糕的网站,百度有做企业网站吗,免费模板网站都有什么,天价域名排名100在之前的文章我们已经学习了Celery和APScheduler的基本使用#xff0c;下面让我们来了解一下如何在Django中使用Celery和APScheduler
Celery
1.前提工作
python 3.7
pip install celery
pip install eventlet
#5.0版本以下
pip install importlib-metadata4.8.3#xff08…在之前的文章我们已经学习了Celery和APScheduler的基本使用下面让我们来了解一下如何在Django中使用Celery和APScheduler
Celery
1.前提工作
python 3.7
pip install celery
pip install eventlet
#5.0版本以下
pip install importlib-metadata4.8.3python3.7下可能会出现报错
2.项目结构 3.异步任务
#tasks.py
from ..main import celery_app
# 装饰器将send_sms_code装饰为异步任务,并设置别名
celery_app.task(namesend_sms_code)
def send_sms_code(mobile, sms_code):print(向手机号{}发送验证码{}.format(mobile,sms_code))
# config.py
broker_url redis://127.0.0.1:6379/7
#main.py
# celery启动⽂件
from celery import Celery
import os#配置环境
os.environ.setdefault(DJANGO_SETTINGS_MODULE, djangoProject7.settings)
# 创建celery实例
celery_app Celery(test)
# 加载celery配置
celery_app.config_from_object(celery_tasks.config)
# ⾃动注册celery任务
celery_app.autodiscover_tasks([celery_tasks.sms])
命令启动
与manage.py平级,执行命令celery -A celery_tasks.main worker -l info -P eventlet 编写视图函数和路由
#views.py
from django.shortcuts import render,HttpResponse
from celery_tasks.sms import tasks
# Create your views here.
def index(request):tasks.send_sms_code(13417361123,123456)return HttpResponse(111)#urls.py
from django.contrib import admin
from django.urls import path
from app01 import views
urlpatterns [path(admin/, admin.site.urls),path(index/,views.index)
]启动项目然后访问视图即可
4.定时任务
代码只需要改config.py即可
# config.py
broker_url redis://127.0.0.1:6379/7
from celery_tasks.main import celery_app
from celery.schedules import crontab#设置定时任务
from datetime import timedelta# 设置定时任务
celery_app.conf.beat_schedule {test_task: {task: send_sms_code,# schedule: crontab(hour11, minute28),# 每天的11点28分执行一次任务schedule: timedelta(seconds1), # 每秒执行一次任务args: (13417366781,1111), # 这里是传递给任务的参数元组形式}
}
然后在终端启动分别执行两条命令(开两个终端执行)
celery --appcelery_tasks.main worker -P eventlet -l INFO
celery -A celery_tasks.main beat APScheduler
1.前提工作
pip install django-apscheduler 2.配置
在settings.py中加入
INSTALLED_APPS (# ...django_apscheduler,
)
apscheduler存在数据库依赖所以得配置一下数据库信息
DATABASES {default: {ENGINE: django.db.backends.sqlite3,NAME: test,USER: root,PASSWORD: 547710,HOST: localhost,PORT: 3306}
}然后运行python manage.py migrate接着会在数据库中生成两张表
django_apscheduler_djangojob 表保存注册的任务以及下次执行的时间
django_apscheduler_djangojobexecution 保存每次任务执行的时间和结果和任务状态 3.使用
#views.py
from django_apscheduler.jobstores import DjangoJobStore, register_jobfrom apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetimescheduler BlockingScheduler() # 创建调度器
scheduler.add_jobstore(DjangoJobStore(), default)#添加定时任务方式一
register_job(scheduler, interval, seconds5, idfunc, replace_existingTrue, misfire_grace_time120)
def job():print(datetime.now().strftime(%Y-%m-%d %H:%M:%S))# 添加定时任务方式二
def job1():print(datetime.now().strftime(%Y-%m-%d %H:%M:%S))
scheduler.add_job(job1,interval,seconds5,idmy_job, # 任务的唯一标识replace_existingTrue,
)scheduler.start() 4.启动
在终端运行python manage.py runserver,效果如下 数据库表记录有时间差8小时这个去配置时区即可
django_apscheduler_djangojob django_apscheduler_djangojobexecution