Files
taskq/task.go
hupeh 326f2a371c feat: 优化监控仪表盘 UI
- 添加 appbar 导航栏,支持 Chart/Queues 视图切换
- appbar 切换使用 history API,支持浏览器前进/后退
- 图表视图占满整个可视区域
- queue-modal 共享 appbar 样式
- 修复 queue tab count 字段名大小写问题
- tooltip 跟随鼠标显示在右下方,移除箭头
- 图表 canvas 鼠标样式改为准星
- pause/resume 队列后刷新列表
- example 添加 flag 配置参数
2025-12-10 00:53:30 +08:00

164 lines
4.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package taskq
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"reflect"
"time"
"github.com/hibiken/asynq"
"github.com/rs/xid"
)
// Task 定义任务结构
type Task struct {
// 公开字段:用户配置
Queue string // 任务队列名称
Group string // 任务分组
Name string // 任务名称,唯一标识
MaxRetries int // 最大重试次数
Priority int // 任务优先级(数值越大优先级越高)
TTR time.Duration // 任务超时时间Time-To-Run
Handler any // 处理器函数
// 私有字段:运行时反射信息
funcValue reflect.Value // 处理器函数的反射值
dataType reflect.Type // 数据类型的反射信息
inputContext bool // 是否需要 context.Context 参数
inputData bool // 是否需要数据参数
returnError bool // 是否返回 error
servlet *Servlet // 所属的 Servlet 实例
}
// Publish 发布任务到队列
func (t *Task) Publish(ctx context.Context, data any, options ...PublishOption) error {
var c *asynq.Client
if t.servlet != nil {
c = t.servlet.Client()
} else if defaultServlet != nil {
c = defaultServlet.Client()
}
if c == nil {
return errors.New("taskq: client not initialized, call SetRedis() first")
}
// 序列化任务数据为 JSON
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("taskq: failed to marshal task data: %w", err)
}
// 构建任务选项
opts := []asynq.Option{
asynq.Queue(t.Queue), // 设置队列名称
asynq.MaxRetry(t.MaxRetries), // 设置最大重试次数
asynq.TaskID(xid.New().String()), // 生成唯一任务ID
asynq.Retention(time.Hour * 24), // 设置结果保留24小时
}
// 只有设置了 TTR 才添加超时选项,避免 TTR=0 导致立即超时
if t.TTR > 0 {
opts = append(opts, asynq.Timeout(t.TTR))
}
// 只有设置了 Group 才添加分组选项
if t.Group != "" {
opts = append(opts, asynq.Group(t.Group))
}
// 应用用户自定义选项
for _, option := range options {
if opt := option(); opt != nil {
opts = append(opts, opt)
}
}
// 发布任务到队列
info, err := c.EnqueueContext(
ctx,
asynq.NewTask(t.Name, payload),
opts...,
)
// 记录任务发布信息(用于调试)
log.Println(info)
return err
}
// ProcessTask 处理任务的核心方法
// 由 asynq 服务器调用,根据任务配置动态调用处理器函数
func (t *Task) ProcessTask(ctx context.Context, tsk *asynq.Task) error {
var in []reflect.Value
// 根据配置添加 context.Context 参数
if t.inputContext {
in = append(in, reflect.ValueOf(ctx))
}
// 根据配置添加数据参数
if t.inputData {
// 创建数据类型的指针实例用于反序列化
dataPtr := reflect.New(t.dataType)
// 反序列化任务载荷
err := json.Unmarshal(tsk.Payload(), dataPtr.Interface())
if err != nil {
return err
}
// 传递值类型而非指针,因为 Handler 期望的是值类型
in = append(in, dataPtr.Elem())
}
// 通过反射调用处理器函数
out := t.funcValue.Call(in)
// 处理返回值
if t.returnError {
// 当返回值为 nil 时Interface() 返回 nil不能直接类型断言
if out[0].IsNil() {
return nil
}
return out[0].Interface().(error)
}
return nil
}
// PublishOption 任务发布选项函数类型
// 用于配置任务发布时的各种选项
type PublishOption func() asynq.Option
// Delay 设置任务延迟执行时间
// 参数 d 表示延迟多长时间后执行
func Delay(d time.Duration) PublishOption {
return func() asynq.Option {
return asynq.ProcessIn(d)
}
}
// DelayUntil 设置任务在指定时间执行
// 参数 t 表示任务执行的具体时间点
func DelayUntil(t time.Time) PublishOption {
return func() asynq.Option {
return asynq.ProcessAt(t)
}
}
// TTR 设置任务超时时间
// 覆盖任务默认的超时时间配置
func TTR(d time.Duration) PublishOption {
return func() asynq.Option {
return asynq.Timeout(d)
}
}
// Retention 设置任务结果保留时间
// 任务执行完成后,结果在 Redis 中保留的时间
func Retention(d time.Duration) PublishOption {
return func() asynq.Option {
return asynq.Retention(d)
}
}