// Package taskq 提供基于 Redis 的异步任务队列功能 // 使用 asynq 库作为底层实现,支持任务注册、发布、消费和重试机制 package taskq import ( "context" "errors" "log" "maps" "reflect" "sync/atomic" "time" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" ) // 全局状态变量 var ( started atomic.Bool // 服务器启动状态 exit chan chan struct{} // 优雅退出信号通道 done chan struct{} // 关闭完成信号通道 handlers map[string]asynq.Handler // 任务处理器映射表 queues map[string]int // 队列优先级配置 client atomic.Pointer[asynq.Client] // asynq 客户端实例 redisClient redis.UniversalClient // Redis 客户端实例 inspector *Inspector // 统计采集器实例 errorType = reflect.TypeOf((*error)(nil)).Elem() // error 类型反射 contextType = reflect.TypeOf((*context.Context)(nil)).Elem() // context.Context 类型反射 ) // Init 初始化 taskq 系统 // 创建必要的全局变量和映射表,必须在调用其他函数之前调用 func Init() { exit = make(chan chan struct{}) // 创建优雅退出通道 done = make(chan struct{}) // 创建关闭完成通道 handlers = make(map[string]asynq.Handler) // 创建任务处理器映射 queues = make(map[string]int) // 创建队列优先级映射 } // Register 注册任务处理器 // 使用泛型确保类型安全,通过反射验证处理器函数签名 // 处理器函数签名必须是:func(context.Context, T) error 或 func(context.Context) 或 func(T) error 或 func() func Register[T any](t *Task[T]) error { if t.Queue == "" { return errors.New("taskq: queue name cannot be empty") } if t.Priority < 0 || t.Priority > 255 { return errors.New("taskq: priority must be between 0 and 255") } if t.MaxRetries < 0 { return errors.New("taskq: retry count must be non-negative") } if t.Handler == nil { return errors.New("taskq: handler cannot be nil") } rv := reflect.ValueOf(t.Handler) if rv.Kind() != reflect.Func { return errors.New("taskq: handler must be a function") } rt := rv.Type() // 验证返回值:只能是 error 或无返回值 var returnError bool for i := range rt.NumOut() { if i == 0 && rt.Out(0).Implements(errorType) { returnError = true } else { return errors.New("taskq: handler function must return either error or nothing") } } // 验证参数:支持以下签名 // - func(context.Context, T) error // - func(context.Context) error // - func(T) error // - func() var inContext bool var inData bool var dataType reflect.Type numIn := rt.NumIn() if numIn > 2 { return errors.New("taskq: handler function can have at most 2 parameters") } for i := range numIn { fi := rt.In(i) if fi.Implements(contextType) { if i != 0 { return errors.New("taskq: context.Context must be the first parameter") } inContext = true } else if fi.Kind() == reflect.Struct { if inData { return errors.New("taskq: handler function can only have one data parameter") } inData = true dataType = fi } else { return errors.New("taskq: handler parameter must be context.Context or a struct") } } // 检查服务器是否已启动 if started.Load() { return errors.New("taskq: cannot register handler after server has started") } // 设置任务的反射信息 t.funcValue = rv t.dataType = dataType t.inputContext = inContext t.inputData = inData t.returnError = returnError // 注册到全局映射表 handlers[t.Name] = t queues[t.Queue] = t.Priority return nil } // SetRedis 设置 Redis 客户端 // 必须在启动服务器之前调用,用于配置任务队列的存储后端 func SetRedis(rdb redis.UniversalClient) error { if started.Load() { return errors.New("taskq: server is already running") } redisClient = rdb client.Store(asynq.NewClientFromRedisClient(rdb)) return nil } // StartOptions 启动选项 type StartOptions struct { // StatsInterval 统计采集间隔,默认 2 秒 StatsInterval time.Duration // StatsDBPath SQLite 数据库文件路径,默认 "./taskq_stats.db" StatsDBPath string } // Start 启动 taskq 服务器 // 开始监听任务队列并处理任务,包含健康检查和优雅退出机制 func Start(ctx context.Context, opts ...StartOptions) error { if !started.CompareAndSwap(false, true) { return errors.New("taskq: server is already running") } if redisClient == nil { started.Store(false) return errors.New("taskq: redis client not initialized, call SetRedis() first") } var opt StartOptions if len(opts) > 0 { opt = opts[0] } if err := startInspector(opt); err != nil { started.Store(false) return err } srv := createServer(ctx) go runServer(srv) go runMonitor(ctx, srv) return nil } // startInspector 启动统计采集器 func startInspector(opt StartOptions) error { ins, err := NewInspector(InspectorOptions{ Interval: opt.StatsInterval, DBPath: opt.StatsDBPath, }) if err != nil { return err } inspector = ins SetStatsDB(ins.GetStatsDB()) return nil } // createServer 创建 asynq 服务器 func createServer(ctx context.Context) *asynq.Server { return asynq.NewServerFromRedisClient(redisClient, asynq.Config{ Concurrency: 30, Queues: maps.Clone(queues), BaseContext: func() context.Context { return ctx }, LogLevel: asynq.WarnLevel, }) } // runServer 运行任务处理服务器 func runServer(srv *asynq.Server) { mux := asynq.NewServeMux() for name, handler := range handlers { mux.Handle(name, handler) } if err := srv.Run(mux); err != nil { log.Fatal(err) } } // runMonitor 运行监控协程,处理优雅退出和健康检查 func runMonitor(ctx context.Context, srv *asynq.Server) { defer close(done) defer started.Store(false) defer closeInspector() defer srv.Shutdown() ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { select { case quit := <-exit: quit <- struct{}{} return case <-ctx.Done(): // ctx 取消时,排空 exit 通道中可能的信号 select { case quit := <-exit: quit <- struct{}{} default: } return case <-ticker.C: if err := srv.Ping(); err != nil { log.Println(err) return } } } } // closeInspector 关闭统计采集器 func closeInspector() { if inspector != nil { inspector.Close() inspector = nil } } // Stop 优雅停止 taskq 服务器 // 发送停止信号并等待服务器完全关闭 func Stop() { if !started.Load() { return } quit := make(chan struct{}) select { case exit <- quit: <-quit // 等待确认收到退出信号 default: // monitor 已经退出 } <-done // 等待 runMonitor 完全结束 }