package taskq import ( "context" "encoding/json" "errors" "fmt" "log" "reflect" "time" "github.com/hibiken/asynq" "github.com/rs/xid" ) // Task 定义任务结构 type Task struct { // 公开字段:用户配置 Queue string // 任务队列名称 Group string // 任务分组 Name string // 任务名称,唯一标识 MaxRetries int // 最大重试次数 Priority int // 任务优先级(数值越大优先级越高) TTR time.Duration // 任务超时时间(Time-To-Run) Handler any // 处理器函数 // 私有字段:运行时反射信息 funcValue reflect.Value // 处理器函数的反射值 dataType reflect.Type // 数据类型的反射信息 inputContext bool // 是否需要 context.Context 参数 inputData bool // 是否需要数据参数 returnError bool // 是否返回 error servlet *Servlet // 所属的 Servlet 实例 } // Publish 发布任务到队列 func (t *Task) Publish(ctx context.Context, data any, options ...PublishOption) error { var c *asynq.Client if t.servlet != nil { c = t.servlet.Client() } else if defaultServlet != nil { c = defaultServlet.Client() } if c == nil { return errors.New("taskq: client not initialized, call SetRedis() first") } // 序列化任务数据为 JSON payload, err := json.Marshal(data) if err != nil { return fmt.Errorf("taskq: failed to marshal task data: %w", err) } // 构建任务选项 opts := []asynq.Option{ asynq.Queue(t.Queue), // 设置队列名称 asynq.MaxRetry(t.MaxRetries), // 设置最大重试次数 asynq.TaskID(xid.New().String()), // 生成唯一任务ID asynq.Retention(time.Hour * 24), // 设置结果保留24小时 } // 只有设置了 TTR 才添加超时选项,避免 TTR=0 导致立即超时 if t.TTR > 0 { opts = append(opts, asynq.Timeout(t.TTR)) } // 只有设置了 Group 才添加分组选项 if t.Group != "" { opts = append(opts, asynq.Group(t.Group)) } // 应用用户自定义选项 for _, option := range options { if opt := option(); opt != nil { opts = append(opts, opt) } } // 发布任务到队列 info, err := c.EnqueueContext( ctx, asynq.NewTask(t.Name, payload), opts..., ) // 记录任务发布信息(用于调试) log.Println(info) return err } // ProcessTask 处理任务的核心方法 // 由 asynq 服务器调用,根据任务配置动态调用处理器函数 func (t *Task) ProcessTask(ctx context.Context, tsk *asynq.Task) error { var in []reflect.Value // 根据配置添加 context.Context 参数 if t.inputContext { in = append(in, reflect.ValueOf(ctx)) } // 根据配置添加数据参数 if t.inputData { // 创建数据类型的指针实例用于反序列化 dataPtr := reflect.New(t.dataType) // 反序列化任务载荷 err := json.Unmarshal(tsk.Payload(), dataPtr.Interface()) if err != nil { return err } // 传递值类型而非指针,因为 Handler 期望的是值类型 in = append(in, dataPtr.Elem()) } // 通过反射调用处理器函数 out := t.funcValue.Call(in) // 处理返回值 if t.returnError { // 当返回值为 nil 时,Interface() 返回 nil,不能直接类型断言 if out[0].IsNil() { return nil } return out[0].Interface().(error) } return nil } // PublishOption 任务发布选项函数类型 // 用于配置任务发布时的各种选项 type PublishOption func() asynq.Option // Delay 设置任务延迟执行时间 // 参数 d 表示延迟多长时间后执行 func Delay(d time.Duration) PublishOption { return func() asynq.Option { return asynq.ProcessIn(d) } } // DelayUntil 设置任务在指定时间执行 // 参数 t 表示任务执行的具体时间点 func DelayUntil(t time.Time) PublishOption { return func() asynq.Option { return asynq.ProcessAt(t) } } // TTR 设置任务超时时间 // 覆盖任务默认的超时时间配置 func TTR(d time.Duration) PublishOption { return func() asynq.Option { return asynq.Timeout(d) } } // Retention 设置任务结果保留时间 // 任务执行完成后,结果在 Redis 中保留的时间 func Retention(d time.Duration) PublishOption { return func() asynq.Option { return asynq.Retention(d) } }