feat: 优化监控仪表盘 UI

- 添加 appbar 导航栏,支持 Chart/Queues 视图切换
- appbar 切换使用 history API,支持浏览器前进/后退
- 图表视图占满整个可视区域
- queue-modal 共享 appbar 样式
- 修复 queue tab count 字段名大小写问题
- tooltip 跟随鼠标显示在右下方,移除箭头
- 图表 canvas 鼠标样式改为准星
- pause/resume 队列后刷新列表
- example 添加 flag 配置参数
This commit is contained in:
2025-12-10 00:53:30 +08:00
parent 42cb0fa4c2
commit d2d59746b2
19 changed files with 1626 additions and 909 deletions

605
x/monitor/monitor.go Normal file
View File

@@ -0,0 +1,605 @@
// Package monitor 提供 taskq 的 HTTP 监控服务
package monitor
import (
"embed"
"encoding/json"
"fmt"
"io/fs"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"code.tczkiot.com/wlw/taskq/x/inspector"
)
//go:embed ui/*
var uiFS embed.FS
// Options 配置监控服务的选项
type Options struct {
// Inspector 检查器实例(必需)
Inspector *inspector.Inspector
// Queues 队列优先级映射(必需)
Queues map[string]int
// RootPath 监控服务的根路径,默认为 "/monitor"
RootPath string
// ReadOnly 是否只读模式,禁用所有修改操作,默认为 false
ReadOnly bool
}
// Monitor 监控服务的 HTTP 处理器
type Monitor struct {
router *http.ServeMux
rootPath string
readOnly bool
closeCh chan struct{}
closeOnce sync.Once
inspector *inspector.Inspector
queues map[string]int
}
// New 创建新的监控服务
func New(opts Options) (*Monitor, error) {
if opts.Inspector == nil {
return nil, fmt.Errorf("monitor: inspector is required")
}
if opts.Queues == nil {
return nil, fmt.Errorf("monitor: queues is required")
}
// 设置默认值
if opts.RootPath == "" {
opts.RootPath = "/monitor"
}
// 确保路径以 / 开头且不以 / 结尾
if !strings.HasPrefix(opts.RootPath, "/") {
opts.RootPath = "/" + opts.RootPath
}
opts.RootPath = strings.TrimSuffix(opts.RootPath, "/")
m := &Monitor{
router: http.NewServeMux(),
rootPath: opts.RootPath,
readOnly: opts.ReadOnly,
closeCh: make(chan struct{}),
inspector: opts.Inspector,
queues: opts.Queues,
}
m.setupRoutes()
return m, nil
}
// ServeHTTP 实现 http.Handler 接口
func (m *Monitor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.router.ServeHTTP(w, r)
}
// RootPath 返回监控服务的根路径
func (m *Monitor) RootPath() string {
return m.rootPath
}
// Close 关闭监控服务
func (m *Monitor) Close() error {
m.closeOnce.Do(func() {
close(m.closeCh)
})
return nil
}
// setupRoutes 设置路由
func (m *Monitor) setupRoutes() {
// API 路由
apiPath := m.rootPath + "/api/"
m.router.HandleFunc(apiPath+"queues", m.handleQueues)
m.router.HandleFunc(apiPath+"queues/", m.handleQueueDetail)
m.router.HandleFunc(apiPath+"tasks/", m.handleTasks)
m.router.HandleFunc(apiPath+"stats/", m.handleStats)
m.router.HandleFunc(apiPath+"sse", m.handleSSE)
// 静态文件路由
uiSubFS, _ := fs.Sub(uiFS, "ui")
fileServer := http.FileServer(http.FS(uiSubFS))
m.router.Handle(m.rootPath+"/static/", http.StripPrefix(m.rootPath+"/static/", fileServer))
// 主页路由(包含 History API 的路由)
m.router.HandleFunc(m.rootPath+"/queues/", m.handleIndex)
m.router.HandleFunc(m.rootPath+"/", m.handleIndex)
m.router.HandleFunc(m.rootPath, m.handleIndex)
}
// handleStats 处理队列统计数据请求
func (m *Monitor) handleStats(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 从 URL 中提取队列名称(可选,为空则查询所有队列汇总)
path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/stats/")
queueName := strings.TrimSuffix(path, "/")
// 构建查询参数
query := inspector.StatsQuery{
Queue: queueName,
Limit: 500,
}
// 解析 limit 参数
if l := r.URL.Query().Get("limit"); l != "" {
if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 10000 {
query.Limit = parsed
}
}
// 解析 start 参数Unix 时间戳)
if s := r.URL.Query().Get("start"); s != "" {
if parsed, err := strconv.ParseInt(s, 10, 64); err == nil && parsed > 0 {
query.Start = parsed
}
}
// 解析 end 参数Unix 时间戳)
if e := r.URL.Query().Get("end"); e != "" {
if parsed, err := strconv.ParseInt(e, 10, 64); err == nil && parsed > 0 {
query.End = parsed
}
}
stats, err := m.inspector.QueryStats(query)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to get stats: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
// queueInfoJSON 用于 JSON 输出的队列信息
type queueInfoJSON struct {
Name string `json:"name"`
Priority int `json:"priority"`
Size int `json:"size"`
Active int `json:"active"`
Pending int `json:"pending"`
Scheduled int `json:"scheduled"`
Retry int `json:"retry"`
Archived int `json:"archived"`
Completed int `json:"completed"`
Processed int `json:"processed"`
Failed int `json:"failed"`
Paused bool `json:"paused"`
MemoryUsage int64 `json:"memory_usage"`
Latency int64 `json:"latency"`
}
// handleQueues 处理队列列表请求
func (m *Monitor) handleQueues(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var queueInfos []queueInfoJSON
// 首先显示所有注册的队列即使Redis中还没有任务
for queueName, priority := range m.queues {
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
// 如果队列不存在,创建一个空的状态
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
})
} else {
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
Size: info.Size,
Active: info.Active,
Pending: info.Pending,
Scheduled: info.Scheduled,
Retry: info.Retry,
Archived: info.Archived,
Completed: info.Completed,
Processed: info.Processed,
Failed: info.Failed,
Paused: info.Paused,
MemoryUsage: info.MemoryUsage,
Latency: info.LatencyMS,
})
}
}
// 按优先级排序
sort.Slice(queueInfos, func(i, j int) bool {
return queueInfos[i].Priority > queueInfos[j].Priority
})
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(queueInfos)
}
// handleQueueDetail 处理队列详情请求和队列操作
func (m *Monitor) handleQueueDetail(w http.ResponseWriter, r *http.Request) {
// 从 URL 中提取队列名称
path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/queues/")
parts := strings.Split(path, "/")
if len(parts) == 0 || parts[0] == "" {
http.Error(w, "Queue name is required", http.StatusBadRequest)
return
}
queueName := parts[0]
// 检查队列是否已注册
if _, exists := m.queues[queueName]; !exists {
http.Error(w, "Queue not found", http.StatusNotFound)
return
}
// 处理暂停/恢复请求
if r.Method == http.MethodPost && len(parts) >= 2 {
if m.readOnly {
http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden)
return
}
action := parts[1]
switch action {
case "pause":
if err := m.inspector.PauseQueue(queueName); err != nil {
http.Error(w, fmt.Sprintf("Failed to pause queue: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "paused"})
return
case "unpause":
if err := m.inspector.UnpauseQueue(queueName); err != nil {
http.Error(w, fmt.Sprintf("Failed to unpause queue: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "unpaused"})
return
default:
http.Error(w, "Invalid action", http.StatusBadRequest)
return
}
}
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 获取队列详细信息
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
// 如果队列在 Redis 中不存在,返回空状态
if strings.Contains(err.Error(), "queue not found") {
emptyStats := map[string]any{
"queue": queueName,
"active": 0,
"pending": 0,
"retry": 0,
"archived": 0,
"completed": 0,
"processed": 0,
"failed": 0,
"paused": false,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(emptyStats)
return
}
http.Error(w, fmt.Sprintf("Failed to get queue info: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(info)
}
// taskInfoJSON 转换任务信息用于 JSON 输出
type taskInfoJSON struct {
ID string `json:"id"`
Type string `json:"type"`
Payload string `json:"payload"`
Queue string `json:"queue"`
Retried int `json:"retried"`
LastFailed string `json:"last_failed,omitempty"`
LastError string `json:"last_error,omitempty"`
NextProcess string `json:"next_process,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
}
// handleTasks 处理任务列表请求和任务操作
func (m *Monitor) handleTasks(w http.ResponseWriter, r *http.Request) {
// 从 URL 中提取队列名称和任务状态
path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/tasks/")
parts := strings.Split(path, "/")
if len(parts) < 2 {
http.Error(w, "Queue name and task state are required", http.StatusBadRequest)
return
}
queueName := parts[0]
taskState := parts[1]
// 处理重试请求: POST /api/tasks/{queue}/archived/{taskId}/retry
if r.Method == http.MethodPost && len(parts) >= 4 && parts[1] == "archived" && parts[3] == "retry" {
if m.readOnly {
http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden)
return
}
taskID := parts[2]
m.handleRetryTask(w, r, queueName, taskID)
return
}
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 检查队列是否已注册
if _, exists := m.queues[queueName]; !exists {
http.Error(w, "Queue not found", http.StatusNotFound)
return
}
// 解析分页参数
page := 1
pageSize := 20
if p := r.URL.Query().Get("page"); p != "" {
if parsed, err := strconv.Atoi(p); err == nil && parsed > 0 {
page = parsed
}
}
if ps := r.URL.Query().Get("page_size"); ps != "" {
if parsed, err := strconv.Atoi(ps); err == nil && parsed > 0 && parsed <= 100 {
pageSize = parsed
}
}
// 获取队列信息以获取任务总数
var total int
queueInfo, queueErr := m.inspector.GetQueueInfo(queueName)
if queueErr == nil {
switch taskState {
case "active":
total = queueInfo.Active
case "pending":
total = queueInfo.Pending
case "scheduled":
total = queueInfo.Scheduled
case "retry":
total = queueInfo.Retry
case "archived":
total = queueInfo.Archived
case "completed":
total = queueInfo.Completed
}
}
// 根据任务状态获取任务列表
var tasks []*inspector.TaskInfo
var err error
switch taskState {
case "active":
tasks, err = m.inspector.ListActiveTasks(queueName, pageSize, page-1)
case "pending":
tasks, err = m.inspector.ListPendingTasks(queueName, pageSize, page-1)
case "scheduled":
tasks, err = m.inspector.ListScheduledTasks(queueName, pageSize, page-1)
case "retry":
tasks, err = m.inspector.ListRetryTasks(queueName, pageSize, page-1)
case "archived":
tasks, err = m.inspector.ListArchivedTasks(queueName, pageSize, page-1)
case "completed":
tasks, err = m.inspector.ListCompletedTasks(queueName, pageSize, page-1)
default:
http.Error(w, "Invalid task state. Valid states: active, pending, scheduled, retry, archived, completed", http.StatusBadRequest)
return
}
// 如果队列在 Redis 中不存在(没有任务),返回空列表而不是错误
if err != nil {
if strings.Contains(err.Error(), "queue not found") {
tasks = []*inspector.TaskInfo{}
total = 0
} else {
http.Error(w, fmt.Sprintf("Failed to get tasks: %v", err), http.StatusInternalServerError)
return
}
}
var taskInfos []taskInfoJSON
for _, task := range tasks {
info := taskInfoJSON{
ID: task.ID,
Type: task.Type,
Payload: string(task.Payload),
Queue: task.Queue,
Retried: task.Retried,
}
if !task.LastFailedAt.IsZero() {
info.LastFailed = task.LastFailedAt.Format(time.RFC3339)
}
if task.LastErr != "" {
info.LastError = task.LastErr
}
if !task.NextProcessAt.IsZero() {
info.NextProcess = task.NextProcessAt.Format(time.RFC3339)
}
if !task.CompletedAt.IsZero() {
info.CompletedAt = task.CompletedAt.Format(time.RFC3339)
}
taskInfos = append(taskInfos, info)
}
response := map[string]any{
"tasks": taskInfos,
"page": page,
"page_size": pageSize,
"total": total,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// handleRetryTask 重试失败任务
func (m *Monitor) handleRetryTask(w http.ResponseWriter, r *http.Request, queueName, taskID string) {
// 检查队列是否已注册
if _, exists := m.queues[queueName]; !exists {
http.Error(w, "Queue not found", http.StatusNotFound)
return
}
// 运行重试
err := m.inspector.RunTask(queueName, taskID)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to retry task: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
}
// handleIndex 处理主页请求,返回 SPA 入口页面
func (m *Monitor) handleIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 读取 index.html 并替换模板变量
indexHTML, err := uiFS.ReadFile("ui/index.html")
if err != nil {
http.Error(w, fmt.Sprintf("Failed to read index.html: %v", err), http.StatusInternalServerError)
return
}
// 替换模板变量
content := strings.ReplaceAll(string(indexHTML), "{{.RootPath}}", m.rootPath)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write([]byte(content))
}
// handleSSE 处理 Server-Sent Events 实时数据推送
func (m *Monitor) handleSSE(w http.ResponseWriter, r *http.Request) {
// 设置 SSE 响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// 获取 flusher
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
// 两个定时器:统计数据频率高,队列数据频率低
statsTicker := time.NewTicker(2 * time.Second)
queuesTicker := time.NewTicker(5 * time.Second)
defer statsTicker.Stop()
defer queuesTicker.Stop()
// 监听客户端断开连接
ctx := r.Context()
// 立即发送一次数据
m.sendQueuesEvent(w, flusher)
m.sendStatsEvent(w, flusher)
for {
select {
case <-ctx.Done():
return
case <-m.closeCh:
return
case <-statsTicker.C:
m.sendStatsEvent(w, flusher)
case <-queuesTicker.C:
m.sendQueuesEvent(w, flusher)
}
}
}
// sendStatsEvent 发送统计图表数据
func (m *Monitor) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher) {
stats, err := m.inspector.QueryStats(inspector.StatsQuery{Limit: 1})
if err != nil || len(stats) == 0 {
return
}
data, err := json.Marshal(stats[0])
if err != nil {
return
}
fmt.Fprintf(w, "event: stats\ndata: %s\n\n", data)
flusher.Flush()
}
// sendQueuesEvent 发送队列表格数据
func (m *Monitor) sendQueuesEvent(w http.ResponseWriter, flusher http.Flusher) {
var queueInfos []queueInfoJSON
for queueName, priority := range m.queues {
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
})
} else {
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
Size: info.Size,
Active: info.Active,
Pending: info.Pending,
Scheduled: info.Scheduled,
Retry: info.Retry,
Archived: info.Archived,
Completed: info.Completed,
Processed: info.Processed,
Failed: info.Failed,
Paused: info.Paused,
MemoryUsage: info.MemoryUsage,
Latency: info.LatencyMS,
})
}
}
// 按优先级排序
sort.Slice(queueInfos, func(i, j int) bool {
return queueInfos[i].Priority > queueInfos[j].Priority
})
data, err := json.Marshal(queueInfos)
if err != nil {
return
}
fmt.Fprintf(w, "event: queues\ndata: %s\n\n", data)
flusher.Flush()
}