feat: 优化监控仪表盘 UI

- 添加 appbar 导航栏,支持 Chart/Queues 视图切换
- appbar 切换使用 history API,支持浏览器前进/后退
- 图表视图占满整个可视区域
- queue-modal 共享 appbar 样式
- 修复 queue tab count 字段名大小写问题
- tooltip 跟随鼠标显示在右下方,移除箭头
- 图表 canvas 鼠标样式改为准星
- pause/resume 队列后刷新列表
- example 添加 flag 配置参数
This commit is contained in:
2025-12-10 00:53:30 +08:00
parent 1f9f1cab53
commit 326f2a371c
19 changed files with 1626 additions and 909 deletions

View File

@@ -2,6 +2,7 @@ package main
import (
"context"
"flag"
"fmt"
"log"
"net/http"
@@ -11,7 +12,12 @@ import (
"time"
"code.tczkiot.com/wlw/taskq"
"code.tczkiot.com/wlw/taskq/x/inspector"
"code.tczkiot.com/wlw/taskq/x/metrics"
"code.tczkiot.com/wlw/taskq/x/monitor"
_ "github.com/mattn/go-sqlite3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/go-redis/v9"
)
@@ -28,83 +34,99 @@ type ImageResizeTask struct {
// 定义任务处理器
func handleEmailTask(ctx context.Context, t EmailTask) error {
log.Printf("处理邮件任务: 用户ID=%d, 模板ID=%s", t.UserID, t.TemplateID)
// 模拟邮件发送逻辑
return nil
}
func handleImageResizeTask(ctx context.Context, t ImageResizeTask) error {
log.Printf("处理图片调整任务: 源URL=%s", t.SourceURL)
// 模拟图片调整逻辑
return nil
}
var (
redisAddr = flag.String("redis", "127.0.0.1:6379", "Redis 地址")
redisDB = flag.Int("redis-db", 1, "Redis 数据库")
httpAddr = flag.String("http", ":8081", "HTTP 服务地址")
dbPath = flag.String("db", "./taskq_stats.db", "SQLite 数据库路径")
)
func main() {
flag.Parse()
// 创建 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
DB: 1,
Addr: *redisAddr,
DB: *redisDB,
})
defer rdb.Close()
// 初始化 taskq
taskq.SetRedis(rdb)
taskq.Init()
// 创建邮件任务
emailTask := &taskq.Task[EmailTask]{
emailTask := &taskq.Task{
Queue: "email",
Name: "email:deliver",
MaxRetries: 3,
Priority: 5,
TTR: 0,
Handler: handleEmailTask,
}
// 创建图片调整任务
imageTask := &taskq.Task[ImageResizeTask]{
imageTask := &taskq.Task{
Queue: "image",
Name: "image:resize",
MaxRetries: 3,
Priority: 3,
TTR: 0,
Handler: handleImageResizeTask,
}
// 注册任务
if err := taskq.Register(emailTask); err != nil {
log.Fatal("注册邮件任务失败:", err)
}
if err := taskq.Register(imageTask); err != nil {
log.Fatal("注册图片任务失败:", err)
// 创建 Inspector 插件(用于监控仪表盘)
ins := inspector.New(inspector.Options{
Interval: 2 * time.Second,
DBPath: *dbPath,
})
// 创建 Metrics 插件(用于 Prometheus
met := metrics.New(metrics.Options{
Namespace: "taskq",
Interval: 15 * time.Second,
})
// 配置 taskq
if err := taskq.Configure(taskq.Config{
Redis: rdb,
Tasks: []*taskq.Task{emailTask, imageTask},
Plugins: []taskq.Plugin{ins, met},
}); err != nil {
log.Fatal("配置 taskq 失败:", err)
}
// 创建监控 HTTP 处理器
handler, err := taskq.NewHTTPHandler(taskq.HTTPHandlerOptions{
RootPath: "/monitor",
ReadOnly: false,
// 创建监控服务
servlet := taskq.Default()
mon, err := monitor.New(monitor.Options{
Inspector: ins,
Queues: servlet.Queues(),
RootPath: "/monitor",
ReadOnly: false,
})
if err != nil {
log.Fatal("创建监控处理器失败:", err)
log.Fatal("创建监控服务失败:", err)
}
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动 taskq 服务器(包含统计采集器
go func() {
err := taskq.Start(ctx, taskq.StartOptions{
StatsInterval: 2 * time.Second,
StatsDBPath: "./taskq_stats.db",
})
if err != nil {
log.Fatal("启动 taskq 服务器失败:", err)
}
}()
// 初始化 taskq(初始化所有插件
if err := taskq.Init(ctx); err != nil {
log.Fatal("初始化 taskq 失败:", err)
}
// 启动 taskq 服务器(启动所有插件)
if err := taskq.Start(ctx); err != nil {
log.Fatal("启动 taskq 服务器失败:", err)
}
// 定时发布任务
go func() {
ticker := time.NewTicker(5 * time.Second) // 每5秒发布一次任务
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
taskCounter := 0
@@ -162,15 +184,22 @@ func main() {
}
}()
// 创建 HTTP 路由
mux := http.NewServeMux()
mux.Handle("/monitor/", mon)
mux.Handle("/metrics", promhttp.Handler())
// 创建 HTTP 服务器
server := &http.Server{
Addr: ":8081",
Handler: handler,
Addr: *httpAddr,
Handler: mux,
}
// 启动 HTTP 服务器(非阻塞)
// 启动 HTTP 服务器
go func() {
log.Printf("启动监控服务器在 http://localhost:8081")
log.Printf("启动服务器在 http://localhost%s", *httpAddr)
log.Printf(" - 监控仪表盘: http://localhost%s/monitor", *httpAddr)
log.Printf(" - Prometheus: http://localhost%s/metrics", *httpAddr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal("HTTP 服务器错误:", err)
}
@@ -186,10 +215,10 @@ func main() {
// 1. 取消 context停止任务发布
cancel()
// 2. 关闭监控 HTTP 处理器(会断开 SSE 连接)
handler.Close()
// 2. 关闭监控服务(断开 SSE 连接)
mon.Close()
// 3. 关闭 HTTP 服务器(设置 5 秒超时)
// 3. 关闭 HTTP 服务器
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
@@ -197,9 +226,8 @@ func main() {
log.Printf("HTTP 服务器关闭错误: %v", err)
}
// 4. 停止 taskq 服务器(会等待完全关闭
// 4. 停止 taskq 服务器(会自动调用插件的 OnStop
taskq.Stop()
log.Println("服务已安全关闭")
// rdb.Close() 由 defer 执行
}

16
go.mod
View File

@@ -4,18 +4,28 @@ go 1.25.4
require (
github.com/hibiken/asynq v0.25.1
github.com/prometheus/client_golang v1.23.2
github.com/redis/go-redis/v9 v9.7.0
github.com/rs/xid v1.6.0
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
)
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/mattn/go-sqlite3 v1.14.32
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/cast v1.7.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
google.golang.org/protobuf v1.36.8 // indirect
)

49
go.sum
View File

@@ -1,40 +1,69 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw=
github.com/hibiken/asynq v0.25.1/go.mod h1:pazWNOLBu0FEynQRBvHA26qdIKRSmfdIfUm4HdsLmXg=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs=
github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -1,381 +0,0 @@
// Package taskq 提供基于 Redis 的异步任务队列功能
// inspect.go 文件包含统计采集器和相关数据结构
package taskq
import (
"database/sql"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/hibiken/asynq"
_ "github.com/mattn/go-sqlite3"
)
// ==================== Inspector 统计采集器 ====================
// Inspector 统计采集器,独立于 HTTP 服务运行
type Inspector struct {
inspector *asynq.Inspector
db *sql.DB
closeCh chan struct{}
closeOnce sync.Once
interval time.Duration
}
// InspectorOptions 配置统计采集器的选项
type InspectorOptions struct {
// Interval 采集间隔,默认 2 秒
Interval time.Duration
// DBPath SQLite 数据库文件路径,默认为 "./taskq_stats.db"
DBPath string
}
// NewInspector 创建新的统计采集器
func NewInspector(opts InspectorOptions) (*Inspector, error) {
if redisClient == nil {
return nil, fmt.Errorf("taskq: redis client not initialized, call SetRedis() first")
}
if opts.Interval <= 0 {
opts.Interval = 2 * time.Second
}
if opts.DBPath == "" {
opts.DBPath = "./taskq_stats.db"
}
// 确保目录存在
dir := filepath.Dir(opts.DBPath)
if dir != "" && dir != "." {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("taskq: failed to create directory: %v", err)
}
}
// 打开 SQLite 数据库
db, err := sql.Open("sqlite3", opts.DBPath)
if err != nil {
return nil, fmt.Errorf("taskq: failed to open database: %v", err)
}
// 初始化数据库表
if err := initStatsDB(db); err != nil {
db.Close()
return nil, fmt.Errorf("taskq: failed to init database: %v", err)
}
ins := &Inspector{
inspector: asynq.NewInspectorFromRedisClient(redisClient),
db: db,
closeCh: make(chan struct{}),
interval: opts.Interval,
}
// 启动后台统计采集
go ins.startCollector()
return ins, nil
}
// initStatsDB 初始化数据库Prometheus 风格:单表 + 标签)
// 设计思路:
// - 单表存储所有队列的统计数据,通过 queue 列区分
// - 复合索引支持按时间和队列两个维度高效查询
// - 类似 Prometheus 的 (timestamp, labels, value) 模型
func initStatsDB(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
queue TEXT NOT NULL,
active INTEGER DEFAULT 0,
pending INTEGER DEFAULT 0,
scheduled INTEGER DEFAULT 0,
retry INTEGER DEFAULT 0,
archived INTEGER DEFAULT 0,
completed INTEGER DEFAULT 0,
succeeded INTEGER DEFAULT 0,
failed INTEGER DEFAULT 0
);
-- 按队列查询WHERE queue = ? ORDER BY timestamp
CREATE INDEX IF NOT EXISTS idx_metrics_queue_time ON metrics(queue, timestamp DESC);
-- 按时间查询所有队列WHERE timestamp BETWEEN ? AND ?
CREATE INDEX IF NOT EXISTS idx_metrics_time ON metrics(timestamp DESC);
-- 唯一约束:同一时间同一队列只有一条记录
CREATE UNIQUE INDEX IF NOT EXISTS idx_metrics_unique ON metrics(timestamp, queue);
`)
return err
}
// Close 关闭统计采集器
func (ins *Inspector) Close() error {
ins.closeOnce.Do(func() {
close(ins.closeCh)
})
if ins.db != nil {
ins.db.Close()
}
return ins.inspector.Close()
}
// startCollector 启动后台统计采集任务
func (ins *Inspector) startCollector() {
ticker := time.NewTicker(ins.interval)
defer ticker.Stop()
for {
select {
case <-ins.closeCh:
return
case <-ticker.C:
ins.collectStats()
}
}
}
// collectStats 采集所有队列的统计数据
func (ins *Inspector) collectStats() {
now := time.Now().Unix()
for queueName := range queues {
stats, err := ins.inspector.GetQueueInfo(queueName)
if err != nil {
continue
}
qs := QueueStats{
Queue: queueName,
Timestamp: now,
Active: stats.Active,
Pending: stats.Pending,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Completed: stats.Completed,
Succeeded: stats.Processed - stats.Failed,
Failed: stats.Failed,
}
ins.saveMetrics(qs)
}
}
// saveMetrics 保存统计数据到 metrics 表
func (ins *Inspector) saveMetrics(stats QueueStats) error {
if ins.db == nil {
return nil
}
_, err := ins.db.Exec(`
INSERT OR REPLACE INTO metrics (timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, stats.Timestamp, stats.Queue, stats.Active, stats.Pending, stats.Scheduled, stats.Retry, stats.Archived, stats.Completed, stats.Succeeded, stats.Failed)
return err
}
// GetQueueInfo 获取队列信息
func (ins *Inspector) GetQueueInfo(queueName string) (*asynq.QueueInfo, error) {
return ins.inspector.GetQueueInfo(queueName)
}
// ListActiveTasks 获取活跃任务列表
func (ins *Inspector) ListActiveTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) {
return ins.inspector.ListActiveTasks(queueName, opts...)
}
// ListPendingTasks 获取等待任务列表
func (ins *Inspector) ListPendingTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) {
return ins.inspector.ListPendingTasks(queueName, opts...)
}
// ListScheduledTasks 获取计划任务列表
func (ins *Inspector) ListScheduledTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) {
return ins.inspector.ListScheduledTasks(queueName, opts...)
}
// ListRetryTasks 获取重试任务列表
func (ins *Inspector) ListRetryTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) {
return ins.inspector.ListRetryTasks(queueName, opts...)
}
// ListArchivedTasks 获取归档任务列表
func (ins *Inspector) ListArchivedTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) {
return ins.inspector.ListArchivedTasks(queueName, opts...)
}
// ListCompletedTasks 获取已完成任务列表
func (ins *Inspector) ListCompletedTasks(queueName string, opts ...asynq.ListOption) ([]*asynq.TaskInfo, error) {
return ins.inspector.ListCompletedTasks(queueName, opts...)
}
// RunTask 立即运行归档任务(重试失败任务)
func (ins *Inspector) RunTask(queueName, taskID string) error {
return ins.inspector.RunTask(queueName, taskID)
}
// PauseQueue 暂停队列
func (ins *Inspector) PauseQueue(queueName string) error {
return ins.inspector.PauseQueue(queueName)
}
// UnpauseQueue 恢复队列
func (ins *Inspector) UnpauseQueue(queueName string) error {
return ins.inspector.UnpauseQueue(queueName)
}
// ==================== 统计数据结构 ====================
// QueueInfo 获取每个队列的详细信息
type QueueInfo struct {
Name string `json:"name"`
Priority int `json:"priority"`
Size int `json:"size"` // 队列中任务总数
Active int `json:"active"` // 活跃任务数
Pending int `json:"pending"` // 等待任务数
Scheduled int `json:"scheduled"` // 计划任务数
Retry int `json:"retry"` // 重试任务数
Archived int `json:"archived"` // 归档任务数
Completed int `json:"completed"` // 已完成任务数
Processed int `json:"processed"` // 累计处理数(今日)
Failed int `json:"failed"` // 累计失败数(今日)
Paused bool `json:"paused"` // 是否暂停
MemoryUsage int64 `json:"memory_usage"` // 内存使用(字节)
Latency int64 `json:"latency"` // 延迟(毫秒)
}
// QueueStats 队列统计数据点(用于存储历史数据)
type QueueStats struct {
Timestamp int64 `json:"t"` // Unix 时间戳(秒)
Queue string `json:"q,omitempty"` // 队列名称(汇总查询时为空)
Active int `json:"a"` // 活跃任务数
Pending int `json:"p"` // 等待任务数
Scheduled int `json:"s"` // 计划任务数
Retry int `json:"r"` // 重试任务数
Archived int `json:"ar"` // 归档任务数
Completed int `json:"c"` // 已完成任务数
Succeeded int `json:"su"` // 成功数
Failed int `json:"f"` // 失败数
}
// ==================== 全局统计数据查询 ====================
var statsDB *sql.DB
var statsDBMu sync.RWMutex
// SetStatsDB 设置全局统计数据库(供 HTTPHandler 使用)
func SetStatsDB(db *sql.DB) {
statsDBMu.Lock()
defer statsDBMu.Unlock()
statsDB = db
}
// StatsQuery 统计查询参数
type StatsQuery struct {
Queue string // 队列名称,为空则查询所有队列汇总
Start int64 // 开始时间戳0 表示不限制
End int64 // 结束时间戳0 表示不限制
Limit int // 返回数量限制,默认 500
}
// getQueueStats 获取队列历史统计数据
func getQueueStats(queueName string, limit int) ([]QueueStats, error) {
return getQueueStatsWithQuery(StatsQuery{
Queue: queueName,
Limit: limit,
})
}
// getQueueStatsWithQuery 根据查询条件获取统计数据Prometheus 风格单表查询)
// - 按队列查询:使用 idx_metrics_queue_time 索引
// - 按时间汇总:使用 idx_metrics_time 索引 + GROUP BY
func getQueueStatsWithQuery(q StatsQuery) ([]QueueStats, error) {
statsDBMu.RLock()
db := statsDB
statsDBMu.RUnlock()
if db == nil {
return nil, nil
}
if q.Limit <= 0 {
q.Limit = 500
}
var args []any
var whereClause string
var conditions []string
// 构建 WHERE 条件
if q.Queue != "" {
conditions = append(conditions, "queue = ?")
args = append(args, q.Queue)
}
if q.Start > 0 {
conditions = append(conditions, "timestamp >= ?")
args = append(args, q.Start)
}
if q.End > 0 {
conditions = append(conditions, "timestamp <= ?")
args = append(args, q.End)
}
if len(conditions) > 0 {
whereClause = "WHERE " + strings.Join(conditions, " AND ")
}
var query string
if q.Queue != "" {
// 查询单个队列
query = fmt.Sprintf(`
SELECT timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed
FROM metrics
%s
ORDER BY timestamp DESC
LIMIT ?
`, whereClause)
} else {
// 查询所有队列汇总(按时间 GROUP BY
query = fmt.Sprintf(`
SELECT timestamp, '' as queue, SUM(active), SUM(pending), SUM(scheduled), SUM(retry), SUM(archived), SUM(completed), SUM(succeeded), SUM(failed)
FROM metrics
%s
GROUP BY timestamp
ORDER BY timestamp DESC
LIMIT ?
`, whereClause)
}
args = append(args, q.Limit)
rows, err := db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var statsList []QueueStats
for rows.Next() {
var s QueueStats
if err := rows.Scan(&s.Timestamp, &s.Queue, &s.Active, &s.Pending, &s.Scheduled, &s.Retry, &s.Archived, &s.Completed, &s.Succeeded, &s.Failed); err != nil {
continue
}
statsList = append(statsList, s)
}
// 反转顺序,使时间从早到晚
for i, j := 0, len(statsList)-1; i < j; i, j = i+1, j-1 {
statsList[i], statsList[j] = statsList[j], statsList[i]
}
return statsList, nil
}
// GetStatsDB 返回 Inspector 的数据库连接(供外部设置给 HTTPHandler
func (ins *Inspector) GetStatsDB() *sql.DB {
return ins.db
}

