commit 39c6a65b87e393752480765e60947b1d2ccc7a51 Author: hupeh Date: Tue Dec 9 14:31:02 2025 +0800 初始提交:添加任务队列管理系统 diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..e7bd9b5 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,15 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.go] +indent_style = tab +indent_size = 4 + +[*.{md,yml,yaml,html}] +indent_style = space +indent_size = 2 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7125ccf --- /dev/null +++ b/.gitignore @@ -0,0 +1,51 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# IDE and editor files +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Log files +*.log + +# Environment files +.env +.env.local +.env.*.local + +# Build output +dist/ +build/ +bin/ + +# Temporary files +tmp/ +temp/ \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f49dca1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 TaskQ + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..265c02e --- /dev/null +++ b/Makefile @@ -0,0 +1,39 @@ +# Makefile for TaskQ + +.PHONY: test clean fmt lint install + +# Default target +all: fmt lint test + +# Run tests +test: + @echo "Running tests..." + go test -v ./... + +# Clean +clean: + @echo "Cleaning..." + go clean + +# Format code +fmt: + @echo "Formatting code..." + go fmt ./... + goimports -w . + +# Run linter +lint: + @echo "Running linter..." + golint ./... + +# Install dependencies +install: + @echo "Installing dependencies..." + go mod download + go mod tidy + +# Development setup +dev-setup: install + @echo "Setting up development environment..." + go install golang.org/x/tools/cmd/goimports@latest + go install golang.org/x/lint/golint@latest \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..0da485d --- /dev/null +++ b/README.md @@ -0,0 +1,47 @@ +# TaskQ - Task Queue Management System + +A Go-based task queue management system for efficient task processing and management. + +## Features + +- Task queue management +- Dashboard interface +- Task inspection and monitoring +- Concurrent task processing + +## Installation + +```bash +go mod download +``` + +## Usage + +```bash +go run . +``` + +## Project Structure + +- `taskq.go` - Main application entry point +- `task.go` - Task definition and management +- `inspect.go` - Task inspection utilities +- `dashboard.html` - Web dashboard interface +- `example/` - Example implementations + +## Development + +```bash +# Run the application +go run . + +# Run tests +go test ./... + +# Build +go build -o taskq +``` + +## License + +MIT License \ No newline at end of file diff --git a/dashboard.html b/dashboard.html new file mode 100644 index 0000000..791aa4d --- /dev/null +++ b/dashboard.html @@ -0,0 +1,420 @@ + + + + + + TaskQ 监控面板 + + + +
+
+

🚀 TaskQ 监控面板

+

实时监控异步任务队列状态

