feat: monitor 实现 Plugin 接口,优化关闭流程
- monitor 包导入 taskq,实现 Plugin 接口 - monitor 作为插件注册到 taskq.Configure() - 修复优雅关闭顺序:先关闭 SSE 连接,再关闭 HTTP 服务器,最后停止 taskq - 移除 main.go 中手动调用 cancel() 导致的阻塞问题
This commit is contained in:
@@ -89,20 +89,10 @@ func main() {
|
||||
Interval: 15 * time.Second,
|
||||
})
|
||||
|
||||
// 配置 taskq
|
||||
if err := taskq.Configure(taskq.Config{
|
||||
Redis: rdb,
|
||||
Tasks: []*taskq.Task{emailTask, imageTask},
|
||||
Plugins: []taskq.Plugin{ins, met},
|
||||
}); err != nil {
|
||||
log.Fatal("配置 taskq 失败:", err)
|
||||
}
|
||||
|
||||
// 创建监控服务
|
||||
servlet := taskq.Default()
|
||||
// 创建监控服务插件
|
||||
mon, err := monitor.New(monitor.Options{
|
||||
Inspector: ins,
|
||||
Queues: servlet.Queues(),
|
||||
Queues: map[string]int{"email": 5, "image": 3},
|
||||
RootPath: "/monitor",
|
||||
ReadOnly: false,
|
||||
})
|
||||
@@ -110,6 +100,15 @@ func main() {
|
||||
log.Fatal("创建监控服务失败:", err)
|
||||
}
|
||||
|
||||
// 配置 taskq(monitor 也作为插件注册)
|
||||
if err := taskq.Configure(taskq.Config{
|
||||
Redis: rdb,
|
||||
Tasks: []*taskq.Task{emailTask, imageTask},
|
||||
Plugins: []taskq.Plugin{ins, met, mon},
|
||||
}); err != nil {
|
||||
log.Fatal("配置 taskq 失败:", err)
|
||||
}
|
||||
|
||||
// 创建可取消的 context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -212,13 +211,10 @@ func main() {
|
||||
|
||||
log.Println("收到关闭信号,正在优雅关停...")
|
||||
|
||||
// 1. 取消 context,停止任务发布
|
||||
cancel()
|
||||
|
||||
// 2. 关闭监控服务(断开 SSE 连接)
|
||||
// 1. 先关闭 monitor 的 SSE 连接,否则 HTTP 服务器无法优雅关闭
|
||||
mon.Close()
|
||||
|
||||
// 3. 关闭 HTTP 服务器
|
||||
// 2. 关闭 HTTP 服务器
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer shutdownCancel()
|
||||
|
||||
@@ -226,7 +222,7 @@ func main() {
|
||||
log.Printf("HTTP 服务器关闭错误: %v", err)
|
||||
}
|
||||
|
||||
// 4. 停止 taskq 服务器(会自动调用插件的 OnStop)
|
||||
// 3. 停止 taskq 服务器(会自动调用所有插件的 Stop)
|
||||
taskq.Stop()
|
||||
|
||||
log.Println("服务已安全关闭")
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.tczkiot.com/wlw/taskq"
|
||||
"code.tczkiot.com/wlw/taskq/x/inspector"
|
||||
)
|
||||
|
||||
@@ -96,6 +97,26 @@ func (m *Monitor) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Name 返回插件名称
|
||||
func (m *Monitor) Name() string {
|
||||
return "monitor"
|
||||
}
|
||||
|
||||
// Init 初始化插件(实现 Plugin 接口)
|
||||
func (m *Monitor) Init(ctx *taskq.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start 启动插件(实现 Plugin 接口)
|
||||
func (m *Monitor) Start(ctx *taskq.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop 停止插件(实现 Plugin 接口)
|
||||
func (m *Monitor) Stop() error {
|
||||
return m.Close()
|
||||
}
|
||||
|
||||
// setupRoutes 设置路由
|
||||
func (m *Monitor) setupRoutes() {
|
||||
// API 路由
|
||||
|
||||
Reference in New Issue
Block a user