41
plugin.go Normal file
View File

@@ -0,0 +1,41 @@
package taskq
import (
"context"
"github.com/redis/go-redis/v9"
)
// Context 插件上下文,提供对 Servlet 资源的访问
type Context struct {
context.Context
servlet *Servlet
}
// Redis 返回 Redis 客户端
func (ctx *Context) Redis() redis.UniversalClient {
return ctx.servlet.redisClient
}
// Queues 返回队列优先级配置
func (ctx *Context) Queues() map[string]int {
return ctx.servlet.Queues()
}
// Plugin 定义插件接口,用于扩展 Servlet 的生命周期
type Plugin interface {
// Name 返回插件名称,用于日志和调试
Name() string
// Init 初始化插件,在 Servlet.Start 之前调用
// ctx 提供对 Servlet 资源的访问
Init(ctx *Context) error
// Start 启动插件,在 Servlet.Start 时调用
// ctx 提供对 Servlet 资源的访问,内嵌的 context.Context 会在 Stop 时取消
Start(ctx *Context) error
// Stop 停止插件,在 Servlet.Stop 时调用
// 按注册的逆序调用
Stop() error
}

294
servlet.go Normal file
View File

@@ -0,0 +1,294 @@
// Package taskq 提供基于 Redis 的异步任务队列功能
// servlet.go 文件包含 Servlet 结构体,封装了任务队列的完整生命周期管理
package taskq
import (
"context"
"errors"
"log"
"maps"
"reflect"
"sync"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)
// Config 配置 Servlet 的选项
type Config struct {
Redis redis.UniversalClient
Tasks []*Task
Plugins []Plugin
}
// Servlet 封装了 taskq 的完整生命周期管理
//
// 生命周期:
// 1. 配置阶段:调用 Configure 配置 Servlet
// 2. 初始化阶段:调用 Init 初始化插件
// 3. 运行阶段:调用 Start 启动服务器
// 4. 停止阶段:调用 Stop 优雅关闭
type Servlet struct {
mu sync.RWMutex
handlers map[string]asynq.Handler
queues map[string]int
client *asynq.Client
redisClient redis.UniversalClient
plugins []Plugin
exit chan chan struct{}
}
// NewServlet 创建一个新的 Servlet 实例
func NewServlet() *Servlet {
return &Servlet{
handlers: make(map[string]asynq.Handler),
queues: make(map[string]int),
exit: make(chan chan struct{}),
}
}
// Configure 配置 Servlet
func (s *Servlet) Configure(cfg Config) error {
s.mu.Lock()
defer s.mu.Unlock()
if cfg.Redis == nil {
return errors.New("taskq: redis client is required")
}
s.redisClient = cfg.Redis
s.client = asynq.NewClientFromRedisClient(cfg.Redis)
// 注册任务
for _, t := range cfg.Tasks {
if err := s.registerTask(t); err != nil {
return err
}
}
s.plugins = cfg.Plugins
return nil
}
// registerTask 注册单个任务(内部方法,调用时已持有锁)
func (s *Servlet) registerTask(t *Task) error {
if t.Queue == "" {
return errors.New("taskq: queue name cannot be empty")
}
if t.Priority < 0 || t.Priority > 255 {
return errors.New("taskq: priority must be between 0 and 255")
}
if t.MaxRetries < 0 {
return errors.New("taskq: retry count must be non-negative")
}
if t.Handler == nil {
return errors.New("taskq: handler cannot be nil")
}
rv := reflect.ValueOf(t.Handler)
if rv.Kind() != reflect.Func {
return errors.New("taskq: handler must be a function")
}
rt := rv.Type()
// 验证返回值:只能是 error 或无返回值
var returnError bool
for i := range rt.NumOut() {
if i == 0 && rt.Out(0).Implements(errorType) {
returnError = true
} else {
return errors.New("taskq: handler function must return either error or nothing")
}
}
// 验证参数:支持以下签名
// - func(context.Context, T) error
// - func(context.Context) error
// - func(T) error
// - func()
var inContext bool
var inData bool
var dataType reflect.Type
numIn := rt.NumIn()
if numIn > 2 {
return errors.New("taskq: handler function can have at most 2 parameters")
}
for i := range numIn {
fi := rt.In(i)
if fi.Implements(contextType) {
if i != 0 {
return errors.New("taskq: context.Context must be the first parameter")
}
inContext = true
} else if fi.Kind() == reflect.Struct {
if inData {
return errors.New("taskq: handler function can only have one data parameter")
}
inData = true
dataType = fi
} else {
return errors.New("taskq: handler parameter must be context.Context or a struct")
}
}
// 设置任务的反射信息
t.funcValue = rv
t.dataType = dataType
t.inputContext = inContext
t.inputData = inData
t.returnError = returnError
t.servlet = s
s.handlers[t.Name] = t
s.queues[t.Queue] = t.Priority
return nil
}
// Init 初始化所有插件
func (s *Servlet) Init(ctx context.Context) error {
return s.initPlugins(ctx)
}
// initPlugins 初始化所有插件
func (s *Servlet) initPlugins(ctx context.Context) error {
s.mu.RLock()
plugins := s.plugins
s.mu.RUnlock()
pctx := &Context{
Context: ctx,
servlet: s,
}
for _, p := range plugins {
if err := p.Init(pctx); err != nil {
return err
}
}
return nil
}
// Start 启动 taskq 服务器
func (s *Servlet) Start(ctx context.Context) error {
s.mu.Lock()
rdb := s.redisClient
qs := maps.Clone(s.queues)
s.mu.Unlock()
localCtx, cancel := context.WithCancel(ctx)
srv := asynq.NewServerFromRedisClient(rdb, asynq.Config{
Concurrency: 30,
Queues: qs,
BaseContext: func() context.Context { return localCtx },
LogLevel: asynq.WarnLevel,
})
// 启动插件
if err := s.startPlugins(localCtx); err != nil {
cancel()
return err
}
go s.runServer(localCtx, srv)
go s.runMonitor(localCtx, srv, cancel)
return nil
}
// startPlugins 启动所有插件
func (s *Servlet) startPlugins(ctx context.Context) error {
s.mu.RLock()
plugins := s.plugins
s.mu.RUnlock()
pctx := &Context{
Context: ctx,
servlet: s,
}
for _, p := range plugins {
if err := p.Start(pctx); err != nil {
return err
}
}
return nil
}
// runServer 运行任务处理服务器
func (s *Servlet) runServer(_ context.Context, srv *asynq.Server) {
mux := asynq.NewServeMux()
s.mu.RLock()
for name, handler := range s.handlers {
mux.Handle(name, handler)
}
s.mu.RUnlock()
if err := srv.Run(mux); err != nil {
log.Printf("taskq: server error: %v", err)
}
}
func (s *Servlet) runMonitor(ctx context.Context, srv *asynq.Server, cancel context.CancelFunc) {
var exit chan struct{}
select {
case <-ctx.Done():
case exit = <-s.exit:
}
srv.Shutdown()
s.stopPlugins()
cancel()
if exit != nil {
exit <- struct{}{}
}
}
// stopPlugins 停止所有插件(按注册的逆序)
func (s *Servlet) stopPlugins() {
s.mu.RLock()
plugins := s.plugins
s.mu.RUnlock()
for i := len(plugins) - 1; i >= 0; i-- {
if err := plugins[i].Stop(); err != nil {
log.Printf("taskq: plugin %s stop error: %v", plugins[i].Name(), err)
}
}
}
// Stop 优雅停止 taskq 服务器
// 发送停止信号并等待服务器完全关闭
// 可安全地多次调用
func (s *Servlet) Stop() {
exit := make(chan struct{})
s.exit <- exit
<-exit
}
// Client 返回 asynq 客户端
func (s *Servlet) Client() *asynq.Client {
s.mu.RLock()
defer s.mu.RUnlock()
return s.client
}
// RedisClient 返回 Redis 客户端
func (s *Servlet) RedisClient() redis.UniversalClient {
s.mu.RLock()
defer s.mu.RUnlock()
return s.redisClient
}
// Queues 返回队列优先级配置的副本
func (s *Servlet) Queues() map[string]int {
s.mu.RLock()
defer s.mu.RUnlock()
return maps.Clone(s.queues)
}

