Files

230 lines
5.7 KiB
Go
Raw Permalink Normal View History

2025-09-30 15:05:56 +08:00
// 演示并发数据处理
package main
import (
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"
"code.tczkiot.com/wlw/pipelinedb"
"code.tczkiot.com/wlw/pipelinedb/examples/common"
)
// 模拟不同类型的数据源
type DataSource struct {
name string
group string
interval time.Duration
}
func main() {
// 创建临时数据库文件
dbFile := "concurrent_example.db"
defer os.Remove(dbFile)
// 确保文件可以创建
if _, err := os.Create(dbFile); err != nil {
log.Fatalf("创建数据库文件失败: %v", err)
}
fmt.Println("🚀 并发处理示例")
fmt.Println("================")
// 配置数据库
fmt.Println("\n📂 步骤1: 配置数据库")
config := &pipelinedb.Config{
CacheSize: 200, // 增大缓存以支持并发
}
// 创建处理器
handler := common.NewExampleHandler("并发处理")
pdb, err := pipelinedb.Open(pipelinedb.Options{
Filename: dbFile,
Config: config,
Handler: handler,
})
if err != nil {
log.Fatalf("打开数据库失败: %v", err)
}
defer pdb.Stop()
fmt.Println("✅ 数据库已配置")
// 定义数据源
dataSources := []DataSource{
{"Web服务器", "访问日志", 100 * time.Millisecond},
{"API网关", "API调用", 150 * time.Millisecond},
{"数据库", "慢查询", 300 * time.Millisecond},
{"缓存系统", "缓存命中", 50 * time.Millisecond},
{"消息队列", "消息处理", 200 * time.Millisecond},
}
// 启动并发数据生产者
fmt.Println("\n🏭 步骤2: 启动并发数据生产者")
var wg sync.WaitGroup
stopChan := make(chan bool)
// 统计计数器
var totalRecords int64
var mu sync.Mutex
for i, source := range dataSources {
wg.Add(1)
go func(id int, src DataSource) {
defer wg.Done()
fmt.Printf("🔄 启动生产者 %d: %s (组: %s)\n", id+1, src.name, src.group)
count := 0
for {
select {
case <-stopChan:
fmt.Printf("🛑 生产者 %d (%s) 停止,共生产 %d 条记录\n",
id+1, src.name, count)
return
default:
// 生成模拟数据
data := fmt.Sprintf("[%s] 时间戳: %d, 随机值: %d",
src.name, time.Now().Unix(), rand.Intn(1000))
metadata := fmt.Sprintf(`{"source": "%s", "producer_id": %d, "sequence": %d}`,
src.name, id+1, count+1)
// 发送数据
recordID, err := pdb.AcceptData(src.group, []byte(data), metadata)
if err != nil {
fmt.Printf("❌ 生产者 %d 发送失败: %v\n", id+1, err)
continue
}
// 更新计数
mu.Lock()
totalRecords++
count++
mu.Unlock()
if count%10 == 0 {
fmt.Printf("📊 生产者 %d (%s) 已生产 %d 条记录最新ID: %d\n",
id+1, src.name, count, recordID)
}
// 按间隔休眠
time.Sleep(src.interval)
}
}
}(i, source)
}
// 启动统计监控
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-stopChan:
return
case <-ticker.C:
// 获取实时统计
stats, err := pdb.GetStats()
if err != nil {
fmt.Printf("❌ 获取统计失败: %v\n", err)
continue
}
fmt.Printf("\n📈 实时统计 (时间: %s)\n", time.Now().Format("15:04:05"))
fmt.Printf(" 数据库总记录: %d\n", stats.TotalRecords)
fmt.Printf(" 生产者总计数: %d\n", totalRecords)
for group, groupStats := range stats.GroupStats {
fmt.Printf(" [%s] 热:%d 温:%d 冷:%d 总:%d\n",
group, groupStats.HotRecords, groupStats.WarmRecords,
groupStats.ColdRecords, groupStats.TotalRecords)
}
}
}
}()
// 运行一段时间
fmt.Println("\n⏳ 步骤3: 运行并发处理")
fmt.Println("正在并发处理数据运行10秒...")
time.Sleep(10 * time.Second)
// 停止所有生产者
fmt.Println("\n🛑 步骤4: 停止生产者")
close(stopChan)
wg.Wait()
// 最终统计
fmt.Println("\n📊 步骤5: 最终统计")
finalStats, err := pdb.GetStats()
if err != nil {
log.Fatalf("获取最终统计失败: %v", err)
}
fmt.Printf("🎯 最终结果:\n")
fmt.Printf(" 总记录数: %d\n", finalStats.TotalRecords)
fmt.Printf(" 总组数: %d\n", len(finalStats.GroupStats))
fmt.Println("\n📋 各组详细统计:")
for group, groupStats := range finalStats.GroupStats {
fmt.Printf(" %s:\n", group)
fmt.Printf(" 热数据: %d (%.1f%%)\n",
groupStats.HotRecords, float64(groupStats.HotRecords)/float64(groupStats.TotalRecords)*100)
fmt.Printf(" 温数据: %d (%.1f%%)\n",
groupStats.WarmRecords, float64(groupStats.WarmRecords)/float64(groupStats.TotalRecords)*100)
fmt.Printf(" 冷数据: %d (%.1f%%)\n",
groupStats.ColdRecords, float64(groupStats.ColdRecords)/float64(groupStats.TotalRecords)*100)
fmt.Printf(" 总计: %d\n", groupStats.TotalRecords)
}
// 测试并发查询
fmt.Println("\n🔍 步骤6: 测试并发查询")
var queryWg sync.WaitGroup
for _, source := range dataSources {
queryWg.Add(1)
go func(group string) {
defer queryWg.Done()
pageReq := &pipelinedb.PageRequest{
Page: 1,
PageSize: 5,
}
response, err := pdb.GetRecordsByGroup(group, pageReq)
if err != nil {
fmt.Printf("❌ 查询组 %s 失败: %v\n", group, err)
return
}
fmt.Printf("🔍 组 [%s] 查询结果: %d/%d 条记录\n",
group, len(response.Records), response.TotalCount)
for i, record := range response.Records {
if i < 2 { // 只显示前2条
dataStr := string(record.Data)
if len(dataStr) > 50 {
dataStr = dataStr[:50]
}
fmt.Printf(" 📄 ID:%d [%s] %s...\n",
record.ID, record.Status, dataStr)
}
}
}(source.group)
}
queryWg.Wait()
fmt.Println("\n🎉 并发处理示例完成!")
fmt.Println("💡 提示: 这个示例展示了Pipeline Database在高并发场景下的稳定性")
}