吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1450|回复: 8
收起左侧

[Python 原创] Django使用APSchedule实现简单定时任务

[复制链接]
liuhaizhang 发表于 2023-11-3 10:00

一、环境依赖

系统:windows10

python: python==3.9.0

djnago==3.2.0

APScheduler==3.10.1

二、django中的配置

1、创建utils包,在包里面创建schedulers包

utils/schedulers/task.py

#1、设置 Django 环境,就可以导入项目的模型类这些了
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "项目根目录名.settings")
import django
django.setup()
#2、一些需要的模块
from datetime import datetime,timedelta,date

#3、django项目中模型类

NOW_DATETIME = datetime.strftime(datetime.now(),'%Y-%m-%d %H:%M:%S')
NOW_DATE = date.today().strftime('%Y-%m-%d')

def example_interval():
    '''
    每隔一段固定时间就执行一次
    :return:
    '''
    print('interval',NOW_DATETIME)

def example_cron():
    '''
    在每天的固定时间执行
    :return:
    '''
    print('cron,凌晨开始执行的定时任务')

def example_date():
    '''
    在指定日期执行一次,就执行一次
    :return:
    '''
    print(f'date,指定日期执行一次:{NOW_DATETIME}')

utils/schedulers/scheduler.py

# 2、导入所需的调度器类和触发器类
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from pytz import timezone
from threading import RLock
from django.conf import settings
from datetime import datetime,timedelta
lock = RLock()

#3、导入定时任务
from .task import example_interval   #例子,时间间隔,每隔一段时间执行
from .task import example_cron  #指定时间执行,在指定时间点执行
from .task import example_date #指定日期执行,执行一次

class __SchedulerManage(BackgroundScheduler):
    _instance = None
    def __new__(cls, *args, **kwargs):
        if cls._instance:
            return cls._instance
        with lock:
            if cls._instance:
               return cls._instance
            cls._instance = super().__new__(cls)
            return cls._instance

    def __init__(self):
        super().__init__()
        # 1、设置时区
        self.timezone = timezone(settings.TIME_ZONE)
        # 2、使用内存存储定时任务信息
        jobstore_redis = RedisJobStore(host='localhost', port=6379, db=0, password='redis密码')
        jobstore_memory = MemoryJobStore()
        self.add_jobstore(jobstore_memory)
        # 3、添加任务
        self.add_task()

    def add_task(self):
        '''
        自定义的功能: 用来添加定时任务的
        :return:
        '''
        '1、三种触发器的例子'
        #每隔一段固定时间段执行一次,1小时执行一次,设置开始时间是启动时间后的3分钟
        self.add_job(example_interval, trigger=IntervalTrigger(hours=1,start_date=datetime.now()+timedelta(minutes=3)), id='example_interval', replace_existing=True)
        #设置每天的11:03:10 执行一次
        self.add_job(example_cron,trigger=CronTrigger(hour=11,minute=3,second=10),id='example_cron',replace_existing=True)
        #设置在2023-08-10 11:03:01执行一次,只执行一次
        self.add_job(example_date,trigger=DateTrigger(run_date=datetime(2023,8,10,11,3,1)),id='example_date',replace_existing=True)

#也可以在实例化时设置时区:__SchedulerManage(timezone=timezone('Asia/Shanghai'))
scheduler_ = __SchedulerManage()
if __name__ == '__main__':
    #启动 scheduler_.start()  或者 scheduler_() 两种方式都ok
    scheduler_()

utils/schedulers/__init__.py

2、项目配置文件settings.py

####配置定时任务
#启动定时任务
from utils.schedulers import scheduler_
scheduler_.start()

免费评分

参与人数 1吾爱币 +7 热心值 +1 收起 理由
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

mygaryge 发表于 2023-11-3 14:07
感谢楼主分享
diantai929 发表于 2023-11-3 14:19
xuexiba 发表于 2023-11-3 17:48
sghades 发表于 2023-11-5 12:18
长期驻留内存吗?
leo0634641107 发表于 2023-11-5 15:14
你这是想执行什么任务,例如?
 楼主| liuhaizhang 发表于 2023-11-6 17:24
1、这个是一个简单的实现定时任务的功能,是走内存,效率比较低;如果整个系统需要使用的定时任务少且不频繁时,可以使用这个配置。大的项目系统或定时任务多且频繁建议使用celery来实现。
2、具体写什么,看你的系统业务需要什么:
     我写的系统中,关于医疗养老的,需要每天给每个老人创建一些检测的项目,药物的服用,三餐的记录等数据,就使用到了定时任务,在每天凌晨的时候创建这些记录提供给当天记录使用。
     由于这个是内部使用的系统,不涉及高并发这些东西的,一个养老院也就上千人吧,性能要求也不高。定时任务就几个,一天执行一次,还是在凌晨执行的,使用这种简单的配置就可以了。
3、在utils/schedulers/task.py 模块中导入了django环境,可以执行django中的orm等东西的。
4、如果是对性能要求高,或者定时任务执行的比较频繁的,定时任务很多的系统,建议使用celery来实现定时任务,后续补上相关的帖子
 楼主| liuhaizhang 发表于 2023-11-6 17:32
可以直接设置内存或使用redis数据库来存定时任务:
模块:utils/schedulers/scheduler.py
def __init__(self,):
        # 2、使用内存存储定时任务信息
        jobstore_redis = RedisJobStore(host='localhost', port=6379, db=0, password='redis密码')
        jobstore_memory = MemoryJobStore()
       # 把  jobstore_memory  替换成  jobstore_redis 就是使用redis来存储定时任务
        self.add_jobstore(jobstore_memory)
Cx330Hz 发表于 2023-11-19 14:33
这个是会常驻内存吗
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-11 05:51

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表