91
task.go
View File

@@ -13,9 +13,8 @@ import (
"github.com/rs/xid"
)
// Task 定义泛型任务结构
// T 表示任务数据的类型,必须是结构体
type Task[T any] struct {
// Task 定义任务结构
type Task struct {
// 公开字段:用户配置
Queue string // 任务队列名称
Group string // 任务分组
@@ -31,49 +30,17 @@ type Task[T any] struct {
inputContext bool // 是否需要 context.Context 参数
inputData bool // 是否需要数据参数
returnError bool // 是否返回 error
}
// PublishOption 任务发布选项函数类型
// 用于配置任务发布时的各种选项
type PublishOption func() asynq.Option
// Delay 设置任务延迟执行时间
// 参数 d 表示延迟多长时间后执行
func Delay(d time.Duration) PublishOption {
return func() asynq.Option {
return asynq.ProcessIn(d)
}
}
// DelayUntil 设置任务在指定时间执行
// 参数 t 表示任务执行的具体时间点
func DelayUntil(t time.Time) PublishOption {
return func() asynq.Option {
return asynq.ProcessAt(t)
}
}
// TTR 设置任务超时时间
// 覆盖任务默认的超时时间配置
func TTR(d time.Duration) PublishOption {
return func() asynq.Option {
return asynq.Timeout(d)
}
}
// Retention 设置任务结果保留时间
// 任务执行完成后,结果在 Redis 中保留的时间
func Retention(d time.Duration) PublishOption {
return func() asynq.Option {
return asynq.Retention(d)
}
servlet *Servlet // 所属的 Servlet 实例
}
// Publish 发布任务到队列
// 将任务数据序列化后发送到 Redis 队列中等待处理
func (t *Task[T]) Publish(ctx context.Context, data T, options ...PublishOption) error {
// 获取 asynq 客户端
c := client.Load()
func (t *Task) Publish(ctx context.Context, data any, options ...PublishOption) error {
var c *asynq.Client
if t.servlet != nil {
c = t.servlet.Client()
} else if defaultServlet != nil {
c = defaultServlet.Client()
}
if c == nil {
return errors.New("taskq: client not initialized, call SetRedis() first")
}
@@ -124,7 +91,7 @@ func (t *Task[T]) Publish(ctx context.Context, data T, options ...PublishOption)
// ProcessTask 处理任务的核心方法
// 由 asynq 服务器调用,根据任务配置动态调用处理器函数
func (t *Task[T]) ProcessTask(ctx context.Context, tsk *asynq.Task) error {
func (t *Task) ProcessTask(ctx context.Context, tsk *asynq.Task) error {
var in []reflect.Value
// 根据配置添加 context.Context 参数
@@ -158,3 +125,39 @@ func (t *Task[T]) ProcessTask(ctx context.Context, tsk *asynq.Task) error {
}
return nil
}
// PublishOption 任务发布选项函数类型
// 用于配置任务发布时的各种选项
type PublishOption func() asynq.Option
// Delay 设置任务延迟执行时间
// 参数 d 表示延迟多长时间后执行
func Delay(d time.Duration) PublishOption {
return func() asynq.Option {
return asynq.ProcessIn(d)
}
}
// DelayUntil 设置任务在指定时间执行
// 参数 t 表示任务执行的具体时间点
func DelayUntil(t time.Time) PublishOption {
return func() asynq.Option {
return asynq.ProcessAt(t)
}
}
// TTR 设置任务超时时间
// 覆盖任务默认的超时时间配置
func TTR(d time.Duration) PublishOption {
return func() asynq.Option {
return asynq.Timeout(d)
}
}
// Retention 设置任务结果保留时间
// 任务执行完成后,结果在 Redis 中保留的时间
func Retention(d time.Duration) PublishOption {
return func() asynq.Option {
return asynq.Retention(d)
}
}

266
taskq.go
View File

@@ -1,265 +1,51 @@
// Package taskq 提供基于 Redis 的异步任务队列功能
// 使用 asynq 库作为底层实现,支持任务注册、发布、消费和重试机制
//
// 本包提供两种使用方式:
// 1. 包级别函数:使用全局默认 Servlet适合单实例场景
// 2. Servlet 实例方法:支持多实例复用,适合需要隔离的场景
package taskq
import (
"context"
"errors"
"log"
"maps"
"reflect"
"sync/atomic"
"time"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)
// 全局状态变量
// 全局默认 Servlet 实例
var defaultServlet = NewServlet()
// 类型反射常量
var (
started atomic.Bool // 服务器启动状态
exit chan chan struct{} // 优雅退出信号通道
done chan struct{} // 关闭完成信号通道
handlers map[string]asynq.Handler // 任务处理器映射表
queues map[string]int // 队列优先级配置
client atomic.Pointer[asynq.Client] // asynq 客户端实例
redisClient redis.UniversalClient // Redis 客户端实例
inspector *Inspector // 统计采集器实例
errorType = reflect.TypeOf((*error)(nil)).Elem() // error 类型反射
contextType = reflect.TypeOf((*context.Context)(nil)).Elem() // context.Context 类型反射
)
// Init 初始化 taskq 系统
// 创建必要的全局变量和映射表,必须在调用其他函数之前调用
func Init() {
exit = make(chan chan struct{}) // 创建优雅退出通道
done = make(chan struct{}) // 创建关闭完成通道
handlers = make(map[string]asynq.Handler) // 创建任务处理器映射
queues = make(map[string]int) // 创建队列优先级映射
// Default 返回默认的 Servlet 实例
func Default() *Servlet {
return defaultServlet
}
// Register 注册任务处理器
// 使用泛型确保类型安全,通过反射验证处理器函数签名
// 处理器函数签名必须是func(context.Context, T) error 或 func(context.Context) 或 func(T) error 或 func()
func Register[T any](t *Task[T]) error {
if t.Queue == "" {
return errors.New("taskq: queue name cannot be empty")
}
if t.Priority < 0 || t.Priority > 255 {
return errors.New("taskq: priority must be between 0 and 255")
}
if t.MaxRetries < 0 {
return errors.New("taskq: retry count must be non-negative")
}
if t.Handler == nil {
return errors.New("taskq: handler cannot be nil")
}
rv := reflect.ValueOf(t.Handler)
if rv.Kind() != reflect.Func {
return errors.New("taskq: handler must be a function")
}
rt := rv.Type()
// 验证返回值:只能是 error 或无返回值
var returnError bool
for i := range rt.NumOut() {
if i == 0 && rt.Out(0).Implements(errorType) {
returnError = true
} else {
return errors.New("taskq: handler function must return either error or nothing")
}
}
// 验证参数:支持以下签名
// - func(context.Context, T) error
// - func(context.Context) error
// - func(T) error
// - func()
var inContext bool
var inData bool
var dataType reflect.Type
numIn := rt.NumIn()
if numIn > 2 {
return errors.New("taskq: handler function can have at most 2 parameters")
}
for i := range numIn {
fi := rt.In(i)
if fi.Implements(contextType) {
if i != 0 {
return errors.New("taskq: context.Context must be the first parameter")
}
inContext = true
} else if fi.Kind() == reflect.Struct {
if inData {
return errors.New("taskq: handler function can only have one data parameter")
}
inData = true
dataType = fi
} else {
return errors.New("taskq: handler parameter must be context.Context or a struct")
}
}
// 检查服务器是否已启动
if started.Load() {
return errors.New("taskq: cannot register handler after server has started")
}
// 设置任务的反射信息
t.funcValue = rv
t.dataType = dataType
t.inputContext = inContext
t.inputData = inData
t.returnError = returnError
// 注册到全局映射表
handlers[t.Name] = t
queues[t.Queue] = t.Priority
return nil
// Configure 配置默认 Servlet
// 必须在 Init 之前调用
func Configure(cfg Config) error {
return defaultServlet.Configure(cfg)
}
// SetRedis 设置 Redis 客户端
// 必须在启动服务器之前调用,用于配置任务队列的存储后端
func SetRedis(rdb redis.UniversalClient) error {
if started.Load() {
return errors.New("taskq: server is already running")
}
redisClient = rdb
client.Store(asynq.NewClientFromRedisClient(rdb))
return nil
// Init 初始化默认 Servlet 和所有插件
// 必须在 Start 之前调用,且只能调用一次
func Init(ctx context.Context) error {
return defaultServlet.Init(ctx)
}
// StartOptions 启动选项
type StartOptions struct {
// StatsInterval 统计采集间隔,默认 2 秒
StatsInterval time.Duration
// StatsDBPath SQLite 数据库文件路径,默认 "./taskq_stats.db"
StatsDBPath string
// Start 启动 taskq 服务器(包级别函数,使用默认 Servlet
// 开始监听任务队列并处理任务
// 必须在 Init 之后调用
func Start(ctx context.Context) error {
return defaultServlet.Start(ctx)
}
// Start 启动 taskq 服务器
// 开始监听任务队列并处理任务,包含健康检查和优雅退出机制
func Start(ctx context.Context, opts ...StartOptions) error {
if !started.CompareAndSwap(false, true) {
return errors.New("taskq: server is already running")
}
if redisClient == nil {
started.Store(false)
return errors.New("taskq: redis client not initialized, call SetRedis() first")
}
var opt StartOptions
if len(opts) > 0 {
opt = opts[0]
}
if err := startInspector(opt); err != nil {
started.Store(false)
return err
}
srv := createServer(ctx)
go runServer(srv)
go runMonitor(ctx, srv)
return nil
}
// startInspector 启动统计采集器
func startInspector(opt StartOptions) error {
ins, err := NewInspector(InspectorOptions{
Interval: opt.StatsInterval,
DBPath: opt.StatsDBPath,
})
if err != nil {
return err
}
inspector = ins
SetStatsDB(ins.GetStatsDB())
return nil
}
// createServer 创建 asynq 服务器
func createServer(ctx context.Context) *asynq.Server {
return asynq.NewServerFromRedisClient(redisClient, asynq.Config{
Concurrency: 30,
Queues: maps.Clone(queues),
BaseContext: func() context.Context { return ctx },
LogLevel: asynq.WarnLevel,
})
}
// runServer 运行任务处理服务器
func runServer(srv *asynq.Server) {
mux := asynq.NewServeMux()
for name, handler := range handlers {
mux.Handle(name, handler)
}
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
// runMonitor 运行监控协程,处理优雅退出和健康检查
func runMonitor(ctx context.Context, srv *asynq.Server) {
defer close(done)
defer started.Store(false)
defer closeInspector()
defer srv.Shutdown()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case quit := <-exit:
quit <- struct{}{}
return
case <-ctx.Done():
// ctx 取消时,排空 exit 通道中可能的信号
select {
case quit := <-exit:
quit <- struct{}{}
default:
}
return
case <-ticker.C:
if err := srv.Ping(); err != nil {
log.Println(err)
return
}
}
}
}
// closeInspector 关闭统计采集器
func closeInspector() {
if inspector != nil {
inspector.Close()
inspector = nil
}
}
// Stop 优雅停止 taskq 服务器
// Stop 优雅停止 taskq 服务器(包级别函数,使用默认 Servlet
// 发送停止信号并等待服务器完全关闭
func Stop() {
if !started.Load() {
return
}
quit := make(chan struct{})
select {
case exit <- quit:
<-quit // 等待确认收到退出信号
default:
// monitor 已经退出
}
<-done // 等待 runMonitor 完全结束
defaultServlet.Stop()
}

484
x/inspector/inspector.go Normal file
View File

@@ -0,0 +1,484 @@
// Package inspector 提供 taskq 的统计采集功能
package inspector
import (
"context"
"database/sql"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"code.tczkiot.com/wlw/taskq"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)
// Options 配置统计采集器的选项
type Options struct {
// Interval 采集间隔,默认 2 秒
Interval time.Duration
// DBPath SQLite 数据库文件路径,默认为 "./taskq_stats.db"
DBPath string
}
// Inspector 统计采集器,独立于 HTTP 服务运行
// 实现 taskq.Plugin 接口
type Inspector struct {
opts Options
rdb redis.UniversalClient
queues func() map[string]int
inspector *asynq.Inspector
db *sql.DB
closeCh chan struct{}
closeOnce sync.Once
}
// New 创建新的统计采集器
// 创建后需要通过 Servlet.Use() 注册
func New(opts Options) *Inspector {
if opts.Interval <= 0 {
opts.Interval = 2 * time.Second
}
if opts.DBPath == "" {
opts.DBPath = "./taskq_stats.db"
}
return &Inspector{
opts: opts,
closeCh: make(chan struct{}),
}
}
// Name 返回插件名称
func (ins *Inspector) Name() string {
return "inspector"
}
// Init 初始化插件,从 Context 获取 Redis 和 Queues
func (ins *Inspector) Init(ctx *taskq.Context) error {
ins.rdb = ctx.Redis()
ins.queues = ctx.Queues
return nil
}
// Start 启动采集器,初始化数据库并开始后台采集
func (ins *Inspector) Start(ctx *taskq.Context) error {
// 确保目录存在
dir := filepath.Dir(ins.opts.DBPath)
if dir != "" && dir != "." {
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("inspector: failed to create directory: %v", err)
}
}
// 打开 SQLite 数据库
db, err := sql.Open("sqlite3", ins.opts.DBPath)
if err != nil {
return fmt.Errorf("inspector: failed to open database: %v", err)
}
// 初始化数据库表
if err := initStatsDB(db); err != nil {
db.Close()
return fmt.Errorf("inspector: failed to init database: %v", err)
}
ins.db = db
ins.inspector = asynq.NewInspectorFromRedisClient(ins.rdb)
// 启动后台统计采集
go ins.startCollector(ctx)
return nil
}
// Stop 停止采集器,关闭数据库连接
func (ins *Inspector) Stop() error {
ins.closeOnce.Do(func() {
close(ins.closeCh)
})
if ins.db != nil {
ins.db.Close()
}
if ins.inspector != nil {
ins.inspector.Close()
}
return nil
}
// initStatsDB 初始化数据库
func initStatsDB(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
queue TEXT NOT NULL,
active INTEGER DEFAULT 0,
pending INTEGER DEFAULT 0,
scheduled INTEGER DEFAULT 0,
retry INTEGER DEFAULT 0,
archived INTEGER DEFAULT 0,
completed INTEGER DEFAULT 0,
succeeded INTEGER DEFAULT 0,
failed INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_metrics_queue_time ON metrics(queue, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_metrics_time ON metrics(timestamp DESC);
CREATE UNIQUE INDEX IF NOT EXISTS idx_metrics_unique ON metrics(timestamp, queue);
`)
return err
}
// startCollector 启动后台统计采集任务
func (ins *Inspector) startCollector(ctx context.Context) {
ticker := time.NewTicker(ins.opts.Interval)
defer ticker.Stop()
for {
select {
case <-ins.closeCh:
return
case <-ctx.Done():
return
case <-ticker.C:
ins.collectStats()
}
}
}
// collectStats 采集所有队列的统计数据
func (ins *Inspector) collectStats() {
if ins.queues == nil || ins.inspector == nil {
return
}
now := time.Now().Unix()
queueList := ins.queues()
for queueName := range queueList {
info, err := ins.inspector.GetQueueInfo(queueName)
if err != nil {
continue
}
stats := Stats{
Queue: queueName,
Timestamp: now,
Active: info.Active,
Pending: info.Pending,
Scheduled: info.Scheduled,
Retry: info.Retry,
Archived: info.Archived,
Completed: info.Completed,
Succeeded: info.Processed - info.Failed,
Failed: info.Failed,
}
ins.saveMetrics(stats)
}
}
// saveMetrics 保存统计数据到 metrics 表
func (ins *Inspector) saveMetrics(stats Stats) error {
if ins.db == nil {
return nil
}
_, err := ins.db.Exec(`
INSERT OR REPLACE INTO metrics (timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, stats.Timestamp, stats.Queue, stats.Active, stats.Pending, stats.Scheduled, stats.Retry, stats.Archived, stats.Completed, stats.Succeeded, stats.Failed)
return err
}
// ==================== 数据类型 ====================
// StatsQuery 统计查询参数
type StatsQuery struct {
Queue string // 队列名称,为空则查询所有队列汇总
Start int64 // 开始时间戳0 表示不限制
End int64 // 结束时间戳0 表示不限制
Limit int // 返回数量限制,默认 500
}
// Stats 队列统计数据点
type Stats struct {
Timestamp int64 `json:"t"` // Unix 时间戳(秒)
Queue string `json:"q,omitempty"` // 队列名称
Active int `json:"a"` // 活跃任务数
Pending int `json:"p"` // 等待任务数
Scheduled int `json:"s"` // 计划任务数
Retry int `json:"r"` // 重试任务数
Archived int `json:"ar"` // 归档任务数
Completed int `json:"c"` // 已完成任务数
Succeeded int `json:"su"` // 成功数
Failed int `json:"f"` // 失败数
}
// QueueInfo 队列信息
type QueueInfo struct {
Name string `json:"name"`
Priority int `json:"priority"`
Size int `json:"size"`
Active int `json:"active"`
Pending int `json:"pending"`
Scheduled int `json:"scheduled"`
Retry int `json:"retry"`
Archived int `json:"archived"`
Completed int `json:"completed"`
Processed int `json:"processed"`
Failed int `json:"failed"`
Paused bool `json:"paused"`
MemoryUsage int64 `json:"memory_usage"`
Latency time.Duration `json:"-"`
LatencyMS int64 `json:"latency"`
}
// TaskInfo 任务信息
type TaskInfo struct {
ID string `json:"id"`
Type string `json:"type"`
Payload []byte `json:"payload"`
Queue string `json:"queue"`
Retried int `json:"retried"`
LastFailedAt time.Time `json:"last_failed_at,omitempty"`
LastErr string `json:"last_error,omitempty"`
NextProcessAt time.Time `json:"next_process_at,omitempty"`
CompletedAt time.Time `json:"completed_at,omitempty"`
}
// ==================== 查询方法 ====================
// QueryStats 查询统计数据
func (ins *Inspector) QueryStats(q StatsQuery) ([]Stats, error) {
if ins.db == nil {
return nil, nil
}
limit := q.Limit
if limit <= 0 {
limit = 500
}
var args []any
var whereClause string
var conditions []string
if q.Queue != "" {
conditions = append(conditions, "queue = ?")
args = append(args, q.Queue)
}
if q.Start > 0 {
conditions = append(conditions, "timestamp >= ?")
args = append(args, q.Start)
}
if q.End > 0 {
conditions = append(conditions, "timestamp <= ?")
args = append(args, q.End)
}
if len(conditions) > 0 {
whereClause = "WHERE " + strings.Join(conditions, " AND ")
}
var query string
if q.Queue != "" {
query = fmt.Sprintf(`
SELECT timestamp, queue, active, pending, scheduled, retry, archived, completed, succeeded, failed
FROM metrics
%s
ORDER BY timestamp DESC
LIMIT ?
`, whereClause)
} else {
query = fmt.Sprintf(`
SELECT timestamp, '' as queue, SUM(active), SUM(pending), SUM(scheduled), SUM(retry), SUM(archived), SUM(completed), SUM(succeeded), SUM(failed)
FROM metrics
%s
GROUP BY timestamp
ORDER BY timestamp DESC
LIMIT ?
`, whereClause)
}
args = append(args, limit)
rows, err := ins.db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var statsList []Stats
for rows.Next() {
var s Stats
if err := rows.Scan(&s.Timestamp, &s.Queue, &s.Active, &s.Pending, &s.Scheduled, &s.Retry, &s.Archived, &s.Completed, &s.Succeeded, &s.Failed); err != nil {
continue
}
statsList = append(statsList, s)
}
// 反转顺序,使时间从早到晚
for i, j := 0, len(statsList)-1; i < j; i, j = i+1, j-1 {
statsList[i], statsList[j] = statsList[j], statsList[i]
}
return statsList, nil
}
// GetQueueInfo 获取队列信息
func (ins *Inspector) GetQueueInfo(queueName string) (*QueueInfo, error) {
if ins.inspector == nil {
return nil, fmt.Errorf("inspector: not started")
}
info, err := ins.inspector.GetQueueInfo(queueName)
if err != nil {
return nil, err
}
return &QueueInfo{
Name: info.Queue,
Size: info.Size,
Active: info.Active,
Pending: info.Pending,
Scheduled: info.Scheduled,
Retry: info.Retry,
Archived: info.Archived,
Completed: info.Completed,
Processed: info.Processed,
Failed: info.Failed,
Paused: info.Paused,
MemoryUsage: info.MemoryUsage,
Latency: info.Latency,
LatencyMS: info.Latency.Milliseconds(),
}, nil
}
// convertTaskInfo 将 asynq.TaskInfo 转换为 TaskInfo
func convertTaskInfo(task *asynq.TaskInfo) *TaskInfo {
return &TaskInfo{
ID: task.ID,
Type: task.Type,
Payload: task.Payload,
Queue: task.Queue,
Retried: task.Retried,
LastFailedAt: task.LastFailedAt,
LastErr: task.LastErr,
NextProcessAt: task.NextProcessAt,
CompletedAt: task.CompletedAt,
}
}
// convertTaskList 批量转换任务列表
func convertTaskList(tasks []*asynq.TaskInfo) []*TaskInfo {
result := make([]*TaskInfo, len(tasks))
for i, t := range tasks {
result[i] = convertTaskInfo(t)
}
return result
}
// ListActiveTasks 获取活跃任务列表
func (ins *Inspector) ListActiveTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) {
if ins.inspector == nil {
return nil, fmt.Errorf("inspector: not started")
}
tasks, err := ins.inspector.ListActiveTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page))
if err != nil {
return nil, err
}
return convertTaskList(tasks), nil
}
// ListPendingTasks 获取等待任务列表
func (ins *Inspector) ListPendingTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) {
if ins.inspector == nil {
return nil, fmt.Errorf("inspector: not started")
}
tasks, err := ins.inspector.ListPendingTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page))
if err != nil {
return nil, err
}
return convertTaskList(tasks), nil
}
// ListScheduledTasks 获取计划任务列表
func (ins *Inspector) ListScheduledTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) {
if ins.inspector == nil {
return nil, fmt.Errorf("inspector: not started")
}
tasks, err := ins.inspector.ListScheduledTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page))
if err != nil {
return nil, err
}
return convertTaskList(tasks), nil
}
// ListRetryTasks 获取重试任务列表
func (ins *Inspector) ListRetryTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) {
if ins.inspector == nil {
return nil, fmt.Errorf("inspector: not started")
}
tasks, err := ins.inspector.ListRetryTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page))
if err != nil {
return nil, err
}
return convertTaskList(tasks), nil
}
// ListArchivedTasks 获取归档任务列表
func (ins *Inspector) ListArchivedTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) {
if ins.inspector == nil {
return nil, fmt.Errorf("inspector: not started")
}
tasks, err := ins.inspector.ListArchivedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page))
if err != nil {
return nil, err
}
return convertTaskList(tasks), nil
}
// ListCompletedTasks 获取已完成任务列表
func (ins *Inspector) ListCompletedTasks(queueName string, pageSize, page int) ([]*TaskInfo, error) {
if ins.inspector == nil {
return nil, fmt.Errorf("inspector: not started")
}
tasks, err := ins.inspector.ListCompletedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page))
if err != nil {
return nil, err
}
return convertTaskList(tasks), nil
}
// RunTask 立即运行归档任务(重试失败任务)
func (ins *Inspector) RunTask(queueName, taskID string) error {
if ins.inspector == nil {
return fmt.Errorf("inspector: not started")
}
return ins.inspector.RunTask(queueName, taskID)
}
// PauseQueue 暂停队列
func (ins *Inspector) PauseQueue(queueName string) error {
if ins.inspector == nil {
return fmt.Errorf("inspector: not started")
}
return ins.inspector.PauseQueue(queueName)
}
// UnpauseQueue 恢复队列
func (ins *Inspector) UnpauseQueue(queueName string) error {
if ins.inspector == nil {
return fmt.Errorf("inspector: not started")
}
return ins.inspector.UnpauseQueue(queueName)
}

