Files
pipelinedb/examples/concurrent-processing/main.go
2025-09-30 15:05:56 +08:00

230 lines
5.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// 演示并发数据处理
package main
import (
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"
"code.tczkiot.com/wlw/pipelinedb"
"code.tczkiot.com/wlw/pipelinedb/examples/common"
)
// 模拟不同类型的数据源
type DataSource struct {
name string
group string
interval time.Duration
}
func main() {
// 创建临时数据库文件
dbFile := "concurrent_example.db"
defer os.Remove(dbFile)
// 确保文件可以创建
if _, err := os.Create(dbFile); err != nil {
log.Fatalf("创建数据库文件失败: %v", err)
}
fmt.Println("🚀 并发处理示例")
fmt.Println("================")
// 配置数据库
fmt.Println("\n📂 步骤1: 配置数据库")
config := &pipelinedb.Config{
CacheSize: 200, // 增大缓存以支持并发
}
// 创建处理器
handler := common.NewExampleHandler("并发处理")
pdb, err := pipelinedb.Open(pipelinedb.Options{
Filename: dbFile,
Config: config,
Handler: handler,
})
if err != nil {
log.Fatalf("打开数据库失败: %v", err)
}
defer pdb.Stop()
fmt.Println("✅ 数据库已配置")
// 定义数据源
dataSources := []DataSource{
{"Web服务器", "访问日志", 100 * time.Millisecond},
{"API网关", "API调用", 150 * time.Millisecond},
{"数据库", "慢查询", 300 * time.Millisecond},
{"缓存系统", "缓存命中", 50 * time.Millisecond},
{"消息队列", "消息处理", 200 * time.Millisecond},
}
// 启动并发数据生产者
fmt.Println("\n🏭 步骤2: 启动并发数据生产者")
var wg sync.WaitGroup
stopChan := make(chan bool)
// 统计计数器
var totalRecords int64
var mu sync.Mutex
for i, source := range dataSources {
wg.Add(1)
go func(id int, src DataSource) {
defer wg.Done()
fmt.Printf("🔄 启动生产者 %d: %s (组: %s)\n", id+1, src.name, src.group)
count := 0
for {
select {
case <-stopChan:
fmt.Printf("🛑 生产者 %d (%s) 停止,共生产 %d 条记录\n",
id+1, src.name, count)
return
default:
// 生成模拟数据
data := fmt.Sprintf("[%s] 时间戳: %d, 随机值: %d",
src.name, time.Now().Unix(), rand.Intn(1000))
metadata := fmt.Sprintf(`{"source": "%s", "producer_id": %d, "sequence": %d}`,
src.name, id+1, count+1)
// 发送数据
recordID, err := pdb.AcceptData(src.group, []byte(data), metadata)
if err != nil {
fmt.Printf("❌ 生产者 %d 发送失败: %v\n", id+1, err)
continue
}
// 更新计数
mu.Lock()
totalRecords++
count++
mu.Unlock()
if count%10 == 0 {
fmt.Printf("📊 生产者 %d (%s) 已生产 %d 条记录最新ID: %d\n",
id+1, src.name, count, recordID)
}
// 按间隔休眠
time.Sleep(src.interval)
}
}
}(i, source)
}
// 启动统计监控
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-stopChan:
return
case <-ticker.C:
// 获取实时统计
stats, err := pdb.GetStats()
if err != nil {
fmt.Printf("❌ 获取统计失败: %v\n", err)
continue
}
fmt.Printf("\n📈 实时统计 (时间: %s)\n", time.Now().Format("15:04:05"))
fmt.Printf(" 数据库总记录: %d\n", stats.TotalRecords)
fmt.Printf(" 生产者总计数: %d\n", totalRecords)
for group, groupStats := range stats.GroupStats {
fmt.Printf(" [%s] 热:%d 温:%d 冷:%d 总:%d\n",
group, groupStats.HotRecords, groupStats.WarmRecords,
groupStats.ColdRecords, groupStats.TotalRecords)
}
}
}
}()
// 运行一段时间
fmt.Println("\n⏳ 步骤3: 运行并发处理")
fmt.Println("正在并发处理数据运行10秒...")
time.Sleep(10 * time.Second)
// 停止所有生产者
fmt.Println("\n🛑 步骤4: 停止生产者")
close(stopChan)
wg.Wait()
// 最终统计
fmt.Println("\n📊 步骤5: 最终统计")
finalStats, err := pdb.GetStats()
if err != nil {
log.Fatalf("获取最终统计失败: %v", err)
}
fmt.Printf("🎯 最终结果:\n")
fmt.Printf(" 总记录数: %d\n", finalStats.TotalRecords)
fmt.Printf(" 总组数: %d\n", len(finalStats.GroupStats))
fmt.Println("\n📋 各组详细统计:")
for group, groupStats := range finalStats.GroupStats {
fmt.Printf(" %s:\n", group)
fmt.Printf(" 热数据: %d (%.1f%%)\n",
groupStats.HotRecords, float64(groupStats.HotRecords)/float64(groupStats.TotalRecords)*100)
fmt.Printf(" 温数据: %d (%.1f%%)\n",
groupStats.WarmRecords, float64(groupStats.WarmRecords)/float64(groupStats.TotalRecords)*100)
fmt.Printf(" 冷数据: %d (%.1f%%)\n",
groupStats.ColdRecords, float64(groupStats.ColdRecords)/float64(groupStats.TotalRecords)*100)
fmt.Printf(" 总计: %d\n", groupStats.TotalRecords)
}
// 测试并发查询
fmt.Println("\n🔍 步骤6: 测试并发查询")
var queryWg sync.WaitGroup
for _, source := range dataSources {
queryWg.Add(1)
go func(group string) {
defer queryWg.Done()
pageReq := &pipelinedb.PageRequest{
Page: 1,
PageSize: 5,
}
response, err := pdb.GetRecordsByGroup(group, pageReq)
if err != nil {
fmt.Printf("❌ 查询组 %s 失败: %v\n", group, err)
return
}
fmt.Printf("🔍 组 [%s] 查询结果: %d/%d 条记录\n",
group, len(response.Records), response.TotalCount)
for i, record := range response.Records {
if i < 2 { // 只显示前2条
dataStr := string(record.Data)
if len(dataStr) > 50 {
dataStr = dataStr[:50]
}
fmt.Printf(" 📄 ID:%d [%s] %s...\n",
record.ID, record.Status, dataStr)
}
}
}(source.group)
}
queryWg.Wait()
fmt.Println("\n🎉 并发处理示例完成!")
fmt.Println("💡 提示: 这个示例展示了Pipeline Database在高并发场景下的稳定性")
}