admin管理员组

文章数量:1561464

🍀 前言

博客地址:

  • CSDN:https://blog.csdn/powerbiubiu

👋 简介

Schedule是Python中的一个轻量级进程调度程序库,用于安排任务以指定的时间间隔定期运行

📖 正文

1 Schedule的介绍

1.1 安装

pip install schedule

1.2 语法

Schedule.every(interval).[timeframe].do(function)
参数:

  • interval:任意整数,表示数量;
  • timeframe:日期类型,入:时分秒;
  • function:需要执行的函数。

2 Schedule的使用

2.1 基本使用
from datetime import datetime

import schedule
import time

def job(name: str) -> None:
    now_time = datetime.now()
    print(f"{now_time} - Hello {name}")

if __name__ == '__main__':
    name = 'Python'
    schedule.every(2).seconds.do(job, name)
    while True:
        schedule.run_pending()  # 运行所有可以运行的任务
        time.sleep(1)


通过代码和执行结果可以看出,通过调度器,每两秒执行一次job方法,并传入参数。

2.2 任务调度
# 每2秒执行一次任务
schedule.every(2).seconds.do(job, name)
# 每10分钟执行一次任务
schedule.every(10).minutes.do(job, name)
# 每1小时执行一次任务
schedule.every(1).hour.do(job, name)
# 每天9:30分的时候执行一次任务
schedule.every().day.at("9:30").do(job, name)
# 每1-5天的时候执行一次任务
schedule.every(1).to(5).days.do(job, name)
# 每周一执行一次任务
schedule.every().monday.do(job, name)
# 每周三的9:30分钟执行一次任务
schedule.every().wednesday.at("9:30").do(job, name)

3 使用装饰器

3.1 装饰器介绍

@repeat(job, *args, **kwargs)
参数:

  • job:任务定时;
  • *args:参数
  • **kwargs:参数
3.2 装饰器调度

通过装饰器,我们可以指定任务的定时以及被装饰方法的参数。实际上装饰器实现了job.do(function,*args,**kwargs),就等于every(2).seconds.do(job, name),只是通过装饰器把every(2).seconds当参数传入了而已。

from schedule import repeat, every, run_pending

@repeat(every(2).seconds, 'World')
def job(name: str) -> None:
    now_time = datetime.now()
    print(f"{now_time} - Hello {name}")
    
if __name__ == '__main__':
    while True:
        run_pending()  # 运行所有可以运行的任务
        time.sleep(1)

4 取消定时器

import schedule
i = 0

def job(name: str) -> None:
    global i
    i += 1
    now_time = datetime.now()
    print(f"{now_time} - Hello {name}")
    if i == 5:
        schedule.clear()
        exit(0)

if __name__ == '__main__':
    name = 'Python'
    schedule.every(2).seconds.do(job, name)
    while True:
        schedule.run_pending() 
        time.sleep(1)

通过clear()方法可以清除定时任务,在job方法中,模拟执行5次后停止任务执行,这里通过exit(0)退出python程序,不然调度任务不执行了,但python程序还在执行。

5 运行任务到某时刻

通过until设置任务执行截至时间,任务运行则通过程序启动时开始执行。

from datetime import datetime, timedelta, time
import schedule


def job(name: str) -> None:
    now_time = datetime.now()
    print(f"{now_time} - Hello {name}")


if __name__ == '__main__':
    # 今天14:45停止
    schedule.every().second.until('14:45').do(job,name)
    # 2024-12-31 23:59:59停止
    schedule.every().second.until('2024-12-31 23:59:59').do(job,name)
    # 8小时后停止
    schedule.every().second.until(timedelta(hours=8)).do(job,name)
    # 今天23:59:59停止
    schedule.every().second.until(time(23, 59, 59)).do(job,name)
    # 2024-12-31 23:59:59停止
    schedule.every().second.until(datetime(2024, 12, 31, 23, 59, 59)).do(job,name)
    while True:
        schedule.run_pending()

✏ 总结

schedule的任务调度是串行的,如果各个任务之间时间不冲突,是没问题的;如果时间有冲突的话,则会串行的执行命令,可以通过多线程或多进程的方法避免该问题。

💖 欢迎关注我的公众号

本文标签: 高效上手模块轻松Python