268
x/metrics/metrics.go Normal file
View File

@@ -0,0 +1,268 @@
// Package metrics 提供 taskq 的 Prometheus 指标采集功能
package metrics
import (
"context"
"sync"
"time"
"code.tczkiot.com/wlw/taskq"
"github.com/hibiken/asynq"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
)
// Options 配置指标采集器的选项
type Options struct {
// Namespace 指标命名空间,默认为 "taskq"
Namespace string
// Interval 采集间隔,默认 15 秒
Interval time.Duration
// Registry Prometheus 注册器,默认使用 prometheus.DefaultRegisterer
Registry prometheus.Registerer
}
// Metrics Prometheus 指标采集器
// 实现 taskq.Plugin 接口
type Metrics struct {
opts Options
rdb redis.UniversalClient
queues func() map[string]int
inspector *asynq.Inspector
closeCh chan struct{}
closeOnce sync.Once
// Prometheus 指标
queueSize *prometheus.GaugeVec
activeTasks *prometheus.GaugeVec
pendingTasks *prometheus.GaugeVec
scheduledTasks *prometheus.GaugeVec
retryTasks *prometheus.GaugeVec
archivedTasks *prometheus.GaugeVec
completedTasks *prometheus.GaugeVec
processedTotal *prometheus.GaugeVec
failedTotal *prometheus.GaugeVec
latencySeconds *prometheus.GaugeVec
memoryBytes *prometheus.GaugeVec
paused *prometheus.GaugeVec
}
// New 创建新的指标采集器
func New(opts Options) *Metrics {
if opts.Namespace == "" {
opts.Namespace = "taskq"
}
if opts.Interval <= 0 {
opts.Interval = 15 * time.Second
}
if opts.Registry == nil {
opts.Registry = prometheus.DefaultRegisterer
}
m := &Metrics{
opts: opts,
closeCh: make(chan struct{}),
}
// 初始化指标
m.initMetrics()
return m
}
// initMetrics 初始化 Prometheus 指标
func (m *Metrics) initMetrics() {
labels := []string{"queue"}
m.queueSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "queue_size",
Help: "Total number of tasks in the queue",
}, labels)
m.activeTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "tasks_active",
Help: "Number of currently active tasks",
}, labels)
m.pendingTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "tasks_pending",
Help: "Number of pending tasks",
}, labels)
m.scheduledTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "tasks_scheduled",
Help: "Number of scheduled tasks",
}, labels)
m.retryTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "tasks_retry",
Help: "Number of tasks in retry queue",
}, labels)
m.archivedTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "tasks_archived",
Help: "Number of archived (dead) tasks",
}, labels)
m.completedTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "tasks_completed",
Help: "Number of completed tasks (retained)",
}, labels)
m.processedTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "tasks_processed_total",
Help: "Total number of processed tasks (today)",
}, labels)
m.failedTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "tasks_failed_total",
Help: "Total number of failed tasks (today)",
}, labels)
m.latencySeconds = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "queue_latency_seconds",
Help: "Queue latency in seconds",
}, labels)
m.memoryBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "queue_memory_bytes",
Help: "Memory usage of the queue in bytes",
}, labels)
m.paused = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.opts.Namespace,
Name: "queue_paused",
Help: "Whether the queue is paused (1) or not (0)",
}, labels)
}
// Name 返回插件名称
func (m *Metrics) Name() string {
return "metrics"
}
// Init 初始化插件,从 Context 获取 Redis 和 Queues
func (m *Metrics) Init(ctx *taskq.Context) error {
m.rdb = ctx.Redis()
m.queues = ctx.Queues
return nil
}
// Start 启动指标采集
func (m *Metrics) Start(ctx *taskq.Context) error {
m.inspector = asynq.NewInspectorFromRedisClient(m.rdb)
// 注册指标
collectors := []prometheus.Collector{
m.queueSize,
m.activeTasks,
m.pendingTasks,
m.scheduledTasks,
m.retryTasks,
m.archivedTasks,
m.completedTasks,
m.processedTotal,
m.failedTotal,
m.latencySeconds,
m.memoryBytes,
m.paused,
}
for _, c := range collectors {
if err := m.opts.Registry.Register(c); err != nil {
// 如果已注册则忽略
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
return err
}
}
}
// 启动后台采集
go m.startCollector(ctx)
return nil
}
// Stop 停止指标采集
func (m *Metrics) Stop() error {
m.closeOnce.Do(func() {
close(m.closeCh)
})
if m.inspector != nil {
m.inspector.Close()
}
return nil
}
// startCollector 启动后台指标采集
func (m *Metrics) startCollector(ctx context.Context) {
// 立即采集一次
m.collect()
ticker := time.NewTicker(m.opts.Interval)
defer ticker.Stop()
for {
select {
case <-m.closeCh:
return
case <-ctx.Done():
return
case <-ticker.C:
m.collect()
}
}
}
// collect 采集所有队列的指标
func (m *Metrics) collect() {
if m.queues == nil || m.inspector == nil {
return
}
queues := m.queues()
for queueName := range queues {
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
continue
}
m.queueSize.WithLabelValues(queueName).Set(float64(info.Size))
m.activeTasks.WithLabelValues(queueName).Set(float64(info.Active))
m.pendingTasks.WithLabelValues(queueName).Set(float64(info.Pending))
m.scheduledTasks.WithLabelValues(queueName).Set(float64(info.Scheduled))
m.retryTasks.WithLabelValues(queueName).Set(float64(info.Retry))
m.archivedTasks.WithLabelValues(queueName).Set(float64(info.Archived))
m.completedTasks.WithLabelValues(queueName).Set(float64(info.Completed))
m.processedTotal.WithLabelValues(queueName).Set(float64(info.Processed))
m.failedTotal.WithLabelValues(queueName).Set(float64(info.Failed))
m.latencySeconds.WithLabelValues(queueName).Set(info.Latency.Seconds())
m.memoryBytes.WithLabelValues(queueName).Set(float64(info.MemoryUsage))
if info.Paused {
m.paused.WithLabelValues(queueName).Set(1)
} else {
m.paused.WithLabelValues(queueName).Set(0)
}
}
}

