# TaskQ - 基于 Redis 的异步任务队列系统 一个基于 Go 和 Redis 的异步任务队列管理系统,使用 asynq 库作为底层实现,支持任务注册、发布、消费和重试机制。 ## 特性 - 🚀 基于 Redis 的高性能任务队列 - 📊 实时监控仪表板(Web UI) - 🔍 任务检查和调试工具 - ⚡ 并发任务处理 - 🔄 自动重试机制 - 📈 Prometheus 指标集成 - 🎯 灵活的任务优先级和分组 - ⏰ 延迟任务和定时任务支持 ## 快速开始 ### 基本使用 ```go package main import ( "context" "log" "time" "code.tczkiot.com/wlw/taskq" ) // 定义任务数据结构 type EmailData struct { To string Subject string Body string } // 定义任务处理器 func sendEmail(ctx context.Context, data EmailData) error { log.Printf("发送邮件到 %s: %s", data.To, data.Subject) // 实际的邮件发送逻辑 return nil } func main() { // 配置 Redis 连接 cfg := taskq.Config{ Redis: &redis.Client{ Addr: "localhost:6379", }, } // 注册任务 emailTask := &taskq.Task{ Name: "send-email", Queue: "email", MaxRetries: 3, TTR: 30 * time.Second, Handler: sendEmail, } cfg.Tasks = []*taskq.Task{emailTask} // 配置并启动 taskq.Configure(cfg) if err := taskq.Init(context.Background()); err != nil { log.Fatal(err) } // 发布任务 data := EmailData{ To: "user@example.com", Subject: "欢迎使用 TaskQ", Body: "这是一个测试邮件", } if err := emailTask.Publish(context.Background(), data); err != nil { log.Fatal(err) } // 启动服务器 if err := taskq.Start(context.Background()); err != nil { log.Fatal(err) } } ``` ## 项目结构 ``` taskq/ ├── taskq.go # 主包入口,提供包级别 API ├── servlet.go # Servlet 核心实现,生命周期管理 ├── task.go # Task 结构体和任务处理逻辑 ├── plugin.go # 插件系统接口 ├── x/ # 扩展组件 │ ├── inspector/ # 任务检查工具 │ ├── metrics/ # Prometheus 指标 │ └── monitor/ # Web 监控界面 ├── example/ # 示例代码 └── Makefile # 构建脚本 ``` ## API 文档 ### 包级别 API ```go // 配置默认 Servlet taskq.Configure(cfg) // 初始化(必须先调用) taskq.Init(ctx) // 启动服务器 taskq.Start(ctx) // 停止服务器 taskq.Stop() ``` ### Servlet 实例 API ```go // 创建新实例 servlet := taskq.New() // 配置 servlet.Configure(cfg) // 初始化 servlet.Init(ctx) // 启动 servlet.Start(ctx) // 停止 servlet.Stop() ``` ### 任务发布选项 ```go // 延迟执行 emailTask.Publish(ctx, data, taskq.Delay(5*time.Minute)) // 指定时间执行 emailTask.Publish(ctx, data, taskq.DelayUntil(time.Now().Add(time.Hour))) // 自定义超时 emailTask.Publish(ctx, data, taskq.TTR(10*time.Second)) // 结果保留时间 emailTask.Publish(ctx, data, taskq.Retention(48*time.Hour)) ``` ## 监控界面 访问 `http://localhost:8080` 查看任务监控界面,包括: - 实时任务状态 - 队列统计信息 - 任务执行历史 - 性能指标图表 ## 配置选项 ### Redis 配置 ```go cfg := taskq.Config{ Redis: &redis.Client{ Addr: "localhost:6379", Password: "", DB: 0, }, } ``` ### 任务配置 ```go task := &taskq.Task{ Name: "task-name", // 任务名称(唯一) Queue: "default", // 队列名称 Group: "group-name", // 任务分组(可选) MaxRetries: 3, // 最大重试次数 Priority: 1, // 优先级 TTR: 30 * time.Second, // 超时时间 Handler: handlerFunc, // 处理器函数 } ``` ## 许可证 MIT License