+
+
+ +
加载中...
+ + +
+
+ + + + + + + \ No newline at end of file diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..20a978c --- /dev/null +++ b/example/main.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "fmt" + "log" + "net/http" + "time" + + "code.tczkiot.com/wlw/taskq" + + "github.com/redis/go-redis/v9" +) + +// 定义任务数据结构 +type EmailTask struct { + UserID int `json:"user_id"` + TemplateID string `json:"template_id"` +} + +type ImageResizeTask struct { + SourceURL string `json:"source_url"` +} + +// 定义任务处理器 +func handleEmailTask(ctx context.Context, t EmailTask) error { + log.Printf("处理邮件任务: 用户ID=%d, 模板ID=%s", t.UserID, t.TemplateID) + // 模拟邮件发送逻辑 + return nil +} + +func handleImageResizeTask(ctx context.Context, t ImageResizeTask) error { + log.Printf("处理图片调整任务: 源URL=%s", t.SourceURL) + // 模拟图片调整逻辑 + return nil +} + +func main() { + // 创建 Redis 客户端 + rdb := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + DB: 1, + }) + defer rdb.Close() + + // 初始化 taskq + taskq.SetRedis(rdb) + taskq.Init() + + // 创建邮件任务 + emailTask := &taskq.Task[EmailTask]{ + Queue: "email", + Name: "email:deliver", + MaxRetries: 3, + Priority: 5, + TTR: 0, + Handler: handleEmailTask, + } + + // 创建图片调整任务 + imageTask := &taskq.Task[ImageResizeTask]{ + Queue: "image", + Name: "image:resize", + MaxRetries: 3, + Priority: 3, + TTR: 0, + Handler: handleImageResizeTask, + } + + // 注册任务 + if err := taskq.Register(emailTask); err != nil { + log.Fatal("注册邮件任务失败:", err) + } + if err := taskq.Register(imageTask); err != nil { + log.Fatal("注册图片任务失败:", err) + } + + // 创建监控处理器 + handler, err := taskq.NewInspectHandler(taskq.InspectOptions{ + RootPath: "/monitor", + ReadOnly: false, + }) + if err != nil { + log.Fatal("创建监控处理器失败:", err) + } + + // 启动 taskq 服务器 + ctx := context.Background() + go func() { + err := taskq.Start(ctx) + if err != nil { + log.Fatal("启动 taskq 服务器失败:", err) + } + }() + + // 定时发布任务 + go func() { + ticker := time.NewTicker(5 * time.Second) // 每5秒发布一次任务 + defer ticker.Stop() + + taskCounter := 0 + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + taskCounter++ + + // 发布邮件任务 + err := emailTask.Publish(ctx, EmailTask{ + UserID: taskCounter, + TemplateID: "welcome", + }) + if err != nil { + log.Printf("发布邮件任务失败: %v", err) + } else { + log.Printf("发布邮件任务成功: 用户ID=%d", taskCounter) + } + + // 发布图片调整任务 + err = imageTask.Publish(ctx, ImageResizeTask{ + SourceURL: fmt.Sprintf("https://example.com/image%d.jpg", taskCounter), + }) + if err != nil { + log.Printf("发布图片任务失败: %v", err) + } else { + log.Printf("发布图片任务成功: 任务ID=%d", taskCounter) + } + } + } + }() + + // 启动 HTTP 服务器提供监控界面 + log.Printf("启动监控服务器在 http://localhost:8080") + log.Fatal(http.ListenAndServe(":8080", handler)) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..aff2429 --- /dev/null +++ b/go.mod @@ -0,0 +1,20 @@ +module code.tczkiot.com/wlw/taskq + +go 1.25.4 + +require ( + github.com/hibiken/asynq v0.25.1 + github.com/redis/go-redis/v9 v9.7.0 + github.com/rs/xid v1.6.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/spf13/cast v1.7.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/time v0.8.0 // indirect + google.golang.org/protobuf v1.35.2 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b4a12a7 --- /dev/null +++ b/go.sum @@ -0,0 +1,38 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw= +github.com/hibiken/asynq v0.25.1/go.mod h1:pazWNOLBu0FEynQRBvHA26qdIKRSmfdIfUm4HdsLmXg= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= +github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/inspect.go b/inspect.go new file mode 100644 index 0000000..2f60238 --- /dev/null +++ b/inspect.go @@ -0,0 +1,332 @@ +// Package taskq 提供基于 Redis 的异步任务队列功能 +// inspect.go 文件包含任务队列的监控和检查功能 +package taskq + +import ( + _ "embed" + "encoding/json" + "fmt" + "html/template" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/hibiken/asynq" +) + +//go:embed dashboard.html +var dashboardHTML string + +// InspectOptions 配置监控服务的选项 +type InspectOptions struct { + // RootPath 监控服务的根路径 + // 默认为 "/monitor" + RootPath string + + // ReadOnly 是否只读模式,禁用所有修改操作 + // 默认为 false + ReadOnly bool +} + +// HTTPHandler 监控服务的 HTTP 处理器 +type HTTPHandler struct { + router *http.ServeMux + rootPath string + readOnly bool + inspector *asynq.Inspector +} + +// NewInspectHandler 创建新的监控处理器 +// 使用全局的 redisClient 创建 asynq.Inspector +func NewInspectHandler(opts InspectOptions) (*HTTPHandler, error) { + if redisClient == nil { + return nil, fmt.Errorf("taskq: redis client not initialized, call SetRedis() first") + } + + // 设置默认值 + if opts.RootPath == "" { + opts.RootPath = "/monitor" + } + + // 确保路径以 / 开头且不以 / 结尾 + if !strings.HasPrefix(opts.RootPath, "/") { + opts.RootPath = "/" + opts.RootPath + } + opts.RootPath = strings.TrimSuffix(opts.RootPath, "/") + + // 创建 asynq inspector + inspector := asynq.NewInspectorFromRedisClient(redisClient) + + handler := &HTTPHandler{ + router: http.NewServeMux(), + rootPath: opts.RootPath, + readOnly: opts.ReadOnly, + inspector: inspector, + } + + handler.setupRoutes() + return handler, nil +} + +// ServeHTTP 实现 http.Handler 接口 +func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.router.ServeHTTP(w, r) +} + +// RootPath 返回监控服务的根路径 +func (h *HTTPHandler) RootPath() string { + return h.rootPath +} + +// Close 关闭 inspector 连接 +func (h *HTTPHandler) Close() error { + return h.inspector.Close() +} + +// setupRoutes 设置路由 +func (h *HTTPHandler) setupRoutes() { + // API 路由 + apiPath := h.rootPath + "/api/" + h.router.HandleFunc(apiPath+"queues", h.handleQueues) + h.router.HandleFunc(apiPath+"queues/", h.handleQueueDetail) + h.router.HandleFunc(apiPath+"tasks/", h.handleTasks) + + // 主页路由 + h.router.HandleFunc(h.rootPath+"/", h.handleDashboard) + h.router.HandleFunc(h.rootPath, h.handleDashboard) +} + +// handleQueues 处理队列列表请求 +func (h *HTTPHandler) handleQueues(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 获取所有队列信息 + asynqQueues, err := h.inspector.Queues() + if err != nil { + http.Error(w, fmt.Sprintf("Failed to get queues: %v", err), http.StatusInternalServerError) + return + } + + fmt.Println("Redis中的队列:", asynqQueues) + fmt.Println("注册的队列:", queues) + + // 获取每个队列的详细信息 + type QueueInfo struct { + Name string `json:"name"` + Priority int `json:"priority"` + Active int `json:"active"` + Pending int `json:"pending"` + Retry int `json:"retry"` + Archived int `json:"archived"` + } + + var queueInfos []QueueInfo + + // 首先显示所有注册的队列(即使Redis中还没有任务) + for queueName, priority := range queues { + stats, err := h.inspector.GetQueueInfo(queueName) + if err != nil { + // 如果队列不存在,创建一个空的状态 + queueInfos = append(queueInfos, QueueInfo{ + Name: queueName, + Priority: priority, + Active: 0, + Pending: 0, + Retry: 0, + Archived: 0, + }) + } else { + queueInfos = append(queueInfos, QueueInfo{ + Name: queueName, + Priority: priority, + Active: stats.Active, + Pending: stats.Pending, + Retry: stats.Retry, + Archived: stats.Archived, + }) + } + } + + // 按优先级排序 + sort.Slice(queueInfos, func(i, j int) bool { + return queueInfos[i].Priority > queueInfos[j].Priority + }) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(queueInfos) +} + +// handleQueueDetail 处理队列详情请求 +func (h *HTTPHandler) handleQueueDetail(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 从 URL 中提取队列名称 + path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/queues/") + parts := strings.Split(path, "/") + if len(parts) == 0 || parts[0] == "" { + http.Error(w, "Queue name is required", http.StatusBadRequest) + return + } + queueName := parts[0] + + // 检查队列是否已注册 + if _, exists := queues[queueName]; !exists { + http.Error(w, "Queue not found", http.StatusNotFound) + return + } + + // 获取队列详细信息 + stats, err := h.inspector.GetQueueInfo(queueName) + if err != nil { + http.Error(w, fmt.Sprintf("Failed to get queue info: %v", err), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) +} + +// 转换任务信息 +type TaskInfo struct { + ID string `json:"id"` + Type string `json:"type"` + Payload string `json:"payload"` + Queue string `json:"queue"` + Retried int `json:"retried"` + LastFailed string `json:"last_failed,omitempty"` + LastError string `json:"last_error,omitempty"` + NextProcess string `json:"next_process,omitempty"` + CompletedAt string `json:"completed_at,omitempty"` +} + +// handleTasks 处理任务列表请求 +func (h *HTTPHandler) handleTasks(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 从 URL 中提取队列名称和任务状态 + path := strings.TrimPrefix(r.URL.Path, h.rootPath+"/api/tasks/") + parts := strings.Split(path, "/") + if len(parts) < 2 { + http.Error(w, "Queue name and task state are required", http.StatusBadRequest) + return + } + + queueName := parts[0] + taskState := parts[1] + + // 检查队列是否已注册 + if _, exists := queues[queueName]; !exists { + http.Error(w, "Queue not found", http.StatusNotFound) + return + } + + // 解析分页参数 + page := 1 + pageSize := 20 + if p := r.URL.Query().Get("page"); p != "" { + if parsed, err := strconv.Atoi(p); err == nil && parsed > 0 { + page = parsed + } + } + if ps := r.URL.Query().Get("page_size"); ps != "" { + if parsed, err := strconv.Atoi(ps); err == nil && parsed > 0 && parsed <= 100 { + pageSize = parsed + } + } + + // 根据任务状态获取任务列表 + var tasks []*asynq.TaskInfo + var err error + + switch taskState { + case "active": + tasks, err = h.inspector.ListActiveTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + case "pending": + tasks, err = h.inspector.ListPendingTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + case "retry": + tasks, err = h.inspector.ListRetryTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + case "archived": + tasks, err = h.inspector.ListArchivedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + case "completed": + tasks, err = h.inspector.ListCompletedTasks(queueName, asynq.PageSize(pageSize), asynq.Page(page-1)) + default: + http.Error(w, "Invalid task state. Valid states: active, pending, retry, archived, completed", http.StatusBadRequest) + return + } + + if err != nil { + http.Error(w, fmt.Sprintf("Failed to get tasks: %v", err), http.StatusInternalServerError) + return + } + + var taskInfos []TaskInfo + for _, task := range tasks { + info := TaskInfo{ + ID: task.ID, + Type: task.Type, + Payload: string(task.Payload), + Queue: task.Queue, + Retried: task.Retried, + } + + if !task.LastFailedAt.IsZero() { + info.LastFailed = task.LastFailedAt.Format(time.RFC3339) + } + if task.LastErr != "" { + info.LastError = task.LastErr + } + if !task.NextProcessAt.IsZero() { + info.NextProcess = task.NextProcessAt.Format(time.RFC3339) + } + if !task.CompletedAt.IsZero() { + info.CompletedAt = task.CompletedAt.Format(time.RFC3339) + } + + taskInfos = append(taskInfos, info) + } + + response := map[string]any{ + "tasks": taskInfos, + "page": page, + "page_size": pageSize, + "total": len(taskInfos), + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// handleDashboard 处理仪表板页面 +func (h *HTTPHandler) handleDashboard(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 使用嵌入的 HTML 模板 + tmpl, err := template.New("dashboard").Parse(dashboardHTML) + if err != nil { + http.Error(w, fmt.Sprintf("Template error: %v", err), http.StatusInternalServerError) + return + } + + data := struct { + RootPath string + }{ + RootPath: h.rootPath, + } + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + tmpl.Execute(w, data) +} diff --git a/task.go b/task.go new file mode 100644 index 0000000..1f1acf2 --- /dev/null +++ b/task.go @@ -0,0 +1,148 @@ +package taskq + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "reflect" + "time" + + "github.com/hibiken/asynq" + "github.com/rs/xid" +) + +// Task 定义泛型任务结构 +// T 表示任务数据的类型,必须是结构体 +type Task[T any] struct { + // 公开字段:用户配置 + Queue string // 任务队列名称 + Group string // 任务分组 + Name string // 任务名称,唯一标识 + MaxRetries int // 最大重试次数 + Priority int // 任务优先级(数值越大优先级越高) + TTR time.Duration // 任务超时时间(Time-To-Run) + Handler any // 处理器函数 + + // 私有字段:运行时反射信息 + funcValue reflect.Value // 处理器函数的反射值 + dataType reflect.Type // 数据类型的反射信息 + inputContext bool // 是否需要 context.Context 参数 + inputData bool // 是否需要数据参数 + returnError bool // 是否返回 error +} + +// PublishOption 任务发布选项函数类型 +// 用于配置任务发布时的各种选项 +type PublishOption func() asynq.Option + +// Delay 设置任务延迟执行时间 +// 参数 d 表示延迟多长时间后执行 +func Delay(d time.Duration) PublishOption { + return func() asynq.Option { + return asynq.ProcessIn(d) + } +} + +// DelayUntil 设置任务在指定时间执行 +// 参数 t 表示任务执行的具体时间点 +func DelayUntil(t time.Time) PublishOption { + return func() asynq.Option { + return asynq.ProcessAt(t) + } +} + +// TTR 设置任务超时时间 +// 覆盖任务默认的超时时间配置 +func TTR(d time.Duration) PublishOption { + return func() asynq.Option { + return asynq.Timeout(d) + } +} + +// Retention 设置任务结果保留时间 +// 任务执行完成后,结果在 Redis 中保留的时间 +func Retention(d time.Duration) PublishOption { + return func() asynq.Option { + return asynq.Retention(d) + } +} + +// Publish 发布任务到队列 +// 将任务数据序列化后发送到 Redis 队列中等待处理 +func (t *Task[T]) Publish(ctx context.Context, data T, options ...PublishOption) error { + // 获取 asynq 客户端 + c := client.Load() + if c == nil { + return errors.New("taskq: client not initialized, call SetRedis() first") + } + + // 序列化任务数据为 JSON + payload, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("taskq: failed to marshal task data: %w", err) + } + + // 构建任务选项 + opts := []asynq.Option{ + asynq.Queue(t.Queue), // 设置队列名称 + asynq.Group(t.Group), // 设置任务组 + asynq.MaxRetry(t.MaxRetries), // 设置最大重试次数 + asynq.TaskID(xid.New().String()), // 生成唯一任务ID + asynq.Timeout(t.TTR), // 设置超时时间 + asynq.Retention(time.Hour * 24), // 设置结果保留24小时 + } + + // 应用用户自定义选项 + for _, option := range options { + if opt := option(); opt != nil { + opts = append(opts, opt) + } + } + + // 发布任务到队列 + info, err := c.EnqueueContext( + ctx, + asynq.NewTask(t.Name, payload), + opts..., + ) + + // 记录任务发布信息(用于调试) + log.Println(info) + + return err +} + +// ProcessTask 处理任务的核心方法 +// 由 asynq 服务器调用,根据任务配置动态调用处理器函数 +func (t *Task[T]) ProcessTask(ctx context.Context, tsk *asynq.Task) error { + var in []reflect.Value + + // 根据配置添加 context.Context 参数 + if t.inputContext { + in = append(in, reflect.ValueOf(ctx)) + } + + // 根据配置添加数据参数 + if t.inputData { + // 创建数据类型的指针实例 + dataValue := reflect.New(t.dataType) + // 反序列化任务载荷 + err := json.Unmarshal(tsk.Payload(), dataValue.Interface()) + if err != nil { + return err + } + in = append(in, dataValue) + } + + // 通过反射调用处理器函数 + out := t.funcValue.Call(in) + + // 处理返回值 + if t.returnError { + // Register 已确保返回类型为 error,无需类型断言 + return out[0].Interface().(error) + } + return nil +} diff --git a/taskq.go b/taskq.go new file mode 100644 index 0000000..15fc51f --- /dev/null +++ b/taskq.go @@ -0,0 +1,184 @@ +// Package taskq 提供基于 Redis 的异步任务队列功能 +// 使用 asynq 库作为底层实现,支持任务注册、发布、消费和重试机制 +package taskq + +import ( + "context" + "errors" + "log" + "maps" + "reflect" + "sync/atomic" + "time" + + "github.com/hibiken/asynq" + "github.com/redis/go-redis/v9" +) + +// 全局状态变量 +var ( + started atomic.Bool // 服务器启动状态 + exit chan chan struct{} // 优雅退出信号通道 + handlers map[string]asynq.Handler // 任务处理器映射表 + queues map[string]int // 队列优先级配置 + client atomic.Pointer[asynq.Client] // asynq 客户端实例 + redisClient redis.UniversalClient // Redis 客户端实例 + errorType = reflect.TypeOf((*error)(nil)).Elem() // error 类型反射 + contextType = reflect.TypeOf((*context.Context)(nil)).Elem() // context.Context 类型反射 +) + +// Init 初始化 taskq 系统 +// 创建必要的全局变量和映射表,必须在调用其他函数之前调用 +func Init() { + exit = make(chan chan struct{}) // 创建优雅退出通道 + handlers = make(map[string]asynq.Handler) // 创建任务处理器映射 + queues = make(map[string]int) // 创建队列优先级映射 +} + +// Register 注册任务处理器 +// 使用泛型确保类型安全,通过反射验证处理器函数签名 +// 处理器函数签名必须是:func(context.Context, T) error 或 func(context.Context) 或 func(T) error 或 func() +func Register[T any](t *Task[T]) error { + rv := reflect.ValueOf(t.Handler) + if rv.Kind() != reflect.Func { + return errors.New("taskq: handler must be a function") + } + + rt := rv.Type() + + // 验证返回值:只能是 error 或无返回值 + var returnError bool + for i := range rt.NumOut() { + if i == 0 && rt.Out(0).Implements(errorType) { + returnError = true + } else { + return errors.New("taskq: handler function must return either error or nothing") + } + } + + // 验证参数:最多2个参数,第一个必须是 context.Context,第二个必须是结构体 + var inContext bool + var inData bool + var dataType reflect.Type + for i := range rt.NumIn() { + if i == 0 { + fi := rt.In(i) + if !fi.Implements(contextType) { + return errors.New("taskq: handler function first parameter must be context.Context") + } + inContext = true + continue + } + if i != 1 { + return errors.New("taskq: handler function can have at most 2 parameters") + } + fi := rt.In(i) + if fi.Kind() != reflect.Struct { + return errors.New("taskq: handler function second parameter must be a struct") + } + inData = true + dataType = fi + } + + // 检查服务器是否已启动 + if started.Load() { + return errors.New("taskq: cannot register handler after server has started") + } + + // 设置任务的反射信息 + t.funcValue = rv + t.dataType = dataType + t.inputContext = inContext + t.inputData = inData + t.returnError = returnError + + // 注册到全局映射表 + handlers[t.Name] = t + queues[t.Queue] = t.Priority + + return nil +} + +// SetRedis 设置 Redis 客户端 +// 必须在启动服务器之前调用,用于配置任务队列的存储后端 +func SetRedis(rdb redis.UniversalClient) error { + if started.Load() { + return errors.New("taskq: server is already running") + } + + redisClient = rdb + client.Store(asynq.NewClientFromRedisClient(rdb)) + + return nil +} + +// Start 启动 taskq 服务器 +// 开始监听任务队列并处理任务,包含健康检查和优雅退出机制 +func Start(ctx context.Context) error { + // 原子操作确保只启动一次 + if !started.CompareAndSwap(false, true) { + return errors.New("taskq: server is already running") + } + + // 检查 Redis 客户端是否已初始化 + if redisClient == nil { + return errors.New("taskq: redis client not initialized, call SetRedis() first") + } + + // 创建任务路由器 + mux := asynq.NewServeMux() + for name, handler := range handlers { + mux.Handle(name, handler) + } + + // 创建 asynq 服务器 + srv := asynq.NewServerFromRedisClient(redisClient, asynq.Config{ + Concurrency: 30, // 并发处理数 + Queues: maps.Clone(queues), // 队列配置 + BaseContext: func() context.Context { return ctx }, // 基础上下文 + LogLevel: asynq.DebugLevel, // 日志级别 + }) + + // 启动监控协程:处理优雅退出和健康检查 + ctx, cancel := context.WithCancel(ctx) + go func() { + defer cancel() + + ticker := time.NewTicker(time.Minute) // 每分钟健康检查 + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case exit := <-exit: // 收到退出信号 + srv.Stop() + exit <- struct{}{} + return + case <-ticker.C: // 定期健康检查 + err := srv.Ping() + if err != nil { + log.Println(err) + Stop() + } + } + } + }() + + // 启动任务处理服务器 + go func() { + if err := srv.Run(mux); err != nil { + log.Fatal(err) + } + }() + + return nil +} + +// Stop 优雅停止 taskq 服务器 +// 发送停止信号并等待服务器完全关闭 +func Stop() { + quit := make(chan struct{}) + exit <- quit // 发送退出信号 + <-quit // 等待确认退出 +}