tokio-cron-scheduler

tokio-cron-scheduler

Rust异步Cron调度库

tokio-cron-scheduler是一个Rust异步Cron调度库,基于Tokio构建。它提供Cron风格的任务调度,支持即时执行、固定间隔重复任务及任务数据持久化。该库支持PostgreSQL和Nats存储,具备灵活的时区设置和任务状态通知功能。适合需要在异步环境中进行精确任务调度的Rust项目。

tokio-cron-schedulerRust异步任务调度定时任务cronGithub开源项目

tokio-cron-scheduler

在异步 tokio 环境中使用类似 cron 的调度。 还可以立即调度任务或以固定时间间隔重复执行任务。 任务数据可以选择使用 PostgreSQL 或 Nats 进行持久化。

灵感来自 https://github.com/lholden/job_scheduler

使用方法

更多详细信息请参阅文档

请确保在 Cargo.toml 中添加 job_scheduler crate:

[dependencies] tokio-cron-scheduler = "*"

使用 cron 库的 Schedule 类型的 FromStr 实现来为作业创建调度。

调度格式如下:

秒 分 时 日 月 星期 年 * * * * * * *

时间以 UTC 指定,而非您的本地时区。请注意,年份可以省略。如果您想使用您的时区,请在作业创建调用后附加 _tz(例如 Job::new_async 对比 Job::new_async_tz)。

以逗号分隔的值(如 5,8,10)表示多个时间值。例如,调度 0 2,14,26 * * * * 将在每小时的第 2、14 和 26 分钟执行。

范围可以用破折号指定。调度 0 0 * 5-10 * * 将每小时执行一次,但仅在每月的 5 日至 10 日执行。

星期几可以用缩写或全名指定。调度 0 0 6 * * Sun,Sat 将在周日和周六的早上 6 点执行。

对于每个作业,您可以在作业开始、停止和移除时收到通知。由于这些通知是使用 tokio::spawn 调度的,如果任务快速完成,则无法保证这些通知的顺序。

一个简单的使用示例:

use std::time::Duration; use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError}; #[tokio::main] async fn main() -> Result<(), JobSchedulerError> { let mut sched = JobScheduler::new().await?; // 添加基本 cron 作业 sched.add( Job::new("1/10 * * * * *", |_uuid, _l| { println!("我每 10 秒运行一次"); })? ).await?; // 添加异步作业 sched.add( Job::new_async("1/7 * * * * *", |uuid, mut l| { Box::pin(async move { println!("我每 7 秒异步运行一次"); // 查询此作业的下一次执行时间 let next_tick = l.next_tick_for_job(uuid).await; match next_tick { Ok(Some(ts)) => println!("7 秒作业的下一次运行时间是 {:?}", ts), _ => println!("无法获取 7 秒作业的下一次运行时间"), } }) })? ).await?; // 添加指定持续时间的一次性作业 sched.add( Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| { println!("我只运行一次"); })? ).await?; // 创建指定持续时间的重复作业,使其可变以便之后编辑 let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| { println!("我每 8 秒重复运行一次"); })?; // 添加在作业开始/停止等时执行的操作 jj.on_start_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| { Box::pin(async move { println!("作业 {:?} 已启动,通知 {:?} 已运行({:?})", job_id, notification_id, type_of_notification); }) })).await?; jj.on_stop_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| { Box::pin(async move { println!("作业 {:?} 已完成,通知 {:?} 已运行({:?})", job_id, notification_id, type_of_notification); }) })).await?; jj.on_removed_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| { Box::pin(async move { println!("作业 {:?} 已移除,通知 {:?} 已运行({:?})", job_id, notification_id, type_of_notification); }) })).await?; sched.add(jj).await?; // 必须启用 'signal' 功能 sched.shutdown_on_ctrl_c(); // 添加在关闭期间/之后运行的代码 sched.set_shutdown_handler(Box::new(|| { Box::pin(async move { println!("关闭完成"); }) })); // 启动调度器 sched.start().await?; // 等待作业运行 tokio::time::sleep(Duration::from_secs(100)).await; Ok(()) }

