Files
taskq/README.md

197 lines
4.0 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# TaskQ - 基于 Redis 的异步任务队列系统
一个基于 Go 和 Redis 的异步任务队列管理系统,使用 asynq 库作为底层实现,支持任务注册、发布、消费和重试机制。
## 特性
- 🚀 基于 Redis 的高性能任务队列
- 📊 实时监控仪表板Web UI
- 🔍 任务检查和调试工具
- ⚡ 并发任务处理
- 🔄 自动重试机制
- 📈 Prometheus 指标集成
- 🎯 灵活的任务优先级和分组
- ⏰ 延迟任务和定时任务支持
## 快速开始
### 基本使用
```go
package main
import (
"context"
"log"
"time"
"code.tczkiot.com/wlw/taskq"
)
// 定义任务数据结构
type EmailData struct {
To string
Subject string
Body string
}
// 定义任务处理器
func sendEmail(ctx context.Context, data EmailData) error {
log.Printf("发送邮件到 %s: %s", data.To, data.Subject)
// 实际的邮件发送逻辑
return nil
}
func main() {
// 配置 Redis 连接
cfg := taskq.Config{
Redis: &redis.Client{
Addr: "localhost:6379",
},
}
// 注册任务
emailTask := &taskq.Task{
Name: "send-email",
Queue: "email",
MaxRetries: 3,
TTR: 30 * time.Second,
Handler: sendEmail,
}
cfg.Tasks = []*taskq.Task{emailTask}
// 配置并启动
taskq.Configure(cfg)
if err := taskq.Init(context.Background()); err != nil {
log.Fatal(err)
}
// 发布任务
data := EmailData{
To: "user@example.com",
Subject: "欢迎使用 TaskQ",
Body: "这是一个测试邮件",
}
if err := emailTask.Publish(context.Background(), data); err != nil {
log.Fatal(err)
}
// 启动服务器
if err := taskq.Start(context.Background()); err != nil {
log.Fatal(err)
}
}
```
## 项目结构
```
taskq/
├── taskq.go # 主包入口,提供包级别 API
├── servlet.go # Servlet 核心实现,生命周期管理
├── task.go # Task 结构体和任务处理逻辑
├── plugin.go # 插件系统接口
├── x/ # 扩展组件
│ ├── inspector/ # 任务检查工具
│ ├── metrics/ # Prometheus 指标
│ └── monitor/ # Web 监控界面
├── example/ # 示例代码
└── Makefile # 构建脚本
```
## API 文档
### 包级别 API
```go
// 配置默认 Servlet
taskq.Configure(cfg)
// 初始化(必须先调用)
taskq.Init(ctx)
// 启动服务器
taskq.Start(ctx)
// 停止服务器
taskq.Stop()
```
### Servlet 实例 API
```go
// 创建新实例
servlet := taskq.New()
// 配置
servlet.Configure(cfg)
// 初始化
servlet.Init(ctx)
// 启动
servlet.Start(ctx)
// 停止
servlet.Stop()
```
### 任务发布选项
```go
// 延迟执行
emailTask.Publish(ctx, data, taskq.Delay(5*time.Minute))
// 指定时间执行
emailTask.Publish(ctx, data, taskq.DelayUntil(time.Now().Add(time.Hour)))
// 自定义超时
emailTask.Publish(ctx, data, taskq.TTR(10*time.Second))
// 结果保留时间
emailTask.Publish(ctx, data, taskq.Retention(48*time.Hour))
```
## 监控界面
访问 `http://localhost:8080` 查看任务监控界面,包括:
- 实时任务状态
- 队列统计信息
- 任务执行历史
- 性能指标图表
## 配置选项
### Redis 配置
```go
cfg := taskq.Config{
Redis: &redis.Client{
Addr: "localhost:6379",
Password: "",
DB: 0,
},
}
```
### 任务配置
```go
task := &taskq.Task{
Name: "task-name", // 任务名称(唯一)
Queue: "default", // 队列名称
Group: "group-name", // 任务分组(可选)
MaxRetries: 3, // 最大重试次数
Priority: 1, // 优先级
TTR: 30 * time.Second, // 超时时间
Handler: handlerFunc, // 处理器函数
}
```
## 许可证
MIT License