diff --git a/Makefile b/Makefile index 073e38e..47134ef 100644 --- a/Makefile +++ b/Makefile @@ -165,6 +165,11 @@ run-handler: ## 运行处理器示例 @echo "🎯 运行处理器示例..." @cd $(EXAMPLES_DIR)/external-handler && GOWORK=off $(GOCMD) run main.go +.PHONY: run-auto +run-auto: ## 运行自动处理示例 + @echo "🚀 运行自动处理示例..." + @cd $(EXAMPLES_DIR)/auto-processing && GOWORK=off $(GOCMD) run main.go + # ==================== 安全和质量 ==================== .PHONY: security diff --git a/examples/auto-processing/README.md b/examples/auto-processing/README.md new file mode 100644 index 0000000..ccaee11 --- /dev/null +++ b/examples/auto-processing/README.md @@ -0,0 +1,146 @@ +# 自动处理示例 + +这个示例展示了如何使用 Pipeline Database 的自动处理功能,实现数据的自动状态流转:**Hot → Warm → Cold**。 + +## 🎯 功能特性 + +### 📋 自动处理流程 +1. **数据插入** - 数据以 `Hot` 状态插入 +2. **自动预热** - 定时将 `Hot` 数据转为 `Warm` 状态 +3. **自动处理** - 定时将 `Warm` 数据转为 `Cold` 状态 +4. **组完成回调** - 组内所有数据处理完成后的回调 +5. **系统监控** - 实时监控 CPU 和内存使用情况 + +### 🔧 自定义处理器 +- **WillWarm** - Hot → Warm 转换时的处理逻辑 +- **WillCold** - Warm → Cold 转换时的处理逻辑 +- **OnComplete** - 组完成时的回调处理 + +## 🚀 运行示例 + +```bash +# 在项目根目录运行 +make run-auto + +# 或者直接运行 +cd examples/auto-processing +go run main.go +``` + +## 📊 示例输出 + +``` +🚀 Pipeline Database - 自动处理示例 +===================================== +📂 数据库文件: /tmp/auto_processing_xxx.db +⚙️ 配置: 预热间隔=2s, 处理间隔=3s, 批大小=5 + +🔄 启动自动处理... +📊 自动处理已启动,按 Ctrl+C 停止... +🔍 观察数据的自动状态流转: Hot → Warm → Cold + +📥 插入数据 - 组: orders, ID: 1, 数据: 订单数据_1 +📥 插入数据 - 组: users, ID: 1, 数据: 用户数据_2 +📥 插入数据 - 组: products, ID: 1, 数据: 产品数据_3 + +🔥 [AutoProcessor] 预热处理 - 组: orders, 数据: 订单数据_1 +✅ [AutoProcessor] 预热完成 - 组: orders, 处理后: WARM_订单数据_1 + +❄️ [AutoProcessor] 冷却处理 - 组: orders, 数据: WARM_订单数据_1 +✅ [AutoProcessor] 冷却完成 - 组: orders, 处理后: COLD_WARM_订单数据_1 + +📊 ===== 当前状态 ===== +📈 总记录数: 10, 总组数: 3 + 📋 orders: 🔥Hot:2 🌡️Warm:1 ❄️Cold:1 (总:4) + 📋 users: 🔥Hot:3 🌡️Warm:0 ❄️Cold:0 (总:3) + 📋 products: 🔥Hot:3 🌡️Warm:0 ❄️Cold:0 (总:3) +======================== + +💻 ===== 系统资源监控 ===== +🖥️ CPU 核心数: 14 +🧵 Goroutines: 8 +📊 内存使用: + 💾 已分配: 2.45 MB + 🏠 堆内存: 2.45 MB + 📚 栈内存: 0.06 MB + 🌐 系统内存: 12.34 MB +🗑️ 垃圾回收: + 🔄 GC 次数: 3 + ⏱️ 最近暂停: 0.12 ms + 📈 GC CPU 占比: 0.01% +📦 对象统计: + 🆕 分配对象数: 15432 + 🗑️ 释放对象数: 12890 + 📊 存活对象数: 2542 +============================ +``` + +## ⚙️ 配置说明 + +```go +config := &pipelinedb.Config{ + CacheSize: 50, // 页面缓存大小 + WarmInterval: 2 * time.Second, // 预热间隔:每2秒检查一次Hot数据 + ProcessInterval: 3 * time.Second, // 处理间隔:每3秒检查一次Warm数据 + BatchSize: 5, // 批处理大小:每次最多处理5条记录 +} +``` + +## 🔍 关键概念 + +### 数据状态流转 +- **Hot** - 新插入的数据,等待预热处理 +- **Warm** - 已预热的数据,等待最终处理 +- **Cold** - 已完成处理的数据,可以归档或清理 + +### 处理器接口 +```go +type Handler interface { + // Hot → Warm 转换时调用 + WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) + + // Warm → Cold 转换时调用 + WillCold(ctx context.Context, group string, data []byte) ([]byte, error) + + // 组完成时调用 + OnComplete(ctx context.Context, group string) error +} +``` + +## 💻 系统监控功能 + +示例包含了完整的系统资源监控,每10秒显示一次: + +### 📊 监控指标 + +- **🖥️ CPU 信息** - CPU 核心数 +- **🧵 并发信息** - Goroutine 数量 +- **💾 内存使用** - 已分配、堆内存、栈内存、系统内存 +- **🗑️ 垃圾回收** - GC 次数、暂停时间、CPU 占比 +- **📦 对象统计** - 分配、释放、存活对象数 + +### 🔍 监控价值 + +- **性能分析** - 观察内存使用趋势 +- **资源优化** - 识别内存泄漏和性能瓶颈 +- **并发监控** - 跟踪 Goroutine 数量变化 +- **GC 调优** - 分析垃圾回收效率 + +## 🎯 使用场景 + +- **数据管道处理** - ETL 流程中的数据转换 +- **批量数据处理** - 定时批处理任务 +- **数据生命周期管理** - 数据的自动归档和清理 +- **事件驱动处理** - 基于数据状态的业务逻辑触发 +- **性能监控** - 实时监控系统资源使用情况 + +## 🛑 优雅关闭 + +示例支持优雅关闭,按 `Ctrl+C` 可以安全停止处理: + +1. 停止数据生产 +2. 等待当前处理完成 +3. 关闭数据库连接 +4. 清理临时文件 + +这确保了数据的完整性和一致性。 diff --git a/examples/auto-processing/go.mod b/examples/auto-processing/go.mod new file mode 100644 index 0000000..5ffc23c --- /dev/null +++ b/examples/auto-processing/go.mod @@ -0,0 +1,9 @@ +module auto-processing + +go 1.24.0 + +replace code.tczkiot.com/wlw/pipelinedb => ../.. + +require code.tczkiot.com/wlw/pipelinedb v0.0.0-00010101000000-000000000000 + +require github.com/google/btree v1.1.3 // indirect diff --git a/examples/auto-processing/go.sum b/examples/auto-processing/go.sum new file mode 100644 index 0000000..3839d06 --- /dev/null +++ b/examples/auto-processing/go.sum @@ -0,0 +1,2 @@ +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= diff --git a/examples/auto-processing/main.go b/examples/auto-processing/main.go new file mode 100644 index 0000000..8941fae --- /dev/null +++ b/examples/auto-processing/main.go @@ -0,0 +1,269 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "runtime" + "syscall" + "time" + + "code.tczkiot.com/wlw/pipelinedb" +) + +// CustomHandler 自定义数据处理器 +type CustomHandler struct { + name string +} + +// NewCustomHandler 创建自定义处理器 +func NewCustomHandler(name string) *CustomHandler { + return &CustomHandler{name: name} +} + +// WillWarm 预热处理回调 (Hot -> Warm) +func (h *CustomHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) { + fmt.Printf("🔥 [%s] 预热处理 - 组: %s, 数据: %s\n", h.name, group, string(data)) + + // 模拟处理时间 + time.Sleep(100 * time.Millisecond) + + // 模拟处理逻辑 + if string(data) == "error_data" { + return nil, fmt.Errorf("处理失败: 无效数据") + } + + // 可以修改数据 + processedData := fmt.Sprintf("WARM_%s", string(data)) + fmt.Printf("✅ [%s] 预热完成 - 组: %s, 处理后: %s\n", h.name, group, processedData) + return []byte(processedData), nil +} + +// WillCold 冷却处理回调 (Warm -> Cold) +func (h *CustomHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) { + fmt.Printf("❄️ [%s] 冷却处理 - 组: %s, 数据: %s\n", h.name, group, string(data)) + + // 模拟更复杂的处理 + time.Sleep(200 * time.Millisecond) + + // 可以压缩或归档数据 + processedData := fmt.Sprintf("COLD_%s", string(data)) + fmt.Printf("✅ [%s] 冷却完成 - 组: %s, 处理后: %s\n", h.name, group, processedData) + return []byte(processedData), nil +} + +// OnComplete 组完成回调 +func (h *CustomHandler) OnComplete(ctx context.Context, group string) error { + fmt.Printf("🎉 [%s] 组完成回调 - 组: %s - 所有数据已处理完成\n", h.name, group) + return nil +} + +func main() { + fmt.Println("🚀 Pipeline Database - 自动处理示例") + fmt.Println("=====================================") + + // 创建临时数据库文件 + tmpFile, err := os.CreateTemp("", "auto_processing_*.db") + if err != nil { + log.Fatalf("创建临时文件失败: %v", err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + // 创建自定义处理器 + handler := NewCustomHandler("AutoProcessor") + + // 配置数据库 + config := &pipelinedb.Config{ + CacheSize: 50, // 缓存大小 + WarmInterval: 2 * time.Second, // 预热间隔:2秒 + ProcessInterval: 3 * time.Second, // 处理间隔:3秒 + BatchSize: 5, // 批处理大小 + } + + // 打开数据库 + pdb, err := pipelinedb.Open(pipelinedb.Options{ + Filename: tmpFile.Name(), + Handler: handler, + Config: config, + }) + if err != nil { + log.Fatalf("打开数据库失败: %v", err) + } + defer pdb.Stop() + + fmt.Printf("📂 数据库文件: %s\n", tmpFile.Name()) + fmt.Printf("⚙️ 配置: 预热间隔=%v, 处理间隔=%v, 批大小=%d\n\n", + config.WarmInterval, config.ProcessInterval, config.BatchSize) + + // 创建组数据事件通道 + groupEventCh := make(chan pipelinedb.GroupDataEvent, 100) + + // 启动自动处理 + fmt.Println("🔄 启动自动处理...") + err = pdb.Start(groupEventCh) + if err != nil { + log.Fatalf("启动自动处理失败: %v", err) + } + + // 设置优雅关闭 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 监听中断信号 + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // 启动数据生产者 + go dataProducer(ctx, pdb) + + // 启动状态监控 + go statusMonitor(ctx, pdb) + + // 启动系统资源监控 + go systemMonitor(ctx) + + fmt.Println("📊 自动处理已启动,按 Ctrl+C 停止...") + fmt.Println("🔍 观察数据的自动状态流转: Hot → Warm → Cold") + fmt.Println() + + // 等待中断信号 + <-sigChan + fmt.Println("\n🛑 接收到停止信号,正在优雅关闭...") + cancel() + + // 等待一段时间让处理完成 + time.Sleep(2 * time.Second) + fmt.Println("✅ 自动处理示例完成!") +} + +// dataProducer 数据生产者,持续产生数据 +func dataProducer(ctx context.Context, pdb *pipelinedb.PipelineDB) { + groups := []string{"orders", "users", "products"} + dataTemplates := []string{ + "订单数据_%d", + "用户数据_%d", + "产品数据_%d", + "分析数据_%d", + } + + counter := 1 + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // 选择组和数据模板 + group := groups[counter%len(groups)] + template := dataTemplates[counter%len(dataTemplates)] + data := fmt.Sprintf(template, counter) + + // 插入数据 + id, err := pdb.AcceptData(group, []byte(data), fmt.Sprintf("metadata_%d", counter)) + if err != nil { + fmt.Printf("❌ 插入数据失败: %v\n", err) + continue + } + + fmt.Printf("📥 插入数据 - 组: %s, ID: %d, 数据: %s\n", group, id, data) + counter++ + + // 每10条数据后暂停一下 + if counter%10 == 0 { + fmt.Println("⏸️ 暂停5秒,观察自动处理...") + time.Sleep(5 * time.Second) + } + } + } +} + +// statusMonitor 状态监控器,定期显示数据库状态 +func statusMonitor(ctx context.Context, pdb *pipelinedb.PipelineDB) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + stats, err := pdb.GetStats() + if err != nil { + fmt.Printf("❌ 获取统计失败: %v\n", err) + continue + } + + fmt.Println("\n📊 ===== 当前状态 =====") + fmt.Printf("📈 总记录数: %d, 总组数: %d\n", stats.TotalRecords, len(stats.GroupStats)) + + for group, groupStats := range stats.GroupStats { + fmt.Printf(" 📋 %s: 🔥Hot:%d 🌡️Warm:%d ❄️Cold:%d (总:%d)\n", + group, groupStats.HotRecords, groupStats.WarmRecords, + groupStats.ColdRecords, groupStats.TotalRecords) + } + fmt.Println("========================") + } + } +} + +// systemMonitor 系统资源监控器,定期显示 CPU 和内存使用情况 +func systemMonitor(ctx context.Context) { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + // 初始化时执行一次 GC + runtime.GC() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // 获取内存统计 + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + + // 获取 Goroutine 数量 + numGoroutines := runtime.NumGoroutine() + + // 获取 CPU 核心数 + numCPU := runtime.NumCPU() + + // 计算内存使用情况 + allocMB := float64(memStats.Alloc) / 1024 / 1024 + sysMB := float64(memStats.Sys) / 1024 / 1024 + heapMB := float64(memStats.HeapAlloc) / 1024 / 1024 + stackMB := float64(memStats.StackInuse) / 1024 / 1024 + + // 垃圾回收统计 + gcCount := memStats.NumGC + gcPauseMs := float64(memStats.PauseNs[(memStats.NumGC+255)%256]) / 1000000 + + fmt.Println("\n💻 ===== 系统资源监控 =====") + fmt.Printf("🖥️ CPU 核心数: %d\n", numCPU) + fmt.Printf("🧵 Goroutines: %d\n", numGoroutines) + fmt.Printf("📊 内存使用:\n") + fmt.Printf(" 💾 已分配: %.2f MB\n", allocMB) + fmt.Printf(" 🏠 堆内存: %.2f MB\n", heapMB) + fmt.Printf(" 📚 栈内存: %.2f MB\n", stackMB) + fmt.Printf(" 🌐 系统内存: %.2f MB\n", sysMB) + fmt.Printf("🗑️ 垃圾回收:\n") + fmt.Printf(" 🔄 GC 次数: %d\n", gcCount) + fmt.Printf(" ⏱️ 最近暂停: %.2f ms\n", gcPauseMs) + fmt.Printf(" 📈 GC CPU 占比: %.2f%%\n", memStats.GCCPUFraction*100) + + // 显示对象分配统计 + fmt.Printf("📦 对象统计:\n") + fmt.Printf(" 🆕 分配对象数: %d\n", memStats.Mallocs) + fmt.Printf(" 🗑️ 释放对象数: %d\n", memStats.Frees) + fmt.Printf(" 📊 存活对象数: %d\n", memStats.Mallocs-memStats.Frees) + + fmt.Println("============================") + } + } +} diff --git a/pipeline_db.go b/pipeline_db.go index f9ff2f0..1d06cbe 100644 --- a/pipeline_db.go +++ b/pipeline_db.go @@ -1393,11 +1393,18 @@ func (pdb *PipelineDB) performCooldown(record *DataRecord) error { // rebuildIndexes 重建所有索引(用于数据库重启后恢复) func (pdb *PipelineDB) rebuildIndexes() error { + // 如果没有根页面或总页数为0,跳过重建 + if pdb.header.RootPage == 0 || pdb.header.TotalPages <= 1 { + pdb.logger.Info("🔄 数据库为空,跳过索引重建") + return nil + } + pdb.logger.Info("🔄 重建索引...") // 遍历所有数据页重建索引 pdb.logger.Info("📊 开始扫描页面", "totalPages", pdb.header.TotalPages, "rootPage", pdb.header.RootPage) + recordCount := 0 // 从根页面开始扫描页链 for pageNo := pdb.header.RootPage; pageNo != 0 && pageNo < pdb.header.TotalPages; { // 读取页面 @@ -1417,9 +1424,13 @@ func (pdb *PipelineDB) rebuildIndexes() error { numSlots := binary.LittleEndian.Uint16(page[0:2]) // 正确的槽数量 freeOff := binary.LittleEndian.Uint16(page[2:4]) nextPage := binary.LittleEndian.Uint16(page[4:6]) - pdb.logger.Info("📄 扫描页面", "pageNo", pageNo, "numSlots", numSlots, "freeOff", freeOff, "nextPage", nextPage) + // 只在有数据时记录详细日志 + if numSlots > 0 { + pdb.logger.Info("📄 扫描页面", "pageNo", pageNo, "numSlots", numSlots, "freeOff", freeOff, "nextPage", nextPage) + } if numSlots == 0 { + pageNo = nextPage continue } @@ -1463,6 +1474,7 @@ func (pdb *PipelineDB) rebuildIndexes() error { // 添加到索引 idx := pdb.indexMgr.GetOrCreateIndex(record.Group) idx.Insert(int64(id), pageNo, slotNo) + recordCount++ } // 移动到下一个页面 @@ -1473,6 +1485,6 @@ func (pdb *PipelineDB) rebuildIndexes() error { pageNo = nextPageNo } - pdb.logger.Info("✅ 索引重建完成") + pdb.logger.Info("✅ 索引重建完成", "recordCount", recordCount) return nil }