2025-12-09 19:58:18 +08:00
2025-12-10 00:53:30 +08:00
2025-12-09 14:33:07 +08:00
2025-12-09 14:33:07 +08:00
2025-12-10 00:53:30 +08:00
2025-12-10 00:53:30 +08:00
2025-12-10 00:53:30 +08:00

TaskQ - 基于 Redis 的异步任务队列系统

一个基于 Go 和 Redis 的异步任务队列管理系统,使用 asynq 库作为底层实现,支持任务注册、发布、消费和重试机制。

特性

  • 🚀 基于 Redis 的高性能任务队列
  • 📊 实时监控仪表板Web UI
  • 🔍 任务检查和调试工具
  • 并发任务处理
  • 🔄 自动重试机制
  • 📈 Prometheus 指标集成
  • 🎯 灵活的任务优先级和分组
  • 延迟任务和定时任务支持

快速开始

基本使用

package main

import (
    "context"
    "log"
    "time"

    "code.tczkiot.com/wlw/taskq"
)

// 定义任务数据结构
type EmailData struct {
    To      string
    Subject string
    Body    string
}

// 定义任务处理器
func sendEmail(ctx context.Context, data EmailData) error {
    log.Printf("发送邮件到 %s: %s", data.To, data.Subject)
    // 实际的邮件发送逻辑
    return nil
}

func main() {
    // 配置 Redis 连接
    cfg := taskq.Config{
        Redis: &redis.Client{
            Addr: "localhost:6379",
        },
    }

    // 注册任务
    emailTask := &taskq.Task{
        Name:       "send-email",
        Queue:      "email",
        MaxRetries: 3,
        TTR:        30 * time.Second,
        Handler:    sendEmail,
    }

    cfg.Tasks = []*taskq.Task{emailTask}

    // 配置并启动
    taskq.Configure(cfg)
    if err := taskq.Init(context.Background()); err != nil {
        log.Fatal(err)
    }

    // 发布任务
    data := EmailData{
        To:      "user@example.com",
        Subject: "欢迎使用 TaskQ",
        Body:    "这是一个测试邮件",
    }

    if err := emailTask.Publish(context.Background(), data); err != nil {
        log.Fatal(err)
    }

    // 启动服务器
    if err := taskq.Start(context.Background()); err != nil {
        log.Fatal(err)
    }
}

项目结构

taskq/
├── taskq.go          # 主包入口,提供包级别 API
├── servlet.go        # Servlet 核心实现,生命周期管理
├── task.go           # Task 结构体和任务处理逻辑
├── plugin.go         # 插件系统接口
├── x/                # 扩展组件
│   ├── inspector/    # 任务检查工具
│   ├── metrics/      # Prometheus 指标
│   └── monitor/      # Web 监控界面
├── example/          # 示例代码
└── Makefile          # 构建脚本

API 文档

包级别 API

// 配置默认 Servlet
taskq.Configure(cfg)

// 初始化(必须先调用)
taskq.Init(ctx)

// 启动服务器
taskq.Start(ctx)

// 停止服务器
taskq.Stop()

Servlet 实例 API

// 创建新实例
servlet := taskq.New()

// 配置
servlet.Configure(cfg)

// 初始化
servlet.Init(ctx)

// 启动
servlet.Start(ctx)

// 停止
servlet.Stop()

任务发布选项

// 延迟执行
emailTask.Publish(ctx, data, taskq.Delay(5*time.Minute))

// 指定时间执行
emailTask.Publish(ctx, data, taskq.DelayUntil(time.Now().Add(time.Hour)))

// 自定义超时
emailTask.Publish(ctx, data, taskq.TTR(10*time.Second))

// 结果保留时间
emailTask.Publish(ctx, data, taskq.Retention(48*time.Hour))

监控界面

访问 http://localhost:8080 查看任务监控界面,包括:

  • 实时任务状态
  • 队列统计信息
  • 任务执行历史
  • 性能指标图表

配置选项

Redis 配置

cfg := taskq.Config{
    Redis: &redis.Client{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    },
}

任务配置

task := &taskq.Task{
    Name:       "task-name",        // 任务名称(唯一)
    Queue:      "default",          // 队列名称
    Group:      "group-name",       // 任务分组(可选)
    MaxRetries: 3,                  // 最大重试次数
    Priority:   1,                  // 优先级
    TTR:        30 * time.Second,   // 超时时间
    Handler:    handlerFunc,        // 处理器函数
}

许可证

MIT License

Description
No description provided
Readme MIT 151 KiB
Languages
Go 55.5%
JavaScript 38.2%
CSS 4.6%
HTML 1.3%
Makefile 0.4%