Files
taskq/example/main.go
hupeh 42cb0fa4c2 feat: 添加监控仪表盘
- 新增 Lit.js 组件化 UI (ui/ 目录)
  - tasks-chart: 带十字准星和拖拽选择的图表
  - queue-table: 队列列表,支持暂停/恢复
  - queue-modal: 队列详情弹窗,支持任务重试
  - time-range-picker: Prometheus 风格时间选择器
  - help-tooltip: 可复用的提示组件

- HTTPHandler 功能
  - SSE 实时推送 (stats + queues)
  - 队列暂停/恢复 API
  - 任务重试 API
  - 时间范围查询 API

- Inspector 改进
  - Prometheus 风格单表存储
  - 集成到 Start/Stop 生命周期
  - 新增 PauseQueue/UnpauseQueue/RunTask 方法

- 代码重构
  - Start 函数拆分为小函数
  - 优雅关闭流程优化

- 其他
  - 忽略 SQLite 数据库文件
  - example 添加延迟/定点任务示例
2025-12-09 19:58:18 +08:00

206 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"code.tczkiot.com/wlw/taskq"
"github.com/redis/go-redis/v9"
)
// 定义任务数据结构
type EmailTask struct {
UserID int `json:"user_id"`
TemplateID string `json:"template_id"`
}
type ImageResizeTask struct {
SourceURL string `json:"source_url"`
}
// 定义任务处理器
func handleEmailTask(ctx context.Context, t EmailTask) error {
log.Printf("处理邮件任务: 用户ID=%d, 模板ID=%s", t.UserID, t.TemplateID)
// 模拟邮件发送逻辑
return nil
}
func handleImageResizeTask(ctx context.Context, t ImageResizeTask) error {
log.Printf("处理图片调整任务: 源URL=%s", t.SourceURL)
// 模拟图片调整逻辑
return nil
}
func main() {
// 创建 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
DB: 1,
})
defer rdb.Close()
// 初始化 taskq
taskq.SetRedis(rdb)
taskq.Init()
// 创建邮件任务
emailTask := &taskq.Task[EmailTask]{
Queue: "email",
Name: "email:deliver",
MaxRetries: 3,
Priority: 5,
TTR: 0,
Handler: handleEmailTask,
}
// 创建图片调整任务
imageTask := &taskq.Task[ImageResizeTask]{
Queue: "image",
Name: "image:resize",
MaxRetries: 3,
Priority: 3,
TTR: 0,
Handler: handleImageResizeTask,
}
// 注册任务
if err := taskq.Register(emailTask); err != nil {
log.Fatal("注册邮件任务失败:", err)
}
if err := taskq.Register(imageTask); err != nil {
log.Fatal("注册图片任务失败:", err)
}
// 创建监控 HTTP 处理器
handler, err := taskq.NewHTTPHandler(taskq.HTTPHandlerOptions{
RootPath: "/monitor",
ReadOnly: false,
})
if err != nil {
log.Fatal("创建监控处理器失败:", err)
}
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动 taskq 服务器(包含统计采集器)
go func() {
err := taskq.Start(ctx, taskq.StartOptions{
StatsInterval: 2 * time.Second,
StatsDBPath: "./taskq_stats.db",
})
if err != nil {
log.Fatal("启动 taskq 服务器失败:", err)
}
}()
// 定时发布任务
go func() {
ticker := time.NewTicker(5 * time.Second) // 每5秒发布一次任务
defer ticker.Stop()
taskCounter := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
taskCounter++
// 发布即时邮件任务
err := emailTask.Publish(ctx, EmailTask{
UserID: taskCounter,
TemplateID: "welcome",
})
if err != nil {
log.Printf("发布邮件任务失败: %v", err)
} else {
log.Printf("发布邮件任务成功: 用户ID=%d", taskCounter)
}
// 发布延迟任务30秒后执行
err = emailTask.Publish(ctx, EmailTask{
UserID: taskCounter + 1000,
TemplateID: "reminder",
}, taskq.Delay(30*time.Second))
if err != nil {
log.Printf("发布延迟邮件任务失败: %v", err)
} else {
log.Printf("发布延迟邮件任务成功: 用户ID=%d (30秒后执行)", taskCounter+1000)
}
// 发布定点任务1分钟后的整点执行
scheduledTime := time.Now().Add(1 * time.Minute).Truncate(time.Minute)
err = imageTask.Publish(ctx, ImageResizeTask{
SourceURL: fmt.Sprintf("https://example.com/scheduled%d.jpg", taskCounter),
}, taskq.DelayUntil(scheduledTime))
if err != nil {
log.Printf("发布定点图片任务失败: %v", err)
} else {
log.Printf("发布定点图片任务成功: 任务ID=%d (在 %s 执行)", taskCounter, scheduledTime.Format("15:04:05"))
}
// 发布即时图片任务
err = imageTask.Publish(ctx, ImageResizeTask{
SourceURL: fmt.Sprintf("https://example.com/image%d.jpg", taskCounter),
})
if err != nil {
log.Printf("发布图片任务失败: %v", err)
} else {
log.Printf("发布图片任务成功: 任务ID=%d", taskCounter)
}
}
}
}()
// 创建 HTTP 服务器
server := &http.Server{
Addr: ":8081",
Handler: handler,
}
// 启动 HTTP 服务器(非阻塞)
go func() {
log.Printf("启动监控服务器在 http://localhost:8081")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal("HTTP 服务器错误:", err)
}
}()
// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("收到关闭信号,正在优雅关停...")
// 1. 取消 context停止任务发布
cancel()
// 2. 关闭监控 HTTP 处理器(会断开 SSE 连接)
handler.Close()
// 3. 关闭 HTTP 服务器(设置 5 秒超时)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
log.Printf("HTTP 服务器关闭错误: %v", err)
}
// 4. 停止 taskq 服务器(会等待完全关闭)
taskq.Stop()
log.Println("服务已安全关闭")
// rdb.Close() 由 defer 执行
}