// Package metrics 提供 taskq 的 Prometheus 指标采集功能 package metrics import ( "context" "sync" "time" "code.tczkiot.com/wlw/taskq" "github.com/hibiken/asynq" "github.com/prometheus/client_golang/prometheus" "github.com/redis/go-redis/v9" ) // Options 配置指标采集器的选项 type Options struct { // Namespace 指标命名空间,默认为 "taskq" Namespace string // Interval 采集间隔,默认 15 秒 Interval time.Duration // Registry Prometheus 注册器,默认使用 prometheus.DefaultRegisterer Registry prometheus.Registerer } // Metrics Prometheus 指标采集器 // 实现 taskq.Plugin 接口 type Metrics struct { opts Options rdb redis.UniversalClient queues func() map[string]int inspector *asynq.Inspector closeCh chan struct{} closeOnce sync.Once // Prometheus 指标 queueSize *prometheus.GaugeVec activeTasks *prometheus.GaugeVec pendingTasks *prometheus.GaugeVec scheduledTasks *prometheus.GaugeVec retryTasks *prometheus.GaugeVec archivedTasks *prometheus.GaugeVec completedTasks *prometheus.GaugeVec processedTotal *prometheus.GaugeVec failedTotal *prometheus.GaugeVec latencySeconds *prometheus.GaugeVec memoryBytes *prometheus.GaugeVec paused *prometheus.GaugeVec } // New 创建新的指标采集器 func New(opts Options) *Metrics { if opts.Namespace == "" { opts.Namespace = "taskq" } if opts.Interval <= 0 { opts.Interval = 15 * time.Second } if opts.Registry == nil { opts.Registry = prometheus.DefaultRegisterer } m := &Metrics{ opts: opts, closeCh: make(chan struct{}), } // 初始化指标 m.initMetrics() return m } // initMetrics 初始化 Prometheus 指标 func (m *Metrics) initMetrics() { labels := []string{"queue"} m.queueSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "queue_size", Help: "Total number of tasks in the queue", }, labels) m.activeTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "tasks_active", Help: "Number of currently active tasks", }, labels) m.pendingTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "tasks_pending", Help: "Number of pending tasks", }, labels) m.scheduledTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "tasks_scheduled", Help: "Number of scheduled tasks", }, labels) m.retryTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "tasks_retry", Help: "Number of tasks in retry queue", }, labels) m.archivedTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "tasks_archived", Help: "Number of archived (dead) tasks", }, labels) m.completedTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "tasks_completed", Help: "Number of completed tasks (retained)", }, labels) m.processedTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "tasks_processed_total", Help: "Total number of processed tasks (today)", }, labels) m.failedTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "tasks_failed_total", Help: "Total number of failed tasks (today)", }, labels) m.latencySeconds = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "queue_latency_seconds", Help: "Queue latency in seconds", }, labels) m.memoryBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "queue_memory_bytes", Help: "Memory usage of the queue in bytes", }, labels) m.paused = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.opts.Namespace, Name: "queue_paused", Help: "Whether the queue is paused (1) or not (0)", }, labels) } // Name 返回插件名称 func (m *Metrics) Name() string { return "metrics" } // Init 初始化插件,从 Context 获取 Redis 和 Queues func (m *Metrics) Init(ctx *taskq.Context) error { m.rdb = ctx.Redis() m.queues = ctx.Queues return nil } // Start 启动指标采集 func (m *Metrics) Start(ctx *taskq.Context) error { m.inspector = asynq.NewInspectorFromRedisClient(m.rdb) // 注册指标 collectors := []prometheus.Collector{ m.queueSize, m.activeTasks, m.pendingTasks, m.scheduledTasks, m.retryTasks, m.archivedTasks, m.completedTasks, m.processedTotal, m.failedTotal, m.latencySeconds, m.memoryBytes, m.paused, } for _, c := range collectors { if err := m.opts.Registry.Register(c); err != nil { // 如果已注册则忽略 if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { return err } } } // 启动后台采集 go m.startCollector(ctx) return nil } // Stop 停止指标采集 func (m *Metrics) Stop() error { m.closeOnce.Do(func() { close(m.closeCh) }) if m.inspector != nil { m.inspector.Close() } return nil } // startCollector 启动后台指标采集 func (m *Metrics) startCollector(ctx context.Context) { // 立即采集一次 m.collect() ticker := time.NewTicker(m.opts.Interval) defer ticker.Stop() for { select { case <-m.closeCh: return case <-ctx.Done(): return case <-ticker.C: m.collect() } } } // collect 采集所有队列的指标 func (m *Metrics) collect() { if m.queues == nil || m.inspector == nil { return } queues := m.queues() for queueName := range queues { info, err := m.inspector.GetQueueInfo(queueName) if err != nil { continue } m.queueSize.WithLabelValues(queueName).Set(float64(info.Size)) m.activeTasks.WithLabelValues(queueName).Set(float64(info.Active)) m.pendingTasks.WithLabelValues(queueName).Set(float64(info.Pending)) m.scheduledTasks.WithLabelValues(queueName).Set(float64(info.Scheduled)) m.retryTasks.WithLabelValues(queueName).Set(float64(info.Retry)) m.archivedTasks.WithLabelValues(queueName).Set(float64(info.Archived)) m.completedTasks.WithLabelValues(queueName).Set(float64(info.Completed)) m.processedTotal.WithLabelValues(queueName).Set(float64(info.Processed)) m.failedTotal.WithLabelValues(queueName).Set(float64(info.Failed)) m.latencySeconds.WithLabelValues(queueName).Set(info.Latency.Seconds()) m.memoryBytes.WithLabelValues(queueName).Set(float64(info.MemoryUsage)) if info.Paused { m.paused.WithLabelValues(queueName).Set(1) } else { m.paused.WithLabelValues(queueName).Set(0) } } }