// 演示如何使用外部处理器进行数据处理 package main import ( "context" "fmt" "log" "os" "strings" "time" "code.tczkiot.com/wlw/pipelinedb" ) // EmailProcessor 邮件处理器示例 type EmailProcessor struct { name string processed int } func NewEmailProcessor(name string) *EmailProcessor { return &EmailProcessor{ name: name, processed: 0, } } // WillWarm 预热阶段:验证邮件格式 func (ep *EmailProcessor) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) { fmt.Printf("🔥 [%s] 预热处理 - 组: %s\n", ep.name, group) email := string(data) // 简单的邮件格式验证 if !strings.Contains(email, "@") { return nil, fmt.Errorf("无效的邮件格式: %s", email) } // 标准化邮件地址(转为小写) normalizedEmail := strings.ToLower(strings.TrimSpace(email)) fmt.Printf(" 📧 邮件验证通过: %s -> %s\n", email, normalizedEmail) // 模拟处理时间 time.Sleep(10 * time.Millisecond) return []byte(normalizedEmail), nil } // WillCold 冷却阶段:发送邮件通知 func (ep *EmailProcessor) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) { fmt.Printf("❄️ [%s] 冷却处理 - 组: %s\n", ep.name, group) email := string(data) // 模拟发送邮件 fmt.Printf(" 📮 发送邮件到: %s\n", email) fmt.Printf(" 📝 邮件内容: 您的数据已成功处理完成\n") // 模拟发送时间 time.Sleep(50 * time.Millisecond) ep.processed++ fmt.Printf(" ✅ 邮件发送成功 (已处理 %d 封邮件)\n", ep.processed) return data, nil } // OnComplete 完成回调:统计处理结果 func (ep *EmailProcessor) OnComplete(ctx context.Context, group string) error { fmt.Printf("🎉 [%s] 组处理完成 - 组: %s\n", ep.name, group) fmt.Printf(" 📊 本组共处理邮件: %d 封\n", ep.processed) // 重置计数器 ep.processed = 0 return nil } func main() { // 创建临时数据库文件 dbFile := "handler_example.db" defer os.Remove(dbFile) // 确保文件可以创建 if _, err := os.Create(dbFile); err != nil { log.Fatalf("创建数据库文件失败: %v", err) } fmt.Println("🚀 外部处理器示例") fmt.Println("==================") // 创建邮件处理器 emailHandler := NewEmailProcessor("邮件处理器") // 打开数据库并配置处理器 fmt.Println("\n📂 步骤1: 配置数据库和处理器") config := &pipelinedb.Config{ CacheSize: 50, } pdb, err := pipelinedb.Open(pipelinedb.Options{ Filename: dbFile, Config: config, Handler: emailHandler, }) if err != nil { log.Fatalf("打开数据库失败: %v", err) } defer pdb.Stop() fmt.Printf("✅ 数据库已配置外部处理器: %s\n", emailHandler.name) // 接收邮件数据 fmt.Println("\n📥 步骤2: 接收邮件数据") emails := []string{ "user1@example.com", "USER2@EXAMPLE.COM", // 测试大小写转换 "user3@test.org", "invalid-email", // 测试无效邮件 "admin@company.net", } for i, email := range emails { fmt.Printf("\n📧 处理邮件 %d: %s\n", i+1, email) recordID, err := pdb.AcceptData("邮件处理", []byte(email), fmt.Sprintf(`{"batch": %d, "source": "web_form"}`, i+1)) if err != nil { fmt.Printf(" ❌ 接收失败: %v\n", err) continue } fmt.Printf(" ✅ 记录ID: %d\n", recordID) } // 等待处理完成 fmt.Println("\n⏳ 步骤3: 等待处理完成") fmt.Println("正在处理数据,请稍候...") // 给处理器一些时间来处理数据 time.Sleep(2 * time.Second) // 查看处理结果 fmt.Println("\n🔍 步骤4: 查看处理结果") pageReq := &pipelinedb.PageRequest{ Page: 1, PageSize: 20, } response, err := pdb.GetRecordsByGroup("邮件处理", pageReq) if err != nil { log.Fatalf("查询记录失败: %v", err) } fmt.Printf("📊 处理结果统计:\n") fmt.Printf(" 总记录数: %d\n", response.TotalCount) statusCount := make(map[pipelinedb.DataStatus]int) for _, record := range response.Records { statusCount[record.Status]++ } fmt.Printf(" 状态分布:\n") fmt.Printf(" 热数据 (hot): %d\n", statusCount[pipelinedb.StatusHot]) fmt.Printf(" 温数据 (warm): %d\n", statusCount[pipelinedb.StatusWarm]) fmt.Printf(" 冷数据 (cold): %d\n", statusCount[pipelinedb.StatusCold]) // 显示详细记录 fmt.Println("\n📋 详细记录:") for _, record := range response.Records { fmt.Printf(" ID:%d [%s] %s\n", record.ID, record.Status, string(record.Data)) } fmt.Println("\n🎉 外部处理器示例完成!") fmt.Println("💡 提示: 查看上面的日志,可以看到数据如何通过预热->冷却的完整流程") }