时区变更

您可以使用 JobBuilder API 创建使用特定时区的作业。 chrono-tz 不包含在依赖项中,因此如果您想轻松创建 Timezone 结构,需要将其添加到您的 Cargo.toml 中。

let job = JobBuilder::new() .with_timezone(chrono_tz::Africa::Johannesburg) .with_cron_job_type() .with_schedule("*/2 * * * * *") .unwrap() .with_run_async(Box::new( | uuid, mut l| { Box::pin(async move { info ! ("JHB 每 2 秒异步运行一次,id {:?}", uuid); let next_tick = l.next_tick_for_job(uuid).await; match next_tick { Ok(Some(ts)) => info !("JHB 2 秒作业的下一次运行时间是 {:?}", ts), _ => warn !("无法获取 2 秒作业的下一次运行时间"), } }) })) .build() .unwrap();

类似的库

  • job_scheduler 启发这个 crate 的项目
  • cron 我们使用的 cron 表达式解析器
  • schedule-rs 是一个类似的 Rust 库,实现了自己的 cron 表达式解析器

许可证

TokioCronScheduler 使用以下任一许可证

自定义存储

MetadataStore和NotificationStore特质可以被实现并在JobScheduler中使用。

默认提供了基于易失性哈希映射的SimpleMetadataStore和SimpleNotificationStore版本。使用Nats的持久版本由NatsMetadataStore和NatsNotificationStore提供。

贡献

除非您另有明确声明,否则您有意提交以包含在作品中的任何贡献,按照Apache-2.0许可证的定义,均应按上述方式双重许可,无任何附加条款或条件。

更多信息请参阅CONTRIBUTING文件。

特性

has_bytes

自0.7版本起

使Prost生成的数据结构可被需要获取数据结构字节的存储使用。Nats和Postgres存储依赖于启用此特性。

postgres_storage

自0.6版本起

添加Postgres元数据存储和通知存储(PostgresMetadataStore, PostgresNotificationStore)。使用Postgres数据库存储元数据和通知数据。

参见PostgreSQL文档

postgres_native_tls

自0.6版本起

使用postgres-native-tls crate作为PostgreSQL连接的TLS提供程序。

postgres_openssl

自0.6版本起

使用postgres-openssl crate作为PostgreSQL连接的TLS提供程序。

nats_storage

自0.6版本起

添加Nats元数据存储和通知存储(NatsMetadataStore, NatsNotificationStore)。使用Nats系统作为存储元数据和通知的方式。

参见Nats文档

signal

自0.5版本起

为调度器添加shutdown_on_signalshutdown_on_ctrl_c。 当接收到信号时,两者都会关闭系统(停止调度器并移除所有任务)。

由于这利用了Tokio的信号处理,因此仅在Unix系统上可用。

编写测试

在进行tokio::test时,记得在多线程上下文中运行,否则测试将在scheduler.add()上挂起。

例如:

#[cfg(test)] mod test { use tokio_cron_scheduler::{Job, JobScheduler}; use tracing::{info, Level}; use tracing_subscriber::FmtSubscriber; // 需要多线程来测试,否则会在scheduler.add()上挂起 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // #[tokio::test] async fn test_schedule() { let subscriber = FmtSubscriber::builder() .with_max_level(Level::TRACE) .finish(); tracing::subscriber::set_global_default(subscriber) .expect("设置默认订阅者失败"); info!("创建调度器"); let scheduler = JobScheduler::new().await.unwrap(); info!("添加任务"); scheduler .add( Job::new_async("*/1 * * * * *", |_, _| { Box::pin(async { info!("每秒运行"); }) }) .unwrap(), ) .await .expect("应该能够添加任务"); scheduler.start().await.unwrap(); tokio::time::sleep(core::time::Duration::from_secs(20)).await; } }

