Compare commits
2 Commits
f3a1b8060b
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 5f4ab7e7a1 | |||
| 38138a5236 |
207
README.md
207
README.md
@@ -1,47 +1,196 @@
|
||||
# TaskQ - Task Queue Management System
|
||||
# TaskQ - 基于 Redis 的异步任务队列系统
|
||||
|
||||
A Go-based task queue management system for efficient task processing and management.
|
||||
一个基于 Go 和 Redis 的异步任务队列管理系统,使用 asynq 库作为底层实现,支持任务注册、发布、消费和重试机制。
|
||||
|
||||
## Features
|
||||
## 特性
|
||||
|
||||
- Task queue management
|
||||
- Dashboard interface
|
||||
- Task inspection and monitoring
|
||||
- Concurrent task processing
|
||||
- 🚀 基于 Redis 的高性能任务队列
|
||||
- 📊 实时监控仪表板(Web UI)
|
||||
- 🔍 任务检查和调试工具
|
||||
- ⚡ 并发任务处理
|
||||
- 🔄 自动重试机制
|
||||
- 📈 Prometheus 指标集成
|
||||
- 🎯 灵活的任务优先级和分组
|
||||
- ⏰ 延迟任务和定时任务支持
|
||||
|
||||
## Installation
|
||||
## 快速开始
|
||||
|
||||
```bash
|
||||
go mod download
|
||||
### 基本使用
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"code.tczkiot.com/wlw/taskq"
|
||||
)
|
||||
|
||||
// 定义任务数据结构
|
||||
type EmailData struct {
|
||||
To string
|
||||
Subject string
|
||||
Body string
|
||||
}
|
||||
|
||||
// 定义任务处理器
|
||||
func sendEmail(ctx context.Context, data EmailData) error {
|
||||
log.Printf("发送邮件到 %s: %s", data.To, data.Subject)
|
||||
// 实际的邮件发送逻辑
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
// 配置 Redis 连接
|
||||
cfg := taskq.Config{
|
||||
Redis: &redis.Client{
|
||||
Addr: "localhost:6379",
|
||||
},
|
||||
}
|
||||
|
||||
// 注册任务
|
||||
emailTask := &taskq.Task{
|
||||
Name: "send-email",
|
||||
Queue: "email",
|
||||
MaxRetries: 3,
|
||||
TTR: 30 * time.Second,
|
||||
Handler: sendEmail,
|
||||
}
|
||||
|
||||
cfg.Tasks = []*taskq.Task{emailTask}
|
||||
|
||||
// 配置并启动
|
||||
taskq.Configure(cfg)
|
||||
if err := taskq.Init(context.Background()); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// 发布任务
|
||||
data := EmailData{
|
||||
To: "user@example.com",
|
||||
Subject: "欢迎使用 TaskQ",
|
||||
Body: "这是一个测试邮件",
|
||||
}
|
||||
|
||||
if err := emailTask.Publish(context.Background(), data); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// 启动服务器
|
||||
if err := taskq.Start(context.Background()); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Usage
|
||||
## 项目结构
|
||||
|
||||
```bash
|
||||
go run .
|
||||
```
|
||||
taskq/
|
||||
├── taskq.go # 主包入口,提供包级别 API
|
||||
├── servlet.go # Servlet 核心实现,生命周期管理
|
||||
├── task.go # Task 结构体和任务处理逻辑
|
||||
├── plugin.go # 插件系统接口
|
||||
├── x/ # 扩展组件
|
||||
│ ├── inspector/ # 任务检查工具
|
||||
│ ├── metrics/ # Prometheus 指标
|
||||
│ └── monitor/ # Web 监控界面
|
||||
├── example/ # 示例代码
|
||||
└── Makefile # 构建脚本
|
||||
```
|
||||
|
||||
## Project Structure
|
||||
## API 文档
|
||||
|
||||
- `taskq.go` - Main application entry point
|
||||
- `task.go` - Task definition and management
|
||||
- `inspect.go` - Task inspection utilities
|
||||
- `dashboard.html` - Web dashboard interface
|
||||
- `example/` - Example implementations
|
||||
### 包级别 API
|
||||
|
||||
## Development
|
||||
```go
|
||||
// 配置默认 Servlet
|
||||
taskq.Configure(cfg)
|
||||
|
||||
```bash
|
||||
# Run the application
|
||||
go run .
|
||||
// 初始化(必须先调用)
|
||||
taskq.Init(ctx)
|
||||
|
||||
# Run tests
|
||||
go test ./...
|
||||
// 启动服务器
|
||||
taskq.Start(ctx)
|
||||
|
||||
# Build
|
||||
go build -o taskq
|
||||
// 停止服务器
|
||||
taskq.Stop()
|
||||
```
|
||||
|
||||
## License
|
||||
### Servlet 实例 API
|
||||
|
||||
MIT License
|
||||
```go
|
||||
// 创建新实例
|
||||
servlet := taskq.New()
|
||||
|
||||
// 配置
|
||||
servlet.Configure(cfg)
|
||||
|
||||
// 初始化
|
||||
servlet.Init(ctx)
|
||||
|
||||
// 启动
|
||||
servlet.Start(ctx)
|
||||
|
||||
// 停止
|
||||
servlet.Stop()
|
||||
```
|
||||
|
||||
### 任务发布选项
|
||||
|
||||
```go
|
||||
// 延迟执行
|
||||
emailTask.Publish(ctx, data, taskq.Delay(5*time.Minute))
|
||||
|
||||
// 指定时间执行
|
||||
emailTask.Publish(ctx, data, taskq.DelayUntil(time.Now().Add(time.Hour)))
|
||||
|
||||
// 自定义超时
|
||||
emailTask.Publish(ctx, data, taskq.TTR(10*time.Second))
|
||||
|
||||
// 结果保留时间
|
||||
emailTask.Publish(ctx, data, taskq.Retention(48*time.Hour))
|
||||
```
|
||||
|
||||
## 监控界面
|
||||
|
||||
访问 `http://localhost:8080` 查看任务监控界面,包括:
|
||||
|
||||
- 实时任务状态
|
||||
- 队列统计信息
|
||||
- 任务执行历史
|
||||
- 性能指标图表
|
||||
|
||||
## 配置选项
|
||||
|
||||
### Redis 配置
|
||||
|
||||
```go
|
||||
cfg := taskq.Config{
|
||||
Redis: &redis.Client{
|
||||
Addr: "localhost:6379",
|
||||
Password: "",
|
||||
DB: 0,
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
### 任务配置
|
||||
|
||||
```go
|
||||
task := &taskq.Task{
|
||||
Name: "task-name", // 任务名称(唯一)
|
||||
Queue: "default", // 队列名称
|
||||
Group: "group-name", // 任务分组(可选)
|
||||
MaxRetries: 3, // 最大重试次数
|
||||
Priority: 1, // 优先级
|
||||
TTR: 30 * time.Second, // 超时时间
|
||||
Handler: handlerFunc, // 处理器函数
|
||||
}
|
||||
```
|
||||
|
||||
## 许可证
|
||||
|
||||
MIT License
|
||||
|
||||
103
x/inspector/inspector_test.go
Normal file
103
x/inspector/inspector_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package inspector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.tczkiot.com/wlw/taskq"
|
||||
"github.com/hibiken/asynq"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func TestNewDefaultsAndName(t *testing.T) {
|
||||
ins := New(Options{})
|
||||
if ins == nil {
|
||||
t.Fatalf("New returned nil")
|
||||
}
|
||||
if ins.opts.Interval <= 0 {
|
||||
t.Fatalf("expected default Interval > 0")
|
||||
}
|
||||
if ins.opts.DBPath == "" {
|
||||
t.Fatalf("expected default DBPath set")
|
||||
}
|
||||
if ins.Name() != "inspector" {
|
||||
t.Fatalf("unexpected Name: %s", ins.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func TestConvertTaskHelpers(t *testing.T) {
|
||||
// ensure convertTaskInfo/convertTaskList are callable
|
||||
_ = convertTaskInfo(&asynq.TaskInfo{})
|
||||
_ = convertTaskList([]*asynq.TaskInfo{})
|
||||
}
|
||||
|
||||
func TestGetQueueInfoWhenNotStarted(t *testing.T) {
|
||||
ins := New(Options{})
|
||||
if _, err := ins.GetQueueInfo("default"); err == nil {
|
||||
t.Fatalf("expected error when inspector not started")
|
||||
}
|
||||
}
|
||||
|
||||
func TestListActiveTasksWhenNotStarted(t *testing.T) {
|
||||
ins := New(Options{})
|
||||
if _, err := ins.ListActiveTasks("default", 10, 0); err == nil {
|
||||
t.Fatalf("expected error when inspector not started")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveMetrics_NoDB(t *testing.T) {
|
||||
ins := New(Options{})
|
||||
// db is nil by default; saveMetrics should return nil (no-op)
|
||||
s := Stats{Queue: "q", Timestamp: time.Now().Unix()}
|
||||
if err := ins.saveMetrics(s); err != nil {
|
||||
t.Fatalf("saveMetrics returned error with nil db: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// helper: create redis client for tests
|
||||
func makeTestRedis(t *testing.T) redis.UniversalClient {
|
||||
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 15})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
t.Skipf("redis not available: %v", err)
|
||||
}
|
||||
// flush test DB
|
||||
if err := rdb.FlushDB(context.Background()).Err(); err != nil {
|
||||
t.Fatalf("failed to flush redis: %v", err)
|
||||
}
|
||||
return rdb
|
||||
}
|
||||
|
||||
func TestPluginLifecycleWithServlet(t *testing.T) {
|
||||
rdb := makeTestRedis(t)
|
||||
// create plugin
|
||||
ins := New(Options{DBPath: ":memory:", Interval: time.Second})
|
||||
|
||||
// wire into a fresh servlet as default to use package-level helpers
|
||||
s := taskq.NewServlet()
|
||||
taskq.SetDefault(s)
|
||||
|
||||
cfg := taskq.Config{Redis: rdb, Tasks: []*taskq.Task{}, Plugins: []taskq.Plugin{ins}}
|
||||
if err := taskq.Configure(cfg); err != nil {
|
||||
t.Fatalf("Configure failed: %v", err)
|
||||
}
|
||||
|
||||
if err := taskq.Init(context.Background()); err != nil {
|
||||
t.Fatalf("Init failed: %v", err)
|
||||
}
|
||||
|
||||
if err := taskq.Start(context.Background()); err != nil {
|
||||
t.Fatalf("Start failed: %v", err)
|
||||
}
|
||||
|
||||
// request stop
|
||||
taskq.Stop()
|
||||
// wait for plugins and internal goroutines to shutdown
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// close redis
|
||||
rdb.Close()
|
||||
}
|
||||
68
x/metrics/metrics_test.go
Normal file
68
x/metrics/metrics_test.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.tczkiot.com/wlw/taskq"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func TestNewDefaultsAndName(t *testing.T) {
|
||||
m := New(Options{})
|
||||
if m == nil {
|
||||
t.Fatalf("New returned nil")
|
||||
}
|
||||
if m.opts.Namespace == "" {
|
||||
t.Fatalf("expected default Namespace")
|
||||
}
|
||||
if m.Name() != "metrics" {
|
||||
t.Fatalf("unexpected Name: %s", m.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectNoInspectorNoQueues(t *testing.T) {
|
||||
m := New(Options{Interval: time.Millisecond})
|
||||
// ensure collect() is safe to call when inspector or queues are nil
|
||||
m.collect()
|
||||
}
|
||||
|
||||
func makeTestRedis(t *testing.T) redis.UniversalClient {
|
||||
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 15})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
t.Skipf("redis not available: %v", err)
|
||||
}
|
||||
if err := rdb.FlushDB(context.Background()).Err(); err != nil {
|
||||
t.Fatalf("failed to flush redis: %v", err)
|
||||
}
|
||||
return rdb
|
||||
}
|
||||
|
||||
func TestMetricsLifecycleWithServlet(t *testing.T) {
|
||||
rdb := makeTestRedis(t)
|
||||
m := New(Options{Interval: time.Second})
|
||||
|
||||
// register plugin via default servlet
|
||||
s := taskq.NewServlet()
|
||||
taskq.SetDefault(s)
|
||||
|
||||
cfg := taskq.Config{Redis: rdb, Tasks: []*taskq.Task{}, Plugins: []taskq.Plugin{m}}
|
||||
if err := taskq.Configure(cfg); err != nil {
|
||||
t.Fatalf("Configure failed: %v", err)
|
||||
}
|
||||
|
||||
if err := taskq.Init(context.Background()); err != nil {
|
||||
t.Fatalf("Init failed: %v", err)
|
||||
}
|
||||
|
||||
if err := taskq.Start(context.Background()); err != nil {
|
||||
t.Fatalf("Start failed: %v", err)
|
||||
}
|
||||
|
||||
taskq.Stop()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
rdb.Close()
|
||||
}
|
||||
48
x/monitor/monitor_test.go
Normal file
48
x/monitor/monitor_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"code.tczkiot.com/wlw/taskq/x/inspector"
|
||||
)
|
||||
|
||||
func TestNewValidatesOptions(t *testing.T) {
|
||||
_, err := New(Options{})
|
||||
if err == nil {
|
||||
t.Fatalf("expected error when options missing")
|
||||
}
|
||||
|
||||
// valid case
|
||||
ins := &inspector.Inspector{}
|
||||
m, err := New(Options{Inspector: ins, Queues: map[string]int{"default": 1}})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if m.RootPath() != "/monitor" {
|
||||
t.Fatalf("unexpected root path: %s", m.RootPath())
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleIndexServesUI(t *testing.T) {
|
||||
ins := &inspector.Inspector{}
|
||||
m, err := New(Options{Inspector: ins, Queues: map[string]int{"default": 1}, RootPath: "/my"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/my/", nil)
|
||||
w := httptest.NewRecorder()
|
||||
m.ServeHTTP(w, req)
|
||||
|
||||
resp := w.Result()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("expected 200 OK, got %d", resp.StatusCode)
|
||||
}
|
||||
// body should contain the root path replacement
|
||||
body := w.Body.String()
|
||||
if body == "" {
|
||||
t.Fatalf("expected non-empty index body")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user