From 326f2a371c1b2165cc06a2da4272ecb098fa3f76 Mon Sep 17 00:00:00 2001 From: hupeh Date: Wed, 10 Dec 2025 00:53:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E4=BB=AA=E8=A1=A8=E7=9B=98=20UI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加 appbar 导航栏,支持 Chart/Queues 视图切换 - appbar 切换使用 history API,支持浏览器前进/后退 - 图表视图占满整个可视区域 - queue-modal 共享 appbar 样式 - 修复 queue tab count 字段名大小写问题 - tooltip 跟随鼠标显示在右下方,移除箭头 - 图表 canvas 鼠标样式改为准星 - pause/resume 队列后刷新列表 - example 添加 flag 配置参数 --- example/main.go | 114 +++-- go.mod | 16 +- go.sum | 49 +- inspect.go | 381 -------------- plugin.go | 41 ++ servlet.go | 294 +++++++++++ task.go | 91 ++-- taskq.go | 266 +--------- x/inspector/inspector.go | 484 ++++++++++++++++++ x/metrics/metrics.go | 268 ++++++++++ handler.go => x/monitor/monitor.go | 279 +++++----- {ui => x/monitor/ui}/app.js | 112 ++-- .../monitor/ui}/components/help-tooltip.js | 0 .../monitor/ui}/components/queue-modal.js | 50 +- .../monitor/ui}/components/queue-table.js | 0 .../monitor/ui}/components/tasks-chart.js | 17 +- .../ui}/components/time-range-picker.js | 0 {ui => x/monitor/ui}/index.html | 0 {ui => x/monitor/ui}/styles.css | 73 ++- 19 files changed, 1626 insertions(+), 909 deletions(-) delete mode 100644 inspect.go create mode 100644 plugin.go create mode 100644 servlet.go create mode 100644 x/inspector/inspector.go create mode 100644 x/metrics/metrics.go rename handler.go => x/monitor/monitor.go (63%) rename {ui => x/monitor/ui}/app.js (79%) rename {ui => x/monitor/ui}/components/help-tooltip.js (100%) rename {ui => x/monitor/ui}/components/queue-modal.js (96%) rename {ui => x/monitor/ui}/components/queue-table.js (100%) rename {ui => x/monitor/ui}/components/tasks-chart.js (95%) rename {ui => x/monitor/ui}/components/time-range-picker.js (100%) rename {ui => x/monitor/ui}/index.html (100%) rename {ui => x/monitor/ui}/styles.css (84%) diff --git a/example/main.go b/example/main.go index e36b62f..09fea28 100644 --- a/example/main.go +++ b/example/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "flag" "fmt" "log" "net/http" @@ -11,7 +12,12 @@ import ( "time" "code.tczkiot.com/wlw/taskq" + "code.tczkiot.com/wlw/taskq/x/inspector" + "code.tczkiot.com/wlw/taskq/x/metrics" + "code.tczkiot.com/wlw/taskq/x/monitor" + _ "github.com/mattn/go-sqlite3" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/redis/go-redis/v9" ) @@ -28,83 +34,99 @@ type ImageResizeTask struct { // 定义任务处理器 func handleEmailTask(ctx context.Context, t EmailTask) error { log.Printf("处理邮件任务: 用户ID=%d, 模板ID=%s", t.UserID, t.TemplateID) - // 模拟邮件发送逻辑 return nil } func handleImageResizeTask(ctx context.Context, t ImageResizeTask) error { log.Printf("处理图片调整任务: 源URL=%s", t.SourceURL) - // 模拟图片调整逻辑 return nil } +var ( + redisAddr = flag.String("redis", "127.0.0.1:6379", "Redis 地址") + redisDB = flag.Int("redis-db", 1, "Redis 数据库") + httpAddr = flag.String("http", ":8081", "HTTP 服务地址") + dbPath = flag.String("db", "./taskq_stats.db", "SQLite 数据库路径") +) + func main() { + flag.Parse() + // 创建 Redis 客户端 rdb := redis.NewClient(&redis.Options{ - Addr: "127.0.0.1:6379", - DB: 1, + Addr: *redisAddr, + DB: *redisDB, }) defer rdb.Close() - // 初始化 taskq - taskq.SetRedis(rdb) - taskq.Init() - // 创建邮件任务 - emailTask := &taskq.Task[EmailTask]{ + emailTask := &taskq.Task{ Queue: "email", Name: "email:deliver", MaxRetries: 3, Priority: 5, - TTR: 0, Handler: handleEmailTask, } // 创建图片调整任务 - imageTask := &taskq.Task[ImageResizeTask]{ + imageTask := &taskq.Task{ Queue: "image", Name: "image:resize", MaxRetries: 3, Priority: 3, - TTR: 0, Handler: handleImageResizeTask, } - // 注册任务 - if err := taskq.Register(emailTask); err != nil { - log.Fatal("注册邮件任务失败:", err) - } - if err := taskq.Register(imageTask); err != nil { - log.Fatal("注册图片任务失败:", err) + // 创建 Inspector 插件(用于监控仪表盘) + ins := inspector.New(inspector.Options{ + Interval: 2 * time.Second, + DBPath: *dbPath, + }) + + // 创建 Metrics 插件(用于 Prometheus) + met := metrics.New(metrics.Options{ + Namespace: "taskq", + Interval: 15 * time.Second, + }) + + // 配置 taskq + if err := taskq.Configure(taskq.Config{ + Redis: rdb, + Tasks: []*taskq.Task{emailTask, imageTask}, + Plugins: []taskq.Plugin{ins, met}, + }); err != nil { + log.Fatal("配置 taskq 失败:", err) } - // 创建监控 HTTP 处理器 - handler, err := taskq.NewHTTPHandler(taskq.HTTPHandlerOptions{ - RootPath: "/monitor", - ReadOnly: false, + // 创建监控服务 + servlet := taskq.Default() + mon, err := monitor.New(monitor.Options{ + Inspector: ins, + Queues: servlet.Queues(), + RootPath: "/monitor", + ReadOnly: false, }) if err != nil { - log.Fatal("创建监控处理器失败:", err) + log.Fatal("创建监控服务失败:", err) } // 创建可取消的 context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // 启动 taskq 服务器(包含统计采集器) - go func() { - err := taskq.Start(ctx, taskq.StartOptions{ - StatsInterval: 2 * time.Second, - StatsDBPath: "./taskq_stats.db", - }) - if err != nil { - log.Fatal("启动 taskq 服务器失败:", err) - } - }() + // 初始化 taskq(初始化所有插件) + if err := taskq.Init(ctx); err != nil { + log.Fatal("初始化 taskq 失败:", err) + } + + // 启动 taskq 服务器(启动所有插件) + if err := taskq.Start(ctx); err != nil { + log.Fatal("启动 taskq 服务器失败:", err) + } // 定时发布任务 go func() { - ticker := time.NewTicker(5 * time.Second) // 每5秒发布一次任务 + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() taskCounter := 0 @@ -162,15 +184,22 @@ func main() { } }() + // 创建 HTTP 路由 + mux := http.NewServeMux() + mux.Handle("/monitor/", mon) + mux.Handle("/metrics", promhttp.Handler()) + // 创建 HTTP 服务器 server := &http.Server{ - Addr: ":8081", - Handler: handler, + Addr: *httpAddr, + Handler: mux, } - // 启动 HTTP 服务器(非阻塞) + // 启动 HTTP 服务器 go func() { - log.Printf("启动监控服务器在 http://localhost:8081") + log.Printf("启动服务器在 http://localhost%s", *httpAddr) + log.Printf(" - 监控仪表盘: http://localhost%s/monitor", *httpAddr) + log.Printf(" - Prometheus: http://localhost%s/metrics", *httpAddr) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatal("HTTP 服务器错误:", err) } @@ -186,10 +215,10 @@ func main() { // 1. 取消 context,停止任务发布 cancel() - // 2. 关闭监控 HTTP 处理器(会断开 SSE 连接) - handler.Close() + // 2. 关闭监控服务(断开 SSE 连接) + mon.Close() - // 3. 关闭 HTTP 服务器(设置 5 秒超时) + // 3. 关闭 HTTP 服务器 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) defer shutdownCancel() @@ -197,9 +226,8 @@ func main() { log.Printf("HTTP 服务器关闭错误: %v", err) } - // 4. 停止 taskq 服务器(会等待完全关闭) + // 4. 停止 taskq 服务器(会自动调用插件的 OnStop) taskq.Stop() log.Println("服务已安全关闭") - // rdb.Close() 由 defer 执行 } diff --git a/go.mod b/go.mod index a11431e..c5b25a6 100644 --- a/go.mod +++ b/go.mod @@ -4,18 +4,28 @@ go 1.25.4 require ( github.com/hibiken/asynq v0.25.1 + github.com/prometheus/client_golang v1.23.2 github.com/redis/go-redis/v9 v9.7.0 github.com/rs/xid v1.6.0 ) require ( - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-sqlite3 v1.14.32 github.com/robfig/cron/v3 v3.0.1 // indirect github.com/spf13/cast v1.7.0 // indirect - golang.org/x/sys v0.27.0 // indirect + golang.org/x/sys v0.35.0 // indirect golang.org/x/time v0.8.0 // indirect - google.golang.org/protobuf v1.35.2 // indirect + google.golang.org/protobuf v1.36.8 // indirect ) diff --git a/go.sum b/go.sum index 8415f28..23bf899 100644 --- a/go.sum +++ b/go.sum @@ -1,40 +1,69 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw= github.com/hibiken/asynq v0.25.1/go.mod h1:pazWNOLBu0FEynQRBvHA26qdIKRSmfdIfUm4HdsLmXg= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= -golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= -google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/inspect.go b/inspect.go deleted file mode 100644 index b1643f0..0000000 --- a/inspect.go +++ /dev/null @@ -1,381 +0,0 @@ -// Package taskq 提供基于 Redis 的异步任务队列功能 -// inspect.go 文件包含统计采集器和相关数据结构 -package taskq - -import ( - "database/sql" - "fmt" - "os" - "path/filepath" - "strings" - "sync" - "time" - - "github.com/hibiken/asynq" - _ "github.com/mattn/go-sqlite3" -) - -// ==================== Inspector 统计采集器 ==================== - -// Inspector 统计采集器,独立于 HTTP 服务运行 -type Inspector struct { - inspector *asynq.Inspector - db *sql.DB - closeCh chan struct{} - closeOnce sync.Once - interval time.Duration -} - -// InspectorOptions 配置统计采集器的选项 -type InspectorOptions struct { - // Interval 采集间隔,默认 2 秒 - Interval time.Duration - - // DBPath SQLite 数据库文件路径,默认为 "./taskq_stats.db" - DBPath string -} - -// NewInspector 创建新的统计采集器 -func NewInspector(opts InspectorOptions) (*Inspector, error) { - if redisClient == nil { - return nil, fmt.Errorf("taskq: redis client not initialized, call SetRedis() first") - } - - if opts.Interval <= 0 { - opts.Interval = 2 * time.Second - } - - if opts.DBPath == "" { - opts.DBPath = "./taskq_stats.db" - } - - // 确保目录存在 - dir := filepath.Dir(opts.DBPath) - if dir != "" && dir != "." { - if err := os.MkdirAll(dir, 0755); err != nil { - return nil, fmt.Errorf("taskq: failed to create directory: %v", err) - } - } - - // 打开 SQLite 数据库 - db, err := sql.Open("sqlite3", opts.DBPath) - if err != nil { - return nil, fmt.Errorf("taskq: failed to open database: %v", err) - } - - // 初始化数据库表 - if err := initStatsDB(db); err != nil { - db.Close() - return nil, fmt.Errorf("taskq: failed to init database: %v", err) - } - - ins := &Inspector{ - inspector: asynq.NewInspectorFromRedisClient(redisClient), - db: db, - closeCh: make(chan struct{}), - interval: opts.Interval, - } - - // 启动后台统计采集 - go ins.startCollector() - - return ins, nil -} - -// initStatsDB 初始化数据库(Prometheus 风格:单表 + 标签) -// 设计思路: -// - 单表存储所有队列的统计数据,通过 queue 列区分 -// - 复合索引支持按时间和队列两个维度高效查询 -// - 类似 Prometheus 的 (timestamp, labels, value) 模型 -func initStatsDB(db *sql.DB) error { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS metrics ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp INTEGER NOT NULL, - queue TEXT NOT NULL, - active INTEGER DEFAULT 0, - pending INTEGER DEFAULT 0, - scheduled INTEGER DEFAULT 0, - retry INTEGER DEFAULT 0, - archived INTEGER DEFAULT 0, - completed INTEGER DEFAULT 0, - succeeded INTEGER DEFAULT 0, - failed INTEGER DEFAULT 0 - ); - -- 按队列查询:WHERE queue = ? ORDER BY timestamp - CREATE INDEX IF NOT EXISTS idx_metrics_queue_time ON metrics(queue, timestamp DESC); - -- 按时间查询所有队列:WHERE timestamp BETWEEN ? AND ? - CREATE INDEX IF NOT EXISTS idx_metrics_time ON metrics(timestamp DESC); - -- 唯一约束:同一时间同一队列只有一条记录 - CREATE UNIQUE INDEX IF NOT EXISTS idx_metrics_unique ON metrics(timestamp, queue); - `) - return err -} - -// Close 关闭统计采集器 -func (ins *Inspector) Close() error { - ins.closeOnce.Do(func() { - close(ins.closeCh) - }) - if ins.db != nil { - ins.db.Close() - } - return ins.inspector.Close() -} - -// startCollector 启动后台统计采集任务 -func (ins *Inspector) startCollector() { - ticker := time.NewTicker(ins.interval) - defer ticker.Stop() - - for { - select { - case <-ins.closeCh: - return - case <-ticker.C: - ins.collectStats() - } - } -} - -// collectStats 采集所有队列的统计数据 -func (ins *Inspector) collectStats() { - now := time.Now().Unix() - - for queueName := range queues { - stats, err := ins.inspector.GetQueueInfo(queueName) - if err != nil { - continue - } - - qs := QueueStats{ - Queue: queueName, - Timestamp: now, - Active: stats.Active, - Pending: stats.Pending, - Scheduled: stats.Scheduled, - Retry: stats.Retry, - Archived: stats.Archived, - Completed: stats.Completed, - Succeeded: stats.Processed - stats.Failed, - Failed: stats.Failed, - } - - ins.saveMetrics(qs) - } -} - -// saveMetrics 保存统计数据到 metrics 表 -func (ins *Inspector) saveMetrics(stats QueueStats) error { - if ins.db == nil { - return nil - } - - _, err := ins.db.Exec(` - INSERT OR REPLACE INTO metrics (timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `, stats.Timestamp, stats.Queue, stats.Active, stats.Pending, stats.Scheduled, stats.Retry, stats.Archived, stats.Completed, stats.Succeeded, stats.Failed) - - return err -} - -// GetQueueInfo 获取队列信息 -func (ins *Inspector) GetQueueInfo(queueName string) (*asynq.QueueInfo, error) { - return ins.inspector.GetQueueInfo(queueName) -} - -// ListActiveTasks 获取活跃任务列表 -func (ins *Inspector) ListActiveTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { - return ins.inspector.ListActiveTasks(queueName, opts...) -} - -// ListPendingTasks 获取等待任务列表 -func (ins *Inspector) ListPendingTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { - return ins.inspector.ListPendingTasks(queueName, opts...) -} - -// ListScheduledTasks 获取计划任务列表 -func (ins *Inspector) ListScheduledTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { - return ins.inspector.ListScheduledTasks(queueName, opts...) -} - -// ListRetryTasks 获取重试任务列表 -func (ins *Inspector) ListRetryTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { - return ins.inspector.ListRetryTasks(queueName, opts...) -} - -// ListArchivedTasks 获取归档任务列表 -func (ins *Inspector) ListArchivedTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { - return ins.inspector.ListArchivedTasks(queueName, opts...) -} - -// ListCompletedTasks 获取已完成任务列表 -func (ins *Inspector) ListCompletedTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) { - return ins.inspector.ListCompletedTasks(queueName, opts...) -} - -// RunTask 立即运行归档任务(重试失败任务) -func (ins *Inspector) RunTask(queueName, taskID string) error { - return ins.inspector.RunTask(queueName, taskID) -} - -// PauseQueue 暂停队列 -func (ins *Inspector) PauseQueue(queueName string) error { - return ins.inspector.PauseQueue(queueName) -} - -// UnpauseQueue 恢复队列 -func (ins *Inspector) UnpauseQueue(queueName string) error { - return ins.inspector.UnpauseQueue(queueName) -} - -// ==================== 统计数据结构 ==================== - -// QueueInfo 获取每个队列的详细信息 -type QueueInfo struct { - Name string `json:"name"` - Priority int `json:"priority"` - Size int `json:"size"` // 队列中任务总数 - Active int `json:"active"` // 活跃任务数 - Pending int `json:"pending"` // 等待任务数 - Scheduled int `json:"scheduled"` // 计划任务数 - Retry int `json:"retry"` // 重试任务数 - Archived int `json:"archived"` // 归档任务数 - Completed int `json:"completed"` // 已完成任务数 - Processed int `json:"processed"` // 累计处理数(今日) - Failed int `json:"failed"` // 累计失败数(今日) - Paused bool `json:"paused"` // 是否暂停 - MemoryUsage int64 `json:"memory_usage"` // 内存使用(字节) - Latency int64 `json:"latency"` // 延迟(毫秒) -} - -// QueueStats 队列统计数据点(用于存储历史数据) -type QueueStats struct { - Timestamp int64 `json:"t"` // Unix 时间戳(秒) - Queue string `json:"q,omitempty"` // 队列名称(汇总查询时为空) - Active int `json:"a"` // 活跃任务数 - Pending int `json:"p"` // 等待任务数 - Scheduled int `json:"s"` // 计划任务数 - Retry int `json:"r"` // 重试任务数 - Archived int `json:"ar"` // 归档任务数 - Completed int `json:"c"` // 已完成任务数 - Succeeded int `json:"su"` // 成功数 - Failed int `json:"f"` // 失败数 -} - -// ==================== 全局统计数据查询 ==================== - -var statsDB *sql.DB -var statsDBMu sync.RWMutex - -// SetStatsDB 设置全局统计数据库(供 HTTPHandler 使用) -func SetStatsDB(db *sql.DB) { - statsDBMu.Lock() - defer statsDBMu.Unlock() - statsDB = db -} - -// StatsQuery 统计查询参数 -type StatsQuery struct { - Queue string // 队列名称,为空则查询所有队列汇总 - Start int64 // 开始时间戳(秒),0 表示不限制 - End int64 // 结束时间戳(秒),0 表示不限制 - Limit int // 返回数量限制,默认 500 -} - -// getQueueStats 获取队列历史统计数据 -func getQueueStats(queueName string, limit int) ([]QueueStats, error) { - return getQueueStatsWithQuery(StatsQuery{ - Queue: queueName, - Limit: limit, - }) -} - -// getQueueStatsWithQuery 根据查询条件获取统计数据(Prometheus 风格单表查询) -// - 按队列查询:使用 idx_metrics_queue_time 索引 -// - 按时间汇总:使用 idx_metrics_time 索引 + GROUP BY -func getQueueStatsWithQuery(q StatsQuery) ([]QueueStats, error) { - statsDBMu.RLock() - db := statsDB - statsDBMu.RUnlock() - - if db == nil { - return nil, nil - } - - if q.Limit <= 0 { - q.Limit = 500 - } - - var args []any - var whereClause string - var conditions []string - - // 构建 WHERE 条件 - if q.Queue != "" { - conditions = append(conditions, "queue = ?") - args = append(args, q.Queue) - } - if q.Start > 0 { - conditions = append(conditions, "timestamp >= ?") - args = append(args, q.Start) - } - if q.End > 0 { - conditions = append(conditions, "timestamp <= ?") - args = append(args, q.End) - } - - if len(conditions) > 0 { - whereClause = "WHERE " + strings.Join(conditions, " AND ") - } - - var query string - if q.Queue != "" { - // 查询单个队列 - query = fmt.Sprintf(` - SELECT timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed - FROM metrics - %s - ORDER BY timestamp DESC - LIMIT ? - `, whereClause) - } else { - // 查询所有队列汇总(按时间 GROUP BY) - query = fmt.Sprintf(` - SELECT timestamp, '' as queue, SUM(active), SUM(pending), SUM(scheduled), SUM(retry), SUM(archived), SUM(completed), SUM(succeeded), SUM(failed) - FROM metrics - %s - GROUP BY timestamp - ORDER BY timestamp DESC - LIMIT ? - `, whereClause) - } - args = append(args, q.Limit) - - rows, err := db.Query(query, args...) - if err != nil { - return nil, err - } - defer rows.Close() - - var statsList []QueueStats - for rows.Next() { - var s QueueStats - if err := rows.Scan(&s.Timestamp, &s.Queue, &s.Active, &s.Pending, &s.Scheduled, &s.Retry, &s.Archived, &s.Completed, &s.Succeeded, &s.Failed); err != nil { - continue - } - statsList = append(statsList, s) - } - - // 反转顺序,使时间从早到晚 - for i, j := 0, len(statsList)-1; i < j; i, j = i+1, j-1 { - statsList[i], statsList[j] = statsList[j], statsList[i] - } - - return statsList, nil -} - -// GetStatsDB 返回 Inspector 的数据库连接(供外部设置给 HTTPHandler) -func (ins *Inspector) GetStatsDB() *sql.DB { - return ins.db -} diff --git a/plugin.go b/plugin.go new file mode 100644 index 0000000..d3fd2a8 --- /dev/null +++ b/plugin.go @@ -0,0 +1,41 @@ +package taskq + +import ( + "context" + + "github.com/redis/go-redis/v9" +) + +// Context 插件上下文,提供对 Servlet 资源的访问 +type Context struct { + context.Context + servlet *Servlet +} + +// Redis 返回 Redis 客户端 +func (ctx *Context) Redis() redis.UniversalClient { + return ctx.servlet.redisClient +} + +// Queues 返回队列优先级配置 +func (ctx *Context) Queues() map[string]int { + return ctx.servlet.Queues() +} + +// Plugin 定义插件接口,用于扩展 Servlet 的生命周期 +type Plugin interface { + // Name 返回插件名称,用于日志和调试 + Name() string + + // Init 初始化插件,在 Servlet.Start 之前调用 + // ctx 提供对 Servlet 资源的访问 + Init(ctx *Context) error + + // Start 启动插件,在 Servlet.Start 时调用 + // ctx 提供对 Servlet 资源的访问,内嵌的 context.Context 会在 Stop 时取消 + Start(ctx *Context) error + + // Stop 停止插件,在 Servlet.Stop 时调用 + // 按注册的逆序调用 + Stop() error +} diff --git a/servlet.go b/servlet.go new file mode 100644 index 0000000..39287ff --- /dev/null +++ b/servlet.go @@ -0,0 +1,294 @@ +// Package taskq 提供基于 Redis 的异步任务队列功能 +// servlet.go 文件包含 Servlet 结构体,封装了任务队列的完整生命周期管理 +package taskq + +import ( + "context" + "errors" + "log" + "maps" + "reflect" + "sync" + + "github.com/hibiken/asynq" + "github.com/redis/go-redis/v9" +) + +// Config 配置 Servlet 的选项 +type Config struct { + Redis redis.UniversalClient + Tasks []*Task + Plugins []Plugin +} + +// Servlet 封装了 taskq 的完整生命周期管理 +// +// 生命周期: +// 1. 配置阶段:调用 Configure 配置 Servlet +// 2. 初始化阶段:调用 Init 初始化插件 +// 3. 运行阶段:调用 Start 启动服务器 +// 4. 停止阶段:调用 Stop 优雅关闭 +type Servlet struct { + mu sync.RWMutex + handlers map[string]asynq.Handler + queues map[string]int + client *asynq.Client + redisClient redis.UniversalClient + plugins []Plugin + exit chan chan struct{} +} + +// NewServlet 创建一个新的 Servlet 实例 +func NewServlet() *Servlet { + return &Servlet{ + handlers: make(map[string]asynq.Handler), + queues: make(map[string]int), + exit: make(chan chan struct{}), + } +} + +// Configure 配置 Servlet +func (s *Servlet) Configure(cfg Config) error { + s.mu.Lock() + defer s.mu.Unlock() + + if cfg.Redis == nil { + return errors.New("taskq: redis client is required") + } + + s.redisClient = cfg.Redis + s.client = asynq.NewClientFromRedisClient(cfg.Redis) + + // 注册任务 + for _, t := range cfg.Tasks { + if err := s.registerTask(t); err != nil { + return err + } + } + + s.plugins = cfg.Plugins + + return nil +} + +// registerTask 注册单个任务(内部方法,调用时已持有锁) +func (s *Servlet) registerTask(t *Task) error { + if t.Queue == "" { + return errors.New("taskq: queue name cannot be empty") + } + if t.Priority < 0 || t.Priority > 255 { + return errors.New("taskq: priority must be between 0 and 255") + } + if t.MaxRetries < 0 { + return errors.New("taskq: retry count must be non-negative") + } + if t.Handler == nil { + return errors.New("taskq: handler cannot be nil") + } + + rv := reflect.ValueOf(t.Handler) + if rv.Kind() != reflect.Func { + return errors.New("taskq: handler must be a function") + } + + rt := rv.Type() + + // 验证返回值:只能是 error 或无返回值 + var returnError bool + for i := range rt.NumOut() { + if i == 0 && rt.Out(0).Implements(errorType) { + returnError = true + } else { + return errors.New("taskq: handler function must return either error or nothing") + } + } + + // 验证参数:支持以下签名 + // - func(context.Context, T) error + // - func(context.Context) error + // - func(T) error + // - func() + var inContext bool + var inData bool + var dataType reflect.Type + numIn := rt.NumIn() + + if numIn > 2 { + return errors.New("taskq: handler function can have at most 2 parameters") + } + + for i := range numIn { + fi := rt.In(i) + if fi.Implements(contextType) { + if i != 0 { + return errors.New("taskq: context.Context must be the first parameter") + } + inContext = true + } else if fi.Kind() == reflect.Struct { + if inData { + return errors.New("taskq: handler function can only have one data parameter") + } + inData = true + dataType = fi + } else { + return errors.New("taskq: handler parameter must be context.Context or a struct") + } + } + + // 设置任务的反射信息 + t.funcValue = rv + t.dataType = dataType + t.inputContext = inContext + t.inputData = inData + t.returnError = returnError + t.servlet = s + + s.handlers[t.Name] = t + s.queues[t.Queue] = t.Priority + return nil +} + +// Init 初始化所有插件 +func (s *Servlet) Init(ctx context.Context) error { + return s.initPlugins(ctx) +} + +// initPlugins 初始化所有插件 +func (s *Servlet) initPlugins(ctx context.Context) error { + s.mu.RLock() + plugins := s.plugins + s.mu.RUnlock() + + pctx := &Context{ + Context: ctx, + servlet: s, + } + + for _, p := range plugins { + if err := p.Init(pctx); err != nil { + return err + } + } + return nil +} + +// Start 启动 taskq 服务器 +func (s *Servlet) Start(ctx context.Context) error { + s.mu.Lock() + rdb := s.redisClient + qs := maps.Clone(s.queues) + s.mu.Unlock() + + localCtx, cancel := context.WithCancel(ctx) + + srv := asynq.NewServerFromRedisClient(rdb, asynq.Config{ + Concurrency: 30, + Queues: qs, + BaseContext: func() context.Context { return localCtx }, + LogLevel: asynq.WarnLevel, + }) + + // 启动插件 + if err := s.startPlugins(localCtx); err != nil { + cancel() + return err + } + + go s.runServer(localCtx, srv) + go s.runMonitor(localCtx, srv, cancel) + + return nil +} + +// startPlugins 启动所有插件 +func (s *Servlet) startPlugins(ctx context.Context) error { + s.mu.RLock() + plugins := s.plugins + s.mu.RUnlock() + + pctx := &Context{ + Context: ctx, + servlet: s, + } + + for _, p := range plugins { + if err := p.Start(pctx); err != nil { + return err + } + } + return nil +} + +// runServer 运行任务处理服务器 +func (s *Servlet) runServer(_ context.Context, srv *asynq.Server) { + mux := asynq.NewServeMux() + s.mu.RLock() + for name, handler := range s.handlers { + mux.Handle(name, handler) + } + s.mu.RUnlock() + + if err := srv.Run(mux); err != nil { + log.Printf("taskq: server error: %v", err) + } +} + +func (s *Servlet) runMonitor(ctx context.Context, srv *asynq.Server, cancel context.CancelFunc) { + var exit chan struct{} + + select { + case <-ctx.Done(): + case exit = <-s.exit: + } + + srv.Shutdown() + s.stopPlugins() + cancel() + + if exit != nil { + exit <- struct{}{} + } +} + +// stopPlugins 停止所有插件(按注册的逆序) +func (s *Servlet) stopPlugins() { + s.mu.RLock() + plugins := s.plugins + s.mu.RUnlock() + + for i := len(plugins) - 1; i >= 0; i-- { + if err := plugins[i].Stop(); err != nil { + log.Printf("taskq: plugin %s stop error: %v", plugins[i].Name(), err) + } + } +} + +// Stop 优雅停止 taskq 服务器 +// 发送停止信号并等待服务器完全关闭 +// 可安全地多次调用 +func (s *Servlet) Stop() { + exit := make(chan struct{}) + s.exit <- exit + <-exit +} + +// Client 返回 asynq 客户端 +func (s *Servlet) Client() *asynq.Client { + s.mu.RLock() + defer s.mu.RUnlock() + return s.client +} + +// RedisClient 返回 Redis 客户端 +func (s *Servlet) RedisClient() redis.UniversalClient { + s.mu.RLock() + defer s.mu.RUnlock() + return s.redisClient +} + +// Queues 返回队列优先级配置的副本 +func (s *Servlet) Queues() map[string]int { + s.mu.RLock() + defer s.mu.RUnlock() + return maps.Clone(s.queues) +} diff --git a/task.go b/task.go index 7d56073..21eb469 100644 --- a/task.go +++ b/task.go @@ -13,9 +13,8 @@ import ( "github.com/rs/xid" ) -// Task 定义泛型任务结构 -// T 表示任务数据的类型,必须是结构体 -type Task[T any] struct { +// Task 定义任务结构 +type Task struct { // 公开字段:用户配置 Queue string // 任务队列名称 Group string // 任务分组 @@ -31,49 +30,17 @@ type Task[T any] struct { inputContext bool // 是否需要 context.Context 参数 inputData bool // 是否需要数据参数 returnError bool // 是否返回 error -} - -// PublishOption 任务发布选项函数类型 -// 用于配置任务发布时的各种选项 -type PublishOption func() asynq.Option - -// Delay 设置任务延迟执行时间 -// 参数 d 表示延迟多长时间后执行 -func Delay(d time.Duration) PublishOption { - return func() asynq.Option { - return asynq.ProcessIn(d) - } -} - -// DelayUntil 设置任务在指定时间执行 -// 参数 t 表示任务执行的具体时间点 -func DelayUntil(t time.Time) PublishOption { - return func() asynq.Option { - return asynq.ProcessAt(t) - } -} - -// TTR 设置任务超时时间 -// 覆盖任务默认的超时时间配置 -func TTR(d time.Duration) PublishOption { - return func() asynq.Option { - return asynq.Timeout(d) - } -} - -// Retention 设置任务结果保留时间 -// 任务执行完成后,结果在 Redis 中保留的时间 -func Retention(d time.Duration) PublishOption { - return func() asynq.Option { - return asynq.Retention(d) - } + servlet *Servlet // 所属的 Servlet 实例 } // Publish 发布任务到队列 -// 将任务数据序列化后发送到 Redis 队列中等待处理 -func (t *Task[T]) Publish(ctx context.Context, data T, options ...PublishOption) error { - // 获取 asynq 客户端 - c := client.Load() +func (t *Task) Publish(ctx context.Context, data any, options ...PublishOption) error { + var c *asynq.Client + if t.servlet != nil { + c = t.servlet.Client() + } else if defaultServlet != nil { + c = defaultServlet.Client() + } if c == nil { return errors.New("taskq: client not initialized, call SetRedis() first") } @@ -124,7 +91,7 @@ func (t *Task[T]) Publish(ctx context.Context, data T, options ...PublishOption) // ProcessTask 处理任务的核心方法 // 由 asynq 服务器调用,根据任务配置动态调用处理器函数 -func (t *Task[T]) ProcessTask(ctx context.Context, tsk *asynq.Task) error { +func (t *Task) ProcessTask(ctx context.Context, tsk *asynq.Task) error { var in []reflect.Value // 根据配置添加 context.Context 参数 @@ -158,3 +125,39 @@ func (t *Task[T]) ProcessTask(ctx context.Context, tsk *asynq.Task) error { } return nil } + +// PublishOption 任务发布选项函数类型 +// 用于配置任务发布时的各种选项 +type PublishOption func() asynq.Option + +// Delay 设置任务延迟执行时间 +// 参数 d 表示延迟多长时间后执行 +func Delay(d time.Duration) PublishOption { + return func() asynq.Option { + return asynq.ProcessIn(d) + } +} + +// DelayUntil 设置任务在指定时间执行 +// 参数 t 表示任务执行的具体时间点 +func DelayUntil(t time.Time) PublishOption { + return func() asynq.Option { + return asynq.ProcessAt(t) + } +} + +// TTR 设置任务超时时间 +// 覆盖任务默认的超时时间配置 +func TTR(d time.Duration) PublishOption { + return func() asynq.Option { + return asynq.Timeout(d) + } +} + +// Retention 设置任务结果保留时间 +// 任务执行完成后,结果在 Redis 中保留的时间 +func Retention(d time.Duration) PublishOption { + return func() asynq.Option { + return asynq.Retention(d) + } +} diff --git a/taskq.go b/taskq.go index 47bea26..8a5f3c9 100644 --- a/taskq.go +++ b/taskq.go @@ -1,265 +1,51 @@ // Package taskq 提供基于 Redis 的异步任务队列功能 // 使用 asynq 库作为底层实现,支持任务注册、发布、消费和重试机制 +// +// 本包提供两种使用方式: +// 1. 包级别函数:使用全局默认 Servlet,适合单实例场景 +// 2. Servlet 实例方法:支持多实例复用,适合需要隔离的场景 package taskq import ( "context" - "errors" - "log" - "maps" "reflect" - "sync/atomic" - "time" - - "github.com/hibiken/asynq" - "github.com/redis/go-redis/v9" ) -// 全局状态变量 +// 全局默认 Servlet 实例 +var defaultServlet = NewServlet() + +// 类型反射常量 var ( - started atomic.Bool // 服务器启动状态 - exit chan chan struct{} // 优雅退出信号通道 - done chan struct{} // 关闭完成信号通道 - handlers map[string]asynq.Handler // 任务处理器映射表 - queues map[string]int // 队列优先级配置 - client atomic.Pointer[asynq.Client] // asynq 客户端实例 - redisClient redis.UniversalClient // Redis 客户端实例 - inspector *Inspector // 统计采集器实例 errorType = reflect.TypeOf((*error)(nil)).Elem() // error 类型反射 contextType = reflect.TypeOf((*context.Context)(nil)).Elem() // context.Context 类型反射 ) -// Init 初始化 taskq 系统 -// 创建必要的全局变量和映射表,必须在调用其他函数之前调用 -func Init() { - exit = make(chan chan struct{}) // 创建优雅退出通道 - done = make(chan struct{}) // 创建关闭完成通道 - handlers = make(map[string]asynq.Handler) // 创建任务处理器映射 - queues = make(map[string]int) // 创建队列优先级映射 +// Default 返回默认的 Servlet 实例 +func Default() *Servlet { + return defaultServlet } -// Register 注册任务处理器 -// 使用泛型确保类型安全,通过反射验证处理器函数签名 -// 处理器函数签名必须是:func(context.Context, T) error 或 func(context.Context) 或 func(T) error 或 func() -func Register[T any](t *Task[T]) error { - if t.Queue == "" { - return errors.New("taskq: queue name cannot be empty") - } - if t.Priority < 0 || t.Priority > 255 { - return errors.New("taskq: priority must be between 0 and 255") - } - if t.MaxRetries < 0 { - return errors.New("taskq: retry count must be non-negative") - } - if t.Handler == nil { - return errors.New("taskq: handler cannot be nil") - } - - rv := reflect.ValueOf(t.Handler) - if rv.Kind() != reflect.Func { - return errors.New("taskq: handler must be a function") - } - - rt := rv.Type() - - // 验证返回值:只能是 error 或无返回值 - var returnError bool - for i := range rt.NumOut() { - if i == 0 && rt.Out(0).Implements(errorType) { - returnError = true - } else { - return errors.New("taskq: handler function must return either error or nothing") - } - } - - // 验证参数:支持以下签名 - // - func(context.Context, T) error - // - func(context.Context) error - // - func(T) error - // - func() - var inContext bool - var inData bool - var dataType reflect.Type - numIn := rt.NumIn() - - if numIn > 2 { - return errors.New("taskq: handler function can have at most 2 parameters") - } - - for i := range numIn { - fi := rt.In(i) - if fi.Implements(contextType) { - if i != 0 { - return errors.New("taskq: context.Context must be the first parameter") - } - inContext = true - } else if fi.Kind() == reflect.Struct { - if inData { - return errors.New("taskq: handler function can only have one data parameter") - } - inData = true - dataType = fi - } else { - return errors.New("taskq: handler parameter must be context.Context or a struct") - } - } - - // 检查服务器是否已启动 - if started.Load() { - return errors.New("taskq: cannot register handler after server has started") - } - - // 设置任务的反射信息 - t.funcValue = rv - t.dataType = dataType - t.inputContext = inContext - t.inputData = inData - t.returnError = returnError - - // 注册到全局映射表 - handlers[t.Name] = t - queues[t.Queue] = t.Priority - - return nil +// Configure 配置默认 Servlet +// 必须在 Init 之前调用 +func Configure(cfg Config) error { + return defaultServlet.Configure(cfg) } -// SetRedis 设置 Redis 客户端 -// 必须在启动服务器之前调用,用于配置任务队列的存储后端 -func SetRedis(rdb redis.UniversalClient) error { - if started.Load() { - return errors.New("taskq: server is already running") - } - - redisClient = rdb - client.Store(asynq.NewClientFromRedisClient(rdb)) - - return nil +// Init 初始化默认 Servlet 和所有插件 +// 必须在 Start 之前调用,且只能调用一次 +func Init(ctx context.Context) error { + return defaultServlet.Init(ctx) } -// StartOptions 启动选项 -type StartOptions struct { - // StatsInterval 统计采集间隔,默认 2 秒 - StatsInterval time.Duration - // StatsDBPath SQLite 数据库文件路径,默认 "./taskq_stats.db" - StatsDBPath string +// Start 启动 taskq 服务器(包级别函数,使用默认 Servlet) +// 开始监听任务队列并处理任务 +// 必须在 Init 之后调用 +func Start(ctx context.Context) error { + return defaultServlet.Start(ctx) } -// Start 启动 taskq 服务器 -// 开始监听任务队列并处理任务,包含健康检查和优雅退出机制 -func Start(ctx context.Context, opts ...StartOptions) error { - if !started.CompareAndSwap(false, true) { - return errors.New("taskq: server is already running") - } - - if redisClient == nil { - started.Store(false) - return errors.New("taskq: redis client not initialized, call SetRedis() first") - } - - var opt StartOptions - if len(opts) > 0 { - opt = opts[0] - } - - if err := startInspector(opt); err != nil { - started.Store(false) - return err - } - - srv := createServer(ctx) - go runServer(srv) - go runMonitor(ctx, srv) - - return nil -} - -// startInspector 启动统计采集器 -func startInspector(opt StartOptions) error { - ins, err := NewInspector(InspectorOptions{ - Interval: opt.StatsInterval, - DBPath: opt.StatsDBPath, - }) - if err != nil { - return err - } - inspector = ins - SetStatsDB(ins.GetStatsDB()) - return nil -} - -// createServer 创建 asynq 服务器 -func createServer(ctx context.Context) *asynq.Server { - return asynq.NewServerFromRedisClient(redisClient, asynq.Config{ - Concurrency: 30, - Queues: maps.Clone(queues), - BaseContext: func() context.Context { return ctx }, - LogLevel: asynq.WarnLevel, - }) -} - -// runServer 运行任务处理服务器 -func runServer(srv *asynq.Server) { - mux := asynq.NewServeMux() - for name, handler := range handlers { - mux.Handle(name, handler) - } - if err := srv.Run(mux); err != nil { - log.Fatal(err) - } -} - -// runMonitor 运行监控协程,处理优雅退出和健康检查 -func runMonitor(ctx context.Context, srv *asynq.Server) { - defer close(done) - defer started.Store(false) - defer closeInspector() - defer srv.Shutdown() - - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - for { - select { - case quit := <-exit: - quit <- struct{}{} - return - case <-ctx.Done(): - // ctx 取消时,排空 exit 通道中可能的信号 - select { - case quit := <-exit: - quit <- struct{}{} - default: - } - return - case <-ticker.C: - if err := srv.Ping(); err != nil { - log.Println(err) - return - } - } - } -} - -// closeInspector 关闭统计采集器 -func closeInspector() { - if inspector != nil { - inspector.Close() - inspector = nil - } -} - -// Stop 优雅停止 taskq 服务器 +// Stop 优雅停止 taskq 服务器(包级别函数,使用默认 Servlet) // 发送停止信号并等待服务器完全关闭 func Stop() { - if !started.Load() { - return - } - quit := make(chan struct{}) - select { - case exit <- quit: - <-quit // 等待确认收到退出信号 - default: - // monitor 已经退出 - } - <-done // 等待 runMonitor 完全结束 + defaultServlet.Stop() } diff --git a/x/inspector/inspector.go b/x/inspector/inspector.go new file mode 100644 index 0000000..21a9c57 --- /dev/null +++ b/x/inspector/inspector.go @@ -0,0 +1,484 @@ +// Package inspector 提供 taskq 的统计采集功能 +package inspector + +import ( + "context" + "database/sql" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "code.tczkiot.com/wlw/taskq" + "github.com/hibiken/asynq" + "github.com/redis/go-redis/v9" +) + +// Options 配置统计采集器的选项 +type Options struct { + // Interval 采集间隔,默认 2 秒 + Interval time.Duration + + // DBPath SQLite 数据库文件路径,默认为 "./taskq_stats.db" + DBPath string +} + +// Inspector 统计采集器,独立于 HTTP 服务运行 +// 实现 taskq.Plugin 接口 +type Inspector struct { + opts Options + + rdb redis.UniversalClient + queues func() map[string]int + + inspector *asynq.Inspector + db *sql.DB + closeCh chan struct{} + closeOnce sync.Once +} + +// New 创建新的统计采集器 +// 创建后需要通过 Servlet.Use() 注册 +func New(opts Options) *Inspector { + if opts.Interval <= 0 { + opts.Interval = 2 * time.Second + } + + if opts.DBPath == "" { + opts.DBPath = "./taskq_stats.db" + } + + return &Inspector{ + opts: opts, + closeCh: make(chan struct{}), + } +} + +// Name 返回插件名称 +func (ins *Inspector) Name() string { + return "inspector" +} + +// Init 初始化插件,从 Context 获取 Redis 和 Queues +func (ins *Inspector) Init(ctx *taskq.Context) error { + ins.rdb = ctx.Redis() + ins.queues = ctx.Queues + return nil +} + +// Start 启动采集器,初始化数据库并开始后台采集 +func (ins *Inspector) Start(ctx *taskq.Context) error { + // 确保目录存在 + dir := filepath.Dir(ins.opts.DBPath) + if dir != "" && dir != "." { + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("inspector: failed to create directory: %v", err) + } + } + + // 打开 SQLite 数据库 + db, err := sql.Open("sqlite3", ins.opts.DBPath) + if err != nil { + return fmt.Errorf("inspector: failed to open database: %v", err) + } + + // 初始化数据库表 + if err := initStatsDB(db); err != nil { + db.Close() + return fmt.Errorf("inspector: failed to init database: %v", err) + } + + ins.db = db + ins.inspector = asynq.NewInspectorFromRedisClient(ins.rdb) + + // 启动后台统计采集 + go ins.startCollector(ctx) + + return nil +} + +// Stop 停止采集器,关闭数据库连接 +func (ins *Inspector) Stop() error { + ins.closeOnce.Do(func() { + close(ins.closeCh) + }) + + if ins.db != nil { + ins.db.Close() + } + + if ins.inspector != nil { + ins.inspector.Close() + } + + return nil +} + +// initStatsDB 初始化数据库 +func initStatsDB(db *sql.DB) error { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + queue TEXT NOT NULL, + active INTEGER DEFAULT 0, + pending INTEGER DEFAULT 0, + scheduled INTEGER DEFAULT 0, + retry INTEGER DEFAULT 0, + archived INTEGER DEFAULT 0, + completed INTEGER DEFAULT 0, + succeeded INTEGER DEFAULT 0, + failed INTEGER DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS idx_metrics_queue_time ON metrics(queue, timestamp DESC); + CREATE INDEX IF NOT EXISTS idx_metrics_time ON metrics(timestamp DESC); + CREATE UNIQUE INDEX IF NOT EXISTS idx_metrics_unique ON metrics(timestamp, queue); + `) + return err +} + +// startCollector 启动后台统计采集任务 +func (ins *Inspector) startCollector(ctx context.Context) { + ticker := time.NewTicker(ins.opts.Interval) + defer ticker.Stop() + + for { + select { + case <-ins.closeCh: + return + case <-ctx.Done(): + return + case <-ticker.C: + ins.collectStats() + } + } +} + +// collectStats 采集所有队列的统计数据 +func (ins *Inspector) collectStats() { + if ins.queues == nil || ins.inspector == nil { + return + } + + now := time.Now().Unix() + queueList := ins.queues() + + for queueName := range queueList { + info, err := ins.inspector.GetQueueInfo(queueName) + if err != nil { + continue + } + + stats := Stats{ + Queue: queueName, + Timestamp: now, + Active: info.Active, + Pending: info.Pending, + Scheduled: info.Scheduled, + Retry: info.Retry, + Archived: info.Archived, + Completed: info.Completed, + Succeeded: info.Processed - info.Failed, + Failed: info.Failed, + } + + ins.saveMetrics(stats) + } +} + +// saveMetrics 保存统计数据到 metrics 表 +func (ins *Inspector) saveMetrics(stats Stats) error { + if ins.db == nil { + return nil + } + + _, err := ins.db.Exec(` + INSERT OR REPLACE INTO metrics (timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, stats.Timestamp, stats.Queue, stats.Active, stats.Pending, stats.Scheduled, stats.Retry, stats.Archived, stats.Completed, stats.Succeeded, stats.Failed) + + return err +} + +// ==================== 数据类型 ==================== + +// StatsQuery 统计查询参数 +type StatsQuery struct { + Queue string // 队列名称,为空则查询所有队列汇总 + Start int64 // 开始时间戳(秒),0 表示不限制 + End int64 // 结束时间戳(秒),0 表示不限制 + Limit int // 返回数量限制,默认 500 +} + +// Stats 队列统计数据点 +type Stats struct { + Timestamp int64 `json:"t"` // Unix 时间戳(秒) + Queue string `json:"q,omitempty"` // 队列名称 + Active int `json:"a"` // 活跃任务数 + Pending int `json:"p"` // 等待任务数 + Scheduled int `json:"s"` // 计划任务数 + Retry int `json:"r"` // 重试任务数 + Archived int `json:"ar"` // 归档任务数 + Completed int `json:"c"` // 已完成任务数 + Succeeded int `json:"su"` // 成功数 + Failed int `json:"f"` // 失败数 +} + +// QueueInfo 队列信息 +type QueueInfo struct { + Name string `json:"name"` + Priority int `json:"priority"` + Size int `json:"size"` + Active int `json:"active"` + Pending int `json:"pending"` + Scheduled int `json:"scheduled"` + Retry int `json:"retry"` + Archived int `json:"archived"` + Completed int `json:"completed"` + Processed int `json:"processed"` + Failed int `json:"failed"` + Paused bool `json:"paused"` + MemoryUsage int64 `json:"memory_usage"` + Latency time.Duration `json:"-"` + LatencyMS int64 `json:"latency"` +} + +// TaskInfo 任务信息 +type TaskInfo struct { + ID string `json:"id"` + Type string `json:"type"` + Payload []byte `json:"payload"` + Queue string `json:"queue"` + Retried int `json:"retried"` + LastFailedAt time.Time `json:"last_failed_at,omitempty"` + LastErr string `json:"last_error,omitempty"` + NextProcessAt time.Time `json:"next_process_at,omitempty"` + CompletedAt time.Time `json:"completed_at,omitempty"` +} + +// ==================== 查询方法 ==================== + +// QueryStats 查询统计数据 +func (ins *Inspector) QueryStats(q StatsQuery) ([]Stats, error) { + if ins.db == nil { + return nil, nil + } + + limit := q.Limit + if limit <= 0 { + limit = 500 + } + + var args []any + var whereClause string + var conditions []string + + if q.Queue != "" { + conditions = append(conditions, "queue = ?") + args = append(args, q.Queue) + } + if q.Start > 0 { + conditions = append(conditions, "timestamp >= ?") + args = append(args, q.Start) + } + if q.End > 0 { + conditions = append(conditions, "timestamp <= ?") + args = append(args, q.End) + } + + if len(conditions) > 0 { + whereClause = "WHERE " + strings.Join(conditions, " AND ") + } + + var query string + if q.Queue != "" { + query = fmt.Sprintf(` + SELECT timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed + FROM metrics + %s + ORDER BY timestamp DESC + LIMIT ? + `, whereClause) + } else { + query = fmt.Sprintf(` + SELECT timestamp, '' as queue, SUM(active), SUM(pending), SUM(scheduled), SUM(retry), SUM(archived), SUM(completed), SUM(succeeded), SUM(failed) + FROM metrics + %s + GROUP BY timestamp + ORDER BY timestamp DESC + LIMIT ? + `, whereClause) + } + args = append(args, limit) + + rows, err := ins.db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var statsList []Stats + for rows.Next() { + var s Stats + if err := rows.Scan(&s.Timestamp, &s.Queue, &s.Active, &s.Pending, &s.Scheduled, &s.Retry, &s.Archived, &s.Completed, &s.Succeeded, &s.Failed); err != nil { + continue + } + statsList = append(statsList, s) + } + + // 反转顺序,使时间从早到晚 + for i, j := 0, len(statsList)-1; i < j; i, j = i+1, j-1 { + statsList[i], statsList[j] = statsList[j], statsList[i] + } + + return statsList, nil +} + +// GetQueueInfo 获取队列信息 +func (ins *Inspector) GetQueueInfo(queueName string) (*QueueInfo, error) { + if ins.inspector == nil { + return nil, fmt.Errorf("inspector: not started") + } + info, err := ins.inspector.GetQueueInfo(queueName) + if err != nil { + return nil, err + } + return &QueueInfo{ + Name: info.Queue, + Size: info.Size, + Active: info.Active, + Pending: info.Pending, + Scheduled: info.Scheduled, + Retry: info.Retry, + Archived: info.Archived, + Completed: info.Completed, + Processed: info.Processed, + Failed: info.Failed, + Paused: info.Paused, + MemoryUsage: info.MemoryUsage, + Latency: info.Latency, + LatencyMS: info.Latency.Milliseconds(), + }, nil +} + +// convertTaskInfo 将 asynq.TaskInfo 转换为 TaskInfo +func convertTaskInfo(task *asynq.TaskInfo) *TaskInfo { + return &TaskInfo{ + ID: task.ID, + Type: task.Type, + Payload: task.Payload, + Queue: task.Queue, + Retried: task.Retried, + LastFailedAt: task.LastFailedAt, + LastErr: task.LastErr, + NextProcessAt: task.NextProcessAt, + CompletedAt: task.CompletedAt, + } +} + +// convertTaskList 批量转换任务列表 +func convertTaskList(tasks []*asynq.TaskInfo) []*TaskInfo { + result := make([]*TaskInfo, len(tasks)) + for i, t := range tasks { + result[i] = convertTaskInfo(t) + } + return result +} + +// ListActiveTasks 获取活跃任务列表 +func (ins *Inspector) ListActiveTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { + if ins.inspector == nil { + return nil, fmt.Errorf("inspector: not started") + } + tasks, err := ins.inspector.ListActiveTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) + if err != nil { + return nil, err + } + return convertTaskList(tasks), nil +} + +// ListPendingTasks 获取等待任务列表 +func (ins *Inspector) ListPendingTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { + if ins.inspector == nil { + return nil, fmt.Errorf("inspector: not started") + } + tasks, err := ins.inspector.ListPendingTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) + if err != nil { + return nil, err + } + return convertTaskList(tasks), nil +} + +// ListScheduledTasks 获取计划任务列表 +func (ins *Inspector) ListScheduledTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { + if ins.inspector == nil { + return nil, fmt.Errorf("inspector: not started") + } + tasks, err := ins.inspector.ListScheduledTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) + if err != nil { + return nil, err + } + return convertTaskList(tasks), nil +} + +// ListRetryTasks 获取重试任务列表 +func (ins *Inspector) ListRetryTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { + if ins.inspector == nil { + return nil, fmt.Errorf("inspector: not started") + } + tasks, err := ins.inspector.ListRetryTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) + if err != nil { + return nil, err + } + return convertTaskList(tasks), nil +} + +// ListArchivedTasks 获取归档任务列表 +func (ins *Inspector) ListArchivedTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { + if ins.inspector == nil { + return nil, fmt.Errorf("inspector: not started") + } + tasks, err := ins.inspector.ListArchivedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) + if err != nil { + return nil, err + } + return convertTaskList(tasks), nil +} + +// ListCompletedTasks 获取已完成任务列表 +func (ins *Inspector) ListCompletedTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) { + if ins.inspector == nil { + return nil, fmt.Errorf("inspector: not started") + } + tasks, err := ins.inspector.ListCompletedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page)) + if err != nil { + return nil, err + } + return convertTaskList(tasks), nil +} + +// RunTask 立即运行归档任务(重试失败任务) +func (ins *Inspector) RunTask(queueName, taskID string) error { + if ins.inspector == nil { + return fmt.Errorf("inspector: not started") + } + return ins.inspector.RunTask(queueName, taskID) +} + +// PauseQueue 暂停队列 +func (ins *Inspector) PauseQueue(queueName string) error { + if ins.inspector == nil { + return fmt.Errorf("inspector: not started") + } + return ins.inspector.PauseQueue(queueName) +} + +// UnpauseQueue 恢复队列 +func (ins *Inspector) UnpauseQueue(queueName string) error { + if ins.inspector == nil { + return fmt.Errorf("inspector: not started") + } + return ins.inspector.UnpauseQueue(queueName) +} diff --git a/x/metrics/metrics.go b/x/metrics/metrics.go new file mode 100644 index 0000000..2fe84a9 --- /dev/null +++ b/x/metrics/metrics.go @@ -0,0 +1,268 @@ +// Package metrics 提供 taskq 的 Prometheus 指标采集功能 +package metrics + +import ( + "context" + "sync" + "time" + + "code.tczkiot.com/wlw/taskq" + "github.com/hibiken/asynq" + "github.com/prometheus/client_golang/prometheus" + "github.com/redis/go-redis/v9" +) + +// Options 配置指标采集器的选项 +type Options struct { + // Namespace 指标命名空间,默认为 "taskq" + Namespace string + + // Interval 采集间隔,默认 15 秒 + Interval time.Duration + + // Registry Prometheus 注册器,默认使用 prometheus.DefaultRegisterer + Registry prometheus.Registerer +} + +// Metrics Prometheus 指标采集器 +// 实现 taskq.Plugin 接口 +type Metrics struct { + opts Options + + rdb redis.UniversalClient + queues func() map[string]int + + inspector *asynq.Inspector + closeCh chan struct{} + closeOnce sync.Once + + // Prometheus 指标 + queueSize *prometheus.GaugeVec + activeTasks *prometheus.GaugeVec + pendingTasks *prometheus.GaugeVec + scheduledTasks *prometheus.GaugeVec + retryTasks *prometheus.GaugeVec + archivedTasks *prometheus.GaugeVec + completedTasks *prometheus.GaugeVec + processedTotal *prometheus.GaugeVec + failedTotal *prometheus.GaugeVec + latencySeconds *prometheus.GaugeVec + memoryBytes *prometheus.GaugeVec + paused *prometheus.GaugeVec +} + +// New 创建新的指标采集器 +func New(opts Options) *Metrics { + if opts.Namespace == "" { + opts.Namespace = "taskq" + } + + if opts.Interval <= 0 { + opts.Interval = 15 * time.Second + } + + if opts.Registry == nil { + opts.Registry = prometheus.DefaultRegisterer + } + + m := &Metrics{ + opts: opts, + closeCh: make(chan struct{}), + } + + // 初始化指标 + m.initMetrics() + + return m +} + +// initMetrics 初始化 Prometheus 指标 +func (m *Metrics) initMetrics() { + labels := []string{"queue"} + + m.queueSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "queue_size", + Help: "Total number of tasks in the queue", + }, labels) + + m.activeTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "tasks_active", + Help: "Number of currently active tasks", + }, labels) + + m.pendingTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "tasks_pending", + Help: "Number of pending tasks", + }, labels) + + m.scheduledTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "tasks_scheduled", + Help: "Number of scheduled tasks", + }, labels) + + m.retryTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "tasks_retry", + Help: "Number of tasks in retry queue", + }, labels) + + m.archivedTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "tasks_archived", + Help: "Number of archived (dead) tasks", + }, labels) + + m.completedTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "tasks_completed", + Help: "Number of completed tasks (retained)", + }, labels) + + m.processedTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "tasks_processed_total", + Help: "Total number of processed tasks (today)", + }, labels) + + m.failedTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "tasks_failed_total", + Help: "Total number of failed tasks (today)", + }, labels) + + m.latencySeconds = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "queue_latency_seconds", + Help: "Queue latency in seconds", + }, labels) + + m.memoryBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "queue_memory_bytes", + Help: "Memory usage of the queue in bytes", + }, labels) + + m.paused = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: m.opts.Namespace, + Name: "queue_paused", + Help: "Whether the queue is paused (1) or not (0)", + }, labels) +} + +// Name 返回插件名称 +func (m *Metrics) Name() string { + return "metrics" +} + +// Init 初始化插件,从 Context 获取 Redis 和 Queues +func (m *Metrics) Init(ctx *taskq.Context) error { + m.rdb = ctx.Redis() + m.queues = ctx.Queues + return nil +} + +// Start 启动指标采集 +func (m *Metrics) Start(ctx *taskq.Context) error { + m.inspector = asynq.NewInspectorFromRedisClient(m.rdb) + + // 注册指标 + collectors := []prometheus.Collector{ + m.queueSize, + m.activeTasks, + m.pendingTasks, + m.scheduledTasks, + m.retryTasks, + m.archivedTasks, + m.completedTasks, + m.processedTotal, + m.failedTotal, + m.latencySeconds, + m.memoryBytes, + m.paused, + } + + for _, c := range collectors { + if err := m.opts.Registry.Register(c); err != nil { + // 如果已注册则忽略 + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + return err + } + } + } + + // 启动后台采集 + go m.startCollector(ctx) + + return nil +} + +// Stop 停止指标采集 +func (m *Metrics) Stop() error { + m.closeOnce.Do(func() { + close(m.closeCh) + }) + + if m.inspector != nil { + m.inspector.Close() + } + + return nil +} + +// startCollector 启动后台指标采集 +func (m *Metrics) startCollector(ctx context.Context) { + // 立即采集一次 + m.collect() + + ticker := time.NewTicker(m.opts.Interval) + defer ticker.Stop() + + for { + select { + case <-m.closeCh: + return + case <-ctx.Done(): + return + case <-ticker.C: + m.collect() + } + } +} + +// collect 采集所有队列的指标 +func (m *Metrics) collect() { + if m.queues == nil || m.inspector == nil { + return + } + + queues := m.queues() + + for queueName := range queues { + info, err := m.inspector.GetQueueInfo(queueName) + if err != nil { + continue + } + + m.queueSize.WithLabelValues(queueName).Set(float64(info.Size)) + m.activeTasks.WithLabelValues(queueName).Set(float64(info.Active)) + m.pendingTasks.WithLabelValues(queueName).Set(float64(info.Pending)) + m.scheduledTasks.WithLabelValues(queueName).Set(float64(info.Scheduled)) + m.retryTasks.WithLabelValues(queueName).Set(float64(info.Retry)) + m.archivedTasks.WithLabelValues(queueName).Set(float64(info.Archived)) + m.completedTasks.WithLabelValues(queueName).Set(float64(info.Completed)) + m.processedTotal.WithLabelValues(queueName).Set(float64(info.Processed)) + m.failedTotal.WithLabelValues(queueName).Set(float64(info.Failed)) + m.latencySeconds.WithLabelValues(queueName).Set(info.Latency.Seconds()) + m.memoryBytes.WithLabelValues(queueName).Set(float64(info.MemoryUsage)) + + if info.Paused { + m.paused.WithLabelValues(queueName).Set(1) + } else { + m.paused.WithLabelValues(queueName).Set(0) + } + } +} diff --git a/handler.go b/x/monitor/monitor.go similarity index 63% rename from handler.go rename to x/monitor/monitor.go index 015343e..bc2bb35 100644 --- a/handler.go +++ b/x/monitor/monitor.go @@ -1,6 +1,5 @@ -// Package taskq 提供基于 Redis 的异步任务队列功能 -// handler.go 文件包含 HTTP 监控服务处理器 -package taskq +// Package monitor 提供 taskq 的 HTTP 监控服务 +package monitor import ( "embed" @@ -14,14 +13,20 @@ import ( "sync" "time" - "github.com/hibiken/asynq" + "code.tczkiot.com/wlw/taskq/x/inspector" ) //go:embed ui/* var uiFS embed.FS -// HTTPHandlerOptions 配置监控服务的选项 -type HTTPHandlerOptions struct { +// Options 配置监控服务的选项 +type Options struct { + // Inspector 检查器实例(必需) + Inspector *inspector.Inspector + + // Queues 队列优先级映射(必需) + Queues map[string]int + // RootPath 监控服务的根路径,默认为 "/monitor" RootPath string @@ -29,19 +34,24 @@ type HTTPHandlerOptions struct { ReadOnly bool } -// HTTPHandler 监控服务的 HTTP 处理器 -type HTTPHandler struct { +// Monitor 监控服务的 HTTP 处理器 +type Monitor struct { router *http.ServeMux rootPath string readOnly bool closeCh chan struct{} closeOnce sync.Once + inspector *inspector.Inspector + queues map[string]int } -// NewHTTPHandler 创建新的监控 HTTP 处理器 -func NewHTTPHandler(opts HTTPHandlerOptions) (*HTTPHandler, error) { - if redisClient == nil { - return nil, fmt.Errorf("taskq: redis client not initialized, call SetRedis() first") +// New 创建新的监控服务 +func New(opts Options) (*Monitor, error) { + if opts.Inspector == nil { + return nil, fmt.Errorf("monitor: inspector is required") + } + if opts.Queues == nil { + return nil, fmt.Errorf("monitor: queues is required") } // 设置默认值 @@ -55,71 +65,71 @@ func NewHTTPHandler(opts HTTPHandlerOptions) (*HTTPHandler, error) { } opts.RootPath = strings.TrimSuffix(opts.RootPath, "/") - handler := &HTTPHandler{ - router: http.NewServeMux(), - rootPath: opts.RootPath, - readOnly: opts.ReadOnly, - closeCh: make(chan struct{}), + m := &Monitor{ + router: http.NewServeMux(), + rootPath: opts.RootPath, + readOnly: opts.ReadOnly, + closeCh: make(chan struct{}), + inspector: opts.Inspector, + queues: opts.Queues, } - handler.setupRoutes() - return handler, nil + m.setupRoutes() + return m, nil } // ServeHTTP 实现 http.Handler 接口 -func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.router.ServeHTTP(w, r) +func (m *Monitor) ServeHTTP(w http.ResponseWriter, r *http.Request) { + m.router.ServeHTTP(w, r) } // RootPath 返回监控服务的根路径 -func (h *HTTPHandler) RootPath() string { - return h.rootPath +func (m *Monitor) RootPath() string { + return m.rootPath } -// Close 关闭 HTTP 处理器 -func (h *HTTPHandler) Close() error { - h.closeOnce.Do(func() { - close(h.closeCh) +// Close 关闭监控服务 +func (m *Monitor) Close() error { + m.closeOnce.Do(func() { + close(m.closeCh) }) return nil } // setupRoutes 设置路由 -func (h *HTTPHandler) setupRoutes() { +func (m *Monitor) setupRoutes() { // API 路由 - apiPath := h.rootPath + "/api/" - h.router.HandleFunc(apiPath+"queues", h.handleQueues) - h.router.HandleFunc(apiPath+"queues/", h.handleQueueDetail) - h.router.HandleFunc(apiPath+"tasks/", h.handleTasks) - h.router.HandleFunc(apiPath+"stats/", h.handleStats) - h.router.HandleFunc(apiPath+"sse", h.handleSSE) + apiPath := m.rootPath + "/api/" + m.router.HandleFunc(apiPath+"queues", m.handleQueues) + m.router.HandleFunc(apiPath+"queues/", m.handleQueueDetail) + m.router.HandleFunc(apiPath+"tasks/", m.handleTasks) + m.router.HandleFunc(apiPath+"stats/", m.handleStats) + m.router.HandleFunc(apiPath+"sse", m.handleSSE) // 静态文件路由 uiSubFS, _ := fs.Sub(uiFS, "ui") fileServer := http.FileServer(http.FS(uiSubFS)) - h.router.Handle(h.rootPath+"/static/", http.StripPrefix(h.rootPath+"/static/", fileServer)) + m.router.Handle(m.rootPath+"/static/", http.StripPrefix(m.rootPath+"/static/", fileServer)) // 主页路由(包含 History API 的路由) - h.router.HandleFunc(h.rootPath+"/queues/", h.handleIndex) - h.router.HandleFunc(h.rootPath+"/", h.handleIndex) - h.router.HandleFunc(h.rootPath, h.handleIndex) + m.router.HandleFunc(m.rootPath+"/queues/", m.handleIndex) + m.router.HandleFunc(m.rootPath+"/", m.handleIndex) + m.router.HandleFunc(m.rootPath, m.handleIndex) } // handleStats 处理队列统计数据请求 -// GET /api/stats/{queue}?start=1234567890&end=1234567899&limit=500 -// GET /api/stats/?start=1234567890&end=1234567899&limit=500 (查询所有队列汇总) -func (h *HTTPHandler) handleStats(w http.ResponseWriter, r *http.Request) { +func (m *Monitor) handleStats(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 从 URL 中提取队列名称(可选,为空则查询所有队列汇总) - path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/stats/") + path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/stats/") queueName := strings.TrimSuffix(path, "/") // 构建查询参数 - query := StatsQuery{ + query := inspector.StatsQuery{ Queue: queueName, Limit: 500, } @@ -145,7 +155,7 @@ func (h *HTTPHandler) handleStats(w http.ResponseWriter, r *http.Request) { } } - stats, err := getQueueStatsWithQuery(query) + stats, err := m.inspector.QueryStats(query) if err != nil { http.Error(w, fmt.Sprintf("Failed to get stats: %v", err), http.StatusInternalServerError) return @@ -155,40 +165,58 @@ func (h *HTTPHandler) handleStats(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(stats) } +// queueInfoJSON 用于 JSON 输出的队列信息 +type queueInfoJSON struct { + Name string `json:"name"` + Priority int `json:"priority"` + Size int `json:"size"` + Active int `json:"active"` + Pending int `json:"pending"` + Scheduled int `json:"scheduled"` + Retry int `json:"retry"` + Archived int `json:"archived"` + Completed int `json:"completed"` + Processed int `json:"processed"` + Failed int `json:"failed"` + Paused bool `json:"paused"` + MemoryUsage int64 `json:"memory_usage"` + Latency int64 `json:"latency"` +} + // handleQueues 处理队列列表请求 -func (h *HTTPHandler) handleQueues(w http.ResponseWriter, r *http.Request) { +func (m *Monitor) handleQueues(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - var queueInfos []QueueInfo + var queueInfos []queueInfoJSON // 首先显示所有注册的队列(即使Redis中还没有任务) - for queueName, priority := range queues { - stats, err := inspector.GetQueueInfo(queueName) + for queueName, priority := range m.queues { + info, err := m.inspector.GetQueueInfo(queueName) if err != nil { // 如果队列不存在,创建一个空的状态 - queueInfos = append(queueInfos, QueueInfo{ + queueInfos = append(queueInfos, queueInfoJSON{ Name: queueName, Priority: priority, }) } else { - queueInfos = append(queueInfos, QueueInfo{ + queueInfos = append(queueInfos, queueInfoJSON{ Name: queueName, Priority: priority, - Size: stats.Size, - Active: stats.Active, - Pending: stats.Pending, - Scheduled: stats.Scheduled, - Retry: stats.Retry, - Archived: stats.Archived, - Completed: stats.Completed, - Processed: stats.Processed, - Failed: stats.Failed, - Paused: stats.Paused, - MemoryUsage: stats.MemoryUsage, - Latency: stats.Latency.Milliseconds(), + Size: info.Size, + Active: info.Active, + Pending: info.Pending, + Scheduled: info.Scheduled, + Retry: info.Retry, + Archived: info.Archived, + Completed: info.Completed, + Processed: info.Processed, + Failed: info.Failed, + Paused: info.Paused, + MemoryUsage: info.MemoryUsage, + Latency: info.LatencyMS, }) } } @@ -203,12 +231,9 @@ func (h *HTTPHandler) handleQueues(w http.ResponseWriter, r *http.Request) { } // handleQueueDetail 处理队列详情请求和队列操作 -// GET /api/queues/{queue} - 获取队列详情 -// POST /api/queues/{queue}/pause - 暂停队列 -// POST /api/queues/{queue}/unpause - 恢复队列 -func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request) { +func (m *Monitor) handleQueueDetail(w http.ResponseWriter, r *http.Request) { // 从 URL 中提取队列名称 - path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/queues/") + path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/queues/") parts := strings.Split(path, "/") if len(parts) == 0 || parts[0] == "" { http.Error(w, "Queue name is required", http.StatusBadRequest) @@ -217,21 +242,21 @@ func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request) queueName := parts[0] // 检查队列是否已注册 - if _, exists := queues[queueName]; !exists { + if _, exists := m.queues[queueName]; !exists { http.Error(w, "Queue not found", http.StatusNotFound) return } // 处理暂停/恢复请求 if r.Method == http.MethodPost && len(parts) >= 2 { - if h.readOnly { + if m.readOnly { http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden) return } action := parts[1] switch action { case "pause": - if err := inspector.PauseQueue(queueName); err != nil { + if err := m.inspector.PauseQueue(queueName); err != nil { http.Error(w, fmt.Sprintf("Failed to pause queue: %v", err), http.StatusInternalServerError) return } @@ -239,7 +264,7 @@ func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request) json.NewEncoder(w).Encode(map[string]string{"status": "paused"}) return case "unpause": - if err := inspector.UnpauseQueue(queueName); err != nil { + if err := m.inspector.UnpauseQueue(queueName); err != nil { http.Error(w, fmt.Sprintf("Failed to unpause queue: %v", err), http.StatusInternalServerError) return } @@ -258,7 +283,7 @@ func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request) } // 获取队列详细信息 - stats, err := inspector.GetQueueInfo(queueName) + info, err := m.inspector.GetQueueInfo(queueName) if err != nil { // 如果队列在 Redis 中不存在,返回空状态 if strings.Contains(err.Error(), "queue not found") { @@ -282,11 +307,11 @@ func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request) } w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(stats) + json.NewEncoder(w).Encode(info) } -// TaskInfo 转换任务信息 -type TaskInfo struct { +// taskInfoJSON 转换任务信息用于 JSON 输出 +type taskInfoJSON struct { ID string `json:"id"` Type string `json:"type"` Payload string `json:"payload"` @@ -299,11 +324,9 @@ type TaskInfo struct { } // handleTasks 处理任务列表请求和任务操作 -// GET /api/tasks/{queue}/{state} - 获取任务列表 -// POST /api/tasks/{queue}/archived/{taskId}/retry - 重试失败任务 -func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { +func (m *Monitor) handleTasks(w http.ResponseWriter, r *http.Request) { // 从 URL 中提取队列名称和任务状态 - path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/tasks/") + path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/tasks/") parts := strings.Split(path, "/") if len(parts) < 2 { http.Error(w, "Queue name and task state are required", http.StatusBadRequest) @@ -315,12 +338,12 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { // 处理重试请求: POST /api/tasks/{queue}/archived/{taskId}/retry if r.Method == http.MethodPost && len(parts) >= 4 && parts[1] == "archived" && parts[3] == "retry" { - if h.readOnly { + if m.readOnly { http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden) return } taskID := parts[2] - h.handleRetryTask(w, r, queueName, taskID) + m.handleRetryTask(w, r, queueName, taskID) return } @@ -330,7 +353,7 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { } // 检查队列是否已注册 - if _, exists := queues[queueName]; !exists { + if _, exists := m.queues[queueName]; !exists { http.Error(w, "Queue not found", http.StatusNotFound) return } @@ -351,7 +374,7 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { // 获取队列信息以获取任务总数 var total int - queueInfo, queueErr := inspector.GetQueueInfo(queueName) + queueInfo, queueErr := m.inspector.GetQueueInfo(queueName) if queueErr == nil { switch taskState { case "active": @@ -370,22 +393,22 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { } // 根据任务状态获取任务列表 - var tasks []*asynq.TaskInfo + var tasks []*inspector.TaskInfo var err error switch taskState { case "active": - tasks, err = inspector.ListActiveTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + tasks, err = m.inspector.ListActiveTasks(queueName, pageSize, page-1) case "pending": - tasks, err = inspector.ListPendingTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + tasks, err = m.inspector.ListPendingTasks(queueName, pageSize, page-1) case "scheduled": - tasks, err = inspector.ListScheduledTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + tasks, err = m.inspector.ListScheduledTasks(queueName, pageSize, page-1) case "retry": - tasks, err = inspector.ListRetryTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + tasks, err = m.inspector.ListRetryTasks(queueName, pageSize, page-1) case "archived": - tasks, err = inspector.ListArchivedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + tasks, err = m.inspector.ListArchivedTasks(queueName, pageSize, page-1) case "completed": - tasks, err = inspector.ListCompletedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + tasks, err = m.inspector.ListCompletedTasks(queueName, pageSize, page-1) default: http.Error(w, "Invalid task state. Valid states: active, pending, scheduled, retry, archived, completed", http.StatusBadRequest) return @@ -394,7 +417,7 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { // 如果队列在 Redis 中不存在(没有任务),返回空列表而不是错误 if err != nil { if strings.Contains(err.Error(), "queue not found") { - tasks = []*asynq.TaskInfo{} + tasks = []*inspector.TaskInfo{} total = 0 } else { http.Error(w, fmt.Sprintf("Failed to get tasks: %v", err), http.StatusInternalServerError) @@ -402,9 +425,9 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { } } - var taskInfos []TaskInfo + var taskInfos []taskInfoJSON for _, task := range tasks { - info := TaskInfo{ + info := taskInfoJSON{ ID: task.ID, Type: task.Type, Payload: string(task.Payload), @@ -440,15 +463,15 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { } // handleRetryTask 重试失败任务 -func (h *HTTPHandler) handleRetryTask(w http.ResponseWriter, r *http.Request, queueName, taskID string) { +func (m *Monitor) handleRetryTask(w http.ResponseWriter, r *http.Request, queueName, taskID string) { // 检查队列是否已注册 - if _, exists := queues[queueName]; !exists { + if _, exists := m.queues[queueName]; !exists { http.Error(w, "Queue not found", http.StatusNotFound) return } // 运行重试 - err := inspector.RunTask(queueName, taskID) + err := m.inspector.RunTask(queueName, taskID) if err != nil { http.Error(w, fmt.Sprintf("Failed to retry task: %v", err), http.StatusInternalServerError) return @@ -459,7 +482,7 @@ func (h *HTTPHandler) handleRetryTask(w http.ResponseWriter, r *http.Request, qu } // handleIndex 处理主页请求,返回 SPA 入口页面 -func (h *HTTPHandler) handleIndex(w http.ResponseWriter, r *http.Request) { +func (m *Monitor) handleIndex(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return @@ -473,17 +496,14 @@ func (h *HTTPHandler) handleIndex(w http.ResponseWriter, r *http.Request) { } // 替换模板变量 - content := strings.ReplaceAll(string(indexHTML), "{{.RootPath}}", h.rootPath) + content := strings.ReplaceAll(string(indexHTML), "{{.RootPath}}", m.rootPath) w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Write([]byte(content)) } // handleSSE 处理 Server-Sent Events 实时数据推送 -// 交叉推送两种数据: -// - stats: 统计图表数据(来自 SQLite,每 2 秒) -// - queues: 队列表格数据(来自 Redis,每 5 秒) -func (h *HTTPHandler) handleSSE(w http.ResponseWriter, r *http.Request) { +func (m *Monitor) handleSSE(w http.ResponseWriter, r *http.Request) { // 设置 SSE 响应头 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") @@ -507,27 +527,26 @@ func (h *HTTPHandler) handleSSE(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // 立即发送一次数据 - h.sendQueuesEvent(w, flusher) - h.sendStatsEvent(w, flusher) + m.sendQueuesEvent(w, flusher) + m.sendStatsEvent(w, flusher) for { select { case <-ctx.Done(): return - case <-h.closeCh: + case <-m.closeCh: return case <-statsTicker.C: - h.sendStatsEvent(w, flusher) + m.sendStatsEvent(w, flusher) case <-queuesTicker.C: - h.sendQueuesEvent(w, flusher) + m.sendQueuesEvent(w, flusher) } } } -// sendStatsEvent 发送统计图表数据(来自 SQLite) -func (h *HTTPHandler) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher) { - // 获取最近的统计数据点(用于图表增量更新) - stats, err := getQueueStatsWithQuery(StatsQuery{Limit: 1}) +// sendStatsEvent 发送统计图表数据 +func (m *Monitor) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher) { + stats, err := m.inspector.QueryStats(inspector.StatsQuery{Limit: 1}) if err != nil || len(stats) == 0 { return } @@ -541,32 +560,32 @@ func (h *HTTPHandler) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher flusher.Flush() } -// sendQueuesEvent 发送队列表格数据(来自 Redis) -func (h *HTTPHandler) sendQueuesEvent(w http.ResponseWriter, flusher http.Flusher) { - var queueInfos []QueueInfo - for queueName, priority := range queues { - stats, err := inspector.GetQueueInfo(queueName) +// sendQueuesEvent 发送队列表格数据 +func (m *Monitor) sendQueuesEvent(w http.ResponseWriter, flusher http.Flusher) { + var queueInfos []queueInfoJSON + for queueName, priority := range m.queues { + info, err := m.inspector.GetQueueInfo(queueName) if err != nil { - queueInfos = append(queueInfos, QueueInfo{ + queueInfos = append(queueInfos, queueInfoJSON{ Name: queueName, Priority: priority, }) } else { - queueInfos = append(queueInfos, QueueInfo{ + queueInfos = append(queueInfos, queueInfoJSON{ Name: queueName, Priority: priority, - Size: stats.Size, - Active: stats.Active, - Pending: stats.Pending, - Scheduled: stats.Scheduled, - Retry: stats.Retry, - Archived: stats.Archived, - Completed: stats.Completed, - Processed: stats.Processed, - Failed: stats.Failed, - Paused: stats.Paused, - MemoryUsage: stats.MemoryUsage, - Latency: stats.Latency.Milliseconds(), + Size: info.Size, + Active: info.Active, + Pending: info.Pending, + Scheduled: info.Scheduled, + Retry: info.Retry, + Archived: info.Archived, + Completed: info.Completed, + Processed: info.Processed, + Failed: info.Failed, + Paused: info.Paused, + MemoryUsage: info.MemoryUsage, + Latency: info.LatencyMS, }) } } diff --git a/ui/app.js b/x/monitor/ui/app.js similarity index 79% rename from ui/app.js rename to x/monitor/ui/app.js index 7ba7995..5f36d99 100644 --- a/ui/app.js +++ b/x/monitor/ui/app.js @@ -18,7 +18,9 @@ class TaskqApp extends LitElement { endTime: { type: Number, state: true }, isLiveMode: { type: Boolean, state: true }, // Chart data - chartData: { type: Object, state: true } + chartData: { type: Object, state: true }, + // View mode: 'chart' or 'table' + viewMode: { type: String, state: true } }; constructor() { @@ -35,6 +37,7 @@ class TaskqApp extends LitElement { this.isLiveMode = true; this.chartData = { labels: [], timestamps: [], datasets: {} }; this.eventSource = null; + this.viewMode = 'chart'; } createRenderRoot() { @@ -60,8 +63,17 @@ class TaskqApp extends LitElement { initRoute() { const path = window.location.pathname; const relativePath = path.replace(this.rootPath, '').replace(/^\/+/, ''); + + // 检查是否是 queues 视图 + if (relativePath === 'queues' || relativePath === 'queues/') { + this.viewMode = 'table'; + return; + } + + // 检查是否是具体队列详情 const match = relativePath.match(/^queues\/([^\/]+)\/([^\/]+)/); if (match) { + this.viewMode = 'table'; this.currentQueue = decodeURIComponent(match[1]); this.currentTab = match[2]; const params = new URLSearchParams(window.location.search); @@ -72,10 +84,14 @@ class TaskqApp extends LitElement { handlePopState(event) { if (event.state && event.state.queue) { + this.viewMode = 'table'; this.currentQueue = event.state.queue; this.currentTab = event.state.tab; this.currentPage = event.state.page; this.modalOpen = true; + } else if (event.state && event.state.view) { + this.viewMode = event.state.view; + this.modalOpen = false; } else { this.modalOpen = false; } @@ -339,7 +355,7 @@ class TaskqApp extends LitElement { handleModalClose() { this.modalOpen = false; - history.pushState({}, '', `${this.rootPath}/`); + history.pushState({ view: 'table' }, '', `${this.rootPath}/queues`); } handleTabChange(e) { @@ -357,41 +373,75 @@ class TaskqApp extends LitElement { history.pushState({ queue: this.currentQueue, tab: this.currentTab, page }, '', url); } + handleViewChange(mode) { + this.viewMode = mode; + const url = mode === 'table' ? `${this.rootPath}/queues` : `${this.rootPath}/`; + history.pushState({ view: mode }, '', url); + } + + async handleQueueUpdated() { + try { + const response = await fetch(`${this.rootPath}/api/queues`); + if (response.ok) { + this.queues = await response.json(); + } + } catch (err) { + console.error('Failed to refresh queues:', err); + } + } + render() { return html` -
-
- Tasks Overview - -
-
- +
+
TaskQ Monitor
+
+ +
-
- ${this.loading ? html` -
-
-
Loading...
+ ${this.viewMode === 'chart' ? html` +
+
+ Tasks Overview +
- ` : html` - - `} -
+
+ +
+
+ ` : html` +
+ ${this.loading ? html` +
+
+
Loading...
+
+ ` : html` + + `} +
+ `}