// Package monitor 提供 taskq 的 HTTP 监控服务 package monitor import ( "embed" "encoding/json" "fmt" "io/fs" "net/http" "sort" "strconv" "strings" "sync" "time" "code.tczkiot.com/wlw/taskq" "code.tczkiot.com/wlw/taskq/x/inspector" ) //go:embed ui/* var uiFS embed.FS // Options 配置监控服务的选项 type Options struct { // Inspector 检查器实例(必需) Inspector *inspector.Inspector // Queues 队列优先级映射(必需) Queues map[string]int // RootPath 监控服务的根路径,默认为 "/monitor" RootPath string // ReadOnly 是否只读模式,禁用所有修改操作,默认为 false ReadOnly bool } // Monitor 监控服务的 HTTP 处理器 type Monitor struct { router *http.ServeMux rootPath string readOnly bool closeCh chan struct{} closeOnce sync.Once inspector *inspector.Inspector queues map[string]int } // New 创建新的监控服务 func New(opts Options) (*Monitor, error) { if opts.Inspector == nil { return nil, fmt.Errorf("monitor: inspector is required") } if opts.Queues == nil { return nil, fmt.Errorf("monitor: queues is required") } // 设置默认值 if opts.RootPath == "" { opts.RootPath = "/monitor" } // 确保路径以 / 开头且不以 / 结尾 if !strings.HasPrefix(opts.RootPath, "/") { opts.RootPath = "/" + opts.RootPath } opts.RootPath = strings.TrimSuffix(opts.RootPath, "/") m := &Monitor{ router: http.NewServeMux(), rootPath: opts.RootPath, readOnly: opts.ReadOnly, closeCh: make(chan struct{}), inspector: opts.Inspector, queues: opts.Queues, } m.setupRoutes() return m, nil } // ServeHTTP 实现 http.Handler 接口 func (m *Monitor) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.router.ServeHTTP(w, r) } // RootPath 返回监控服务的根路径 func (m *Monitor) RootPath() string { return m.rootPath } // Close 关闭监控服务 func (m *Monitor) Close() error { m.closeOnce.Do(func() { close(m.closeCh) }) return nil } // Name 返回插件名称 func (m *Monitor) Name() string { return "monitor" } // Init 初始化插件(实现 Plugin 接口) func (m *Monitor) Init(ctx *taskq.Context) error { return nil } // Start 启动插件(实现 Plugin 接口) func (m *Monitor) Start(ctx *taskq.Context) error { return nil } // Stop 停止插件(实现 Plugin 接口) func (m *Monitor) Stop() error { return m.Close() } // setupRoutes 设置路由 func (m *Monitor) setupRoutes() { // API 路由 apiPath := m.rootPath + "/api/" m.router.HandleFunc(apiPath+"queues", m.handleQueues) m.router.HandleFunc(apiPath+"queues/", m.handleQueueDetail) m.router.HandleFunc(apiPath+"tasks/", m.handleTasks) m.router.HandleFunc(apiPath+"stats/", m.handleStats) m.router.HandleFunc(apiPath+"sse", m.handleSSE) // 静态文件路由 uiSubFS, _ := fs.Sub(uiFS, "ui") fileServer := http.FileServer(http.FS(uiSubFS)) m.router.Handle(m.rootPath+"/static/", http.StripPrefix(m.rootPath+"/static/", fileServer)) // 主页路由(包含 History API 的路由) m.router.HandleFunc(m.rootPath+"/queues/", m.handleIndex) m.router.HandleFunc(m.rootPath+"/", m.handleIndex) m.router.HandleFunc(m.rootPath, m.handleIndex) } // handleStats 处理队列统计数据请求 func (m *Monitor) handleStats(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 从 URL 中提取队列名称(可选,为空则查询所有队列汇总) path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/stats/") queueName := strings.TrimSuffix(path, "/") // 构建查询参数 query := inspector.StatsQuery{ Queue: queueName, Limit: 500, } // 解析 limit 参数 if l := r.URL.Query().Get("limit"); l != "" { if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 10000 { query.Limit = parsed } } // 解析 start 参数(Unix 时间戳) if s := r.URL.Query().Get("start"); s != "" { if parsed, err := strconv.ParseInt(s, 10, 64); err == nil && parsed > 0 { query.Start = parsed } } // 解析 end 参数(Unix 时间戳) if e := r.URL.Query().Get("end"); e != "" { if parsed, err := strconv.ParseInt(e, 10, 64); err == nil && parsed > 0 { query.End = parsed } } stats, err := m.inspector.QueryStats(query) if err != nil { http.Error(w, fmt.Sprintf("Failed to get stats: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(stats) } // queueInfoJSON 用于 JSON 输出的队列信息 type queueInfoJSON struct { Name string `json:"name"` Priority int `json:"priority"` Size int `json:"size"` Active int `json:"active"` Pending int `json:"pending"` Scheduled int `json:"scheduled"` Retry int `json:"retry"` Archived int `json:"archived"` Completed int `json:"completed"` Processed int `json:"processed"` Failed int `json:"failed"` Paused bool `json:"paused"` MemoryUsage int64 `json:"memory_usage"` Latency int64 `json:"latency"` } // handleQueues 处理队列列表请求 func (m *Monitor) handleQueues(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var queueInfos []queueInfoJSON // 首先显示所有注册的队列(即使Redis中还没有任务) for queueName, priority := range m.queues { info, err := m.inspector.GetQueueInfo(queueName) if err != nil { // 如果队列不存在,创建一个空的状态 queueInfos = append(queueInfos, queueInfoJSON{ Name: queueName, Priority: priority, }) } else { queueInfos = append(queueInfos, queueInfoJSON{ Name: queueName, Priority: priority, Size: info.Size, Active: info.Active, Pending: info.Pending, Scheduled: info.Scheduled, Retry: info.Retry, Archived: info.Archived, Completed: info.Completed, Processed: info.Processed, Failed: info.Failed, Paused: info.Paused, MemoryUsage: info.MemoryUsage, Latency: info.LatencyMS, }) } } // 按优先级排序 sort.Slice(queueInfos, func(i, j int) bool { return queueInfos[i].Priority > queueInfos[j].Priority }) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(queueInfos) } // handleQueueDetail 处理队列详情请求和队列操作 func (m *Monitor) handleQueueDetail(w http.ResponseWriter, r *http.Request) { // 从 URL 中提取队列名称 path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/queues/") parts := strings.Split(path, "/") if len(parts) == 0 || parts[0] == "" { http.Error(w, "Queue name is required", http.StatusBadRequest) return } queueName := parts[0] // 检查队列是否已注册 if _, exists := m.queues[queueName]; !exists { http.Error(w, "Queue not found", http.StatusNotFound) return } // 处理暂停/恢复请求 if r.Method == http.MethodPost && len(parts) >= 2 { if m.readOnly { http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden) return } action := parts[1] switch action { case "pause": if err := m.inspector.PauseQueue(queueName); err != nil { http.Error(w, fmt.Sprintf("Failed to pause queue: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]string{"status": "paused"}) return case "unpause": if err := m.inspector.UnpauseQueue(queueName); err != nil { http.Error(w, fmt.Sprintf("Failed to unpause queue: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]string{"status": "unpaused"}) return default: http.Error(w, "Invalid action", http.StatusBadRequest) return } } if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 获取队列详细信息 info, err := m.inspector.GetQueueInfo(queueName) if err != nil { // 如果队列在 Redis 中不存在,返回空状态 if strings.Contains(err.Error(), "queue not found") { emptyStats := map[string]any{ "queue": queueName, "active": 0, "pending": 0, "retry": 0, "archived": 0, "completed": 0, "processed": 0, "failed": 0, "paused": false, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(emptyStats) return } http.Error(w, fmt.Sprintf("Failed to get queue info: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(info) } // taskInfoJSON 转换任务信息用于 JSON 输出 type taskInfoJSON struct { ID string `json:"id"` Type string `json:"type"` Payload string `json:"payload"` Queue string `json:"queue"` Retried int `json:"retried"` LastFailed string `json:"last_failed,omitempty"` LastError string `json:"last_error,omitempty"` NextProcess string `json:"next_process,omitempty"` CompletedAt string `json:"completed_at,omitempty"` } // handleTasks 处理任务列表请求和任务操作 func (m *Monitor) handleTasks(w http.ResponseWriter, r *http.Request) { // 从 URL 中提取队列名称和任务状态 path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/tasks/") parts := strings.Split(path, "/") if len(parts) < 2 { http.Error(w, "Queue name and task state are required", http.StatusBadRequest) return } queueName := parts[0] taskState := parts[1] // 处理重试请求: POST /api/tasks/{queue}/archived/{taskId}/retry if r.Method == http.MethodPost && len(parts) >= 4 && parts[1] == "archived" && parts[3] == "retry" { if m.readOnly { http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden) return } taskID := parts[2] m.handleRetryTask(w, r, queueName, taskID) return } if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 检查队列是否已注册 if _, exists := m.queues[queueName]; !exists { http.Error(w, "Queue not found", http.StatusNotFound) return } // 解析分页参数 page := 1 pageSize := 20 if p := r.URL.Query().Get("page"); p != "" { if parsed, err := strconv.Atoi(p); err == nil && parsed > 0 { page = parsed } } if ps := r.URL.Query().Get("page_size"); ps != "" { if parsed, err := strconv.Atoi(ps); err == nil && parsed > 0 && parsed <= 100 { pageSize = parsed } } // 获取队列信息以获取任务总数 var total int queueInfo, queueErr := m.inspector.GetQueueInfo(queueName) if queueErr == nil { switch taskState { case "active": total = queueInfo.Active case "pending": total = queueInfo.Pending case "scheduled": total = queueInfo.Scheduled case "retry": total = queueInfo.Retry case "archived": total = queueInfo.Archived case "completed": total = queueInfo.Completed } } // 根据任务状态获取任务列表 var tasks []*inspector.TaskInfo var err error switch taskState { case "active": tasks, err = m.inspector.ListActiveTasks(queueName, pageSize, page-1) case "pending": tasks, err = m.inspector.ListPendingTasks(queueName, pageSize, page-1) case "scheduled": tasks, err = m.inspector.ListScheduledTasks(queueName, pageSize, page-1) case "retry": tasks, err = m.inspector.ListRetryTasks(queueName, pageSize, page-1) case "archived": tasks, err = m.inspector.ListArchivedTasks(queueName, pageSize, page-1) case "completed": tasks, err = m.inspector.ListCompletedTasks(queueName, pageSize, page-1) default: http.Error(w, "Invalid task state. Valid states: active, pending, scheduled, retry, archived, completed", http.StatusBadRequest) return } // 如果队列在 Redis 中不存在(没有任务),返回空列表而不是错误 if err != nil { if strings.Contains(err.Error(), "queue not found") { tasks = []*inspector.TaskInfo{} total = 0 } else { http.Error(w, fmt.Sprintf("Failed to get tasks: %v", err), http.StatusInternalServerError) return } } var taskInfos []taskInfoJSON for _, task := range tasks { info := taskInfoJSON{ ID: task.ID, Type: task.Type, Payload: string(task.Payload), Queue: task.Queue, Retried: task.Retried, } if !task.LastFailedAt.IsZero() { info.LastFailed = task.LastFailedAt.Format(time.RFC3339) } if task.LastErr != "" { info.LastError = task.LastErr } if !task.NextProcessAt.IsZero() { info.NextProcess = task.NextProcessAt.Format(time.RFC3339) } if !task.CompletedAt.IsZero() { info.CompletedAt = task.CompletedAt.Format(time.RFC3339) } taskInfos = append(taskInfos, info) } response := map[string]any{ "tasks": taskInfos, "page": page, "page_size": pageSize, "total": total, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) } // handleRetryTask 重试失败任务 func (m *Monitor) handleRetryTask(w http.ResponseWriter, r *http.Request, queueName, taskID string) { // 检查队列是否已注册 if _, exists := m.queues[queueName]; !exists { http.Error(w, "Queue not found", http.StatusNotFound) return } // 运行重试 err := m.inspector.RunTask(queueName, taskID) if err != nil { http.Error(w, fmt.Sprintf("Failed to retry task: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) } // handleIndex 处理主页请求,返回 SPA 入口页面 func (m *Monitor) handleIndex(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 读取 index.html 并替换模板变量 indexHTML, err := uiFS.ReadFile("ui/index.html") if err != nil { http.Error(w, fmt.Sprintf("Failed to read index.html: %v", err), http.StatusInternalServerError) return } // 替换模板变量 content := strings.ReplaceAll(string(indexHTML), "{{.RootPath}}", m.rootPath) w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Write([]byte(content)) } // handleSSE 处理 Server-Sent Events 实时数据推送 func (m *Monitor) handleSSE(w http.ResponseWriter, r *http.Request) { // 设置 SSE 响应头 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") // 获取 flusher flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "SSE not supported", http.StatusInternalServerError) return } // 两个定时器:统计数据频率高,队列数据频率低 statsTicker := time.NewTicker(2 * time.Second) queuesTicker := time.NewTicker(5 * time.Second) defer statsTicker.Stop() defer queuesTicker.Stop() // 监听客户端断开连接 ctx := r.Context() // 立即发送一次数据 m.sendQueuesEvent(w, flusher) m.sendStatsEvent(w, flusher) for { select { case <-ctx.Done(): return case <-m.closeCh: return case <-statsTicker.C: m.sendStatsEvent(w, flusher) case <-queuesTicker.C: m.sendQueuesEvent(w, flusher) } } } // sendStatsEvent 发送统计图表数据 func (m *Monitor) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher) { stats, err := m.inspector.QueryStats(inspector.StatsQuery{Limit: 1}) if err != nil || len(stats) == 0 { return } data, err := json.Marshal(stats[0]) if err != nil { return } fmt.Fprintf(w, "event: stats\ndata: %s\n\n", data) flusher.Flush() } // sendQueuesEvent 发送队列表格数据 func (m *Monitor) sendQueuesEvent(w http.ResponseWriter, flusher http.Flusher) { var queueInfos []queueInfoJSON for queueName, priority := range m.queues { info, err := m.inspector.GetQueueInfo(queueName) if err != nil { queueInfos = append(queueInfos, queueInfoJSON{ Name: queueName, Priority: priority, }) } else { queueInfos = append(queueInfos, queueInfoJSON{ Name: queueName, Priority: priority, Size: info.Size, Active: info.Active, Pending: info.Pending, Scheduled: info.Scheduled, Retry: info.Retry, Archived: info.Archived, Completed: info.Completed, Processed: info.Processed, Failed: info.Failed, Paused: info.Paused, MemoryUsage: info.MemoryUsage, Latency: info.LatencyMS, }) } } // 按优先级排序 sort.Slice(queueInfos, func(i, j int) bool { return queueInfos[i].Priority > queueInfos[j].Priority }) data, err := json.Marshal(queueInfos) if err != nil { return } fmt.Fprintf(w, "event: queues\ndata: %s\n\n", data) flusher.Flush() }