diff --git a/.gitignore b/.gitignore index 7125ccf..26603fe 100644 --- a/.gitignore +++ b/.gitignore @@ -48,4 +48,10 @@ bin/ # Temporary files tmp/ -temp/ \ No newline at end of file +temp/ + +# SQLite database files +*.db +*.db-journal +*.db-shm +*.db-wal diff --git a/dashboard.html b/dashboard.html deleted file mode 100644 index 791aa4d..0000000 --- a/dashboard.html +++ /dev/null @@ -1,420 +0,0 @@ - - - - - - TaskQ 监控面板 - - - -
-
-

🚀 TaskQ 监控面板

-

实时监控异步任务队列状态

-
-
- -
加载中...
- - -
-
- - - - - - - \ No newline at end of file diff --git a/example/main.go b/example/main.go index 20a978c..e36b62f 100644 --- a/example/main.go +++ b/example/main.go @@ -5,6 +5,9 @@ import ( "fmt" "log" "net/http" + "os" + "os/signal" + "syscall" "time" "code.tczkiot.com/wlw/taskq" @@ -75,8 +78,8 @@ func main() { log.Fatal("注册图片任务失败:", err) } - // 创建监控处理器 - handler, err := taskq.NewInspectHandler(taskq.InspectOptions{ + // 创建监控 HTTP 处理器 + handler, err := taskq.NewHTTPHandler(taskq.HTTPHandlerOptions{ RootPath: "/monitor", ReadOnly: false, }) @@ -84,10 +87,16 @@ func main() { log.Fatal("创建监控处理器失败:", err) } - // 启动 taskq 服务器 - ctx := context.Background() + // 创建可取消的 context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 启动 taskq 服务器(包含统计采集器) go func() { - err := taskq.Start(ctx) + err := taskq.Start(ctx, taskq.StartOptions{ + StatsInterval: 2 * time.Second, + StatsDBPath: "./taskq_stats.db", + }) if err != nil { log.Fatal("启动 taskq 服务器失败:", err) } @@ -107,7 +116,7 @@ func main() { case <-ticker.C: taskCounter++ - // 发布邮件任务 + // 发布即时邮件任务 err := emailTask.Publish(ctx, EmailTask{ UserID: taskCounter, TemplateID: "welcome", @@ -118,7 +127,29 @@ func main() { log.Printf("发布邮件任务成功: 用户ID=%d", taskCounter) } - // 发布图片调整任务 + // 发布延迟任务(30秒后执行) + err = emailTask.Publish(ctx, EmailTask{ + UserID: taskCounter + 1000, + TemplateID: "reminder", + }, taskq.Delay(30*time.Second)) + if err != nil { + log.Printf("发布延迟邮件任务失败: %v", err) + } else { + log.Printf("发布延迟邮件任务成功: 用户ID=%d (30秒后执行)", taskCounter+1000) + } + + // 发布定点任务(1分钟后的整点执行) + scheduledTime := time.Now().Add(1 * time.Minute).Truncate(time.Minute) + err = imageTask.Publish(ctx, ImageResizeTask{ + SourceURL: fmt.Sprintf("https://example.com/scheduled%d.jpg", taskCounter), + }, taskq.DelayUntil(scheduledTime)) + if err != nil { + log.Printf("发布定点图片任务失败: %v", err) + } else { + log.Printf("发布定点图片任务成功: 任务ID=%d (在 %s 执行)", taskCounter, scheduledTime.Format("15:04:05")) + } + + // 发布即时图片任务 err = imageTask.Publish(ctx, ImageResizeTask{ SourceURL: fmt.Sprintf("https://example.com/image%d.jpg", taskCounter), }) @@ -131,7 +162,44 @@ func main() { } }() - // 启动 HTTP 服务器提供监控界面 - log.Printf("启动监控服务器在 http://localhost:8080") - log.Fatal(http.ListenAndServe(":8080", handler)) + // 创建 HTTP 服务器 + server := &http.Server{ + Addr: ":8081", + Handler: handler, + } + + // 启动 HTTP 服务器(非阻塞) + go func() { + log.Printf("启动监控服务器在 http://localhost:8081") + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatal("HTTP 服务器错误:", err) + } + }() + + // 等待中断信号 + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + + log.Println("收到关闭信号,正在优雅关停...") + + // 1. 取消 context,停止任务发布 + cancel() + + // 2. 关闭监控 HTTP 处理器(会断开 SSE 连接) + handler.Close() + + // 3. 关闭 HTTP 服务器(设置 5 秒超时) + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + + if err := server.Shutdown(shutdownCtx); err != nil { + log.Printf("HTTP 服务器关闭错误: %v", err) + } + + // 4. 停止 taskq 服务器(会等待完全关闭) + taskq.Stop() + + log.Println("服务已安全关闭") + // rdb.Close() 由 defer 执行 } diff --git a/go.mod b/go.mod index aff2429..a11431e 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/google/uuid v1.6.0 // indirect + github.com/mattn/go-sqlite3 v1.14.32 github.com/robfig/cron/v3 v3.0.1 // indirect github.com/spf13/cast v1.7.0 // indirect golang.org/x/sys v0.27.0 // indirect diff --git a/go.sum b/go.sum index b4a12a7..8415f28 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= +github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..015343e --- /dev/null +++ b/handler.go @@ -0,0 +1,586 @@ +// Package taskq 提供基于 Redis 的异步任务队列功能 +// handler.go 文件包含 HTTP 监控服务处理器 +package taskq + +import ( + "embed" + "encoding/json" + "fmt" + "io/fs" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/hibiken/asynq" +) + +//go:embed ui/* +var uiFS embed.FS + +// HTTPHandlerOptions 配置监控服务的选项 +type HTTPHandlerOptions struct { + // RootPath 监控服务的根路径,默认为 "/monitor" + RootPath string + + // ReadOnly 是否只读模式,禁用所有修改操作,默认为 false + ReadOnly bool +} + +// HTTPHandler 监控服务的 HTTP 处理器 +type HTTPHandler struct { + router *http.ServeMux + rootPath string + readOnly bool + closeCh chan struct{} + closeOnce sync.Once +} + +// NewHTTPHandler 创建新的监控 HTTP 处理器 +func NewHTTPHandler(opts HTTPHandlerOptions) (*HTTPHandler, error) { + if redisClient == nil { + return nil, fmt.Errorf("taskq: redis client not initialized, call SetRedis() first") + } + + // 设置默认值 + if opts.RootPath == "" { + opts.RootPath = "/monitor" + } + + // 确保路径以 / 开头且不以 / 结尾 + if !strings.HasPrefix(opts.RootPath, "/") { + opts.RootPath = "/" + opts.RootPath + } + opts.RootPath = strings.TrimSuffix(opts.RootPath, "/") + + handler := &HTTPHandler{ + router: http.NewServeMux(), + rootPath: opts.RootPath, + readOnly: opts.ReadOnly, + closeCh: make(chan struct{}), + } + + handler.setupRoutes() + return handler, nil +} + +// ServeHTTP 实现 http.Handler 接口 +func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.router.ServeHTTP(w, r) +} + +// RootPath 返回监控服务的根路径 +func (h *HTTPHandler) RootPath() string { + return h.rootPath +} + +// Close 关闭 HTTP 处理器 +func (h *HTTPHandler) Close() error { + h.closeOnce.Do(func() { + close(h.closeCh) + }) + return nil +} + +// setupRoutes 设置路由 +func (h *HTTPHandler) setupRoutes() { + // API 路由 + apiPath := h.rootPath + "/api/" + h.router.HandleFunc(apiPath+"queues", h.handleQueues) + h.router.HandleFunc(apiPath+"queues/", h.handleQueueDetail) + h.router.HandleFunc(apiPath+"tasks/", h.handleTasks) + h.router.HandleFunc(apiPath+"stats/", h.handleStats) + h.router.HandleFunc(apiPath+"sse", h.handleSSE) + + // 静态文件路由 + uiSubFS, _ := fs.Sub(uiFS, "ui") + fileServer := http.FileServer(http.FS(uiSubFS)) + h.router.Handle(h.rootPath+"/static/", http.StripPrefix(h.rootPath+"/static/", fileServer)) + + // 主页路由(包含 History API 的路由) + h.router.HandleFunc(h.rootPath+"/queues/", h.handleIndex) + h.router.HandleFunc(h.rootPath+"/", h.handleIndex) + h.router.HandleFunc(h.rootPath, h.handleIndex) +} + +// handleStats 处理队列统计数据请求 +// GET /api/stats/{queue}?start=1234567890&end=1234567899&limit=500 +// GET /api/stats/?start=1234567890&end=1234567899&limit=500 (查询所有队列汇总) +func (h *HTTPHandler) handleStats(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 从 URL 中提取队列名称(可选,为空则查询所有队列汇总) + path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/stats/") + queueName := strings.TrimSuffix(path, "/") + + // 构建查询参数 + query := StatsQuery{ + Queue: queueName, + Limit: 500, + } + + // 解析 limit 参数 + if l := r.URL.Query().Get("limit"); l != "" { + if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 10000 { + query.Limit = parsed + } + } + + // 解析 start 参数(Unix 时间戳) + if s := r.URL.Query().Get("start"); s != "" { + if parsed, err := strconv.ParseInt(s, 10, 64); err == nil && parsed > 0 { + query.Start = parsed + } + } + + // 解析 end 参数(Unix 时间戳) + if e := r.URL.Query().Get("end"); e != "" { + if parsed, err := strconv.ParseInt(e, 10, 64); err == nil && parsed > 0 { + query.End = parsed + } + } + + stats, err := getQueueStatsWithQuery(query) + if err != nil { + http.Error(w, fmt.Sprintf("Failed to get stats: %v", err), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) +} + +// handleQueues 处理队列列表请求 +func (h *HTTPHandler) handleQueues(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var queueInfos []QueueInfo + + // 首先显示所有注册的队列(即使Redis中还没有任务) + for queueName, priority := range queues { + stats, err := inspector.GetQueueInfo(queueName) + if err != nil { + // 如果队列不存在,创建一个空的状态 + queueInfos = append(queueInfos, QueueInfo{ + Name: queueName, + Priority: priority, + }) + } else { + queueInfos = append(queueInfos, QueueInfo{ + Name: queueName, + Priority: priority, + Size: stats.Size, + Active: stats.Active, + Pending: stats.Pending, + Scheduled: stats.Scheduled, + Retry: stats.Retry, + Archived: stats.Archived, + Completed: stats.Completed, + Processed: stats.Processed, + Failed: stats.Failed, + Paused: stats.Paused, + MemoryUsage: stats.MemoryUsage, + Latency: stats.Latency.Milliseconds(), + }) + } + } + + // 按优先级排序 + sort.Slice(queueInfos, func(i, j int) bool { + return queueInfos[i].Priority > queueInfos[j].Priority + }) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(queueInfos) +} + +// handleQueueDetail 处理队列详情请求和队列操作 +// GET /api/queues/{queue} - 获取队列详情 +// POST /api/queues/{queue}/pause - 暂停队列 +// POST /api/queues/{queue}/unpause - 恢复队列 +func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request) { + // 从 URL 中提取队列名称 + path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/queues/") + parts := strings.Split(path, "/") + if len(parts) == 0 || parts[0] == "" { + http.Error(w, "Queue name is required", http.StatusBadRequest) + return + } + queueName := parts[0] + + // 检查队列是否已注册 + if _, exists := queues[queueName]; !exists { + http.Error(w, "Queue not found", http.StatusNotFound) + return + } + + // 处理暂停/恢复请求 + if r.Method == http.MethodPost && len(parts) >= 2 { + if h.readOnly { + http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden) + return + } + action := parts[1] + switch action { + case "pause": + if err := inspector.PauseQueue(queueName); err != nil { + http.Error(w, fmt.Sprintf("Failed to pause queue: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "paused"}) + return + case "unpause": + if err := inspector.UnpauseQueue(queueName); err != nil { + http.Error(w, fmt.Sprintf("Failed to unpause queue: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "unpaused"}) + return + default: + http.Error(w, "Invalid action", http.StatusBadRequest) + return + } + } + + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 获取队列详细信息 + stats, err := inspector.GetQueueInfo(queueName) + if err != nil { + // 如果队列在 Redis 中不存在,返回空状态 + if strings.Contains(err.Error(), "queue not found") { + emptyStats := map[string]any{ + "queue": queueName, + "active": 0, + "pending": 0, + "retry": 0, + "archived": 0, + "completed": 0, + "processed": 0, + "failed": 0, + "paused": false, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(emptyStats) + return + } + http.Error(w, fmt.Sprintf("Failed to get queue info: %v", err), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) +} + +// TaskInfo 转换任务信息 +type TaskInfo struct { + ID string `json:"id"` + Type string `json:"type"` + Payload string `json:"payload"` + Queue string `json:"queue"` + Retried int `json:"retried"` + LastFailed string `json:"last_failed,omitempty"` + LastError string `json:"last_error,omitempty"` + NextProcess string `json:"next_process,omitempty"` + CompletedAt string `json:"completed_at,omitempty"` +} + +// handleTasks 处理任务列表请求和任务操作 +// GET /api/tasks/{queue}/{state} - 获取任务列表 +// POST /api/tasks/{queue}/archived/{taskId}/retry - 重试失败任务 +func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { + // 从 URL 中提取队列名称和任务状态 + path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/tasks/") + parts := strings.Split(path, "/") + if len(parts) < 2 { + http.Error(w, "Queue name and task state are required", http.StatusBadRequest) + return + } + + queueName := parts[0] + taskState := parts[1] + + // 处理重试请求: POST /api/tasks/{queue}/archived/{taskId}/retry + if r.Method == http.MethodPost && len(parts) >= 4 && parts[1] == "archived" && parts[3] == "retry" { + if h.readOnly { + http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden) + return + } + taskID := parts[2] + h.handleRetryTask(w, r, queueName, taskID) + return + } + + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 检查队列是否已注册 + if _, exists := queues[queueName]; !exists { + http.Error(w, "Queue not found", http.StatusNotFound) + return + } + + // 解析分页参数 + page := 1 + pageSize := 20 + if p := r.URL.Query().Get("page"); p != "" { + if parsed, err := strconv.Atoi(p); err == nil && parsed > 0 { + page = parsed + } + } + if ps := r.URL.Query().Get("page_size"); ps != "" { + if parsed, err := strconv.Atoi(ps); err == nil && parsed > 0 && parsed <= 100 { + pageSize = parsed + } + } + + // 获取队列信息以获取任务总数 + var total int + queueInfo, queueErr := inspector.GetQueueInfo(queueName) + if queueErr == nil { + switch taskState { + case "active": + total = queueInfo.Active + case "pending": + total = queueInfo.Pending + case "scheduled": + total = queueInfo.Scheduled + case "retry": + total = queueInfo.Retry + case "archived": + total = queueInfo.Archived + case "completed": + total = queueInfo.Completed + } + } + + // 根据任务状态获取任务列表 + var tasks []*asynq.TaskInfo + var err error + + switch taskState { + case "active": + tasks, err = inspector.ListActiveTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + case "pending": + tasks, err = inspector.ListPendingTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + case "scheduled": + tasks, err = inspector.ListScheduledTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + case "retry": + tasks, err = inspector.ListRetryTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + case "archived": + tasks, err = inspector.ListArchivedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + case "completed": + tasks, err = inspector.ListCompletedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + default: + http.Error(w, "Invalid task state. Valid states: active, pending, scheduled, retry, archived, completed", http.StatusBadRequest) + return + } + + // 如果队列在 Redis 中不存在(没有任务),返回空列表而不是错误 + if err != nil { + if strings.Contains(err.Error(), "queue not found") { + tasks = []*asynq.TaskInfo{} + total = 0 + } else { + http.Error(w, fmt.Sprintf("Failed to get tasks: %v", err), http.StatusInternalServerError) + return + } + } + + var taskInfos []TaskInfo + for _, task := range tasks { + info := TaskInfo{ + ID: task.ID, + Type: task.Type, + Payload: string(task.Payload), + Queue: task.Queue, + Retried: task.Retried, + } + + if !task.LastFailedAt.IsZero() { + info.LastFailed = task.LastFailedAt.Format(time.RFC3339) + } + if task.LastErr != "" { + info.LastError = task.LastErr + } + if !task.NextProcessAt.IsZero() { + info.NextProcess = task.NextProcessAt.Format(time.RFC3339) + } + if !task.CompletedAt.IsZero() { + info.CompletedAt = task.CompletedAt.Format(time.RFC3339) + } + + taskInfos = append(taskInfos, info) + } + + response := map[string]any{ + "tasks": taskInfos, + "page": page, + "page_size": pageSize, + "total": total, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// handleRetryTask 重试失败任务 +func (h *HTTPHandler) handleRetryTask(w http.ResponseWriter, r *http.Request, queueName, taskID string) { + // 检查队列是否已注册 + if _, exists := queues[queueName]; !exists { + http.Error(w, "Queue not found", http.StatusNotFound) + return + } + + // 运行重试 + err := inspector.RunTask(queueName, taskID) + if err != nil { + http.Error(w, fmt.Sprintf("Failed to retry task: %v", err), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) +} + +// handleIndex 处理主页请求,返回 SPA 入口页面 +func (h *HTTPHandler) handleIndex(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 读取 index.html 并替换模板变量 + indexHTML, err := uiFS.ReadFile("ui/index.html") + if err != nil { + http.Error(w, fmt.Sprintf("Failed to read index.html: %v", err), http.StatusInternalServerError) + return + } + + // 替换模板变量 + content := strings.ReplaceAll(string(indexHTML), "{{.RootPath}}", h.rootPath) + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Write([]byte(content)) +} + +// handleSSE 处理 Server-Sent Events 实时数据推送 +// 交叉推送两种数据: +// - stats: 统计图表数据(来自 SQLite,每 2 秒) +// - queues: 队列表格数据(来自 Redis,每 5 秒) +func (h *HTTPHandler) handleSSE(w http.ResponseWriter, r *http.Request) { + // 设置 SSE 响应头 + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // 获取 flusher + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "SSE not supported", http.StatusInternalServerError) + return + } + + // 两个定时器:统计数据频率高,队列数据频率低 + statsTicker := time.NewTicker(2 * time.Second) + queuesTicker := time.NewTicker(5 * time.Second) + defer statsTicker.Stop() + defer queuesTicker.Stop() + + // 监听客户端断开连接 + ctx := r.Context() + + // 立即发送一次数据 + h.sendQueuesEvent(w, flusher) + h.sendStatsEvent(w, flusher) + + for { + select { + case <-ctx.Done(): + return + case <-h.closeCh: + return + case <-statsTicker.C: + h.sendStatsEvent(w, flusher) + case <-queuesTicker.C: + h.sendQueuesEvent(w, flusher) + } + } +} + +// sendStatsEvent 发送统计图表数据(来自 SQLite) +func (h *HTTPHandler) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher) { + // 获取最近的统计数据点(用于图表增量更新) + stats, err := getQueueStatsWithQuery(StatsQuery{Limit: 1}) + if err != nil || len(stats) == 0 { + return + } + + data, err := json.Marshal(stats[0]) + if err != nil { + return + } + + fmt.Fprintf(w, "event: stats\ndata: %s\n\n", data) + flusher.Flush() +} + +// sendQueuesEvent 发送队列表格数据(来自 Redis) +func (h *HTTPHandler) sendQueuesEvent(w http.ResponseWriter, flusher http.Flusher) { + var queueInfos []QueueInfo + for queueName, priority := range queues { + stats, err := inspector.GetQueueInfo(queueName) + if err != nil { + queueInfos = append(queueInfos, QueueInfo{ + Name: queueName, + Priority: priority, + }) + } else { + queueInfos = append(queueInfos, QueueInfo{ + Name: queueName, + Priority: priority, + Size: stats.Size, + Active: stats.Active, + Pending: stats.Pending, + Scheduled: stats.Scheduled, + Retry: stats.Retry, + Archived: stats.Archived, + Completed: stats.Completed, + Processed: stats.Processed, + Failed: stats.Failed, + Paused: stats.Paused, + MemoryUsage: stats.MemoryUsage, + Latency: stats.Latency.Milliseconds(), + }) + } + } + + // 按优先级排序 + sort.Slice(queueInfos, func(i, j int) bool { + return queueInfos[i].Priority > queueInfos[j].Priority + }) + + data, err := json.Marshal(queueInfos) + if err != nil { + return + } + + fmt.Fprintf(w, "event: queues\ndata: %s\n\n", data) + flusher.Flush() +} diff --git a/inspect.go b/inspect.go index 2f60238..b1643f0 100644 --- a/inspect.go +++ b/inspect.go @@ -1,332 +1,381 @@ // Package taskq 提供基于 Redis 的异步任务队列功能 -// inspect.go 文件包含任务队列的监控和检查功能 +// inspect.go 文件包含统计采集器和相关数据结构 package taskq import ( - _ "embed" - "encoding/json" + "database/sql" "fmt" - "html/template" - "net/http" - "sort" - "strconv" + "os" + "path/filepath" "strings" + "sync" "time" "github.com/hibiken/asynq" + _ "github.com/mattn/go-sqlite3" ) -//go:embed dashboard.html -var dashboardHTML string +// ==================== Inspector 统计采集器 ==================== -// InspectOptions 配置监控服务的选项 -type InspectOptions struct { - // RootPath 监控服务的根路径 - // 默认为 "/monitor" - RootPath string - - // ReadOnly 是否只读模式,禁用所有修改操作 - // 默认为 false - ReadOnly bool -} - -// HTTPHandler 监控服务的 HTTP 处理器 -type HTTPHandler struct { - router *http.ServeMux - rootPath string - readOnly bool +// Inspector 统计采集器,独立于 HTTP 服务运行 +type Inspector struct { inspector *asynq.Inspector + db *sql.DB + closeCh chan struct{} + closeOnce sync.Once + interval time.Duration } -// NewInspectHandler 创建新的监控处理器 -// 使用全局的 redisClient 创建 asynq.Inspector -func NewInspectHandler(opts InspectOptions) (*HTTPHandler, error) { +// InspectorOptions 配置统计采集器的选项 +type InspectorOptions struct { + // Interval 采集间隔,默认 2 秒 + Interval time.Duration + + // DBPath SQLite 数据库文件路径,默认为 "./taskq_stats.db" + DBPath string +} + +// NewInspector 创建新的统计采集器 +func NewInspector(opts InspectorOptions) (*Inspector, error) { if redisClient == nil { return nil, fmt.Errorf("taskq: redis client not initialized, call SetRedis() first") } - // 设置默认值 - if opts.RootPath == "" { - opts.RootPath = "/monitor" + if opts.Interval <= 0 { + opts.Interval = 2 * time.Second } - // 确保路径以 / 开头且不以 / 结尾 - if !strings.HasPrefix(opts.RootPath, "/") { - opts.RootPath = "/" + opts.RootPath - } - opts.RootPath = strings.TrimSuffix(opts.RootPath, "/") - - // 创建 asynq inspector - inspector := asynq.NewInspectorFromRedisClient(redisClient) - - handler := &HTTPHandler{ - router: http.NewServeMux(), - rootPath: opts.RootPath, - readOnly: opts.ReadOnly, - inspector: inspector, + if opts.DBPath == "" { + opts.DBPath = "./taskq_stats.db" } - handler.setupRoutes() - return handler, nil -} - -// ServeHTTP 实现 http.Handler 接口 -func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.router.ServeHTTP(w, r) -} - -// RootPath 返回监控服务的根路径 -func (h *HTTPHandler) RootPath() string { - return h.rootPath -} - -// Close 关闭 inspector 连接 -func (h *HTTPHandler) Close() error { - return h.inspector.Close() -} - -// setupRoutes 设置路由 -func (h *HTTPHandler) setupRoutes() { - // API 路由 - apiPath := h.rootPath + "/api/" - h.router.HandleFunc(apiPath+"queues", h.handleQueues) - h.router.HandleFunc(apiPath+"queues/", h.handleQueueDetail) - h.router.HandleFunc(apiPath+"tasks/", h.handleTasks) - - // 主页路由 - h.router.HandleFunc(h.rootPath+"/", h.handleDashboard) - h.router.HandleFunc(h.rootPath, h.handleDashboard) -} - -// handleQueues 处理队列列表请求 -func (h *HTTPHandler) handleQueues(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - - // 获取所有队列信息 - asynqQueues, err := h.inspector.Queues() - if err != nil { - http.Error(w, fmt.Sprintf("Failed to get queues: %v", err), http.StatusInternalServerError) - return - } - - fmt.Println("Redis中的队列:", asynqQueues) - fmt.Println("注册的队列:", queues) - - // 获取每个队列的详细信息 - type QueueInfo struct { - Name string `json:"name"` - Priority int `json:"priority"` - Active int `json:"active"` - Pending int `json:"pending"` - Retry int `json:"retry"` - Archived int `json:"archived"` - } - - var queueInfos []QueueInfo - - // 首先显示所有注册的队列(即使Redis中还没有任务) - for queueName, priority := range queues { - stats, err := h.inspector.GetQueueInfo(queueName) - if err != nil { - // 如果队列不存在,创建一个空的状态 - queueInfos = append(queueInfos, QueueInfo{ - Name: queueName, - Priority: priority, - Active: 0, - Pending: 0, - Retry: 0, - Archived: 0, - }) - } else { - queueInfos = append(queueInfos, QueueInfo{ - Name: queueName, - Priority: priority, - Active: stats.Active, - Pending: stats.Pending, - Retry: stats.Retry, - Archived: stats.Archived, - }) + // 确保目录存在 + dir := filepath.Dir(opts.DBPath) + if dir != "" && dir != "." { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("taskq: failed to create directory: %v", err) } } - // 按优先级排序 - sort.Slice(queueInfos, func(i, j int) bool { - return queueInfos[i].Priority > queueInfos[j].Priority + // 打开 SQLite 数据库 + db, err := sql.Open("sqlite3", opts.DBPath) + if err != nil { + return nil, fmt.Errorf("taskq: failed to open database: %v", err) + } + + // 初始化数据库表 + if err := initStatsDB(db); err != nil { + db.Close() + return nil, fmt.Errorf("taskq: failed to init database: %v", err) + } + + ins := &Inspector{ + inspector: asynq.NewInspectorFromRedisClient(redisClient), + db: db, + closeCh: make(chan struct{}), + interval: opts.Interval, + } + + // 启动后台统计采集 + go ins.startCollector() + + return ins, nil +} + +// initStatsDB 初始化数据库(Prometheus 风格:单表 + 标签) +// 设计思路: +// - 单表存储所有队列的统计数据,通过 queue 列区分 +// - 复合索引支持按时间和队列两个维度高效查询 +// - 类似 Prometheus 的 (timestamp, labels, value) 模型 +func initStatsDB(db *sql.DB) error { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + queue TEXT NOT NULL, + active INTEGER DEFAULT 0, + pending INTEGER DEFAULT 0, + scheduled INTEGER DEFAULT 0, + retry INTEGER DEFAULT 0, + archived INTEGER DEFAULT 0, + completed INTEGER DEFAULT 0, + succeeded INTEGER DEFAULT 0, + failed INTEGER DEFAULT 0 + ); + -- 按队列查询:WHERE queue = ? ORDER BY timestamp + CREATE INDEX IF NOT EXISTS idx_metrics_queue_time ON metrics(queue, timestamp DESC); + -- 按时间查询所有队列:WHERE timestamp BETWEEN ? AND ? + CREATE INDEX IF NOT EXISTS idx_metrics_time ON metrics(timestamp DESC); + -- 唯一约束:同一时间同一队列只有一条记录 + CREATE UNIQUE INDEX IF NOT EXISTS idx_metrics_unique ON metrics(timestamp, queue); + `) + return err +} + +// Close 关闭统计采集器 +func (ins *Inspector) Close() error { + ins.closeOnce.Do(func() { + close(ins.closeCh) }) - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(queueInfos) + if ins.db != nil { + ins.db.Close() + } + return ins.inspector.Close() } -// handleQueueDetail 处理队列详情请求 -func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return +// startCollector 启动后台统计采集任务 +func (ins *Inspector) startCollector() { + ticker := time.NewTicker(ins.interval) + defer ticker.Stop() + + for { + select { + case <-ins.closeCh: + return + case <-ticker.C: + ins.collectStats() + } + } +} + +// collectStats 采集所有队列的统计数据 +func (ins *Inspector) collectStats() { + now := time.Now().Unix() + + for queueName := range queues { + stats, err := ins.inspector.GetQueueInfo(queueName) + if err != nil { + continue + } + + qs := QueueStats{ + Queue: queueName, + Timestamp: now, + Active: stats.Active, + Pending: stats.Pending, + Scheduled: stats.Scheduled, + Retry: stats.Retry, + Archived: stats.Archived, + Completed: stats.Completed, + Succeeded: stats.Processed - stats.Failed, + Failed: stats.Failed, + } + + ins.saveMetrics(qs) + } +} + +// saveMetrics 保存统计数据到 metrics 表 +func (ins *Inspector) saveMetrics(stats QueueStats) error { + if ins.db == nil { + return nil } - // 从 URL 中提取队列名称 - path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/queues/") - parts := strings.Split(path, "/") - if len(parts) == 0 || parts[0] == "" { - http.Error(w, "Queue name is required", http.StatusBadRequest) - return - } - queueName := parts[0] + _, err := ins.db.Exec(` + INSERT OR REPLACE INTO metrics (timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, stats.Timestamp, stats.Queue, stats.Active, stats.Pending, stats.Scheduled, stats.Retry, stats.Archived, stats.Completed, stats.Succeeded, stats.Failed) - // 检查队列是否已注册 - if _, exists := queues[queueName]; !exists { - http.Error(w, "Queue not found", http.StatusNotFound) - return + return err +} + +// GetQueueInfo 获取队列信息 +func (ins *Inspector) GetQueueInfo(queueName string) (*asynq.QueueInfo, error) { + return ins.inspector.GetQueueInfo(queueName) +} + +// ListActiveTasks 获取活跃任务列表 +func (ins *Inspector) ListActiveTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { + return ins.inspector.ListActiveTasks(queueName, opts...) +} + +// ListPendingTasks 获取等待任务列表 +func (ins *Inspector) ListPendingTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { + return ins.inspector.ListPendingTasks(queueName, opts...) +} + +// ListScheduledTasks 获取计划任务列表 +func (ins *Inspector) ListScheduledTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { + return ins.inspector.ListScheduledTasks(queueName, opts...) +} + +// ListRetryTasks 获取重试任务列表 +func (ins *Inspector) ListRetryTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { + return ins.inspector.ListRetryTasks(queueName, opts...) +} + +// ListArchivedTasks 获取归档任务列表 +func (ins *Inspector) ListArchivedTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { + return ins.inspector.ListArchivedTasks(queueName, opts...) +} + +// ListCompletedTasks 获取已完成任务列表 +func (ins *Inspector) ListCompletedTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { + return ins.inspector.ListCompletedTasks(queueName, opts...) +} + +// RunTask 立即运行归档任务(重试失败任务) +func (ins *Inspector) RunTask(queueName, taskID string) error { + return ins.inspector.RunTask(queueName, taskID) +} + +// PauseQueue 暂停队列 +func (ins *Inspector) PauseQueue(queueName string) error { + return ins.inspector.PauseQueue(queueName) +} + +// UnpauseQueue 恢复队列 +func (ins *Inspector) UnpauseQueue(queueName string) error { + return ins.inspector.UnpauseQueue(queueName) +} + +// ==================== 统计数据结构 ==================== + +// QueueInfo 获取每个队列的详细信息 +type QueueInfo struct { + Name string `json:"name"` + Priority int `json:"priority"` + Size int `json:"size"` // 队列中任务总数 + Active int `json:"active"` // 活跃任务数 + Pending int `json:"pending"` // 等待任务数 + Scheduled int `json:"scheduled"` // 计划任务数 + Retry int `json:"retry"` // 重试任务数 + Archived int `json:"archived"` // 归档任务数 + Completed int `json:"completed"` // 已完成任务数 + Processed int `json:"processed"` // 累计处理数(今日) + Failed int `json:"failed"` // 累计失败数(今日) + Paused bool `json:"paused"` // 是否暂停 + MemoryUsage int64 `json:"memory_usage"` // 内存使用(字节) + Latency int64 `json:"latency"` // 延迟(毫秒) +} + +// QueueStats 队列统计数据点(用于存储历史数据) +type QueueStats struct { + Timestamp int64 `json:"t"` // Unix 时间戳(秒) + Queue string `json:"q,omitempty"` // 队列名称(汇总查询时为空) + Active int `json:"a"` // 活跃任务数 + Pending int `json:"p"` // 等待任务数 + Scheduled int `json:"s"` // 计划任务数 + Retry int `json:"r"` // 重试任务数 + Archived int `json:"ar"` // 归档任务数 + Completed int `json:"c"` // 已完成任务数 + Succeeded int `json:"su"` // 成功数 + Failed int `json:"f"` // 失败数 +} + +// ==================== 全局统计数据查询 ==================== + +var statsDB *sql.DB +var statsDBMu sync.RWMutex + +// SetStatsDB 设置全局统计数据库(供 HTTPHandler 使用) +func SetStatsDB(db *sql.DB) { + statsDBMu.Lock() + defer statsDBMu.Unlock() + statsDB = db +} + +// StatsQuery 统计查询参数 +type StatsQuery struct { + Queue string // 队列名称,为空则查询所有队列汇总 + Start int64 // 开始时间戳(秒),0 表示不限制 + End int64 // 结束时间戳(秒),0 表示不限制 + Limit int // 返回数量限制,默认 500 +} + +// getQueueStats 获取队列历史统计数据 +func getQueueStats(queueName string, limit int) ([]QueueStats, error) { + return getQueueStatsWithQuery(StatsQuery{ + Queue: queueName, + Limit: limit, + }) +} + +// getQueueStatsWithQuery 根据查询条件获取统计数据(Prometheus 风格单表查询) +// - 按队列查询:使用 idx_metrics_queue_time 索引 +// - 按时间汇总:使用 idx_metrics_time 索引 + GROUP BY +func getQueueStatsWithQuery(q StatsQuery) ([]QueueStats, error) { + statsDBMu.RLock() + db := statsDB + statsDBMu.RUnlock() + + if db == nil { + return nil, nil } - // 获取队列详细信息 - stats, err := h.inspector.GetQueueInfo(queueName) + if q.Limit <= 0 { + q.Limit = 500 + } + + var args []any + var whereClause string + var conditions []string + + // 构建 WHERE 条件 + if q.Queue != "" { + conditions = append(conditions, "queue = ?") + args = append(args, q.Queue) + } + if q.Start > 0 { + conditions = append(conditions, "timestamp >= ?") + args = append(args, q.Start) + } + if q.End > 0 { + conditions = append(conditions, "timestamp <= ?") + args = append(args, q.End) + } + + if len(conditions) > 0 { + whereClause = "WHERE " + strings.Join(conditions, " AND ") + } + + var query string + if q.Queue != "" { + // 查询单个队列 + query = fmt.Sprintf(` + SELECT timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed + FROM metrics + %s + ORDER BY timestamp DESC + LIMIT ? + `, whereClause) + } else { + // 查询所有队列汇总(按时间 GROUP BY) + query = fmt.Sprintf(` + SELECT timestamp, '' as queue, SUM(active), SUM(pending), SUM(scheduled), SUM(retry), SUM(archived), SUM(completed), SUM(succeeded), SUM(failed) + FROM metrics + %s + GROUP BY timestamp + ORDER BY timestamp DESC + LIMIT ? + `, whereClause) + } + args = append(args, q.Limit) + + rows, err := db.Query(query, args...) if err != nil { - http.Error(w, fmt.Sprintf("Failed to get queue info: %v", err), http.StatusInternalServerError) - return + return nil, err + } + defer rows.Close() + + var statsList []QueueStats + for rows.Next() { + var s QueueStats + if err := rows.Scan(&s.Timestamp, &s.Queue, &s.Active, &s.Pending, &s.Scheduled, &s.Retry, &s.Archived, &s.Completed, &s.Succeeded, &s.Failed); err != nil { + continue + } + statsList = append(statsList, s) } - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(stats) + // 反转顺序,使时间从早到晚 + for i, j := 0, len(statsList)-1; i < j; i, j = i+1, j-1 { + statsList[i], statsList[j] = statsList[j], statsList[i] + } + + return statsList, nil } -// 转换任务信息 -type TaskInfo struct { - ID string `json:"id"` - Type string `json:"type"` - Payload string `json:"payload"` - Queue string `json:"queue"` - Retried int `json:"retried"` - LastFailed string `json:"last_failed,omitempty"` - LastError string `json:"last_error,omitempty"` - NextProcess string `json:"next_process,omitempty"` - CompletedAt string `json:"completed_at,omitempty"` -} - -// handleTasks 处理任务列表请求 -func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - - // 从 URL 中提取队列名称和任务状态 - path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/tasks/") - parts := strings.Split(path, "/") - if len(parts) < 2 { - http.Error(w, "Queue name and task state are required", http.StatusBadRequest) - return - } - - queueName := parts[0] - taskState := parts[1] - - // 检查队列是否已注册 - if _, exists := queues[queueName]; !exists { - http.Error(w, "Queue not found", http.StatusNotFound) - return - } - - // 解析分页参数 - page := 1 - pageSize := 20 - if p := r.URL.Query().Get("page"); p != "" { - if parsed, err := strconv.Atoi(p); err == nil && parsed > 0 { - page = parsed - } - } - if ps := r.URL.Query().Get("page_size"); ps != "" { - if parsed, err := strconv.Atoi(ps); err == nil && parsed > 0 && parsed <= 100 { - pageSize = parsed - } - } - - // 根据任务状态获取任务列表 - var tasks []*asynq.TaskInfo - var err error - - switch taskState { - case "active": - tasks, err = h.inspector.ListActiveTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) - case "pending": - tasks, err = h.inspector.ListPendingTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) - case "retry": - tasks, err = h.inspector.ListRetryTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) - case "archived": - tasks, err = h.inspector.ListArchivedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) - case "completed": - tasks, err = h.inspector.ListCompletedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) - default: - http.Error(w, "Invalid task state. Valid states: active, pending, retry, archived, completed", http.StatusBadRequest) - return - } - - if err != nil { - http.Error(w, fmt.Sprintf("Failed to get tasks: %v", err), http.StatusInternalServerError) - return - } - - var taskInfos []TaskInfo - for _, task := range tasks { - info := TaskInfo{ - ID: task.ID, - Type: task.Type, - Payload: string(task.Payload), - Queue: task.Queue, - Retried: task.Retried, - } - - if !task.LastFailedAt.IsZero() { - info.LastFailed = task.LastFailedAt.Format(time.RFC3339) - } - if task.LastErr != "" { - info.LastError = task.LastErr - } - if !task.NextProcessAt.IsZero() { - info.NextProcess = task.NextProcessAt.Format(time.RFC3339) - } - if !task.CompletedAt.IsZero() { - info.CompletedAt = task.CompletedAt.Format(time.RFC3339) - } - - taskInfos = append(taskInfos, info) - } - - response := map[string]any{ - "tasks": taskInfos, - "page": page, - "page_size": pageSize, - "total": len(taskInfos), - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) -} - -// handleDashboard 处理仪表板页面 -func (h *HTTPHandler) handleDashboard(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - - // 使用嵌入的 HTML 模板 - tmpl, err := template.New("dashboard").Parse(dashboardHTML) - if err != nil { - http.Error(w, fmt.Sprintf("Template error: %v", err), http.StatusInternalServerError) - return - } - - data := struct { - RootPath string - }{ - RootPath: h.rootPath, - } - - w.Header().Set("Content-Type", "text/html; charset=utf-8") - tmpl.Execute(w, data) +// GetStatsDB 返回 Inspector 的数据库连接(供外部设置给 HTTPHandler) +func (ins *Inspector) GetStatsDB() *sql.DB { + return ins.db } diff --git a/task.go b/task.go index 1f1acf2..7d56073 100644 --- a/task.go +++ b/task.go @@ -87,13 +87,21 @@ func (t *Task[T]) Publish(ctx context.Context, data T, options ...PublishOption) // 构建任务选项 opts := []asynq.Option{ asynq.Queue(t.Queue), // 设置队列名称 - asynq.Group(t.Group), // 设置任务组 asynq.MaxRetry(t.MaxRetries), // 设置最大重试次数 asynq.TaskID(xid.New().String()), // 生成唯一任务ID - asynq.Timeout(t.TTR), // 设置超时时间 asynq.Retention(time.Hour * 24), // 设置结果保留24小时 } + // 只有设置了 TTR 才添加超时选项,避免 TTR=0 导致立即超时 + if t.TTR > 0 { + opts = append(opts, asynq.Timeout(t.TTR)) + } + + // 只有设置了 Group 才添加分组选项 + if t.Group != "" { + opts = append(opts, asynq.Group(t.Group)) + } + // 应用用户自定义选项 for _, option := range options { if opt := option(); opt != nil { @@ -126,14 +134,15 @@ func (t *Task[T]) ProcessTask(ctx context.Context, tsk *asynq.Task) error { // 根据配置添加数据参数 if t.inputData { - // 创建数据类型的指针实例 - dataValue := reflect.New(t.dataType) + // 创建数据类型的指针实例用于反序列化 + dataPtr := reflect.New(t.dataType) // 反序列化任务载荷 - err := json.Unmarshal(tsk.Payload(), dataValue.Interface()) + err := json.Unmarshal(tsk.Payload(), dataPtr.Interface()) if err != nil { return err } - in = append(in, dataValue) + // 传递值类型而非指针,因为 Handler 期望的是值类型 + in = append(in, dataPtr.Elem()) } // 通过反射调用处理器函数 @@ -141,7 +150,10 @@ func (t *Task[T]) ProcessTask(ctx context.Context, tsk *asynq.Task) error { // 处理返回值 if t.returnError { - // Register 已确保返回类型为 error,无需类型断言 + // 当返回值为 nil 时,Interface() 返回 nil,不能直接类型断言 + if out[0].IsNil() { + return nil + } return out[0].Interface().(error) } return nil diff --git a/taskq.go b/taskq.go index 15fc51f..47bea26 100644 --- a/taskq.go +++ b/taskq.go @@ -19,10 +19,12 @@ import ( var ( started atomic.Bool // 服务器启动状态 exit chan chan struct{} // 优雅退出信号通道 + done chan struct{} // 关闭完成信号通道 handlers map[string]asynq.Handler // 任务处理器映射表 queues map[string]int // 队列优先级配置 client atomic.Pointer[asynq.Client] // asynq 客户端实例 redisClient redis.UniversalClient // Redis 客户端实例 + inspector *Inspector // 统计采集器实例 errorType = reflect.TypeOf((*error)(nil)).Elem() // error 类型反射 contextType = reflect.TypeOf((*context.Context)(nil)).Elem() // context.Context 类型反射 ) @@ -31,6 +33,7 @@ var ( // 创建必要的全局变量和映射表,必须在调用其他函数之前调用 func Init() { exit = make(chan chan struct{}) // 创建优雅退出通道 + done = make(chan struct{}) // 创建关闭完成通道 handlers = make(map[string]asynq.Handler) // 创建任务处理器映射 queues = make(map[string]int) // 创建队列优先级映射 } @@ -39,6 +42,19 @@ func Init() { // 使用泛型确保类型安全,通过反射验证处理器函数签名 // 处理器函数签名必须是:func(context.Context, T) error 或 func(context.Context) 或 func(T) error 或 func() func Register[T any](t *Task[T]) error { + if t.Queue == "" { + return errors.New("taskq: queue name cannot be empty") + } + if t.Priority < 0 || t.Priority > 255 { + return errors.New("taskq: priority must be between 0 and 255") + } + if t.MaxRetries < 0 { + return errors.New("taskq: retry count must be non-negative") + } + if t.Handler == nil { + return errors.New("taskq: handler cannot be nil") + } + rv := reflect.ValueOf(t.Handler) if rv.Kind() != reflect.Func { return errors.New("taskq: handler must be a function") @@ -56,28 +72,36 @@ func Register[T any](t *Task[T]) error { } } - // 验证参数:最多2个参数,第一个必须是 context.Context,第二个必须是结构体 + // 验证参数:支持以下签名 + // - func(context.Context, T) error + // - func(context.Context) error + // - func(T) error + // - func() var inContext bool var inData bool var dataType reflect.Type - for i := range rt.NumIn() { - if i == 0 { - fi := rt.In(i) - if !fi.Implements(contextType) { - return errors.New("taskq: handler function first parameter must be context.Context") + numIn := rt.NumIn() + + if numIn > 2 { + return errors.New("taskq: handler function can have at most 2 parameters") + } + + for i := range numIn { + fi := rt.In(i) + if fi.Implements(contextType) { + if i != 0 { + return errors.New("taskq: context.Context must be the first parameter") } inContext = true - continue + } else if fi.Kind() == reflect.Struct { + if inData { + return errors.New("taskq: handler function can only have one data parameter") + } + inData = true + dataType = fi + } else { + return errors.New("taskq: handler parameter must be context.Context or a struct") } - if i != 1 { - return errors.New("taskq: handler function can have at most 2 parameters") - } - fi := rt.In(i) - if fi.Kind() != reflect.Struct { - return errors.New("taskq: handler function second parameter must be a struct") - } - inData = true - dataType = fi } // 检查服务器是否已启动 @@ -112,73 +136,130 @@ func SetRedis(rdb redis.UniversalClient) error { return nil } +// StartOptions 启动选项 +type StartOptions struct { + // StatsInterval 统计采集间隔,默认 2 秒 + StatsInterval time.Duration + // StatsDBPath SQLite 数据库文件路径,默认 "./taskq_stats.db" + StatsDBPath string +} + // Start 启动 taskq 服务器 // 开始监听任务队列并处理任务,包含健康检查和优雅退出机制 -func Start(ctx context.Context) error { - // 原子操作确保只启动一次 +func Start(ctx context.Context, opts ...StartOptions) error { if !started.CompareAndSwap(false, true) { return errors.New("taskq: server is already running") } - // 检查 Redis 客户端是否已初始化 if redisClient == nil { + started.Store(false) return errors.New("taskq: redis client not initialized, call SetRedis() first") } - // 创建任务路由器 + var opt StartOptions + if len(opts) > 0 { + opt = opts[0] + } + + if err := startInspector(opt); err != nil { + started.Store(false) + return err + } + + srv := createServer(ctx) + go runServer(srv) + go runMonitor(ctx, srv) + + return nil +} + +// startInspector 启动统计采集器 +func startInspector(opt StartOptions) error { + ins, err := NewInspector(InspectorOptions{ + Interval: opt.StatsInterval, + DBPath: opt.StatsDBPath, + }) + if err != nil { + return err + } + inspector = ins + SetStatsDB(ins.GetStatsDB()) + return nil +} + +// createServer 创建 asynq 服务器 +func createServer(ctx context.Context) *asynq.Server { + return asynq.NewServerFromRedisClient(redisClient, asynq.Config{ + Concurrency: 30, + Queues: maps.Clone(queues), + BaseContext: func() context.Context { return ctx }, + LogLevel: asynq.WarnLevel, + }) +} + +// runServer 运行任务处理服务器 +func runServer(srv *asynq.Server) { mux := asynq.NewServeMux() for name, handler := range handlers { mux.Handle(name, handler) } + if err := srv.Run(mux); err != nil { + log.Fatal(err) + } +} - // 创建 asynq 服务器 - srv := asynq.NewServerFromRedisClient(redisClient, asynq.Config{ - Concurrency: 30, // 并发处理数 - Queues: maps.Clone(queues), // 队列配置 - BaseContext: func() context.Context { return ctx }, // 基础上下文 - LogLevel: asynq.DebugLevel, // 日志级别 - }) +// runMonitor 运行监控协程,处理优雅退出和健康检查 +func runMonitor(ctx context.Context, srv *asynq.Server) { + defer close(done) + defer started.Store(false) + defer closeInspector() + defer srv.Shutdown() - // 启动监控协程:处理优雅退出和健康检查 - ctx, cancel := context.WithCancel(ctx) - go func() { - defer cancel() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() - ticker := time.NewTicker(time.Minute) // 每分钟健康检查 - defer ticker.Stop() - - for { + for { + select { + case quit := <-exit: + quit <- struct{}{} + return + case <-ctx.Done(): + // ctx 取消时,排空 exit 通道中可能的信号 select { - case <-ctx.Done(): + case quit := <-exit: + quit <- struct{}{} + default: + } + return + case <-ticker.C: + if err := srv.Ping(); err != nil { + log.Println(err) return - case exit := <-exit: // 收到退出信号 - srv.Stop() - exit <- struct{}{} - return - case <-ticker.C: // 定期健康检查 - err := srv.Ping() - if err != nil { - log.Println(err) - Stop() - } } } - }() + } +} - // 启动任务处理服务器 - go func() { - if err := srv.Run(mux); err != nil { - log.Fatal(err) - } - }() - - return nil +// closeInspector 关闭统计采集器 +func closeInspector() { + if inspector != nil { + inspector.Close() + inspector = nil + } } // Stop 优雅停止 taskq 服务器 // 发送停止信号并等待服务器完全关闭 func Stop() { + if !started.Load() { + return + } quit := make(chan struct{}) - exit <- quit // 发送退出信号 - <-quit // 等待确认退出 + select { + case exit <- quit: + <-quit // 等待确认收到退出信号 + default: + // monitor 已经退出 + } + <-done // 等待 runMonitor 完全结束 } diff --git a/ui/app.js b/ui/app.js new file mode 100644 index 0000000..7ba7995 --- /dev/null +++ b/ui/app.js @@ -0,0 +1,410 @@ +import { LitElement, html } from 'lit'; +import './components/time-range-picker.js'; +import './components/tasks-chart.js'; +import './components/queue-table.js'; +import './components/queue-modal.js'; + +class TaskqApp extends LitElement { + static properties = { + rootPath: { type: String, attribute: 'root-path' }, + queues: { type: Array, state: true }, + loading: { type: Boolean, state: true }, + modalOpen: { type: Boolean, state: true }, + currentQueue: { type: String, state: true }, + currentTab: { type: String, state: true }, + currentPage: { type: Number, state: true }, + // Time range state + duration: { type: String, state: true }, + endTime: { type: Number, state: true }, + isLiveMode: { type: Boolean, state: true }, + // Chart data + chartData: { type: Object, state: true } + }; + + constructor() { + super(); + this.rootPath = '/monitor'; + this.queues = []; + this.loading = true; + this.modalOpen = false; + this.currentQueue = ''; + this.currentTab = 'active'; + this.currentPage = 1; + this.duration = '1h'; + this.endTime = null; + this.isLiveMode = true; + this.chartData = { labels: [], timestamps: [], datasets: {} }; + this.eventSource = null; + } + + createRenderRoot() { + return this; + } + + connectedCallback() { + super.connectedCallback(); + this.initRoute(); + this.loadStatsForTimeRange(); + this.connectSSE(); + window.addEventListener('popstate', this.handlePopState.bind(this)); + } + + disconnectedCallback() { + super.disconnectedCallback(); + if (this.eventSource) { + this.eventSource.close(); + } + window.removeEventListener('popstate', this.handlePopState.bind(this)); + } + + initRoute() { + const path = window.location.pathname; + const relativePath = path.replace(this.rootPath, '').replace(/^\/+/, ''); + const match = relativePath.match(/^queues\/([^\/]+)\/([^\/]+)/); + if (match) { + this.currentQueue = decodeURIComponent(match[1]); + this.currentTab = match[2]; + const params = new URLSearchParams(window.location.search); + this.currentPage = parseInt(params.get('page')) || 1; + this.modalOpen = true; + } + } + + handlePopState(event) { + if (event.state && event.state.queue) { + this.currentQueue = event.state.queue; + this.currentTab = event.state.tab; + this.currentPage = event.state.page; + this.modalOpen = true; + } else { + this.modalOpen = false; + } + } + + connectSSE() { + if (this.eventSource) { + this.eventSource.close(); + } + + this.eventSource = new EventSource(`${this.rootPath}/api/sse`); + + this.eventSource.addEventListener('queues', (event) => { + this.queues = JSON.parse(event.data); + this.loading = false; + }); + + this.eventSource.addEventListener('stats', (event) => { + if (!this.isLiveMode) return; + const stats = JSON.parse(event.data); + + // 检查数据是否在当前时间范围内 + const durationSecs = this.parseDuration(this.duration); + const now = Math.floor(Date.now() / 1000); + const start = now - durationSecs; + + if (stats.t < start) return; // 数据不在时间范围内,忽略 + + this.appendStatsPoint(stats); + + // 移除超出时间范围的旧数据 + this.trimOldData(start); + }); + + this.eventSource.onerror = () => { + setTimeout(() => this.connectSSE(), 3000); + }; + } + + appendStatsPoint(stats) { + const date = new Date(stats.t * 1000); + const timeLabel = date.toLocaleTimeString('zh-CN', { + hour: '2-digit', minute: '2-digit', second: '2-digit' + }); + + // 检查是否已有相同时间戳的数据 + if (this.chartData.timestamps.length > 0 && + this.chartData.timestamps[this.chartData.timestamps.length - 1] === stats.t) { + return; + } + + const newData = { ...this.chartData }; + newData.labels = [...newData.labels, timeLabel]; + newData.timestamps = [...newData.timestamps, stats.t]; + newData.datasets = { + active: [...(newData.datasets.active || []), stats.a || 0], + pending: [...(newData.datasets.pending || []), stats.p || 0], + scheduled: [...(newData.datasets.scheduled || []), stats.s || 0], + retry: [...(newData.datasets.retry || []), stats.r || 0], + archived: [...(newData.datasets.archived || []), stats.ar || 0], + completed: [...(newData.datasets.completed || []), stats.c || 0], + succeeded: [...(newData.datasets.succeeded || []), stats.su || 0], + failed: [...(newData.datasets.failed || []), stats.f || 0] + }; + this.chartData = newData; + } + + trimOldData(startTimestamp) { + const timestamps = this.chartData.timestamps || []; + if (timestamps.length === 0) return; + + // 找到第一个在时间范围内的数据索引 + let trimIndex = 0; + while (trimIndex < timestamps.length && timestamps[trimIndex] < startTimestamp) { + trimIndex++; + } + + if (trimIndex === 0) return; // 没有需要移除的数据 + + const newData = { ...this.chartData }; + newData.labels = newData.labels.slice(trimIndex); + newData.timestamps = newData.timestamps.slice(trimIndex); + newData.datasets = { + active: (newData.datasets.active || []).slice(trimIndex), + pending: (newData.datasets.pending || []).slice(trimIndex), + scheduled: (newData.datasets.scheduled || []).slice(trimIndex), + retry: (newData.datasets.retry || []).slice(trimIndex), + archived: (newData.datasets.archived || []).slice(trimIndex), + completed: (newData.datasets.completed || []).slice(trimIndex), + succeeded: (newData.datasets.succeeded || []).slice(trimIndex), + failed: (newData.datasets.failed || []).slice(trimIndex) + }; + this.chartData = newData; + } + + async loadStatsForTimeRange() { + const durationSecs = this.parseDuration(this.duration); + const end = this.endTime !== null ? this.endTime : Math.floor(Date.now() / 1000); + const start = end - durationSecs; + + try { + const response = await fetch( + `${this.rootPath}/api/stats/?start=${start}&end=${end}&limit=10000` + ); + if (!response.ok) return; + + const stats = await response.json(); + + // 将 API 数据转为 map 便于查找 + const statsMap = new Map(); + if (stats && stats.length > 0) { + stats.forEach(s => { + statsMap.set(s.t, s); + }); + } + + // 计算采样间隔(根据时间范围动态调整) + const interval = this.getInterval(durationSecs); + + // 生成完整的时间序列 + const newData = { + labels: [], timestamps: [], datasets: { + active: [], pending: [], scheduled: [], retry: [], + archived: [], completed: [], succeeded: [], failed: [] + } + }; + + // 对齐到间隔 + const alignedStart = Math.floor(start / interval) * interval; + + for (let t = alignedStart; t <= end; t += interval) { + const date = new Date(t * 1000); + const timeLabel = this.formatTimeLabel(date, durationSecs); + + newData.labels.push(timeLabel); + newData.timestamps.push(t); + + // 查找该时间点附近的数据(允许一定误差) + const s = this.findNearestStats(statsMap, t, interval); + + newData.datasets.active.push(s ? (s.a || 0) : null); + newData.datasets.pending.push(s ? (s.p || 0) : null); + newData.datasets.scheduled.push(s ? (s.s || 0) : null); + newData.datasets.retry.push(s ? (s.r || 0) : null); + newData.datasets.archived.push(s ? (s.ar || 0) : null); + newData.datasets.completed.push(s ? (s.c || 0) : null); + newData.datasets.succeeded.push(s ? (s.su || 0) : null); + newData.datasets.failed.push(s ? (s.f || 0) : null); + } + + this.chartData = newData; + } catch (err) { + console.error('Failed to load stats:', err); + } + } + + // 根据时间范围计算采样间隔 + getInterval(durationSecs) { + if (durationSecs <= 300) return 2; // 5m -> 2s + if (durationSecs <= 900) return 5; // 15m -> 5s + if (durationSecs <= 1800) return 10; // 30m -> 10s + if (durationSecs <= 3600) return 20; // 1h -> 20s + if (durationSecs <= 10800) return 60; // 3h -> 1m + if (durationSecs <= 21600) return 120; // 6h -> 2m + if (durationSecs <= 43200) return 300; // 12h -> 5m + if (durationSecs <= 86400) return 600; // 1d -> 10m + if (durationSecs <= 259200) return 1800; // 3d -> 30m + return 3600; // 7d -> 1h + } + + // 格式化时间标签 + formatTimeLabel(date, durationSecs) { + if (durationSecs <= 86400) { + // 1天内只显示时间 + return date.toLocaleTimeString('zh-CN', { + hour: '2-digit', minute: '2-digit', second: '2-digit' + }); + } else { + // 超过1天显示日期和时间 + return date.toLocaleString('zh-CN', { + month: '2-digit', day: '2-digit', + hour: '2-digit', minute: '2-digit' + }); + } + } + + // 查找最接近的数据点 + findNearestStats(statsMap, timestamp, interval) { + // 精确匹配 + if (statsMap.has(timestamp)) { + return statsMap.get(timestamp); + } + // 在间隔范围内查找 + for (let offset = 1; offset < interval; offset++) { + if (statsMap.has(timestamp + offset)) { + return statsMap.get(timestamp + offset); + } + if (statsMap.has(timestamp - offset)) { + return statsMap.get(timestamp - offset); + } + } + return null; + } + + parseDuration(dur) { + const match = dur.match(/^(\d+)([mhd])$/); + if (!match) return 3600; + const value = parseInt(match[1]); + const unit = match[2]; + switch (unit) { + case 'm': return value * 60; + case 'h': return value * 3600; + case 'd': return value * 86400; + default: return 3600; + } + } + + handleTimeRangeChange(e) { + const { duration, endTime, isLiveMode } = e.detail; + this.duration = duration; + this.endTime = endTime; + this.isLiveMode = isLiveMode; + this.loadStatsForTimeRange(); + } + + handleTimeRangeSelect(e) { + const { start, end } = e.detail; + if (!start || !end) return; + + // 计算选择的时间范围对应的 duration + const durationSecs = end - start; + + // 设置为非实时模式,结束时间为选择的结束时间 + this.isLiveMode = false; + this.endTime = end; + + // 根据选择的秒数设置合适的 duration + if (durationSecs <= 300) this.duration = '5m'; + else if (durationSecs <= 900) this.duration = '15m'; + else if (durationSecs <= 1800) this.duration = '30m'; + else if (durationSecs <= 3600) this.duration = '1h'; + else if (durationSecs <= 10800) this.duration = '3h'; + else if (durationSecs <= 21600) this.duration = '6h'; + else if (durationSecs <= 43200) this.duration = '12h'; + else if (durationSecs <= 86400) this.duration = '1d'; + else if (durationSecs <= 259200) this.duration = '3d'; + else this.duration = '7d'; + + this.loadStatsForTimeRange(); + } + + handleQueueClick(e) { + const { queue } = e.detail; + this.currentQueue = queue; + this.currentTab = 'active'; + this.currentPage = 1; + this.modalOpen = true; + const url = `${this.rootPath}/queues/${queue}/${this.currentTab}?page=1`; + history.pushState({ queue, tab: this.currentTab, page: 1 }, '', url); + } + + handleModalClose() { + this.modalOpen = false; + history.pushState({}, '', `${this.rootPath}/`); + } + + handleTabChange(e) { + const { tab } = e.detail; + this.currentTab = tab; + this.currentPage = 1; + const url = `${this.rootPath}/queues/${this.currentQueue}/${tab}?page=1`; + history.pushState({ queue: this.currentQueue, tab, page: 1 }, '', url); + } + + handlePageChange(e) { + const { page } = e.detail; + this.currentPage = page; + const url = `${this.rootPath}/queues/${this.currentQueue}/${this.currentTab}?page=${page}`; + history.pushState({ queue: this.currentQueue, tab: this.currentTab, page }, '', url); + } + + render() { + return html` +
+
+ Tasks Overview + +
+
+ +
+
+ +
+ ${this.loading ? html` +
+
+
Loading...
+
+ ` : html` + + `} +
+ + + `; + } +} + +customElements.define('taskq-app', TaskqApp); diff --git a/ui/components/help-tooltip.js b/ui/components/help-tooltip.js new file mode 100644 index 0000000..4518e98 --- /dev/null +++ b/ui/components/help-tooltip.js @@ -0,0 +1,80 @@ +import { LitElement, html, css } from 'lit'; + +class HelpTooltip extends LitElement { + static properties = { + text: { type: String } + }; + + static styles = css` + :host { + display: inline-flex; + align-items: center; + justify-content: center; + position: relative; + } + + .icon { + display: inline-flex; + align-items: center; + justify-content: center; + width: 14px; + height: 14px; + border-radius: 50%; + background: #616161; + color: #9e9e9e; + font-size: 10px; + cursor: help; + } + + .icon:hover { + background: #757575; + color: #e0e0e0; + } + + .icon:hover + .tooltip { + display: block; + } + + .tooltip { + display: none; + position: absolute; + top: 100%; + left: 50%; + transform: translateX(-50%); + margin-top: 8px; + padding: 8px 12px; + background: #212121; + color: #e0e0e0; + font-size: 12px; + font-weight: normal; + border-radius: 4px; + white-space: nowrap; + z-index: 1000; + box-shadow: 0 2px 8px rgba(0,0,0,0.3); + } + + .tooltip::before { + content: ''; + position: absolute; + bottom: 100%; + left: 50%; + transform: translateX(-50%); + border: 6px solid transparent; + border-bottom-color: #212121; + } + `; + + constructor() { + super(); + this.text = ''; + } + + render() { + return html` + ? + ${this.text} + `; + } +} + +customElements.define('help-tooltip', HelpTooltip); diff --git a/ui/components/queue-modal.js b/ui/components/queue-modal.js new file mode 100644 index 0000000..0009a40 --- /dev/null +++ b/ui/components/queue-modal.js @@ -0,0 +1,869 @@ +import { LitElement, html, css } from 'lit'; +import { Chart, registerables } from 'chart.js'; +import './time-range-picker.js'; + +Chart.register(...registerables); + +// 十字准星 + 拖拽选择插件(与 tasks-chart 共用逻辑) +const crosshairPlugin = { + id: 'queueCrosshair', + afterEvent(chart, args) { + const event = args.event; + if (event.type === 'mousemove') { + chart.crosshair = { x: event.x, y: event.y }; + } else if (event.type === 'mouseout') { + chart.crosshair = null; + } + }, + afterDraw(chart) { + const ctx = chart.ctx; + const topY = chart.scales.y.top; + const bottomY = chart.scales.y.bottom; + const leftX = chart.scales.x.left; + const rightX = chart.scales.x.right; + + // 绘制选择区域 + if (chart.dragSelect && chart.dragSelect.startX !== null) { + const { startX, currentX } = chart.dragSelect; + const x1 = Math.max(leftX, Math.min(startX, currentX)); + const x2 = Math.min(rightX, Math.max(startX, currentX)); + + ctx.save(); + ctx.fillStyle = 'rgba(255, 193, 7, 0.2)'; + ctx.fillRect(x1, topY, x2 - x1, bottomY - topY); + ctx.restore(); + } + + // 绘制十字准星 + if (!chart.crosshair) return; + const { x, y } = chart.crosshair; + + if (x < leftX || x > rightX || y < topY || y > bottomY) return; + + ctx.save(); + ctx.setLineDash([4, 4]); + ctx.lineWidth = 1; + ctx.strokeStyle = 'rgba(255, 255, 255, 0.4)'; + + ctx.beginPath(); + ctx.moveTo(x, topY); + ctx.lineTo(x, bottomY); + ctx.stroke(); + + ctx.beginPath(); + ctx.moveTo(leftX, y); + ctx.lineTo(rightX, y); + ctx.stroke(); + + ctx.restore(); + } +}; +Chart.register(crosshairPlugin); + +class QueueModal extends LitElement { + static properties = { + open: { type: Boolean }, + queue: { type: String }, + tab: { type: String }, + page: { type: Number }, + rootPath: { type: String }, + tasks: { type: Array, state: true }, + total: { type: Number, state: true }, + loading: { type: Boolean, state: true }, + chartData: { type: Object, state: true }, + queueInfo: { type: Object, state: true }, + // Time range state + duration: { type: String, state: true }, + endTime: { type: Number, state: true }, + isLiveMode: { type: Boolean, state: true }, + timestamps: { type: Array, state: true } + }; + + static styles = css` + :host { + display: block; + } + + .modal { + display: none; + position: fixed; + z-index: 1000; + left: 0; + top: 0; + width: 100%; + height: 100%; + background-color: #424242; + } + + .modal.open { + display: block; + } + + .modal-content { + background-color: #424242; + width: 100%; + height: 100%; + overflow: hidden; + } + + .modal-header { + background: #515151; + padding: 16px 20px; + display: flex; + justify-content: space-between; + align-items: center; + border-bottom: 1px solid #616161; + } + + .modal-header h2 { + font-size: 1.1em; + font-weight: 500; + margin: 0; + } + + .close-btn { + color: #9e9e9e; + font-size: 24px; + cursor: pointer; + width: 32px; + height: 32px; + display: flex; + align-items: center; + justify-content: center; + border-radius: 4px; + background: transparent; + border: none; + } + + .close-btn:hover { + background: #616161; + color: #e0e0e0; + } + + .modal-body { + padding: 20px; + height: calc(100vh - 60px); + overflow-y: auto; + } + + .queue-chart-card { + margin-bottom: 15px; + background: #424242; + border-radius: 4px; + } + + .queue-chart-header { + display: flex; + justify-content: space-between; + align-items: center; + padding: 10px 15px; + border-bottom: 1px solid #515151; + } + + .queue-chart-title { + font-size: 0.9em; + color: #bdbdbd; + } + + .queue-chart-container { + height: 180px; + padding: 10px; + } + + .task-tabs { + display: flex; + gap: 5px; + margin-bottom: 20px; + background: #424242; + padding: 5px; + border-radius: 4px; + } + + .task-tab { + padding: 8px 16px; + cursor: pointer; + border: none; + background: transparent; + color: #9e9e9e; + border-radius: 4px; + font-size: 0.85em; + display: flex; + align-items: center; + gap: 6px; + } + + .task-tab:hover { + color: #e0e0e0; + } + + .task-tab.active { + background: #42a5f5; + color: #fff; + } + + .tab-count { + font-size: 0.8em; + padding: 2px 6px; + border-radius: 10px; + background: rgba(255, 255, 255, 0.15); + } + + .task-tab.active .tab-count { + background: rgba(255, 255, 255, 0.25); + } + + .task-list { + max-height: 400px; + overflow-y: auto; + } + + .task-item { + background: #5a5a5a; + border: 1px solid #616161; + border-radius: 4px; + padding: 12px; + margin-bottom: 8px; + } + + .task-item:hover { + border-color: #42a5f5; + } + + .task-header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 8px; + } + + .task-type { + font-weight: 500; + color: #42a5f5; + } + + .task-info { + display: flex; + gap: 12px; + align-items: center; + font-size: 0.8em; + color: #9e9e9e; + } + + .task-id { + font-family: monospace; + } + + .task-retried { + color: #ffa726; + } + + .task-actions { + margin-top: 8px; + display: flex; + gap: 8px; + } + + .retry-btn { + padding: 4px 12px; + font-size: 0.8em; + background: #42a5f5; + color: #fff; + border: none; + border-radius: 4px; + cursor: pointer; + } + + .retry-btn:hover { + background: #1e88e5; + } + + .retry-btn:disabled { + opacity: 0.5; + cursor: not-allowed; + } + + .task-payload { + font-family: monospace; + font-size: 0.75em; + color: #9e9e9e; + background: #424242; + padding: 8px; + border-radius: 4px; + margin-top: 8px; + word-break: break-all; + } + + .task-meta { + display: flex; + gap: 12px; + margin-top: 8px; + font-size: 0.8em; + color: #9e9e9e; + } + + .task-error { + color: #ef5350; + background: rgba(239, 83, 80, 0.1); + padding: 6px 8px; + border-radius: 4px; + margin-top: 8px; + font-size: 0.8em; + } + + .pagination { + display: flex; + justify-content: center; + gap: 6px; + margin-top: 16px; + } + + .pagination button { + padding: 6px 12px; + border: 1px solid #616161; + background: #424242; + color: #e0e0e0; + cursor: pointer; + border-radius: 4px; + font-size: 0.85em; + } + + .pagination button:hover { + background: #5a5a5a; + } + + .pagination button.active { + background: #42a5f5; + border-color: #42a5f5; + } + + .pagination button:disabled { + opacity: 0.4; + cursor: not-allowed; + } + + .loading { + text-align: center; + padding: 60px; + color: #9e9e9e; + } + + .loading-spinner { + width: 40px; + height: 40px; + border: 3px solid #616161; + border-top-color: #42a5f5; + border-radius: 50%; + animation: spin 1s linear infinite; + margin: 0 auto 15px; + } + + @keyframes spin { + to { transform: rotate(360deg); } + } + + .empty-state { + text-align: center; + padding: 60px; + color: #9e9e9e; + } + + canvas { + width: 100% !important; + height: 100% !important; + } + `; + + static tabs = ['active', 'pending', 'scheduled', 'retry', 'archived', 'completed']; + + constructor() { + super(); + this.open = false; + this.queue = ''; + this.tab = 'active'; + this.page = 1; + this.rootPath = '/monitor'; + this.tasks = []; + this.total = 0; + this.loading = false; + this.chartData = { labels: [], datasets: {} }; + this.chart = null; + this.pageSize = 20; + this.queueInfo = null; + // Time range defaults + this.duration = '1h'; + this.endTime = null; + this.isLiveMode = true; + this.timestamps = []; + this.isDragging = false; + } + + updated(changedProperties) { + if (changedProperties.has('open') && this.open) { + this.loadQueueInfo(); + this.loadQueueHistory(); + this.loadTasks(); + } + if ((changedProperties.has('tab') || changedProperties.has('page')) && this.open) { + this.loadTasks(); + } + if (changedProperties.has('chartData') && this.chart) { + this.updateChart(); + } + } + + async loadQueueInfo() { + try { + const response = await fetch(`${this.rootPath}/api/queues/${this.queue}`); + if (!response.ok) return; + this.queueInfo = await response.json(); + } catch (err) { + console.error('Failed to load queue info:', err); + } + } + + parseDuration(dur) { + const match = dur.match(/^(\d+)([mhd])$/); + if (!match) return 3600; + const value = parseInt(match[1]); + const unit = match[2]; + switch (unit) { + case 'm': return value * 60; + case 'h': return value * 3600; + case 'd': return value * 86400; + default: return 3600; + } + } + + getInterval(durationSecs) { + if (durationSecs <= 300) return 2; + if (durationSecs <= 900) return 5; + if (durationSecs <= 1800) return 10; + if (durationSecs <= 3600) return 20; + if (durationSecs <= 10800) return 60; + if (durationSecs <= 21600) return 120; + if (durationSecs <= 43200) return 300; + if (durationSecs <= 86400) return 600; + if (durationSecs <= 259200) return 1800; + return 3600; + } + + formatTimeLabel(date, durationSecs) { + if (durationSecs <= 86400) { + return date.toLocaleTimeString('zh-CN', { + hour: '2-digit', minute: '2-digit', second: '2-digit' + }); + } else { + return date.toLocaleString('zh-CN', { + month: '2-digit', day: '2-digit', + hour: '2-digit', minute: '2-digit' + }); + } + } + + findNearestStats(statsMap, timestamp, interval) { + if (statsMap.has(timestamp)) return statsMap.get(timestamp); + for (let offset = 1; offset < interval; offset++) { + if (statsMap.has(timestamp + offset)) return statsMap.get(timestamp + offset); + if (statsMap.has(timestamp - offset)) return statsMap.get(timestamp - offset); + } + return null; + } + + async loadQueueHistory() { + const durationSecs = this.parseDuration(this.duration); + const end = this.endTime !== null ? this.endTime : Math.floor(Date.now() / 1000); + const start = end - durationSecs; + + try { + const response = await fetch( + `${this.rootPath}/api/stats/${this.queue}?start=${start}&end=${end}&limit=10000` + ); + if (!response.ok) return; + + const stats = await response.json(); + + // Build stats map + const statsMap = new Map(); + if (stats && stats.length > 0) { + stats.forEach(s => statsMap.set(s.t, s)); + } + + // Calculate interval + const interval = this.getInterval(durationSecs); + const alignedStart = Math.floor(start / interval) * interval; + + const newData = { + labels: [], datasets: { + active: [], pending: [], scheduled: [], retry: [], + archived: [], completed: [], succeeded: [], failed: [] + } + }; + const newTimestamps = []; + + for (let t = alignedStart; t <= end; t += interval) { + const date = new Date(t * 1000); + const timeLabel = this.formatTimeLabel(date, durationSecs); + + newData.labels.push(timeLabel); + newTimestamps.push(t); + + const s = this.findNearestStats(statsMap, t, interval); + newData.datasets.active.push(s ? (s.a || 0) : null); + newData.datasets.pending.push(s ? (s.p || 0) : null); + newData.datasets.scheduled.push(s ? (s.s || 0) : null); + newData.datasets.retry.push(s ? (s.r || 0) : null); + newData.datasets.archived.push(s ? (s.ar || 0) : null); + newData.datasets.completed.push(s ? (s.c || 0) : null); + newData.datasets.succeeded.push(s ? (s.su || 0) : null); + newData.datasets.failed.push(s ? (s.f || 0) : null); + } + + this.chartData = newData; + this.timestamps = newTimestamps; + + await this.updateComplete; + this.initChart(); + } catch (err) { + console.error('Failed to load queue history:', err); + } + } + + async loadTasks() { + this.loading = true; + try { + const response = await fetch( + `${this.rootPath}/api/tasks/${this.queue}/${this.tab}?page=${this.page}&page_size=${this.pageSize}` + ); + if (!response.ok) throw new Error(`HTTP ${response.status}`); + + const data = await response.json(); + this.tasks = data.tasks || []; + this.total = data.total || 0; + } catch (err) { + console.error('Failed to load tasks:', err); + this.tasks = []; + this.total = 0; + } finally { + this.loading = false; + } + } + + initChart() { + const canvas = this.shadowRoot.querySelector('canvas'); + if (!canvas) return; + + if (this.chart) { + this.chart.destroy(); + } + + const ctx = canvas.getContext('2d'); + this.chart = new Chart(ctx, { + type: 'line', + data: { + labels: [], + datasets: [ + { label: 'Active', data: [], borderColor: '#42a5f5', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Pending', data: [], borderColor: '#5c6bc0', backgroundColor: 'transparent', fill: false }, + { label: 'Scheduled', data: [], borderColor: '#ffa726', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Retry', data: [], borderColor: '#ef5350', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Archived', data: [], borderColor: '#ab47bc', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Completed', data: [], borderColor: '#66bb6a', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Succeeded', data: [], borderColor: '#81c784', backgroundColor: 'transparent', fill: false, borderDash: [5, 5], hidden: true }, + { label: 'Failed', data: [], borderColor: '#e57373', backgroundColor: 'transparent', fill: false, borderDash: [5, 5], hidden: true } + ] + }, + options: { + responsive: true, + maintainAspectRatio: false, + animation: { duration: 0 }, + clip: false, + interaction: { mode: 'index', intersect: false }, + hover: { mode: 'index', intersect: false }, + plugins: { + legend: { + position: 'bottom', + labels: { color: '#e0e0e0', padding: 10, usePointStyle: true, pointStyle: 'circle', font: { size: 11 } } + }, + tooltip: { enabled: true, backgroundColor: 'rgba(30, 30, 30, 0.9)', titleColor: '#e0e0e0', bodyColor: '#e0e0e0' } + }, + scales: { + x: { grid: { color: '#616161' }, ticks: { color: '#9e9e9e', maxTicksLimit: 8 } }, + y: { grid: { color: '#616161' }, ticks: { color: '#9e9e9e' }, beginAtZero: true } + }, + elements: { + point: { + radius: 0, + hoverRadius: 5, + hitRadius: 10, + hoverBackgroundColor: (ctx) => ctx.dataset.borderColor, + hoverBorderColor: (ctx) => ctx.dataset.borderColor, + hoverBorderWidth: 0 + }, + line: { tension: 0.3, borderWidth: 1.5, spanGaps: false } + } + } + }); + this.setupDragSelect(canvas); + this.updateChart(); + } + + setupDragSelect(canvas) { + canvas.addEventListener('mousedown', (e) => { + if (!this.chart) return; + const rect = canvas.getBoundingClientRect(); + const x = e.clientX - rect.left; + const leftX = this.chart.scales.x.left; + const rightX = this.chart.scales.x.right; + + if (x >= leftX && x <= rightX) { + this.isDragging = true; + this.chart.dragSelect = { startX: x, currentX: x }; + this.chart.update('none'); + } + }); + + canvas.addEventListener('mousemove', (e) => { + if (!this.isDragging || !this.chart) return; + const rect = canvas.getBoundingClientRect(); + const x = e.clientX - rect.left; + this.chart.dragSelect.currentX = x; + this.chart.update('none'); + }); + + canvas.addEventListener('mouseup', (e) => { + if (!this.isDragging || !this.chart) return; + this.isDragging = false; + + const { startX, currentX } = this.chart.dragSelect; + const leftX = this.chart.scales.x.left; + const rightX = this.chart.scales.x.right; + + const x1 = Math.max(leftX, Math.min(startX, currentX)); + const x2 = Math.min(rightX, Math.max(startX, currentX)); + + if (Math.abs(x2 - x1) > 10 && this.timestamps.length > 0) { + const startIndex = Math.round((x1 - leftX) / (rightX - leftX) * (this.timestamps.length - 1)); + const endIndex = Math.round((x2 - leftX) / (rightX - leftX) * (this.timestamps.length - 1)); + + if (startIndex >= 0 && endIndex < this.timestamps.length) { + const startTime = this.timestamps[startIndex]; + const endTime = this.timestamps[endIndex]; + this.handleTimeRangeSelect(startTime, endTime); + } + } + + this.chart.dragSelect = null; + this.chart.update('none'); + }); + + canvas.addEventListener('mouseleave', () => { + if (this.isDragging && this.chart) { + this.isDragging = false; + this.chart.dragSelect = null; + this.chart.update('none'); + } + }); + } + + handleTimeRangeSelect(start, end) { + const durationSecs = end - start; + + this.isLiveMode = false; + this.endTime = end; + + if (durationSecs <= 300) this.duration = '5m'; + else if (durationSecs <= 900) this.duration = '15m'; + else if (durationSecs <= 1800) this.duration = '30m'; + else if (durationSecs <= 3600) this.duration = '1h'; + else if (durationSecs <= 10800) this.duration = '3h'; + else if (durationSecs <= 21600) this.duration = '6h'; + else if (durationSecs <= 43200) this.duration = '12h'; + else if (durationSecs <= 86400) this.duration = '1d'; + else if (durationSecs <= 259200) this.duration = '3d'; + else this.duration = '7d'; + + this.loadQueueHistory(); + } + + handleChartTimeRangeChange(e) { + const { duration, endTime, isLiveMode } = e.detail; + this.duration = duration; + this.endTime = endTime; + this.isLiveMode = isLiveMode; + this.loadQueueHistory(); + } + + updateChart() { + if (!this.chart || !this.chartData) return; + + this.chart.data.labels = this.chartData.labels || []; + this.chart.data.datasets[0].data = this.chartData.datasets?.active || []; + this.chart.data.datasets[1].data = this.chartData.datasets?.pending || []; + this.chart.data.datasets[2].data = this.chartData.datasets?.scheduled || []; + this.chart.data.datasets[3].data = this.chartData.datasets?.retry || []; + this.chart.data.datasets[4].data = this.chartData.datasets?.archived || []; + this.chart.data.datasets[5].data = this.chartData.datasets?.completed || []; + this.chart.data.datasets[6].data = this.chartData.datasets?.succeeded || []; + this.chart.data.datasets[7].data = this.chartData.datasets?.failed || []; + this.chart.update('none'); + } + + handleClose() { + if (this.chart) { + this.chart.destroy(); + this.chart = null; + } + this.dispatchEvent(new CustomEvent('close')); + } + + handleTabClick(tab) { + this.dispatchEvent(new CustomEvent('tab-change', { detail: { tab } })); + } + + getTabCount(tab) { + if (!this.queueInfo) return 0; + switch (tab) { + case 'active': return this.queueInfo.Active || 0; + case 'pending': return this.queueInfo.Pending || 0; + case 'scheduled': return this.queueInfo.Scheduled || 0; + case 'retry': return this.queueInfo.Retry || 0; + case 'archived': return this.queueInfo.Archived || 0; + case 'completed': return this.queueInfo.Completed || 0; + default: return 0; + } + } + + handlePageClick(page) { + this.dispatchEvent(new CustomEvent('page-change', { detail: { page } })); + } + + async retryTask(taskId) { + try { + const response = await fetch( + `${this.rootPath}/api/tasks/${this.queue}/archived/${taskId}/retry`, + { method: 'POST' } + ); + if (!response.ok) throw new Error(`HTTP ${response.status}`); + // Reload tasks after retry + this.loadTasks(); + } catch (err) { + console.error('Failed to retry task:', err); + alert('Failed to retry task: ' + err.message); + } + } + + renderPagination() { + if (!this.total) return ''; + + const totalPages = Math.ceil(this.total / this.pageSize); + const startPage = Math.max(1, this.page - 2); + const endPage = Math.min(totalPages, this.page + 2); + + const pages = []; + for (let i = startPage; i <= endPage; i++) { + pages.push(i); + } + + return html` + + `; + } + + renderTasks() { + if (this.loading) { + return html` +
+
+
Loading...
+
+ `; + } + + if (!this.tasks || this.tasks.length === 0) { + return html`
No tasks
`; + } + + return html` +
+ ${this.tasks.map(task => html` +
+
+
${task.type}
+
+ ${task.retried > 0 ? html`Retried: ${task.retried}` : ''} + ${task.id} +
+
+ ${task.payload ? html`
${task.payload}
` : ''} +
+ ${task.next_process ? html`Next: ${new Date(task.next_process).toLocaleString()}` : ''} + ${task.completed_at ? html`Completed: ${new Date(task.completed_at).toLocaleString()}` : ''} +
+ ${task.last_error ? html`
${task.last_error}
` : ''} + ${this.tab === 'archived' ? html` +
+ +
+ ` : ''} +
+ `)} +
+ ${this.renderPagination()} + `; + } + + render() { + return html` + + `; + } +} + +customElements.define('queue-modal', QueueModal); diff --git a/ui/components/queue-table.js b/ui/components/queue-table.js new file mode 100644 index 0000000..d2ec001 --- /dev/null +++ b/ui/components/queue-table.js @@ -0,0 +1,233 @@ +import { LitElement, html, css } from 'lit'; +import './help-tooltip.js'; + +class QueueTable extends LitElement { + static properties = { + queues: { type: Array }, + rootPath: { type: String } + }; + + static styles = css` + :host { + display: block; + } + + table { + width: 100%; + border-collapse: collapse; + } + + th { + background: #424242; + padding: 14px 16px; + text-align: left; + font-weight: 500; + color: #bdbdbd; + font-size: 0.9em; + border-bottom: 1px solid #616161; + } + + .th-content { + display: flex; + align-items: center; + gap: 4px; + } + + td { + padding: 14px 16px; + border-bottom: 1px solid #616161; + } + + tr:last-child td { + border-bottom: none; + } + + tbody tr:hover { + background: #5a5a5a; + } + + .queue-name { + font-weight: 500; + color: #4fc3f7; + cursor: pointer; + } + + .queue-name:hover { + text-decoration: underline; + } + + .state-badge { + display: inline-block; + padding: 4px 10px; + border-radius: 4px; + font-size: 0.8em; + font-weight: 500; + background: #66bb6a; + color: #1b5e20; + } + + .state-badge.paused { + background: #ffb74d; + color: #e65100; + } + + .memory-value { + font-size: 0.9em; + color: #bdbdbd; + } + + .latency-value { + color: #bdbdbd; + } + + .action-btn { + background: transparent; + border: 1px solid #757575; + color: #bdbdbd; + padding: 4px 8px; + border-radius: 4px; + cursor: pointer; + font-size: 0.8em; + margin-right: 4px; + } + + .action-btn:hover { + background: #616161; + color: #e0e0e0; + } + + .action-btn.pause { + border-color: #ffb74d; + color: #ffb74d; + } + + .action-btn.pause:hover { + background: rgba(255, 183, 77, 0.2); + } + + .action-btn.resume { + border-color: #66bb6a; + color: #66bb6a; + } + + .action-btn.resume:hover { + background: rgba(102, 187, 106, 0.2); + } + + .empty-state { + text-align: center; + padding: 60px; + color: #9e9e9e; + } + `; + + constructor() { + super(); + this.queues = []; + this.rootPath = '/monitor'; + } + + handleQueueClick(queue) { + this.dispatchEvent(new CustomEvent('queue-click', { + detail: { queue: queue.name } + })); + } + + async togglePause(queue) { + const action = queue.paused ? 'unpause' : 'pause'; + try { + const response = await fetch( + `${this.rootPath}/api/queues/${queue.name}/${action}`, + { method: 'POST' } + ); + if (!response.ok) throw new Error(`HTTP ${response.status}`); + // 触发刷新事件 + this.dispatchEvent(new CustomEvent('queue-updated')); + } catch (err) { + console.error(`Failed to ${action} queue:`, err); + alert(`Failed to ${action} queue: ${err.message}`); + } + } + + formatMemory(bytes) { + if (!bytes || bytes === 0) return '0 B'; + if (bytes < 1024) return bytes + ' B'; + if (bytes < 1024 * 1024) return (bytes / 1024).toFixed(1) + ' KB'; + return (bytes / 1024 / 1024).toFixed(2) + ' MB'; + } + + renderTh(label, tooltip) { + if (!tooltip) { + return html`${label}`; + } + return html` + + + ${label} + + + + `; + } + + + + render() { + if (!this.queues || this.queues.length === 0) { + return html`
No queues
`; + } + + return html` + + + + + ${this.renderTh('State', 'run: 正常处理任务 | paused: 暂停处理新任务')} + ${this.renderTh('Active', '正在被 worker 处理的任务数')} + ${this.renderTh('Pending', '等待处理的任务数')} + ${this.renderTh('Scheduled', '定时/延迟任务,到达指定时间后进入 Pending')} + ${this.renderTh('Retry', '处理失败后等待重试的任务数')} + ${this.renderTh('Archived', '超过最大重试次数的失败任务')} + ${this.renderTh('Memory', '队列在 Redis 中占用的内存')} + ${this.renderTh('Latency', '最老 Pending 任务的等待时间,反映处理及时性')} + + + + + ${this.queues.map(queue => html` + + + + + + + + + + + + + `)} + +
QueueActions
+ this.handleQueueClick(queue)}> + ${queue.name} + + + + ${queue.paused ? 'paused' : 'run'} + + ${queue.active || 0}${queue.pending || 0}${queue.scheduled || 0}${queue.retry || 0}${queue.archived || 0}${this.formatMemory(queue.memory_usage)}${queue.latency || 0}ms + + +
+ `; + } +} + +customElements.define('queue-table', QueueTable); diff --git a/ui/components/tasks-chart.js b/ui/components/tasks-chart.js new file mode 100644 index 0000000..b11383c --- /dev/null +++ b/ui/components/tasks-chart.js @@ -0,0 +1,267 @@ +import { LitElement, html, css } from 'lit'; +import { Chart, registerables } from 'chart.js'; + +Chart.register(...registerables); + +// 十字准星 + 拖拽选择插件 +const crosshairPlugin = { + id: 'crosshair', + afterEvent(chart, args) { + const event = args.event; + if (event.type === 'mousemove') { + chart.crosshair = { x: event.x, y: event.y }; + } else if (event.type === 'mouseout') { + chart.crosshair = null; + } + }, + afterDraw(chart) { + const ctx = chart.ctx; + const topY = chart.scales.y.top; + const bottomY = chart.scales.y.bottom; + const leftX = chart.scales.x.left; + const rightX = chart.scales.x.right; + + // 绘制选择区域 + if (chart.dragSelect && chart.dragSelect.startX !== null) { + const { startX, currentX } = chart.dragSelect; + const x1 = Math.max(leftX, Math.min(startX, currentX)); + const x2 = Math.min(rightX, Math.max(startX, currentX)); + + ctx.save(); + ctx.fillStyle = 'rgba(255, 193, 7, 0.2)'; + ctx.fillRect(x1, topY, x2 - x1, bottomY - topY); + ctx.restore(); + } + + // 绘制十字准星 + if (!chart.crosshair) return; + const { x, y } = chart.crosshair; + + // 检查鼠标是否在图表区域内 + if (x < leftX || x > rightX || y < topY || y > bottomY) return; + + ctx.save(); + ctx.setLineDash([4, 4]); + ctx.lineWidth = 1; + ctx.strokeStyle = 'rgba(255, 255, 255, 0.4)'; + + // 垂直线 + ctx.beginPath(); + ctx.moveTo(x, topY); + ctx.lineTo(x, bottomY); + ctx.stroke(); + + // 水平线跟随鼠标 + ctx.beginPath(); + ctx.moveTo(leftX, y); + ctx.lineTo(rightX, y); + ctx.stroke(); + + ctx.restore(); + } +}; +Chart.register(crosshairPlugin); + +class TasksChart extends LitElement { + static properties = { + data: { type: Object }, + timestamps: { type: Array } + }; + + static styles = css` + :host { + display: block; + width: 100%; + height: 100%; + } + canvas { + width: 100% !important; + height: 100% !important; + } + `; + + constructor() { + super(); + this.data = { labels: [], datasets: {} }; + this.timestamps = []; + this.chart = null; + this.isDragging = false; + } + + firstUpdated() { + this.initChart(); + this.setupDragSelect(); + } + + setupDragSelect() { + const canvas = this.shadowRoot.querySelector('canvas'); + if (!canvas) return; + + canvas.addEventListener('mousedown', (e) => { + if (!this.chart) return; + const rect = canvas.getBoundingClientRect(); + const x = e.clientX - rect.left; + const leftX = this.chart.scales.x.left; + const rightX = this.chart.scales.x.right; + + if (x >= leftX && x <= rightX) { + this.isDragging = true; + this.chart.dragSelect = { startX: x, currentX: x }; + this.chart.update('none'); + } + }); + + canvas.addEventListener('mousemove', (e) => { + if (!this.isDragging || !this.chart) return; + const rect = canvas.getBoundingClientRect(); + const x = e.clientX - rect.left; + this.chart.dragSelect.currentX = x; + this.chart.update('none'); + }); + + canvas.addEventListener('mouseup', (e) => { + if (!this.isDragging || !this.chart) return; + this.isDragging = false; + + const { startX, currentX } = this.chart.dragSelect; + const leftX = this.chart.scales.x.left; + const rightX = this.chart.scales.x.right; + + // 计算选择的时间范围 + const x1 = Math.max(leftX, Math.min(startX, currentX)); + const x2 = Math.min(rightX, Math.max(startX, currentX)); + + // 最小选择宽度检查 + if (Math.abs(x2 - x1) > 10) { + const startIndex = Math.round((x1 - leftX) / (rightX - leftX) * (this.timestamps.length - 1)); + const endIndex = Math.round((x2 - leftX) / (rightX - leftX) * (this.timestamps.length - 1)); + + if (startIndex >= 0 && endIndex < this.timestamps.length) { + const startTime = this.timestamps[startIndex]; + const endTime = this.timestamps[endIndex]; + + this.dispatchEvent(new CustomEvent('time-range-select', { + detail: { start: startTime, end: endTime }, + bubbles: true, + composed: true + })); + } + } + + this.chart.dragSelect = null; + this.chart.update('none'); + }); + + canvas.addEventListener('mouseleave', () => { + if (this.isDragging && this.chart) { + this.isDragging = false; + this.chart.dragSelect = null; + this.chart.update('none'); + } + }); + } + + initChart() { + const canvas = this.shadowRoot.querySelector('canvas'); + if (!canvas) return; + + const ctx = canvas.getContext('2d'); + + this.chart = new Chart(ctx, { + type: 'line', + data: { + labels: [], + datasets: [ + { label: 'Active', data: [], borderColor: '#42a5f5', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Pending', data: [], borderColor: '#5c6bc0', backgroundColor: 'transparent', fill: false }, + { label: 'Scheduled', data: [], borderColor: '#ffa726', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Retry', data: [], borderColor: '#ef5350', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Archived', data: [], borderColor: '#ab47bc', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Completed', data: [], borderColor: '#66bb6a', backgroundColor: 'transparent', fill: false, hidden: true }, + { label: 'Succeeded', data: [], borderColor: '#81c784', backgroundColor: 'transparent', fill: false, borderDash: [5, 5], hidden: true }, + { label: 'Failed', data: [], borderColor: '#e57373', backgroundColor: 'transparent', fill: false, borderDash: [5, 5], hidden: true } + ] + }, + options: { + responsive: true, + maintainAspectRatio: false, + animation: { duration: 0 }, + clip: false, + interaction: { mode: 'index', intersect: false }, + hover: { mode: 'index', intersect: false }, + plugins: { + legend: { + position: 'bottom', + labels: { + color: '#e0e0e0', + padding: 15, + usePointStyle: true, + pointStyle: 'circle' + } + }, + tooltip: { + enabled: true, + backgroundColor: 'rgba(30, 30, 30, 0.9)', + titleColor: '#e0e0e0', + bodyColor: '#e0e0e0', + borderColor: '#616161', + borderWidth: 1, + padding: 10, + displayColors: true + } + }, + scales: { + x: { + grid: { color: '#616161' }, + ticks: { color: '#9e9e9e', maxTicksLimit: 10 } + }, + y: { + grid: { color: '#616161' }, + ticks: { color: '#9e9e9e' }, + beginAtZero: true + } + }, + elements: { + point: { + radius: 0, + hoverRadius: 5, + hitRadius: 10, + hoverBackgroundColor: (ctx) => ctx.dataset.borderColor, + hoverBorderColor: (ctx) => ctx.dataset.borderColor, + hoverBorderWidth: 0 + }, + line: { tension: 0.3, borderWidth: 1.5, spanGaps: false } + } + } + }); + + this.updateChart(); + } + + updated(changedProperties) { + if (changedProperties.has('data') && this.chart) { + this.updateChart(); + } + } + + updateChart() { + if (!this.chart || !this.data) return; + + this.chart.data.labels = this.data.labels || []; + this.chart.data.datasets[0].data = this.data.datasets?.active || []; + this.chart.data.datasets[1].data = this.data.datasets?.pending || []; + this.chart.data.datasets[2].data = this.data.datasets?.scheduled || []; + this.chart.data.datasets[3].data = this.data.datasets?.retry || []; + this.chart.data.datasets[4].data = this.data.datasets?.archived || []; + this.chart.data.datasets[5].data = this.data.datasets?.completed || []; + this.chart.data.datasets[6].data = this.data.datasets?.succeeded || []; + this.chart.data.datasets[7].data = this.data.datasets?.failed || []; + this.chart.update('none'); + } + + render() { + return html``; + } +} + +customElements.define('tasks-chart', TasksChart); diff --git a/ui/components/time-range-picker.js b/ui/components/time-range-picker.js new file mode 100644 index 0000000..673f7f3 --- /dev/null +++ b/ui/components/time-range-picker.js @@ -0,0 +1,165 @@ +import { LitElement, html, css } from 'lit'; + +class TimeRangePicker extends LitElement { + static properties = { + duration: { type: String }, + endTime: { type: Number }, + isLiveMode: { type: Boolean } + }; + + static durations = ['5m', '15m', '30m', '1h', '3h', '6h', '12h', '1d', '3d', '7d']; + + static styles = css` + :host { + display: flex; + gap: 10px; + align-items: center; + } + + .time-control { + display: flex; + align-items: center; + background: #424242; + border: 1px solid #616161; + border-radius: 4px; + overflow: hidden; + } + + button { + background: transparent; + border: none; + color: #9e9e9e; + padding: 6px 10px; + cursor: pointer; + font-size: 1em; + } + + button:hover { + background: #515151; + color: #e0e0e0; + } + + .value { + padding: 6px 12px; + color: #e0e0e0; + font-size: 0.85em; + min-width: 40px; + text-align: center; + } + + .end-value { + cursor: pointer; + } + + .end-value:hover { + background: #515151; + } + + .reset-btn:hover { + color: #ef5350; + } + `; + + constructor() { + super(); + this.duration = '1h'; + this.endTime = null; + this.isLiveMode = true; + } + + get durationIndex() { + return TimeRangePicker.durations.indexOf(this.duration); + } + + parseDuration(dur) { + const match = dur.match(/^(\d+)([mhd])$/); + if (!match) return 3600; + const value = parseInt(match[1]); + const unit = match[2]; + switch (unit) { + case 'm': return value * 60; + case 'h': return value * 3600; + case 'd': return value * 86400; + default: return 3600; + } + } + + adjustDuration(delta) { + const durations = TimeRangePicker.durations; + const newIndex = Math.max(0, Math.min(durations.length - 1, this.durationIndex + delta)); + this.duration = durations[newIndex]; + this.emitChange(); + } + + adjustEndTime(delta) { + const durationSecs = this.parseDuration(this.duration); + const step = durationSecs / 2; + + let newEndTime = this.endTime; + if (newEndTime === null) { + newEndTime = Math.floor(Date.now() / 1000); + } + newEndTime += delta * step; + + const now = Math.floor(Date.now() / 1000); + if (newEndTime >= now) { + this.resetToNow(); + return; + } + + this.endTime = newEndTime; + this.isLiveMode = false; + this.emitChange(); + } + + resetToNow() { + this.endTime = null; + this.isLiveMode = true; + this.emitChange(); + } + + emitChange() { + this.dispatchEvent(new CustomEvent('change', { + detail: { + duration: this.duration, + endTime: this.endTime, + isLiveMode: this.isLiveMode + } + })); + } + + formatEndTime() { + if (this.endTime === null) { + return 'now'; + } + const date = new Date(this.endTime * 1000); + return date.toLocaleString('zh-CN', { + year: 'numeric', + month: '2-digit', + day: '2-digit', + hour: '2-digit', + minute: '2-digit', + second: '2-digit' + }).replace(/\//g, '-'); + } + + render() { + return html` +
+ + ${this.duration} + +
+
+ + + ${this.formatEndTime()} + + + +
+ `; + } +} + +customElements.define('time-range-picker', TimeRangePicker); diff --git a/ui/index.html b/ui/index.html new file mode 100644 index 0000000..da4f48a --- /dev/null +++ b/ui/index.html @@ -0,0 +1,63 @@ + + + + + + TaskQ Monitor + + + + + + +
+

Browser Not Supported

+

TaskQ Monitor requires a modern browser with ES Module support.

+

Please upgrade to one of the following browsers:

+

+ Chrome 61+ | + Firefox 60+ | + Safari 11+ | + Edge 79+ +

+
+ + + + diff --git a/ui/styles.css b/ui/styles.css new file mode 100644 index 0000000..731e6c0 --- /dev/null +++ b/ui/styles.css @@ -0,0 +1,406 @@ +* { + margin: 0; + padding: 0; + box-sizing: border-box; +} + +body { + font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; + background-color: #424242; + color: #e0e0e0; + min-height: 100vh; + padding: 20px; +} + +/* Chart Card */ +.chart-card { + background: #515151; + border-radius: 4px; + padding: 20px; + border: 1px solid #616161; + margin-bottom: 20px; +} + +.chart-header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 15px; +} + +.chart-title { + font-size: 1.1em; + font-weight: 500; +} + +.chart-container { + height: 280px; + position: relative; +} + +/* Time Range Picker */ +.time-range-picker { + display: flex; + gap: 10px; + align-items: center; +} + +.time-control { + display: flex; + align-items: center; + background: #424242; + border: 1px solid #616161; + border-radius: 4px; + overflow: hidden; +} + +.time-control button { + background: transparent; + border: none; + color: #9e9e9e; + padding: 6px 10px; + cursor: pointer; + font-size: 1em; +} + +.time-control button:hover { + background: #515151; + color: #e0e0e0; +} + +.time-control .value { + padding: 6px 12px; + color: #e0e0e0; + font-size: 0.85em; + min-width: 40px; + text-align: center; +} + +.time-control .end-value { + cursor: pointer; +} + +.time-control .end-value:hover { + background: #515151; +} + +.time-control .reset-btn:hover { + color: #ef5350; +} + +/* Table Card */ +.table-card { + background: #515151; + border-radius: 4px; + border: 1px solid #616161; + overflow: hidden; +} + +.queues-table { + width: 100%; + border-collapse: collapse; +} + +.queues-table th { + background: #424242; + padding: 14px 16px; + text-align: left; + font-weight: 500; + color: #bdbdbd; + font-size: 0.9em; + border-bottom: 1px solid #616161; +} + +.queues-table td { + padding: 14px 16px; + border-bottom: 1px solid #616161; +} + +.queues-table tr:last-child td { + border-bottom: none; +} + +.queues-table tbody tr:hover { + background: #5a5a5a; +} + +.queue-name { + font-weight: 500; + color: #4fc3f7; + cursor: pointer; +} + +.queue-name:hover { + text-decoration: underline; +} + +.state-badge { + display: inline-block; + padding: 4px 10px; + border-radius: 4px; + font-size: 0.8em; + font-weight: 500; + background: #66bb6a; + color: #1b5e20; +} + +.state-badge.paused { + background: #ffb74d; + color: #e65100; +} + +.memory-bar { + width: 120px; + height: 6px; + background: #424242; + border-radius: 3px; + overflow: hidden; +} + +.memory-bar-fill { + height: 100%; + background: #42a5f5; + border-radius: 3px; +} + +.latency-value { + color: #bdbdbd; +} + +.action-btn { + background: transparent; + border: 1px solid #757575; + color: #bdbdbd; + padding: 4px 8px; + border-radius: 4px; + cursor: pointer; + font-size: 0.8em; +} + +.action-btn:hover { + background: #616161; + color: #e0e0e0; +} + +/* Loading & Empty State */ +.loading { + text-align: center; + padding: 60px; + color: #9e9e9e; +} + +.loading-spinner { + width: 40px; + height: 40px; + border: 3px solid #616161; + border-top-color: #42a5f5; + border-radius: 50%; + animation: spin 1s linear infinite; + margin: 0 auto 15px; +} + +@keyframes spin { + to { transform: rotate(360deg); } +} + +.empty-state { + text-align: center; + padding: 60px; + color: #9e9e9e; +} + +/* Modal */ +.modal { + display: none; + position: fixed; + z-index: 1000; + left: 0; + top: 0; + width: 100%; + height: 100%; + background-color: #424242; +} + +.modal.open { + display: block; +} + +.modal-content { + background-color: #424242; + width: 100%; + height: 100%; + overflow: hidden; +} + +.modal-header { + background: #515151; + padding: 16px 20px; + display: flex; + justify-content: space-between; + align-items: center; + border-bottom: 1px solid #616161; +} + +.modal-header h2 { + font-size: 1.1em; + font-weight: 500; +} + +.close-btn { + color: #9e9e9e; + font-size: 24px; + cursor: pointer; + width: 32px; + height: 32px; + display: flex; + align-items: center; + justify-content: center; + border-radius: 4px; + background: transparent; + border: none; +} + +.close-btn:hover { + background: #616161; + color: #e0e0e0; +} + +.modal-body { + padding: 20px; + height: calc(100vh - 60px); + overflow-y: auto; +} + +/* Task Tabs */ +.task-tabs { + display: flex; + gap: 5px; + margin-bottom: 20px; + background: #424242; + padding: 5px; + border-radius: 4px; +} + +.task-tab { + padding: 8px 16px; + cursor: pointer; + border: none; + background: transparent; + color: #9e9e9e; + border-radius: 4px; + font-size: 0.85em; +} + +.task-tab:hover { + color: #e0e0e0; +} + +.task-tab.active { + background: #42a5f5; + color: #fff; +} + +/* Task List */ +.task-list { + max-height: 400px; + overflow-y: auto; +} + +.task-item { + background: #5a5a5a; + border: 1px solid #616161; + border-radius: 4px; + padding: 12px; + margin-bottom: 8px; +} + +.task-item:hover { + border-color: #42a5f5; +} + +.task-header { + display: flex; + justify-content: space-between; + align-items: flex-start; + margin-bottom: 8px; +} + +.task-id { + font-family: monospace; + font-size: 0.8em; + color: #9e9e9e; +} + +.task-type { + font-weight: 500; + color: #42a5f5; +} + +.task-payload { + font-family: monospace; + font-size: 0.75em; + color: #9e9e9e; + background: #424242; + padding: 8px; + border-radius: 4px; + margin-top: 8px; + word-break: break-all; +} + +.task-meta { + display: flex; + gap: 12px; + margin-top: 8px; + font-size: 0.8em; + color: #9e9e9e; +} + +.task-error { + color: #ef5350; + background: rgba(239, 83, 80, 0.1); + padding: 6px 8px; + border-radius: 4px; + margin-top: 8px; + font-size: 0.8em; +} + +/* Pagination */ +.pagination { + display: flex; + justify-content: center; + gap: 6px; + margin-top: 16px; +} + +.pagination button { + padding: 6px 12px; + border: 1px solid #616161; + background: #424242; + color: #e0e0e0; + cursor: pointer; + border-radius: 4px; + font-size: 0.85em; +} + +.pagination button:hover { + background: #5a5a5a; +} + +.pagination button.active { + background: #42a5f5; + border-color: #42a5f5; +} + +.pagination button:disabled { + opacity: 0.4; + cursor: not-allowed; +} + +/* Queue Detail Chart */ +.queue-chart-container { + height: 200px; + margin-bottom: 15px; + background: #424242; + border-radius: 4px; + padding: 10px; +}