// 高并发压力测试示例 package main import ( "fmt" "log" "math/rand" "os" "runtime" "sync" "sync/atomic" "time" "code.tczkiot.com/wlw/pipelinedb" "code.tczkiot.com/wlw/pipelinedb/examples/common" ) // 性能指标 type PerformanceMetrics struct { TotalOperations int64 SuccessfulWrites int64 FailedWrites int64 SuccessfulReads int64 FailedReads int64 StartTime time.Time EndTime time.Time } // 工作负载配置 type WorkloadConfig struct { NumWriters int // 写入goroutine数量 NumReaders int // 读取goroutine数量 WritesPerSecond int // 每秒写入次数 ReadsPerSecond int // 每秒读取次数 TestDuration time.Duration // 测试持续时间 DataSize int // 每条记录的数据大小 NumGroups int // 数据组数量 } func main() { // 设置随机种子 rand.Seed(time.Now().UnixNano()) // 创建临时数据库文件 dbFile := "high_concurrency_test.db" defer os.Remove(dbFile) // 确保文件可以创建 if _, err := os.Create(dbFile); err != nil { log.Fatalf("创建数据库文件失败: %v", err) } fmt.Println("🚀 高并发压力测试") fmt.Println("==================") fmt.Printf("🖥️ 系统信息: %d CPU核心, %s\n", runtime.NumCPU(), runtime.Version()) // 配置工作负载 config := WorkloadConfig{ NumWriters: 50, // 50个写入goroutine NumReaders: 20, // 20个读取goroutine WritesPerSecond: 1000, // 每秒1000次写入 ReadsPerSecond: 500, // 每秒500次读取 TestDuration: 30 * time.Second, // 运行30秒 DataSize: 256, // 256字节数据 NumGroups: 10, // 10个数据组 } fmt.Printf("📊 测试配置:\n") fmt.Printf(" 写入goroutine: %d\n", config.NumWriters) fmt.Printf(" 读取goroutine: %d\n", config.NumReaders) fmt.Printf(" 目标写入QPS: %d\n", config.WritesPerSecond) fmt.Printf(" 目标读取QPS: %d\n", config.ReadsPerSecond) fmt.Printf(" 测试时长: %v\n", config.TestDuration) fmt.Printf(" 数据大小: %d bytes\n", config.DataSize) fmt.Printf(" 数据组数: %d\n", config.NumGroups) // 配置数据库 fmt.Println("\n📂 步骤1: 初始化数据库") dbConfig := &pipelinedb.Config{ CacheSize: 1000, // 大缓存支持高并发 WarmInterval: 5 * time.Second, // 较长的预热间隔 ProcessInterval: 10 * time.Second, // 较长的处理间隔 BatchSize: 100, // 大批次处理 } // 创建处理器 handler := common.NewLoggingHandler() pdb, err := pipelinedb.Open(pipelinedb.Options{ Filename: dbFile, Config: dbConfig, Handler: handler, }) if err != nil { log.Fatalf("打开数据库失败: %v", err) } defer pdb.Stop() fmt.Println("✅ 数据库已初始化") // 性能指标 metrics := &PerformanceMetrics{ StartTime: time.Now(), } // 控制通道 stopChan := make(chan struct{}) var wg sync.WaitGroup // 启动写入goroutine fmt.Println("\n🏭 步骤2: 启动写入压力测试") writeInterval := time.Duration(int64(time.Second) / int64(config.WritesPerSecond/config.NumWriters)) for i := 0; i < config.NumWriters; i++ { wg.Add(1) go func(writerID int) { defer wg.Done() ticker := time.NewTicker(writeInterval) defer ticker.Stop() localWrites := 0 for { select { case <-stopChan: fmt.Printf("📝 写入器 %d 停止,完成 %d 次写入\n", writerID, localWrites) return case <-ticker.C: // 生成随机数据 groupID := rand.Intn(config.NumGroups) groupName := fmt.Sprintf("group_%d", groupID) data := make([]byte, config.DataSize) for j := range data { data[j] = byte(rand.Intn(256)) } metadata := fmt.Sprintf(`{"writer_id": %d, "timestamp": %d, "sequence": %d}`, writerID, time.Now().UnixNano(), localWrites) // 执行写入 _, err := pdb.AcceptData(groupName, data, metadata) atomic.AddInt64(&metrics.TotalOperations, 1) if err != nil { atomic.AddInt64(&metrics.FailedWrites, 1) } else { atomic.AddInt64(&metrics.SuccessfulWrites, 1) localWrites++ } } } }(i) } // 启动读取goroutine fmt.Println("🔍 启动读取压力测试") readInterval := time.Duration(int64(time.Second) / int64(config.ReadsPerSecond/config.NumReaders)) for i := 0; i < config.NumReaders; i++ { wg.Add(1) go func(readerID int) { defer wg.Done() ticker := time.NewTicker(readInterval) defer ticker.Stop() localReads := 0 for { select { case <-stopChan: fmt.Printf("📖 读取器 %d 停止,完成 %d 次读取\n", readerID, localReads) return case <-ticker.C: // 随机选择组进行查询 groupID := rand.Intn(config.NumGroups) groupName := fmt.Sprintf("group_%d", groupID) pageReq := &pipelinedb.PageRequest{ Page: 1, PageSize: 10, } // 执行读取 _, err := pdb.GetRecordsByGroup(groupName, pageReq) atomic.AddInt64(&metrics.TotalOperations, 1) if err != nil { atomic.AddInt64(&metrics.FailedReads, 1) } else { atomic.AddInt64(&metrics.SuccessfulReads, 1) localReads++ } } } }(i) } // 启动实时监控 wg.Add(1) go func() { defer wg.Done() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() lastWrites := int64(0) lastReads := int64(0) lastTime := time.Now() for { select { case <-stopChan: return case <-ticker.C: currentWrites := atomic.LoadInt64(&metrics.SuccessfulWrites) currentReads := atomic.LoadInt64(&metrics.SuccessfulReads) currentTime := time.Now() elapsed := currentTime.Sub(lastTime).Seconds() writeQPS := float64(currentWrites-lastWrites) / elapsed readQPS := float64(currentReads-lastReads) / elapsed fmt.Printf("\n📊 实时性能 (时间: %s)\n", currentTime.Format("15:04:05")) fmt.Printf(" 写入QPS: %.1f (目标: %d)\n", writeQPS, config.WritesPerSecond) fmt.Printf(" 读取QPS: %.1f (目标: %d)\n", readQPS, config.ReadsPerSecond) fmt.Printf(" 总写入: %d (失败: %d)\n", currentWrites, atomic.LoadInt64(&metrics.FailedWrites)) fmt.Printf(" 总读取: %d (失败: %d)\n", currentReads, atomic.LoadInt64(&metrics.FailedReads)) // 获取数据库统计 stats, err := pdb.GetStats() if err == nil { fmt.Printf(" 数据库记录: %d\n", stats.TotalRecords) fmt.Printf(" 活跃组数: %d\n", len(stats.GroupStats)) } lastWrites = currentWrites lastReads = currentReads lastTime = currentTime } } }() // 运行测试 fmt.Printf("\n⏳ 步骤3: 运行压力测试 (%v)\n", config.TestDuration) time.Sleep(config.TestDuration) // 停止所有goroutine fmt.Println("\n🛑 步骤4: 停止压力测试") close(stopChan) wg.Wait() metrics.EndTime = time.Now() // 最终性能报告 fmt.Println("\n📈 步骤5: 性能报告") totalDuration := metrics.EndTime.Sub(metrics.StartTime) fmt.Printf("🎯 测试结果:\n") fmt.Printf(" 测试时长: %v\n", totalDuration) fmt.Printf(" 总操作数: %d\n", metrics.TotalOperations) fmt.Printf(" 平均QPS: %.1f\n", float64(metrics.TotalOperations)/totalDuration.Seconds()) fmt.Printf("\n📝 写入性能:\n") fmt.Printf(" 成功写入: %d\n", metrics.SuccessfulWrites) fmt.Printf(" 失败写入: %d\n", metrics.FailedWrites) fmt.Printf(" 写入成功率: %.2f%%\n", float64(metrics.SuccessfulWrites)/float64(metrics.SuccessfulWrites+metrics.FailedWrites)*100) fmt.Printf(" 平均写入QPS: %.1f\n", float64(metrics.SuccessfulWrites)/totalDuration.Seconds()) fmt.Printf("\n📖 读取性能:\n") fmt.Printf(" 成功读取: %d\n", metrics.SuccessfulReads) fmt.Printf(" 失败读取: %d\n", metrics.FailedReads) fmt.Printf(" 读取成功率: %.2f%%\n", float64(metrics.SuccessfulReads)/float64(metrics.SuccessfulReads+metrics.FailedReads)*100) fmt.Printf(" 平均读取QPS: %.1f\n", float64(metrics.SuccessfulReads)/totalDuration.Seconds()) // 数据库最终状态 finalStats, err := pdb.GetStats() if err == nil { fmt.Printf("\n💾 数据库状态:\n") fmt.Printf(" 总记录数: %d\n", finalStats.TotalRecords) fmt.Printf(" 总组数: %d\n", len(finalStats.GroupStats)) fmt.Printf(" 热数据: %d\n", finalStats.HotRecords) fmt.Printf(" 温数据: %d\n", finalStats.WarmRecords) fmt.Printf(" 冷数据: %d\n", finalStats.ColdRecords) fmt.Printf("\n📊 各组分布:\n") for group, groupStats := range finalStats.GroupStats { fmt.Printf(" %s: %d 条记录\n", group, groupStats.TotalRecords) } } // 性能评估 fmt.Printf("\n🏆 性能评估:\n") expectedWrites := float64(config.WritesPerSecond) * totalDuration.Seconds() expectedReads := float64(config.ReadsPerSecond) * totalDuration.Seconds() writeEfficiency := float64(metrics.SuccessfulWrites) / expectedWrites * 100 readEfficiency := float64(metrics.SuccessfulReads) / expectedReads * 100 fmt.Printf(" 写入效率: %.1f%% (期望: %.0f, 实际: %d)\n", writeEfficiency, expectedWrites, metrics.SuccessfulWrites) fmt.Printf(" 读取效率: %.1f%% (期望: %.0f, 实际: %d)\n", readEfficiency, expectedReads, metrics.SuccessfulReads) if writeEfficiency > 90 && readEfficiency > 90 { fmt.Println(" 🎉 性能优秀!数据库在高并发下表现稳定") } else if writeEfficiency > 70 && readEfficiency > 70 { fmt.Println(" 👍 性能良好,可以承受高并发负载") } else { fmt.Println(" ⚠️ 性能需要优化,建议调整配置参数") } fmt.Println("\n🎉 高并发压力测试完成!") fmt.Println("💡 这个测试验证了Pipeline Database在极限并发下的稳定性和性能") }