package main import ( "context" "fmt" "log" "os" "os/signal" "runtime" "syscall" "time" "code.tczkiot.com/wlw/pipelinedb" ) // CustomHandler 自定义数据处理器 type CustomHandler struct { name string } // NewCustomHandler 创建自定义处理器 func NewCustomHandler(name string) *CustomHandler { return &CustomHandler{name: name} } // WillWarm 预热处理回调 (Hot -> Warm) func (h *CustomHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) { fmt.Printf("🔥 [%s] 预热处理 - 组: %s, 数据: %s\n", h.name, group, string(data)) // 模拟处理时间 time.Sleep(100 * time.Millisecond) // 模拟处理逻辑 if string(data) == "error_data" { return nil, fmt.Errorf("处理失败: 无效数据") } // 可以修改数据 processedData := fmt.Sprintf("WARM_%s", string(data)) fmt.Printf("✅ [%s] 预热完成 - 组: %s, 处理后: %s\n", h.name, group, processedData) return []byte(processedData), nil } // WillCold 冷却处理回调 (Warm -> Cold) func (h *CustomHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) { fmt.Printf("❄️ [%s] 冷却处理 - 组: %s, 数据: %s\n", h.name, group, string(data)) // 模拟更复杂的处理 time.Sleep(200 * time.Millisecond) // 可以压缩或归档数据 processedData := fmt.Sprintf("COLD_%s", string(data)) fmt.Printf("✅ [%s] 冷却完成 - 组: %s, 处理后: %s\n", h.name, group, processedData) return []byte(processedData), nil } // OnComplete 组完成回调 func (h *CustomHandler) OnComplete(ctx context.Context, group string) error { fmt.Printf("🎉 [%s] 组完成回调 - 组: %s - 所有数据已处理完成\n", h.name, group) return nil } func main() { fmt.Println("🚀 Pipeline Database - 自动处理示例") fmt.Println("=====================================") // 创建临时数据库文件 tmpFile, err := os.CreateTemp("", "auto_processing_*.db") if err != nil { log.Fatalf("创建临时文件失败: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() // 创建自定义处理器 handler := NewCustomHandler("AutoProcessor") // 配置数据库 config := &pipelinedb.Config{ CacheSize: 50, // 缓存大小 WarmInterval: 2 * time.Second, // 预热间隔:2秒 ProcessInterval: 3 * time.Second, // 处理间隔:3秒 BatchSize: 5, // 批处理大小 } // 打开数据库 pdb, err := pipelinedb.Open(pipelinedb.Options{ Filename: tmpFile.Name(), Handler: handler, Config: config, }) if err != nil { log.Fatalf("打开数据库失败: %v", err) } defer pdb.Stop() fmt.Printf("📂 数据库文件: %s\n", tmpFile.Name()) fmt.Printf("⚙️ 配置: 预热间隔=%v, 处理间隔=%v, 批大小=%d\n\n", config.WarmInterval, config.ProcessInterval, config.BatchSize) // 创建组数据事件通道 groupEventCh := make(chan pipelinedb.GroupDataEvent, 100) // 启动自动处理 fmt.Println("🔄 启动自动处理...") err = pdb.Start(groupEventCh) if err != nil { log.Fatalf("启动自动处理失败: %v", err) } // 设置优雅关闭 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 监听中断信号 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // 启动数据生产者 go dataProducer(ctx, pdb) // 启动状态监控 go statusMonitor(ctx, pdb) // 启动系统资源监控 go systemMonitor(ctx) fmt.Println("📊 自动处理已启动,按 Ctrl+C 停止...") fmt.Println("🔍 观察数据的自动状态流转: Hot → Warm → Cold") fmt.Println() // 等待中断信号 <-sigChan fmt.Println("\n🛑 接收到停止信号,正在优雅关闭...") cancel() // 等待一段时间让处理完成 time.Sleep(2 * time.Second) fmt.Println("✅ 自动处理示例完成!") } // dataProducer 数据生产者,持续产生数据 func dataProducer(ctx context.Context, pdb *pipelinedb.PipelineDB) { groups := []string{"orders", "users", "products"} dataTemplates := []string{ "订单数据_%d", "用户数据_%d", "产品数据_%d", "分析数据_%d", } counter := 1 ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: // 选择组和数据模板 group := groups[counter%len(groups)] template := dataTemplates[counter%len(dataTemplates)] data := fmt.Sprintf(template, counter) // 插入数据 id, err := pdb.AcceptData(group, []byte(data), fmt.Sprintf("metadata_%d", counter)) if err != nil { fmt.Printf("❌ 插入数据失败: %v\n", err) continue } fmt.Printf("📥 插入数据 - 组: %s, ID: %d, 数据: %s\n", group, id, data) counter++ // 每10条数据后暂停一下 if counter%10 == 0 { fmt.Println("⏸️ 暂停5秒,观察自动处理...") time.Sleep(5 * time.Second) } } } } // statusMonitor 状态监控器,定期显示数据库状态 func statusMonitor(ctx context.Context, pdb *pipelinedb.PipelineDB) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: stats, err := pdb.GetStats() if err != nil { fmt.Printf("❌ 获取统计失败: %v\n", err) continue } fmt.Println("\n📊 ===== 当前状态 =====") fmt.Printf("📈 总记录数: %d, 总组数: %d\n", stats.TotalRecords, len(stats.GroupStats)) for group, groupStats := range stats.GroupStats { fmt.Printf(" 📋 %s: 🔥Hot:%d 🌡️Warm:%d ❄️Cold:%d (总:%d)\n", group, groupStats.HotRecords, groupStats.WarmRecords, groupStats.ColdRecords, groupStats.TotalRecords) } fmt.Println("========================") } } } // systemMonitor 系统资源监控器,定期显示 CPU 和内存使用情况 func systemMonitor(ctx context.Context) { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() // 初始化时执行一次 GC runtime.GC() for { select { case <-ctx.Done(): return case <-ticker.C: // 获取内存统计 var memStats runtime.MemStats runtime.ReadMemStats(&memStats) // 获取 Goroutine 数量 numGoroutines := runtime.NumGoroutine() // 获取 CPU 核心数 numCPU := runtime.NumCPU() // 计算内存使用情况 allocMB := float64(memStats.Alloc) / 1024 / 1024 sysMB := float64(memStats.Sys) / 1024 / 1024 heapMB := float64(memStats.HeapAlloc) / 1024 / 1024 stackMB := float64(memStats.StackInuse) / 1024 / 1024 // 垃圾回收统计 gcCount := memStats.NumGC gcPauseMs := float64(memStats.PauseNs[(memStats.NumGC+255)%256]) / 1000000 fmt.Println("\n💻 ===== 系统资源监控 =====") fmt.Printf("🖥️ CPU 核心数: %d\n", numCPU) fmt.Printf("🧵 Goroutines: %d\n", numGoroutines) fmt.Printf("📊 内存使用:\n") fmt.Printf(" 💾 已分配: %.2f MB\n", allocMB) fmt.Printf(" 🏠 堆内存: %.2f MB\n", heapMB) fmt.Printf(" 📚 栈内存: %.2f MB\n", stackMB) fmt.Printf(" 🌐 系统内存: %.2f MB\n", sysMB) fmt.Printf("🗑️ 垃圾回收:\n") fmt.Printf(" 🔄 GC 次数: %d\n", gcCount) fmt.Printf(" ⏱️ 最近暂停: %.2f ms\n", gcPauseMs) fmt.Printf(" 📈 GC CPU 占比: %.2f%%\n", memStats.GCCPUFraction*100) // 显示对象分配统计 fmt.Printf("📦 对象统计:\n") fmt.Printf(" 🆕 分配对象数: %d\n", memStats.Mallocs) fmt.Printf(" 🗑️ 释放对象数: %d\n", memStats.Frees) fmt.Printf(" 📊 存活对象数: %d\n", memStats.Mallocs-memStats.Frees) fmt.Println("============================") } } }