Files
taskq/x/monitor/monitor.go
hupeh 279e6417df feat: monitor 实现 Plugin 接口,优化关闭流程
- monitor 包导入 taskq,实现 Plugin 接口
- monitor 作为插件注册到 taskq.Configure()
- 修复优雅关闭顺序:先关闭 SSE 连接,再关闭 HTTP 服务器,最后停止 taskq
- 移除 main.go 中手动调用 cancel() 导致的阻塞问题
2025-12-10 01:00:42 +08:00

627 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package monitor 提供 taskq 的 HTTP 监控服务
package monitor
import (
"embed"
"encoding/json"
"fmt"
"io/fs"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"code.tczkiot.com/wlw/taskq"
"code.tczkiot.com/wlw/taskq/x/inspector"
)
//go:embed ui/*
var uiFS embed.FS
// Options 配置监控服务的选项
type Options struct {
// Inspector 检查器实例(必需)
Inspector *inspector.Inspector
// Queues 队列优先级映射(必需)
Queues map[string]int
// RootPath 监控服务的根路径,默认为 "/monitor"
RootPath string
// ReadOnly 是否只读模式,禁用所有修改操作,默认为 false
ReadOnly bool
}
// Monitor 监控服务的 HTTP 处理器
type Monitor struct {
router *http.ServeMux
rootPath string
readOnly bool
closeCh chan struct{}
closeOnce sync.Once
inspector *inspector.Inspector
queues map[string]int
}
// New 创建新的监控服务
func New(opts Options) (*Monitor, error) {
if opts.Inspector == nil {
return nil, fmt.Errorf("monitor: inspector is required")
}
if opts.Queues == nil {
return nil, fmt.Errorf("monitor: queues is required")
}
// 设置默认值
if opts.RootPath == "" {
opts.RootPath = "/monitor"
}
// 确保路径以 / 开头且不以 / 结尾
if !strings.HasPrefix(opts.RootPath, "/") {
opts.RootPath = "/" + opts.RootPath
}
opts.RootPath = strings.TrimSuffix(opts.RootPath, "/")
m := &Monitor{
router: http.NewServeMux(),
rootPath: opts.RootPath,
readOnly: opts.ReadOnly,
closeCh: make(chan struct{}),
inspector: opts.Inspector,
queues: opts.Queues,
}
m.setupRoutes()
return m, nil
}
// ServeHTTP 实现 http.Handler 接口
func (m *Monitor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.router.ServeHTTP(w, r)
}
// RootPath 返回监控服务的根路径
func (m *Monitor) RootPath() string {
return m.rootPath
}
// Close 关闭监控服务
func (m *Monitor) Close() error {
m.closeOnce.Do(func() {
close(m.closeCh)
})
return nil
}
// Name 返回插件名称
func (m *Monitor) Name() string {
return "monitor"
}
// Init 初始化插件(实现 Plugin 接口)
func (m *Monitor) Init(ctx *taskq.Context) error {
return nil
}
// Start 启动插件(实现 Plugin 接口)
func (m *Monitor) Start(ctx *taskq.Context) error {
return nil
}
// Stop 停止插件(实现 Plugin 接口)
func (m *Monitor) Stop() error {
return m.Close()
}
// setupRoutes 设置路由
func (m *Monitor) setupRoutes() {
// API 路由
apiPath := m.rootPath + "/api/"
m.router.HandleFunc(apiPath+"queues", m.handleQueues)
m.router.HandleFunc(apiPath+"queues/", m.handleQueueDetail)
m.router.HandleFunc(apiPath+"tasks/", m.handleTasks)
m.router.HandleFunc(apiPath+"stats/", m.handleStats)
m.router.HandleFunc(apiPath+"sse", m.handleSSE)
// 静态文件路由
uiSubFS, _ := fs.Sub(uiFS, "ui")
fileServer := http.FileServer(http.FS(uiSubFS))
m.router.Handle(m.rootPath+"/static/", http.StripPrefix(m.rootPath+"/static/", fileServer))
// 主页路由(包含 History API 的路由)
m.router.HandleFunc(m.rootPath+"/queues/", m.handleIndex)
m.router.HandleFunc(m.rootPath+"/", m.handleIndex)
m.router.HandleFunc(m.rootPath, m.handleIndex)
}
// handleStats 处理队列统计数据请求
func (m *Monitor) handleStats(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 从 URL 中提取队列名称(可选,为空则查询所有队列汇总)
path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/stats/")
queueName := strings.TrimSuffix(path, "/")
// 构建查询参数
query := inspector.StatsQuery{
Queue: queueName,
Limit: 500,
}
// 解析 limit 参数
if l := r.URL.Query().Get("limit"); l != "" {
if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 10000 {
query.Limit = parsed
}
}
// 解析 start 参数Unix 时间戳)
if s := r.URL.Query().Get("start"); s != "" {
if parsed, err := strconv.ParseInt(s, 10, 64); err == nil && parsed > 0 {
query.Start = parsed
}
}
// 解析 end 参数Unix 时间戳)
if e := r.URL.Query().Get("end"); e != "" {
if parsed, err := strconv.ParseInt(e, 10, 64); err == nil && parsed > 0 {
query.End = parsed
}
}
stats, err := m.inspector.QueryStats(query)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to get stats: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
// queueInfoJSON 用于 JSON 输出的队列信息
type queueInfoJSON struct {
Name string `json:"name"`
Priority int `json:"priority"`
Size int `json:"size"`
Active int `json:"active"`
Pending int `json:"pending"`
Scheduled int `json:"scheduled"`
Retry int `json:"retry"`
Archived int `json:"archived"`
Completed int `json:"completed"`
Processed int `json:"processed"`
Failed int `json:"failed"`
Paused bool `json:"paused"`
MemoryUsage int64 `json:"memory_usage"`
Latency int64 `json:"latency"`
}
// handleQueues 处理队列列表请求
func (m *Monitor) handleQueues(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var queueInfos []queueInfoJSON
// 首先显示所有注册的队列即使Redis中还没有任务
for queueName, priority := range m.queues {
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
// 如果队列不存在,创建一个空的状态
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
})
} else {
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
Size: info.Size,
Active: info.Active,
Pending: info.Pending,
Scheduled: info.Scheduled,
Retry: info.Retry,
Archived: info.Archived,
Completed: info.Completed,
Processed: info.Processed,
Failed: info.Failed,
Paused: info.Paused,
MemoryUsage: info.MemoryUsage,
Latency: info.LatencyMS,
})
}
}
// 按优先级排序
sort.Slice(queueInfos, func(i, j int) bool {
return queueInfos[i].Priority > queueInfos[j].Priority
})
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(queueInfos)
}
// handleQueueDetail 处理队列详情请求和队列操作
func (m *Monitor) handleQueueDetail(w http.ResponseWriter, r *http.Request) {
// 从 URL 中提取队列名称
path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/queues/")
parts := strings.Split(path, "/")
if len(parts) == 0 || parts[0] == "" {
http.Error(w, "Queue name is required", http.StatusBadRequest)
return
}
queueName := parts[0]
// 检查队列是否已注册
if _, exists := m.queues[queueName]; !exists {
http.Error(w, "Queue not found", http.StatusNotFound)
return
}
// 处理暂停/恢复请求
if r.Method == http.MethodPost && len(parts) >= 2 {
if m.readOnly {
http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden)
return
}
action := parts[1]
switch action {
case "pause":
if err := m.inspector.PauseQueue(queueName); err != nil {
http.Error(w, fmt.Sprintf("Failed to pause queue: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "paused"})
return
case "unpause":
if err := m.inspector.UnpauseQueue(queueName); err != nil {
http.Error(w, fmt.Sprintf("Failed to unpause queue: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "unpaused"})
return
default:
http.Error(w, "Invalid action", http.StatusBadRequest)
return
}
}
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 获取队列详细信息
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
// 如果队列在 Redis 中不存在,返回空状态
if strings.Contains(err.Error(), "queue not found") {
emptyStats := map[string]any{
"queue": queueName,
"active": 0,
"pending": 0,
"retry": 0,
"archived": 0,
"completed": 0,
"processed": 0,
"failed": 0,
"paused": false,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(emptyStats)
return
}
http.Error(w, fmt.Sprintf("Failed to get queue info: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(info)
}
// taskInfoJSON 转换任务信息用于 JSON 输出
type taskInfoJSON struct {
ID string `json:"id"`
Type string `json:"type"`
Payload string `json:"payload"`
Queue string `json:"queue"`
Retried int `json:"retried"`
LastFailed string `json:"last_failed,omitempty"`
LastError string `json:"last_error,omitempty"`
NextProcess string `json:"next_process,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
}
// handleTasks 处理任务列表请求和任务操作
func (m *Monitor) handleTasks(w http.ResponseWriter, r *http.Request) {
// 从 URL 中提取队列名称和任务状态
path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/tasks/")
parts := strings.Split(path, "/")
if len(parts) < 2 {
http.Error(w, "Queue name and task state are required", http.StatusBadRequest)
return
}
queueName := parts[0]
taskState := parts[1]
// 处理重试请求: POST /api/tasks/{queue}/archived/{taskId}/retry
if r.Method == http.MethodPost && len(parts) >= 4 && parts[1] == "archived" && parts[3] == "retry" {
if m.readOnly {
http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden)
return
}
taskID := parts[2]
m.handleRetryTask(w, r, queueName, taskID)
return
}
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 检查队列是否已注册
if _, exists := m.queues[queueName]; !exists {
http.Error(w, "Queue not found", http.StatusNotFound)
return
}
// 解析分页参数
page := 1
pageSize := 20
if p := r.URL.Query().Get("page"); p != "" {
if parsed, err := strconv.Atoi(p); err == nil && parsed > 0 {
page = parsed
}
}
if ps := r.URL.Query().Get("page_size"); ps != "" {
if parsed, err := strconv.Atoi(ps); err == nil && parsed > 0 && parsed <= 100 {
pageSize = parsed
}
}
// 获取队列信息以获取任务总数
var total int
queueInfo, queueErr := m.inspector.GetQueueInfo(queueName)
if queueErr == nil {
switch taskState {
case "active":
total = queueInfo.Active
case "pending":
total = queueInfo.Pending
case "scheduled":
total = queueInfo.Scheduled
case "retry":
total = queueInfo.Retry
case "archived":
total = queueInfo.Archived
case "completed":
total = queueInfo.Completed
}
}
// 根据任务状态获取任务列表
var tasks []*inspector.TaskInfo
var err error
switch taskState {
case "active":
tasks, err = m.inspector.ListActiveTasks(queueName, pageSize, page-1)
case "pending":
tasks, err = m.inspector.ListPendingTasks(queueName, pageSize, page-1)
case "scheduled":
tasks, err = m.inspector.ListScheduledTasks(queueName, pageSize, page-1)
case "retry":
tasks, err = m.inspector.ListRetryTasks(queueName, pageSize, page-1)
case "archived":
tasks, err = m.inspector.ListArchivedTasks(queueName, pageSize, page-1)
case "completed":
tasks, err = m.inspector.ListCompletedTasks(queueName, pageSize, page-1)
default:
http.Error(w, "Invalid task state. Valid states: active, pending, scheduled, retry, archived, completed", http.StatusBadRequest)
return
}
// 如果队列在 Redis 中不存在(没有任务),返回空列表而不是错误
if err != nil {
if strings.Contains(err.Error(), "queue not found") {
tasks = []*inspector.TaskInfo{}
total = 0
} else {
http.Error(w, fmt.Sprintf("Failed to get tasks: %v", err), http.StatusInternalServerError)
return
}
}
var taskInfos []taskInfoJSON
for _, task := range tasks {
info := taskInfoJSON{
ID: task.ID,
Type: task.Type,
Payload: string(task.Payload),
Queue: task.Queue,
Retried: task.Retried,
}
if !task.LastFailedAt.IsZero() {
info.LastFailed = task.LastFailedAt.Format(time.RFC3339)
}
if task.LastErr != "" {
info.LastError = task.LastErr
}
if !task.NextProcessAt.IsZero() {
info.NextProcess = task.NextProcessAt.Format(time.RFC3339)
}
if !task.CompletedAt.IsZero() {
info.CompletedAt = task.CompletedAt.Format(time.RFC3339)
}
taskInfos = append(taskInfos, info)
}
response := map[string]any{
"tasks": taskInfos,
"page": page,
"page_size": pageSize,
"total": total,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// handleRetryTask 重试失败任务
func (m *Monitor) handleRetryTask(w http.ResponseWriter, r *http.Request, queueName, taskID string) {
// 检查队列是否已注册
if _, exists := m.queues[queueName]; !exists {
http.Error(w, "Queue not found", http.StatusNotFound)
return
}
// 运行重试
err := m.inspector.RunTask(queueName, taskID)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to retry task: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
}
// handleIndex 处理主页请求,返回 SPA 入口页面
func (m *Monitor) handleIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 读取 index.html 并替换模板变量
indexHTML, err := uiFS.ReadFile("ui/index.html")
if err != nil {
http.Error(w, fmt.Sprintf("Failed to read index.html: %v", err), http.StatusInternalServerError)
return
}
// 替换模板变量
content := strings.ReplaceAll(string(indexHTML), "{{.RootPath}}", m.rootPath)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write([]byte(content))
}
// handleSSE 处理 Server-Sent Events 实时数据推送
func (m *Monitor) handleSSE(w http.ResponseWriter, r *http.Request) {
// 设置 SSE 响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// 获取 flusher
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
// 两个定时器:统计数据频率高,队列数据频率低
statsTicker := time.NewTicker(2 * time.Second)
queuesTicker := time.NewTicker(5 * time.Second)
defer statsTicker.Stop()
defer queuesTicker.Stop()
// 监听客户端断开连接
ctx := r.Context()
// 立即发送一次数据
m.sendQueuesEvent(w, flusher)
m.sendStatsEvent(w, flusher)
for {
select {
case <-ctx.Done():
return
case <-m.closeCh:
return
case <-statsTicker.C:
m.sendStatsEvent(w, flusher)
case <-queuesTicker.C:
m.sendQueuesEvent(w, flusher)
}
}
}
// sendStatsEvent 发送统计图表数据
func (m *Monitor) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher) {
stats, err := m.inspector.QueryStats(inspector.StatsQuery{Limit: 1})
if err != nil || len(stats) == 0 {
return
}
data, err := json.Marshal(stats[0])
if err != nil {
return
}
fmt.Fprintf(w, "event: stats\ndata: %s\n\n", data)
flusher.Flush()
}
// sendQueuesEvent 发送队列表格数据
func (m *Monitor) sendQueuesEvent(w http.ResponseWriter, flusher http.Flusher) {
var queueInfos []queueInfoJSON
for queueName, priority := range m.queues {
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
})
} else {
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
Size: info.Size,
Active: info.Active,
Pending: info.Pending,
Scheduled: info.Scheduled,
Retry: info.Retry,
Archived: info.Archived,
Completed: info.Completed,
Processed: info.Processed,
Failed: info.Failed,
Paused: info.Paused,
MemoryUsage: info.MemoryUsage,
Latency: info.LatencyMS,
})
}
}
// 按优先级排序
sort.Slice(queueInfos, func(i, j int) bool {
return queueInfos[i].Priority > queueInfos[j].Priority
})
data, err := json.Marshal(queueInfos)
if err != nil {
return
}
fmt.Fprintf(w, "event: queues\ndata: %s\n\n", data)
flusher.Flush()
}