// Package inspector 提供 taskq 的统计采集功能 package inspector import ( "context" "database/sql" "fmt" "os" "path/filepath" "strings" "sync" "time" "code.tczkiot.com/wlw/taskq" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" ) // Options 配置统计采集器的选项 type Options struct { // Interval 采集间隔,默认 2 秒 Interval time.Duration // DBPath SQLite 数据库文件路径,默认为 "./taskq_stats.db" DBPath string } // Inspector 统计采集器,独立于 HTTP 服务运行 // 实现 taskq.Plugin 接口 type Inspector struct { opts Options rdb redis.UniversalClient queues func() map[string]int inspector *asynq.Inspector db *sql.DB closeCh chan struct{} closeOnce sync.Once } // New 创建新的统计采集器 // 创建后需要通过 Servlet.Use() 注册 func New(opts Options) *Inspector { if opts.Interval <= 0 { opts.Interval = 2 * time.Second } if opts.DBPath == "" { opts.DBPath = "./taskq_stats.db" } return &Inspector{ opts: opts, closeCh: make(chan struct{}), } } // Name 返回插件名称 func (ins *Inspector) Name() string { return "inspector" } // Init 初始化插件,从 Context 获取 Redis 和 Queues func (ins *Inspector) Init(ctx *taskq.Context) error { ins.rdb = ctx.Redis() ins.queues = ctx.Queues return nil } // Start 启动采集器,初始化数据库并开始后台采集 func (ins *Inspector) Start(ctx *taskq.Context) error { // 确保目录存在 dir := filepath.Dir(ins.opts.DBPath) if dir != "" && dir != "." { if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("inspector: failed to create directory: %v", err) } } // 打开 SQLite 数据库 db, err := sql.Open("sqlite3", ins.opts.DBPath) if err != nil { return fmt.Errorf("inspector: failed to open database: %v", err) } // 初始化数据库表 if err := initStatsDB(db); err != nil { db.Close() return fmt.Errorf("inspector: failed to init database: %v", err) } ins.db = db ins.inspector = asynq.NewInspectorFromRedisClient(ins.rdb) // 启动后台统计采集 go ins.startCollector(ctx) return nil } // Stop 停止采集器,关闭数据库连接 func (ins *Inspector) Stop() error { ins.closeOnce.Do(func() { close(ins.closeCh) }) if ins.db != nil { ins.db.Close() } if ins.inspector != nil { ins.inspector.Close() } return nil } // initStatsDB 初始化数据库 func initStatsDB(db *sql.DB) error { _, err := db.Exec(` CREATE TABLE IF NOT EXISTS metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp INTEGER NOT NULL, queue TEXT NOT NULL, active INTEGER DEFAULT 0, pending INTEGER DEFAULT 0, scheduled INTEGER DEFAULT 0, retry INTEGER DEFAULT 0, archived INTEGER DEFAULT 0, completed INTEGER DEFAULT 0, succeeded INTEGER DEFAULT 0, failed INTEGER DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_metrics_queue_time ON metrics(queue, timestamp DESC); CREATE INDEX IF NOT EXISTS idx_metrics_time ON metrics(timestamp DESC); CREATE UNIQUE INDEX IF NOT EXISTS idx_metrics_unique ON metrics(timestamp, queue); `) return err } // startCollector 启动后台统计采集任务 func (ins *Inspector) startCollector(ctx context.Context) { ticker := time.NewTicker(ins.opts.Interval) defer ticker.Stop() for { select { case <-ins.closeCh: return case <-ctx.Done(): return case <-ticker.C: ins.collectStats() } } } // collectStats 采集所有队列的统计数据 func (ins *Inspector) collectStats() { if ins.queues == nil || ins.inspector == nil { return } now := time.Now().Unix() queueList := ins.queues() for queueName := range queueList { info, err := ins.inspector.GetQueueInfo(queueName) if err != nil { continue } stats := Stats{ Queue: queueName, Timestamp: now, Active: info.Active, Pending: info.Pending, Scheduled: info.Scheduled, Retry: info.Retry, Archived: info.Archived, Completed: info.Completed, Succeeded: info.Processed - info.Failed, Failed: info.Failed, } ins.saveMetrics(stats) } } // saveMetrics 保存统计数据到 metrics 表 func (ins *Inspector) saveMetrics(stats Stats) error { if ins.db == nil { return nil } _, err := ins.db.Exec(` INSERT OR REPLACE INTO metrics (timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, stats.Timestamp, stats.Queue, stats.Active, stats.Pending, stats.Scheduled, stats.Retry, stats.Archived, stats.Completed, stats.Succeeded, stats.Failed) return err } // ==================== 数据类型 ==================== // StatsQuery 统计查询参数 type StatsQuery struct { Queue string // 队列名称,为空则查询所有队列汇总 Start int64 // 开始时间戳(秒),0 表示不限制 End int64 // 结束时间戳(秒),0 表示不限制 Limit int // 返回数量限制,默认 500 } // Stats 队列统计数据点 type Stats struct { Timestamp int64 `json:"t"` // Unix 时间戳(秒) Queue string `json:"q,omitempty"` // 队列名称 Active int `json:"a"` // 活跃任务数 Pending int `json:"p"` // 等待任务数 Scheduled int `json:"s"` // 计划任务数 Retry int `json:"r"` // 重试任务数 Archived int `json:"ar"` // 归档任务数 Completed int `json:"c"` // 已完成任务数 Succeeded int `json:"su"` // 成功数 Failed int `json:"f"` // 失败数 } // QueueInfo 队列信息 type QueueInfo struct { Name string `json:"name"` Priority int `json:"priority"` Size int `json:"size"` Active int `json:"active"` Pending int `json:"pending"` Scheduled int `json:"scheduled"` Retry int `json:"retry"` Archived int `json:"archived"` Completed int `json:"completed"` Processed int `json:"processed"` Failed int `json:"failed"` Paused bool `json:"paused"` MemoryUsage int64 `json:"memory_usage"` Latency time.Duration `json:"-"` LatencyMS int64 `json:"latency"` } // TaskInfo 任务信息 type TaskInfo struct { ID string `json:"id"` Type string `json:"type"` Payload []byte `json:"payload"` Queue string `json:"queue"` Retried int `json:"retried"` LastFailedAt time.Time `json:"last_failed_at,omitempty"` LastErr string `json:"last_error,omitempty"` NextProcessAt time.Time `json:"next_process_at,omitempty"` CompletedAt time.Time `json:"completed_at,omitempty"` } // ==================== 查询方法 ==================== // QueryStats 查询统计数据 func (ins *Inspector) QueryStats(q StatsQuery) ([]Stats, error) { if ins.db == nil { return nil, nil } limit := q.Limit if limit <= 0 { limit = 500 } var args []any var whereClause string var conditions []string if q.Queue != "" { conditions = append(conditions, "queue = ?") args = append(args, q.Queue) } if q.Start > 0 { conditions = append(conditions, "timestamp >= ?") args = append(args, q.Start) } if q.End > 0 { conditions = append(conditions, "timestamp <= ?") args = append(args, q.End) } if len(conditions) > 0 { whereClause = "WHERE " + strings.Join(conditions, " AND ") } var query string if q.Queue != "" { query = fmt.Sprintf(` SELECT timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed FROM metrics %s ORDER BY timestamp DESC LIMIT ? `, whereClause) } else { query = fmt.Sprintf(` SELECT timestamp, '' as queue, SUM(active), SUM(pending), SUM(scheduled), SUM(retry), SUM(archived), SUM(completed), SUM(succeeded), SUM(failed) FROM metrics %s GROUP BY timestamp ORDER BY timestamp DESC LIMIT ? `, whereClause) } args = append(args, limit) rows, err := ins.db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() var statsList []Stats for rows.Next() { var s Stats if err := rows.Scan(&s.Timestamp, &s.Queue, &s.Active, &s.Pending, &s.Scheduled, &s.Retry, &s.Archived, &s.Completed, &s.Succeeded, &s.Failed); err != nil { continue } statsList = append(statsList, s) } // 反转顺序,使时间从早到晚 for i, j := 0, len(statsList)-1; i < j; i, j = i+1, j-1 { statsList[i], statsList[j] = statsList[j], statsList[i] } return statsList, nil } // GetQueueInfo 获取队列信息 func (ins *Inspector) GetQueueInfo(queueName string) (*QueueInfo, error) { if ins.inspector == nil { return nil, fmt.Errorf("inspector: not started") } info, err := ins.inspector.GetQueueInfo(queueName) if err != nil { return nil, err } return &QueueInfo{ Name: info.Queue, Size: info.Size, Active: info.Active, Pending: info.Pending, Scheduled: info.Scheduled, Retry: info.Retry, Archived: info.Archived, Completed: info.Completed, Processed: info.Processed, Failed: info.Failed, Paused: info.Paused, MemoryUsage: info.MemoryUsage, Latency: info.Latency, LatencyMS: info.Latency.Milliseconds(), }, nil } // convertTaskInfo 将 asynq.TaskInfo 转换为 TaskInfo func convertTaskInfo(task *asynq.TaskInfo) *TaskInfo { return &TaskInfo{ ID: task.ID, Type: task.Type, Payload: task.Payload, Queue: task.Queue, Retried: task.Retried, LastFailedAt: task.LastFailedAt, LastErr: task.LastErr, NextProcessAt: task.NextProcessAt, CompletedAt: task.CompletedAt, } } // convertTaskList 批量转换任务列表 func convertTaskList(tasks []*asynq.TaskInfo) []*TaskInfo { result := make([]*TaskInfo, len(tasks)) for i, t := range tasks { result[i] = convertTaskInfo(t) } return result } // ListActiveTasks 获取活跃任务列表 func (ins *Inspector) ListActiveTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { if ins.inspector == nil { return nil, fmt.Errorf("inspector: not started") } tasks, err := ins.inspector.ListActiveTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) if err != nil { return nil, err } return convertTaskList(tasks), nil } // ListPendingTasks 获取等待任务列表 func (ins *Inspector) ListPendingTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { if ins.inspector == nil { return nil, fmt.Errorf("inspector: not started") } tasks, err := ins.inspector.ListPendingTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) if err != nil { return nil, err } return convertTaskList(tasks), nil } // ListScheduledTasks 获取计划任务列表 func (ins *Inspector) ListScheduledTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { if ins.inspector == nil { return nil, fmt.Errorf("inspector: not started") } tasks, err := ins.inspector.ListScheduledTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) if err != nil { return nil, err } return convertTaskList(tasks), nil } // ListRetryTasks 获取重试任务列表 func (ins *Inspector) ListRetryTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { if ins.inspector == nil { return nil, fmt.Errorf("inspector: not started") } tasks, err := ins.inspector.ListRetryTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) if err != nil { return nil, err } return convertTaskList(tasks), nil } // ListArchivedTasks 获取归档任务列表 func (ins *Inspector) ListArchivedTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { if ins.inspector == nil { return nil, fmt.Errorf("inspector: not started") } tasks, err := ins.inspector.ListArchivedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) if err != nil { return nil, err } return convertTaskList(tasks), nil } // ListCompletedTasks 获取已完成任务列表 func (ins *Inspector) ListCompletedTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { if ins.inspector == nil { return nil, fmt.Errorf("inspector: not started") } tasks, err := ins.inspector.ListCompletedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) if err != nil { return nil, err } return convertTaskList(tasks), nil } // RunTask 立即运行归档任务(重试失败任务) func (ins *Inspector) RunTask(queueName, taskID string) error { if ins.inspector == nil { return fmt.Errorf("inspector: not started") } return ins.inspector.RunTask(queueName, taskID) } // PauseQueue 暂停队列 func (ins *Inspector) PauseQueue(queueName string) error { if ins.inspector == nil { return fmt.Errorf("inspector: not started") } return ins.inspector.PauseQueue(queueName) } // UnpauseQueue 恢复队列 func (ins *Inspector) UnpauseQueue(queueName string) error { if ins.inspector == nil { return fmt.Errorf("inspector: not started") } return ins.inspector.UnpauseQueue(queueName) }