From 37b262eefb6bb413e63a6abe876082dfdce672bb Mon Sep 17 00:00:00 2001 From: hupeh Date: Wed, 10 Dec 2025 01:00:42 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20monitor=20=E5=AE=9E=E7=8E=B0=20Plugin?= =?UTF-8?q?=20=E6=8E=A5=E5=8F=A3=EF=BC=8C=E4=BC=98=E5=8C=96=E5=85=B3?= =?UTF-8?q?=E9=97=AD=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - monitor 包导入 taskq,实现 Plugin 接口 - monitor 作为插件注册到 taskq.Configure() - 修复优雅关闭顺序:先关闭 SSE 连接,再关闭 HTTP 服务器,最后停止 taskq - 移除 main.go 中手动调用 cancel() 导致的阻塞问题 --- example/main.go | 32 ++++++++++++++------------------ x/monitor/monitor.go | 21 +++++++++++++++++++++ 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/example/main.go b/example/main.go index 09fea28..ba42fd3 100644 --- a/example/main.go +++ b/example/main.go @@ -89,20 +89,10 @@ func main() { Interval: 15 * time.Second, }) - // 配置 taskq - if err := taskq.Configure(taskq.Config{ - Redis: rdb, - Tasks: []*taskq.Task{emailTask, imageTask}, - Plugins: []taskq.Plugin{ins, met}, - }); err != nil { - log.Fatal("配置 taskq 失败:", err) - } - - // 创建监控服务 - servlet := taskq.Default() + // 创建监控服务插件 mon, err := monitor.New(monitor.Options{ Inspector: ins, - Queues: servlet.Queues(), + Queues: map[string]int{"email": 5, "image": 3}, RootPath: "/monitor", ReadOnly: false, }) @@ -110,6 +100,15 @@ func main() { log.Fatal("创建监控服务失败:", err) } + // 配置 taskq(monitor 也作为插件注册) + if err := taskq.Configure(taskq.Config{ + Redis: rdb, + Tasks: []*taskq.Task{emailTask, imageTask}, + Plugins: []taskq.Plugin{ins, met, mon}, + }); err != nil { + log.Fatal("配置 taskq 失败:", err) + } + // 创建可取消的 context ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -212,13 +211,10 @@ func main() { log.Println("收到关闭信号,正在优雅关停...") - // 1. 取消 context,停止任务发布 - cancel() - - // 2. 关闭监控服务(断开 SSE 连接) + // 1. 先关闭 monitor 的 SSE 连接,否则 HTTP 服务器无法优雅关闭 mon.Close() - // 3. 关闭 HTTP 服务器 + // 2. 关闭 HTTP 服务器 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) defer shutdownCancel() @@ -226,7 +222,7 @@ func main() { log.Printf("HTTP 服务器关闭错误: %v", err) } - // 4. 停止 taskq 服务器(会自动调用插件的 OnStop) + // 3. 停止 taskq 服务器(会自动调用所有插件的 Stop) taskq.Stop() log.Println("服务已安全关闭") diff --git a/x/monitor/monitor.go b/x/monitor/monitor.go index bc2bb35..355e6c3 100644 --- a/x/monitor/monitor.go +++ b/x/monitor/monitor.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "code.tczkiot.com/wlw/taskq" "code.tczkiot.com/wlw/taskq/x/inspector" ) @@ -96,6 +97,26 @@ func (m *Monitor) Close() error { return nil } +// Name 返回插件名称 +func (m *Monitor) Name() string { + return "monitor" +} + +// Init 初始化插件(实现 Plugin 接口) +func (m *Monitor) Init(ctx *taskq.Context) error { + return nil +} + +// Start 启动插件(实现 Plugin 接口) +func (m *Monitor) Start(ctx *taskq.Context) error { + return nil +} + +// Stop 停止插件(实现 Plugin 接口) +func (m *Monitor) Stop() error { + return m.Close() +} + // setupRoutes 设置路由 func (m *Monitor) setupRoutes() { // API 路由