示例

simple

运行基于内存哈希映射的存储

cargo run --example simple --features="tracing-subscriber"

postgres

首先需要一个运行中的PostgreSQL实例:

docker run --rm -it -p 5432:5432 -e POSTGRES_USER="postgres" -e POSTGRES_PASSWORD="" -e POSTGRES_HOST_AUTH_METHOD="trust" postgres:14.1

然后运行示例:

POSTGRES_INIT_METADATA=true POSTGRES_INIT_NOTIFICATIONS=true cargo run --example postgres --features="postgres_storage tracing-subscriber"

nats

首先需要一个启用了Jetstream的运行中的Nats实例:

docker run --rm -it -p 4222:4222 -p 6222:6222 -p 7222:7222 -p 8222:8222 nats -js -DV

然后运行示例:

cargo run --example nats --features="nats_storage tracing-subscriber"

设计

任务活动

任务活动

创建任务

创建任务

创建通知

创建通知

删除任务

删除任务

删除通知

删除通知

编辑推荐精选

Keevx

Keevx

AI数字人视频创作平台

Keevx 一款开箱即用的AI数字人视频创作平台,广泛适用于电商广告、企业培训与社媒宣传,让全球企业与个人创作者无需拍摄剪辑,就能快速生成多语言、高质量的专业视频。

即梦AI

即梦AI

一站式AI创作平台

提供 AI 驱动的图片、视频生成及数字人等功能,助力创意创作

扣子-AI办公

扣子-AI办公

AI办公助手,复杂任务高效处理

AI办公助手,复杂任务高效处理。办公效率低?扣子空间AI助手支持播客生成、PPT制作、网页开发及报告写作,覆盖科研、商业、舆情等领域的专家Agent 7x24小时响应,生活工作无缝切换,提升50%效率!

TRAE编程

TRAE编程

AI辅助编程,代码自动修复

Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。

AI工具TraeAI IDE协作生产力转型热门
蛙蛙写作

蛙蛙写作

AI小说写作助手,一站式润色、改写、扩写

蛙蛙写作—国内先进的AI写作平台,涵盖小说、学术、社交媒体等多场景。提供续写、改写、润色等功能,助力创作者高效优化写作流程。界面简洁,功能全面,适合各类写作者提升内容品质和工作效率。

AI辅助写作AI工具蛙蛙写作AI写作工具学术助手办公助手营销助手AI助手
问小白

问小白

全能AI智能助手,随时解答生活与工作的多样问题

问小白,由元石科技研发的AI智能助手,快速准确地解答各种生活和工作问题,包括但不限于搜索、规划和社交互动,帮助用户在日常生活中提高效率,轻松管理个人事务。

热门AI助手AI对话AI工具聊天机器人
Transly

Transly

实时语音翻译/同声传译工具

Transly是一个多场景的AI大语言模型驱动的同声传译、专业翻译助手,它拥有超精准的音频识别翻译能力,几乎零延迟的使用体验和支持多国语言可以让你带它走遍全球,无论你是留学生、商务人士、韩剧美剧爱好者,还是出国游玩、多国会议、跨国追星等等,都可以满足你所有需要同传的场景需求,线上线下通用,扫除语言障碍,让全世界的语言交流不再有国界。

讯飞智文

讯飞智文

一键生成PPT和Word,让学习生活更轻松

讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。

AI办公办公工具AI工具讯飞智文AI在线生成PPTAI撰写助手多语种文档生成AI自动配图热门
讯飞星火

讯飞星火

深度推理能力全新升级,全面对标OpenAI o1

科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。

热门AI开发模型训练AI工具讯飞星火大模型智能问答内容创作多语种支持智慧生活
Spark-TTS

Spark-TTS

一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型

Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能,可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项目适用于多种场景,如有声读物制作、智能语音助手开发等。

下拉加载更多