View File

@@ -1,6 +1,5 @@
// Package taskq 提供基于 Redis 的异步任务队列功能
// handler.go 文件包含 HTTP 监控服务处理器
package taskq
// Package monitor 提供 taskq 的 HTTP 监控服务
package monitor
import (
"embed"
@@ -14,14 +13,20 @@ import (
"sync"
"time"
"github.com/hibiken/asynq"
"code.tczkiot.com/wlw/taskq/x/inspector"
)
//go:embed ui/*
var uiFS embed.FS
// HTTPHandlerOptions 配置监控服务的选项
type HTTPHandlerOptions struct {
// Options 配置监控服务的选项
type Options struct {
// Inspector 检查器实例(必需)
Inspector *inspector.Inspector
// Queues 队列优先级映射(必需)
Queues map[string]int
// RootPath 监控服务的根路径,默认为 "/monitor"
RootPath string
@@ -29,19 +34,24 @@ type HTTPHandlerOptions struct {
ReadOnly bool
}
// HTTPHandler 监控服务的 HTTP 处理器
type HTTPHandler struct {
// Monitor 监控服务的 HTTP 处理器
type Monitor struct {
router *http.ServeMux
rootPath string
readOnly bool
closeCh chan struct{}
closeOnce sync.Once
inspector *inspector.Inspector
queues map[string]int
}
// NewHTTPHandler 创建新的监控 HTTP 处理器
func NewHTTPHandler(opts HTTPHandlerOptions) (*HTTPHandler, error) {
if redisClient == nil {
return nil, fmt.Errorf("taskq: redis client not initialized, call SetRedis() first")
// New 创建新的监控服务
func New(opts Options) (*Monitor, error) {
if opts.Inspector == nil {
return nil, fmt.Errorf("monitor: inspector is required")
}
if opts.Queues == nil {
return nil, fmt.Errorf("monitor: queues is required")
}
// 设置默认值
@@ -55,71 +65,71 @@ func NewHTTPHandler(opts HTTPHandlerOptions) (*HTTPHandler, error) {
}
opts.RootPath = strings.TrimSuffix(opts.RootPath, "/")
handler := &HTTPHandler{
router: http.NewServeMux(),
rootPath: opts.RootPath,
readOnly: opts.ReadOnly,
closeCh: make(chan struct{}),
m := &Monitor{
router: http.NewServeMux(),
rootPath: opts.RootPath,
readOnly: opts.ReadOnly,
closeCh: make(chan struct{}),
inspector: opts.Inspector,
queues: opts.Queues,
}
handler.setupRoutes()
return handler, nil
m.setupRoutes()
return m, nil
}
// ServeHTTP 实现 http.Handler 接口
func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.router.ServeHTTP(w, r)
func (m *Monitor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.router.ServeHTTP(w, r)
}
// RootPath 返回监控服务的根路径
func (h *HTTPHandler) RootPath() string {
return h.rootPath
func (m *Monitor) RootPath() string {
return m.rootPath
}
// Close 关闭 HTTP 处理器
func (h *HTTPHandler) Close() error {
h.closeOnce.Do(func() {
close(h.closeCh)
// Close 关闭监控服务
func (m *Monitor) Close() error {
m.closeOnce.Do(func() {
close(m.closeCh)
})
return nil
}
// setupRoutes 设置路由
func (h *HTTPHandler) setupRoutes() {
func (m *Monitor) setupRoutes() {
// API 路由
apiPath := h.rootPath + "/api/"
h.router.HandleFunc(apiPath+"queues", h.handleQueues)
h.router.HandleFunc(apiPath+"queues/", h.handleQueueDetail)
h.router.HandleFunc(apiPath+"tasks/", h.handleTasks)
h.router.HandleFunc(apiPath+"stats/", h.handleStats)
h.router.HandleFunc(apiPath+"sse", h.handleSSE)
apiPath := m.rootPath + "/api/"
m.router.HandleFunc(apiPath+"queues", m.handleQueues)
m.router.HandleFunc(apiPath+"queues/", m.handleQueueDetail)
m.router.HandleFunc(apiPath+"tasks/", m.handleTasks)
m.router.HandleFunc(apiPath+"stats/", m.handleStats)
m.router.HandleFunc(apiPath+"sse", m.handleSSE)
// 静态文件路由
uiSubFS, _ := fs.Sub(uiFS, "ui")
fileServer := http.FileServer(http.FS(uiSubFS))
h.router.Handle(h.rootPath+"/static/", http.StripPrefix(h.rootPath+"/static/", fileServer))
m.router.Handle(m.rootPath+"/static/", http.StripPrefix(m.rootPath+"/static/", fileServer))
// 主页路由(包含 History API 的路由)
h.router.HandleFunc(h.rootPath+"/queues/", h.handleIndex)
h.router.HandleFunc(h.rootPath+"/", h.handleIndex)
h.router.HandleFunc(h.rootPath, h.handleIndex)
m.router.HandleFunc(m.rootPath+"/queues/", m.handleIndex)
m.router.HandleFunc(m.rootPath+"/", m.handleIndex)
m.router.HandleFunc(m.rootPath, m.handleIndex)
}
// handleStats 处理队列统计数据请求
// GET /api/stats/{queue}?start=1234567890&end=1234567899&limit=500
// GET /api/stats/?start=1234567890&end=1234567899&limit=500 (查询所有队列汇总)
func (h *HTTPHandler) handleStats(w http.ResponseWriter, r *http.Request) {
func (m *Monitor) handleStats(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 从 URL 中提取队列名称(可选,为空则查询所有队列汇总)
path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/stats/")
path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/stats/")
queueName := strings.TrimSuffix(path, "/")
// 构建查询参数
query := StatsQuery{
query := inspector.StatsQuery{
Queue: queueName,
Limit: 500,
}
@@ -145,7 +155,7 @@ func (h *HTTPHandler) handleStats(w http.ResponseWriter, r *http.Request) {
}
}
stats, err := getQueueStatsWithQuery(query)
stats, err := m.inspector.QueryStats(query)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to get stats: %v", err), http.StatusInternalServerError)
return
@@ -155,40 +165,58 @@ func (h *HTTPHandler) handleStats(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(stats)
}
// queueInfoJSON 用于 JSON 输出的队列信息
type queueInfoJSON struct {
Name string `json:"name"`
Priority int `json:"priority"`
Size int `json:"size"`
Active int `json:"active"`
Pending int `json:"pending"`
Scheduled int `json:"scheduled"`
Retry int `json:"retry"`
Archived int `json:"archived"`
Completed int `json:"completed"`
Processed int `json:"processed"`
Failed int `json:"failed"`
Paused bool `json:"paused"`
MemoryUsage int64 `json:"memory_usage"`
Latency int64 `json:"latency"`
}
// handleQueues 处理队列列表请求
func (h *HTTPHandler) handleQueues(w http.ResponseWriter, r *http.Request) {
func (m *Monitor) handleQueues(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var queueInfos []QueueInfo
var queueInfos []queueInfoJSON
// 首先显示所有注册的队列即使Redis中还没有任务
for queueName, priority := range queues {
stats, err := inspector.GetQueueInfo(queueName)
for queueName, priority := range m.queues {
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
// 如果队列不存在,创建一个空的状态
queueInfos = append(queueInfos, QueueInfo{
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
})
} else {
queueInfos = append(queueInfos, QueueInfo{
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
Size: stats.Size,
Active: stats.Active,
Pending: stats.Pending,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Completed: stats.Completed,
Processed: stats.Processed,
Failed: stats.Failed,
Paused: stats.Paused,
MemoryUsage: stats.MemoryUsage,
Latency: stats.Latency.Milliseconds(),
Size: info.Size,
Active: info.Active,
Pending: info.Pending,
Scheduled: info.Scheduled,
Retry: info.Retry,
Archived: info.Archived,
Completed: info.Completed,
Processed: info.Processed,
Failed: info.Failed,
Paused: info.Paused,
MemoryUsage: info.MemoryUsage,
Latency: info.LatencyMS,
})
}
}
@@ -203,12 +231,9 @@ func (h *HTTPHandler) handleQueues(w http.ResponseWriter, r *http.Request) {
}
// handleQueueDetail 处理队列详情请求和队列操作
// GET /api/queues/{queue} - 获取队列详情
// POST /api/queues/{queue}/pause - 暂停队列
// POST /api/queues/{queue}/unpause - 恢复队列
func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request) {
func (m *Monitor) handleQueueDetail(w http.ResponseWriter, r *http.Request) {
// 从 URL 中提取队列名称
path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/queues/")
path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/queues/")
parts := strings.Split(path, "/")
if len(parts) == 0 || parts[0] == "" {
http.Error(w, "Queue name is required", http.StatusBadRequest)
@@ -217,21 +242,21 @@ func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request)
queueName := parts[0]
// 检查队列是否已注册
if _, exists := queues[queueName]; !exists {
if _, exists := m.queues[queueName]; !exists {
http.Error(w, "Queue not found", http.StatusNotFound)
return
}
// 处理暂停/恢复请求
if r.Method == http.MethodPost && len(parts) >= 2 {
if h.readOnly {
if m.readOnly {
http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden)
return
}
action := parts[1]
switch action {
case "pause":
if err := inspector.PauseQueue(queueName); err != nil {
if err := m.inspector.PauseQueue(queueName); err != nil {
http.Error(w, fmt.Sprintf("Failed to pause queue: %v", err), http.StatusInternalServerError)
return
}
@@ -239,7 +264,7 @@ func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request)
json.NewEncoder(w).Encode(map[string]string{"status": "paused"})
return
case "unpause":
if err := inspector.UnpauseQueue(queueName); err != nil {
if err := m.inspector.UnpauseQueue(queueName); err != nil {
http.Error(w, fmt.Sprintf("Failed to unpause queue: %v", err), http.StatusInternalServerError)
return
}
@@ -258,7 +283,7 @@ func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request)
}
// 获取队列详细信息
stats, err := inspector.GetQueueInfo(queueName)
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
// 如果队列在 Redis 中不存在,返回空状态
if strings.Contains(err.Error(), "queue not found") {
@@ -282,11 +307,11 @@ func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
json.NewEncoder(w).Encode(info)
}
// TaskInfo 转换任务信息
type TaskInfo struct {
// taskInfoJSON 转换任务信息用于 JSON 输出
type taskInfoJSON struct {
ID string `json:"id"`
Type string `json:"type"`
Payload string `json:"payload"`
@@ -299,11 +324,9 @@ type TaskInfo struct {
}
// handleTasks 处理任务列表请求和任务操作
// GET /api/tasks/{queue}/{state} - 获取任务列表
// POST /api/tasks/{queue}/archived/{taskId}/retry - 重试失败任务
func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) {
func (m *Monitor) handleTasks(w http.ResponseWriter, r *http.Request) {
// 从 URL 中提取队列名称和任务状态
path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/tasks/")
path := strings.TrimPrefix(r.URL.Path, m.rootPath+"/api/tasks/")
parts := strings.Split(path, "/")
if len(parts) < 2 {
http.Error(w, "Queue name and task state are required", http.StatusBadRequest)
@@ -315,12 +338,12 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) {
// 处理重试请求: POST /api/tasks/{queue}/archived/{taskId}/retry
if r.Method == http.MethodPost && len(parts) >= 4 && parts[1] == "archived" && parts[3] == "retry" {
if h.readOnly {
if m.readOnly {
http.Error(w, "Operation not allowed in read-only mode", http.StatusForbidden)
return
}
taskID := parts[2]
h.handleRetryTask(w, r, queueName, taskID)
m.handleRetryTask(w, r, queueName, taskID)
return
}
@@ -330,7 +353,7 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) {
}
// 检查队列是否已注册
if _, exists := queues[queueName]; !exists {
if _, exists := m.queues[queueName]; !exists {
http.Error(w, "Queue not found", http.StatusNotFound)
return
}
@@ -351,7 +374,7 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) {
// 获取队列信息以获取任务总数
var total int
queueInfo, queueErr := inspector.GetQueueInfo(queueName)
queueInfo, queueErr := m.inspector.GetQueueInfo(queueName)
if queueErr == nil {
switch taskState {
case "active":
@@ -370,22 +393,22 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) {
}
// 根据任务状态获取任务列表
var tasks []*asynq.TaskInfo
var tasks []*inspector.TaskInfo
var err error
switch taskState {
case "active":
tasks, err = inspector.ListActiveTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1))
tasks, err = m.inspector.ListActiveTasks(queueName, pageSize, page-1)
case "pending":
tasks, err = inspector.ListPendingTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1))
tasks, err = m.inspector.ListPendingTasks(queueName, pageSize, page-1)
case "scheduled":
tasks, err = inspector.ListScheduledTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1))
tasks, err = m.inspector.ListScheduledTasks(queueName, pageSize, page-1)
case "retry":
tasks, err = inspector.ListRetryTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1))
tasks, err = m.inspector.ListRetryTasks(queueName, pageSize, page-1)
case "archived":
tasks, err = inspector.ListArchivedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1))
tasks, err = m.inspector.ListArchivedTasks(queueName, pageSize, page-1)
case "completed":
tasks, err = inspector.ListCompletedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1))
tasks, err = m.inspector.ListCompletedTasks(queueName, pageSize, page-1)
default:
http.Error(w, "Invalid task state. Valid states: active, pending, scheduled, retry, archived, completed", http.StatusBadRequest)
return
@@ -394,7 +417,7 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) {
// 如果队列在 Redis 中不存在(没有任务),返回空列表而不是错误
if err != nil {
if strings.Contains(err.Error(), "queue not found") {
tasks = []*asynq.TaskInfo{}
tasks = []*inspector.TaskInfo{}
total = 0
} else {
http.Error(w, fmt.Sprintf("Failed to get tasks: %v", err), http.StatusInternalServerError)
@@ -402,9 +425,9 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) {
}
}
var taskInfos []TaskInfo
var taskInfos []taskInfoJSON
for _, task := range tasks {
info := TaskInfo{
info := taskInfoJSON{
ID: task.ID,
Type: task.Type,
Payload: string(task.Payload),
@@ -440,15 +463,15 @@ func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) {
}
// handleRetryTask 重试失败任务
func (h *HTTPHandler) handleRetryTask(w http.ResponseWriter, r *http.Request, queueName, taskID string) {
func (m *Monitor) handleRetryTask(w http.ResponseWriter, r *http.Request, queueName, taskID string) {
// 检查队列是否已注册
if _, exists := queues[queueName]; !exists {
if _, exists := m.queues[queueName]; !exists {
http.Error(w, "Queue not found", http.StatusNotFound)
return
}
// 运行重试
err := inspector.RunTask(queueName, taskID)
err := m.inspector.RunTask(queueName, taskID)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to retry task: %v", err), http.StatusInternalServerError)
return
@@ -459,7 +482,7 @@ func (h *HTTPHandler) handleRetryTask(w http.ResponseWriter, r *http.Request, qu
}
// handleIndex 处理主页请求,返回 SPA 入口页面
func (h *HTTPHandler) handleIndex(w http.ResponseWriter, r *http.Request) {
func (m *Monitor) handleIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
@@ -473,17 +496,14 @@ func (h *HTTPHandler) handleIndex(w http.ResponseWriter, r *http.Request) {
}
// 替换模板变量
content := strings.ReplaceAll(string(indexHTML), "{{.RootPath}}", h.rootPath)
content := strings.ReplaceAll(string(indexHTML), "{{.RootPath}}", m.rootPath)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write([]byte(content))
}
// handleSSE 处理 Server-Sent Events 实时数据推送
// 交叉推送两种数据:
// - stats: 统计图表数据(来自 SQLite每 2 秒)
// - queues: 队列表格数据(来自 Redis每 5 秒)
func (h *HTTPHandler) handleSSE(w http.ResponseWriter, r *http.Request) {
func (m *Monitor) handleSSE(w http.ResponseWriter, r *http.Request) {
// 设置 SSE 响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
@@ -507,27 +527,26 @@ func (h *HTTPHandler) handleSSE(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// 立即发送一次数据
h.sendQueuesEvent(w, flusher)
h.sendStatsEvent(w, flusher)
m.sendQueuesEvent(w, flusher)
m.sendStatsEvent(w, flusher)
for {
select {
case <-ctx.Done():
return
case <-h.closeCh:
case <-m.closeCh:
return
case <-statsTicker.C:
h.sendStatsEvent(w, flusher)
m.sendStatsEvent(w, flusher)
case <-queuesTicker.C:
h.sendQueuesEvent(w, flusher)
m.sendQueuesEvent(w, flusher)
}
}
}
// sendStatsEvent 发送统计图表数据(来自 SQLite
func (h *HTTPHandler) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher) {
// 获取最近的统计数据点(用于图表增量更新)
stats, err := getQueueStatsWithQuery(StatsQuery{Limit: 1})
// sendStatsEvent 发送统计图表数据
func (m *Monitor) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher) {
stats, err := m.inspector.QueryStats(inspector.StatsQuery{Limit: 1})
if err != nil || len(stats) == 0 {
return
}
@@ -541,32 +560,32 @@ func (h *HTTPHandler) sendStatsEvent(w http.ResponseWriter, flusher http.Flusher
flusher.Flush()
}
// sendQueuesEvent 发送队列表格数据(来自 Redis
func (h *HTTPHandler) sendQueuesEvent(w http.ResponseWriter, flusher http.Flusher) {
var queueInfos []QueueInfo
for queueName, priority := range queues {
stats, err := inspector.GetQueueInfo(queueName)
// sendQueuesEvent 发送队列表格数据
func (m *Monitor) sendQueuesEvent(w http.ResponseWriter, flusher http.Flusher) {
var queueInfos []queueInfoJSON
for queueName, priority := range m.queues {
info, err := m.inspector.GetQueueInfo(queueName)
if err != nil {
queueInfos = append(queueInfos, QueueInfo{
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
})
} else {
queueInfos = append(queueInfos, QueueInfo{
queueInfos = append(queueInfos, queueInfoJSON{
Name: queueName,
Priority: priority,
Size: stats.Size,
Active: stats.Active,
Pending: stats.Pending,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Completed: stats.Completed,
Processed: stats.Processed,
Failed: stats.Failed,
Paused: stats.Paused,
MemoryUsage: stats.MemoryUsage,
Latency: stats.Latency.Milliseconds(),
Size: info.Size,
Active: info.Active,
Pending: info.Pending,
Scheduled: info.Scheduled,
Retry: info.Retry,
Archived: info.Archived,
Completed: info.Completed,
Processed: info.Processed,
Failed: info.Failed,
Paused: info.Paused,
MemoryUsage: info.MemoryUsage,
Latency: info.LatencyMS,
})
}
}

View File

@@ -18,7 +18,9 @@ class TaskqApp extends LitElement {
endTime: { type: Number, state: true },
isLiveMode: { type: Boolean, state: true },
// Chart data
chartData: { type: Object, state: true }
chartData: { type: Object, state: true },
// View mode: 'chart' or 'table'
viewMode: { type: String, state: true }
};
constructor() {
@@ -35,6 +37,7 @@ class TaskqApp extends LitElement {
this.isLiveMode = true;
this.chartData = { labels: [], timestamps: [], datasets: {} };
this.eventSource = null;
this.viewMode = 'chart';
}
createRenderRoot() {
@@ -60,8 +63,17 @@ class TaskqApp extends LitElement {
initRoute() {
const path = window.location.pathname;
const relativePath = path.replace(this.rootPath, '').replace(/^\/+/, '');
// 检查是否是 queues 视图
if (relativePath === 'queues' || relativePath === 'queues/') {
this.viewMode = 'table';
return;
}
// 检查是否是具体队列详情
const match = relativePath.match(/^queues\/([^\/]+)\/([^\/]+)/);
if (match) {
this.viewMode = 'table';
this.currentQueue = decodeURIComponent(match[1]);
this.currentTab = match[2];
const params = new URLSearchParams(window.location.search);
@@ -72,10 +84,14 @@ class TaskqApp extends LitElement {
handlePopState(event) {
if (event.state && event.state.queue) {
this.viewMode = 'table';
this.currentQueue = event.state.queue;
this.currentTab = event.state.tab;
this.currentPage = event.state.page;
this.modalOpen = true;
} else if (event.state && event.state.view) {
this.viewMode = event.state.view;
this.modalOpen = false;
} else {
this.modalOpen = false;
}
@@ -339,7 +355,7 @@ class TaskqApp extends LitElement {
handleModalClose() {
this.modalOpen = false;
history.pushState({}, '', `${this.rootPath}/`);
history.pushState({ view: 'table' }, '', `${this.rootPath}/queues`);
}
handleTabChange(e) {
@@ -357,41 +373,75 @@ class TaskqApp extends LitElement {
history.pushState({ queue: this.currentQueue, tab: this.currentTab, page }, '', url);
}
handleViewChange(mode) {
this.viewMode = mode;
const url = mode === 'table' ? `${this.rootPath}/queues` : `${this.rootPath}/`;
history.pushState({ view: mode }, '', url);
}
async handleQueueUpdated() {
try {
const response = await fetch(`${this.rootPath}/api/queues`);
if (response.ok) {
this.queues = await response.json();
}
} catch (err) {
console.error('Failed to refresh queues:', err);
}
}
render() {
return html`
<div class="chart-card">
<div class="chart-header">
<span class="chart-title">Tasks Overview</span>
<time-range-picker
.duration=${this.duration}
.endTime=${this.endTime}
.isLiveMode=${this.isLiveMode}
@change=${this.handleTimeRangeChange}
></time-range-picker>
</div>
<div class="chart-container">
<tasks-chart
.data=${this.chartData}
.timestamps=${this.chartData.timestamps || []}
@time-range-select=${this.handleTimeRangeSelect}
></tasks-chart>
<div class="appbar">
<div class="appbar-title">TaskQ Monitor</div>
<div class="appbar-tabs">
<button
class="appbar-tab ${this.viewMode === 'chart' ? 'active' : ''}"
@click=${() => this.handleViewChange('chart')}
>Chart</button>
<button
class="appbar-tab ${this.viewMode === 'table' ? 'active' : ''}"
@click=${() => this.handleViewChange('table')}
>Queues</button>
</div>
</div>
<div class="table-card">
${this.loading ? html`
<div class="loading">
<div class="loading-spinner"></div>
<div>Loading...</div>
${this.viewMode === 'chart' ? html`
<div class="chart-card chart-fullheight">
<div class="chart-header">
<span class="chart-title">Tasks Overview</span>
<time-range-picker
.duration=${this.duration}
.endTime=${this.endTime}
.isLiveMode=${this.isLiveMode}
@change=${this.handleTimeRangeChange}
></time-range-picker>
</div>
` : html`
<queue-table
.queues=${this.queues}
.rootPath=${this.rootPath}
@queue-click=${this.handleQueueClick}
></queue-table>
`}
</div>
<div class="chart-container-large">
<tasks-chart
.data=${this.chartData}
.timestamps=${this.chartData.timestamps || []}
@time-range-select=${this.handleTimeRangeSelect}
></tasks-chart>
</div>
</div>
` : html`
<div class="table-card">
${this.loading ? html`
<div class="loading">
<div class="loading-spinner"></div>
<div>Loading...</div>
</div>
` : html`
<queue-table
.queues=${this.queues}
.rootPath=${this.rootPath}
@queue-click=${this.handleQueueClick}
@queue-updated=${this.handleQueueUpdated}
></queue-table>
`}
</div>
`}
<queue-modal
.open=${this.modalOpen}

View File

@@ -1,9 +1,21 @@
import { LitElement, html, css } from 'lit';
import { Chart, registerables } from 'chart.js';
import { Chart, registerables, Tooltip } from 'chart.js';
import './time-range-picker.js';
Chart.register(...registerables);
// 自定义 tooltip positioner显示在鼠标右下方
if (!Tooltip.positioners.cursor) {
Tooltip.positioners.cursor = function (elements, eventPosition, tooltip) {
return {
x: eventPosition.x + 20,
y: eventPosition.y + 15,
xAlign: 'left',
yAlign: 'top'
};
};
}
// 十字准星 + 拖拽选择插件(与 tasks-chart 共用逻辑)
const crosshairPlugin = {
id: 'queueCrosshair',
@@ -106,19 +118,20 @@ class QueueModal extends LitElement {
overflow: hidden;
}
.modal-header {
background: #515151;
padding: 16px 20px;
.appbar {
display: flex;
justify-content: space-between;
align-items: center;
background: #333;
padding: 0 20px;
height: 56px;
border-bottom: 1px solid #616161;
}
.modal-header h2 {
font-size: 1.1em;
font-weight: 500;
margin: 0;
.appbar-title {
font-size: 1.2em;
font-weight: 600;
color: #e0e0e0;
}
.close-btn {
@@ -136,13 +149,13 @@ class QueueModal extends LitElement {
}
.close-btn:hover {
background: #616161;
background: #424242;
color: #e0e0e0;
}
.modal-body {
padding: 20px;
height: calc(100vh - 60px);
height: calc(100vh - 56px);
overflow-y: auto;
}
@@ -370,6 +383,7 @@ class QueueModal extends LitElement {
canvas {
width: 100% !important;
height: 100% !important;
cursor: crosshair;
}
`;
@@ -584,7 +598,7 @@ class QueueModal extends LitElement {
position: 'bottom',
labels: { color: '#e0e0e0', padding: 10, usePointStyle: true, pointStyle: 'circle', font: { size: 11 } }
},
tooltip: { enabled: true, backgroundColor: 'rgba(30, 30, 30, 0.9)', titleColor: '#e0e0e0', bodyColor: '#e0e0e0' }
tooltip: { enabled: true, backgroundColor: 'rgba(30, 30, 30, 0.9)', titleColor: '#e0e0e0', bodyColor: '#e0e0e0', position: 'cursor', caretSize: 0 }
},
scales: {
x: { grid: { color: '#616161' }, ticks: { color: '#9e9e9e', maxTicksLimit: 8 } },
@@ -722,15 +736,7 @@ class QueueModal extends LitElement {
getTabCount(tab) {
if (!this.queueInfo) return 0;
switch (tab) {
case 'active': return this.queueInfo.Active || 0;
case 'pending': return this.queueInfo.Pending || 0;
case 'scheduled': return this.queueInfo.Scheduled || 0;
case 'retry': return this.queueInfo.Retry || 0;
case 'archived': return this.queueInfo.Archived || 0;
case 'completed': return this.queueInfo.Completed || 0;
default: return 0;
}
return this.queueInfo[tab] || 0;
}
handlePageClick(page) {
@@ -828,8 +834,8 @@ class QueueModal extends LitElement {
return html`
<div class="modal ${this.open ? 'open' : ''}">
<div class="modal-content">
<div class="modal-header">
<h2>Queue: ${this.queue}</h2>
<div class="appbar">
<div class="appbar-title">Queue: ${this.queue}</div>
<button class="close-btn" @click=${this.handleClose}>&times;</button>
</div>
<div class="modal-body">

View File

@@ -1,8 +1,18 @@
import { LitElement, html, css } from 'lit';
import { Chart, registerables } from 'chart.js';
import { Chart, registerables, Tooltip } from 'chart.js';
Chart.register(...registerables);
// 自定义 tooltip positioner显示在鼠标右下方
Tooltip.positioners.cursor = function (elements, eventPosition, tooltip) {
return {
x: eventPosition.x + 20,
y: eventPosition.y + 15,
xAlign: 'left',
yAlign: 'top'
};
};
// 十字准星 + 拖拽选择插件
const crosshairPlugin = {
id: 'crosshair',
@@ -77,6 +87,7 @@ class TasksChart extends LitElement {
canvas {
width: 100% !important;
height: 100% !important;
cursor: crosshair;
}
`;
@@ -207,7 +218,9 @@ class TasksChart extends LitElement {
borderColor: '#616161',
borderWidth: 1,
padding: 10,
displayColors: true
displayColors: true,
position: 'cursor',
caretSize: 0
}
},
scales: {

View File

@@ -9,7 +9,53 @@ body {
background-color: #424242;
color: #e0e0e0;
min-height: 100vh;
padding: 20px;
padding: 0;
}
/* AppBar */
.appbar {
display: flex;
justify-content: space-between;
align-items: center;
background: #333;
padding: 0 20px;
height: 56px;
border-bottom: 1px solid #616161;
position: sticky;
top: 0;
z-index: 100;
}
.appbar-title {
font-size: 1.2em;
font-weight: 600;
color: #e0e0e0;
}
.appbar-tabs {
display: flex;
gap: 4px;
}
.appbar-tab {
background: transparent;
border: none;
color: #9e9e9e;
padding: 8px 16px;
cursor: pointer;
font-size: 0.9em;
border-radius: 4px;
transition: background 0.2s, color 0.2s;
}
.appbar-tab:hover {
background: #424242;
color: #e0e0e0;
}
.appbar-tab.active {
background: #42a5f5;
color: #fff;
}
/* Chart Card */
@@ -18,7 +64,19 @@ body {
border-radius: 4px;
padding: 20px;
border: 1px solid #616161;
margin-bottom: 20px;
margin: 20px;
}
.chart-fullheight {
margin: 0;
border-radius: 0;
border-left: none;
border-right: none;
border-bottom: none;
height: calc(100vh - 56px);
display: flex;
flex-direction: column;
overflow: hidden;
}
.chart-header {
@@ -38,6 +96,12 @@ body {
position: relative;
}
.chart-container-large {
flex: 1;
position: relative;
overflow: hidden;
}
/* Time Range Picker */
.time-range-picker {
display: flex;
@@ -94,6 +158,7 @@ body {
border-radius: 4px;
border: 1px solid #616161;
overflow: hidden;
margin: 20px;
}
.queues-table {
@@ -200,7 +265,9 @@ body {
}
@keyframes spin {
to { transform: rotate(360deg); }
to {
transform: rotate(360deg);
}
}
.empty-state {