Files
taskq/example/main.go
hupeh 37b262eefb feat: monitor 实现 Plugin 接口,优化关闭流程
- monitor 包导入 taskq,实现 Plugin 接口
- monitor 作为插件注册到 taskq.Configure()
- 修复优雅关闭顺序:先关闭 SSE 连接,再关闭 HTTP 服务器,最后停止 taskq
- 移除 main.go 中手动调用 cancel() 导致的阻塞问题
2025-12-10 01:00:42 +08:00

230 lines
5.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"code.tczkiot.com/wlw/taskq"
"code.tczkiot.com/wlw/taskq/x/inspector"
"code.tczkiot.com/wlw/taskq/x/metrics"
"code.tczkiot.com/wlw/taskq/x/monitor"
_ "github.com/mattn/go-sqlite3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/go-redis/v9"
)
// 定义任务数据结构
type EmailTask struct {
UserID int `json:"user_id"`
TemplateID string `json:"template_id"`
}
type ImageResizeTask struct {
SourceURL string `json:"source_url"`
}
// 定义任务处理器
func handleEmailTask(ctx context.Context, t EmailTask) error {
log.Printf("处理邮件任务: 用户ID=%d, 模板ID=%s", t.UserID, t.TemplateID)
return nil
}
func handleImageResizeTask(ctx context.Context, t ImageResizeTask) error {
log.Printf("处理图片调整任务: 源URL=%s", t.SourceURL)
return nil
}
var (
redisAddr = flag.String("redis", "127.0.0.1:6379", "Redis 地址")
redisDB = flag.Int("redis-db", 1, "Redis 数据库")
httpAddr = flag.String("http", ":8081", "HTTP 服务地址")
dbPath = flag.String("db", "./taskq_stats.db", "SQLite 数据库路径")
)
func main() {
flag.Parse()
// 创建 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: *redisAddr,
DB: *redisDB,
})
defer rdb.Close()
// 创建邮件任务
emailTask := &taskq.Task{
Queue: "email",
Name: "email:deliver",
MaxRetries: 3,
Priority: 5,
Handler: handleEmailTask,
}
// 创建图片调整任务
imageTask := &taskq.Task{
Queue: "image",
Name: "image:resize",
MaxRetries: 3,
Priority: 3,
Handler: handleImageResizeTask,
}
// 创建 Inspector 插件(用于监控仪表盘)
ins := inspector.New(inspector.Options{
Interval: 2 * time.Second,
DBPath: *dbPath,
})
// 创建 Metrics 插件(用于 Prometheus
met := metrics.New(metrics.Options{
Namespace: "taskq",
Interval: 15 * time.Second,
})
// 创建监控服务插件
mon, err := monitor.New(monitor.Options{
Inspector: ins,
Queues: map[string]int{"email": 5, "image": 3},
RootPath: "/monitor",
ReadOnly: false,
})
if err != nil {
log.Fatal("创建监控服务失败:", err)
}
// 配置 taskqmonitor 也作为插件注册)
if err := taskq.Configure(taskq.Config{
Redis: rdb,
Tasks: []*taskq.Task{emailTask, imageTask},
Plugins: []taskq.Plugin{ins, met, mon},
}); err != nil {
log.Fatal("配置 taskq 失败:", err)
}
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 初始化 taskq初始化所有插件
if err := taskq.Init(ctx); err != nil {
log.Fatal("初始化 taskq 失败:", err)
}
// 启动 taskq 服务器(启动所有插件)
if err := taskq.Start(ctx); err != nil {
log.Fatal("启动 taskq 服务器失败:", err)
}
// 定时发布任务
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
taskCounter := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
taskCounter++
// 发布即时邮件任务
err := emailTask.Publish(ctx, EmailTask{
UserID: taskCounter,
TemplateID: "welcome",
})
if err != nil {
log.Printf("发布邮件任务失败: %v", err)
} else {
log.Printf("发布邮件任务成功: 用户ID=%d", taskCounter)
}
// 发布延迟任务30秒后执行
err = emailTask.Publish(ctx, EmailTask{
UserID: taskCounter + 1000,
TemplateID: "reminder",
}, taskq.Delay(30*time.Second))
if err != nil {
log.Printf("发布延迟邮件任务失败: %v", err)
} else {
log.Printf("发布延迟邮件任务成功: 用户ID=%d (30秒后执行)", taskCounter+1000)
}
// 发布定点任务1分钟后的整点执行
scheduledTime := time.Now().Add(1 * time.Minute).Truncate(time.Minute)
err = imageTask.Publish(ctx, ImageResizeTask{
SourceURL: fmt.Sprintf("https://example.com/scheduled%d.jpg", taskCounter),
}, taskq.DelayUntil(scheduledTime))
if err != nil {
log.Printf("发布定点图片任务失败: %v", err)
} else {
log.Printf("发布定点图片任务成功: 任务ID=%d (在 %s 执行)", taskCounter, scheduledTime.Format("15:04:05"))
}
// 发布即时图片任务
err = imageTask.Publish(ctx, ImageResizeTask{
SourceURL: fmt.Sprintf("https://example.com/image%d.jpg", taskCounter),
})
if err != nil {
log.Printf("发布图片任务失败: %v", err)
} else {
log.Printf("发布图片任务成功: 任务ID=%d", taskCounter)
}
}
}
}()
// 创建 HTTP 路由
mux := http.NewServeMux()
mux.Handle("/monitor/", mon)
mux.Handle("/metrics", promhttp.Handler())
// 创建 HTTP 服务器
server := &http.Server{
Addr: *httpAddr,
Handler: mux,
}
// 启动 HTTP 服务器
go func() {
log.Printf("启动服务器在 http://localhost%s", *httpAddr)
log.Printf(" - 监控仪表盘: http://localhost%s/monitor", *httpAddr)
log.Printf(" - Prometheus: http://localhost%s/metrics", *httpAddr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal("HTTP 服务器错误:", err)
}
}()
// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("收到关闭信号,正在优雅关停...")
// 1. 先关闭 monitor 的 SSE 连接,否则 HTTP 服务器无法优雅关闭
mon.Close()
// 2. 关闭 HTTP 服务器
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
log.Printf("HTTP 服务器关闭错误: %v", err)
}
// 3. 停止 taskq 服务器(会自动调用所有插件的 Stop
taskq.Stop()
log.Println("服务已安全关闭")
}