diff --git a/example/main.go b/example/main.go index 09fea28..ba42fd3 100644 --- a/example/main.go +++ b/example/main.go @@ -89,20 +89,10 @@ func main() { Interval: 15 * time.Second, }) - // 配置 taskq - if err := taskq.Configure(taskq.Config{ - Redis: rdb, - Tasks: []*taskq.Task{emailTask, imageTask}, - Plugins: []taskq.Plugin{ins, met}, - }); err != nil { - log.Fatal("配置 taskq 失败:", err) - } - - // 创建监控服务 - servlet := taskq.Default() + // 创建监控服务插件 mon, err := monitor.New(monitor.Options{ Inspector: ins, - Queues: servlet.Queues(), + Queues: map[string]int{"email": 5, "image": 3}, RootPath: "/monitor", ReadOnly: false, }) @@ -110,6 +100,15 @@ func main() { log.Fatal("创建监控服务失败:", err) } + // 配置 taskq(monitor 也作为插件注册) + if err := taskq.Configure(taskq.Config{ + Redis: rdb, + Tasks: []*taskq.Task{emailTask, imageTask}, + Plugins: []taskq.Plugin{ins, met, mon}, + }); err != nil { + log.Fatal("配置 taskq 失败:", err) + } + // 创建可取消的 context ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -212,13 +211,10 @@ func main() { log.Println("收到关闭信号,正在优雅关停...") - // 1. 取消 context,停止任务发布 - cancel() - - // 2. 关闭监控服务(断开 SSE 连接) + // 1. 先关闭 monitor 的 SSE 连接,否则 HTTP 服务器无法优雅关闭 mon.Close() - // 3. 关闭 HTTP 服务器 + // 2. 关闭 HTTP 服务器 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) defer shutdownCancel() @@ -226,7 +222,7 @@ func main() { log.Printf("HTTP 服务器关闭错误: %v", err) } - // 4. 停止 taskq 服务器(会自动调用插件的 OnStop) + // 3. 停止 taskq 服务器(会自动调用所有插件的 Stop) taskq.Stop() log.Println("服务已安全关闭") diff --git a/x/monitor/monitor.go b/x/monitor/monitor.go index bc2bb35..355e6c3 100644 --- a/x/monitor/monitor.go +++ b/x/monitor/monitor.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "code.tczkiot.com/wlw/taskq" "code.tczkiot.com/wlw/taskq/x/inspector" ) @@ -96,6 +97,26 @@ func (m *Monitor) Close() error { return nil } +// Name 返回插件名称 +func (m *Monitor) Name() string { + return "monitor" +} + +// Init 初始化插件(实现 Plugin 接口) +func (m *Monitor) Init(ctx *taskq.Context) error { + return nil +} + +// Start 启动插件(实现 Plugin 接口) +func (m *Monitor) Start(ctx *taskq.Context) error { + return nil +} + +// Stop 停止插件(实现 Plugin 接口) +func (m *Monitor) Stop() error { + return m.Close() +} + // setupRoutes 设置路由 func (m *Monitor) setupRoutes() { // API 路由