package pipelinedb import ( "context" "fmt" "os" "testing" ) // BenchmarkPipelineDBOpen 基准测试:数据库打开 func BenchmarkPipelineDBOpen(b *testing.B) { config := &Config{CacheSize: 50} for i := 0; i < b.N; i++ { tmpFile, err := os.CreateTemp("", "bench_open_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } tmpFile.Close() pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } pdb.Stop() os.Remove(tmpFile.Name()) } } // BenchmarkAcceptData 基准测试:数据接收 func BenchmarkAcceptData(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_accept_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 100} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() testData := []byte("benchmark test data for AcceptData performance") testMetadata := `{"source": "benchmark", "type": "test"}` b.ResetTimer() for i := 0; i < b.N; i++ { group := fmt.Sprintf("group_%d", i%10) // 10个不同的组 _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } } } // BenchmarkGetRecordsByGroup 基准测试:按组查询记录 func BenchmarkGetRecordsByGroup(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_query_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 100} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 预插入测试数据 testData := []byte("query benchmark data") testMetadata := `{"type": "benchmark"}` groups := []string{"group_a", "group_b", "group_c"} for _, group := range groups { for i := 0; i < 100; i++ { _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } } } pageReq := &PageRequest{Page: 1, PageSize: 20} b.ResetTimer() for i := 0; i < b.N; i++ { group := groups[i%len(groups)] _, err := pdb.GetRecordsByGroup(group, pageReq) if err != nil { b.Fatalf("GetRecordsByGroup failed: %v", err) } } } // BenchmarkDataProcessing 基准测试:数据处理流程 func BenchmarkDataProcessing(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_process_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 100} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() testData := []byte("processing benchmark data") testMetadata := `{"type": "benchmark"}` b.ResetTimer() for i := 0; i < b.N; i++ { group := fmt.Sprintf("group_%d", i%3) // 接收数据 recordID, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } // 查询数据 pageReq := &PageRequest{Page: 1, PageSize: 1} _, err = pdb.GetRecordsByGroup(group, pageReq) if err != nil { b.Fatalf("GetRecordsByGroup failed: %v", err) } // 使用recordID避免未使用警告 _ = recordID } } // BenchmarkGetStats 基准测试:获取统计信息 func BenchmarkGetStats(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_stats_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 100} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 预插入测试数据 testData := []byte("stats benchmark data") testMetadata := `{"type": "benchmark"}` for i := 0; i < 500; i++ { group := fmt.Sprintf("group_%d", i%5) _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } } b.ResetTimer() for i := 0; i < b.N; i++ { _, err := pdb.GetStats() if err != nil { b.Fatalf("GetStats failed: %v", err) } } } // BenchmarkCacheOperations 基准测试:缓存操作 func BenchmarkCacheOperations(b *testing.B) { cache := NewPageCache(100) // 准备测试数据 testPageData := make([]byte, PageSize) for i := 0; i < PageSize; i++ { testPageData[i] = byte(i % 256) } b.ResetTimer() for i := 0; i < b.N; i++ { pageNo := uint16(i % 50) // 50个不同的页面 // 测试 Put 和 Get 操作 cache.Put(pageNo, testPageData, false) // false表示非脏页 _, found := cache.Get(pageNo) if !found { b.Fatalf("cache Get failed for page %d", pageNo) } } } // BenchmarkFreePageManager 基准测试:空闲页面管理 func BenchmarkFreePageManager(b *testing.B) { fpm := NewFreePageManager() // 预分配一些页面 for i := uint16(1); i <= 100; i++ { fpm.FreePage(i) } b.ResetTimer() for i := 0; i < b.N; i++ { // 分配页面 pageNo, ok := fpm.AllocPage() if !ok { // 重新添加页面 fpm.FreePage(uint16(i%100 + 1)) pageNo, ok = fpm.AllocPage() if !ok { b.Fatalf("AllocPage failed") } } // 释放页面 fpm.FreePage(pageNo) } } // BenchmarkGroupIndex 基准测试:组索引操作 func BenchmarkGroupIndex(b *testing.B) { idx := NewGroupIndex("test_group") b.ResetTimer() for i := 0; i < b.N; i++ { recordID := int64(i) pageNo := uint16(i % 100) slotNo := uint16(i % 10) // 插入 idx.Insert(recordID, pageNo, slotNo) // 查询 _, _, found := idx.Get(recordID) if !found { b.Fatalf("Get failed for record %d", recordID) } // 删除(每10次删除一次,保持索引有数据) if i%10 == 0 { idx.Delete(recordID) } } } // BenchmarkConcurrentAcceptData 基准测试:并发数据接收 func BenchmarkConcurrentAcceptData(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_concurrent_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 200} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() testData := []byte("concurrent benchmark data") testMetadata := `{"type": "concurrent"}` b.ResetTimer() b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { group := fmt.Sprintf("group_%d", i%5) _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } i++ } }) } // BenchmarkHandler 基准测试:数据处理器 func BenchmarkHandler(b *testing.B) { handler := NewLoggingHandler("benchmark") ctx := context.Background() testData := []byte("handler benchmark data") group := "test_group" b.ResetTimer() for i := 0; i < b.N; i++ { // 测试预热处理 _, err := handler.WillWarm(ctx, group, testData) if err != nil { b.Fatalf("WillWarm failed: %v", err) } } } // BenchmarkStorageOperations 基准测试:存储层操作(从原来的文件移过来) func BenchmarkStorageOperations(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_storage_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 100} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 预分配页面 pageNo, err := pdb.allocPage() if err != nil { b.Fatalf("allocPage failed: %v", err) } testData := []byte("storage benchmark data") b.ResetTimer() for i := 0; i < b.N; i++ { id := int64(i) // 插入记录 slotNo, err := pdb.insertToPage(pageNo, id, testData) if err != nil { // 页面满了,分配新页面 pageNo, err = pdb.allocPage() if err != nil { b.Fatalf("allocPage failed: %v", err) } slotNo, err = pdb.insertToPage(pageNo, id, testData) if err != nil { b.Fatalf("insertToPage failed: %v", err) } } // 读取记录 _, err = pdb.readRecord(pageNo, slotNo, id) if err != nil { b.Fatalf("readRecord failed: %v", err) } } } // ==================== 并发基准测试 ==================== // BenchmarkConcurrentQuery 基准测试:并发查询 func BenchmarkConcurrentQuery(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_concurrent_query_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 200} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 预插入测试数据 testData := []byte("concurrent query benchmark data") testMetadata := `{"type": "concurrent_query"}` groups := []string{"group_a", "group_b", "group_c", "group_d", "group_e"} for _, group := range groups { for i := 0; i < 200; i++ { _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } } } pageReq := &PageRequest{Page: 1, PageSize: 10} b.ResetTimer() b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { group := groups[i%len(groups)] _, err := pdb.GetRecordsByGroup(group, pageReq) if err != nil { b.Fatalf("GetRecordsByGroup failed: %v", err) } i++ } }) } // BenchmarkConcurrentMixed 基准测试:并发读写混合操作 func BenchmarkConcurrentMixed(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_concurrent_mixed_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 300} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() testData := []byte("concurrent mixed benchmark data") testMetadata := `{"type": "concurrent_mixed"}` groups := []string{"group_1", "group_2", "group_3"} // 预插入一些数据用于查询 for _, group := range groups { for i := 0; i < 50; i++ { _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } } } pageReq := &PageRequest{Page: 1, PageSize: 5} b.ResetTimer() b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { group := groups[i%len(groups)] // 80% 写操作,20% 读操作 if i%5 == 0 { // 读操作 _, err := pdb.GetRecordsByGroup(group, pageReq) if err != nil { b.Fatalf("GetRecordsByGroup failed: %v", err) } } else { // 写操作 _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } } i++ } }) } // BenchmarkConcurrentStats 基准测试:并发统计查询 func BenchmarkConcurrentStats(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_concurrent_stats_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 150} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 预插入测试数据 testData := []byte("concurrent stats benchmark data") testMetadata := `{"type": "concurrent_stats"}` for i := 0; i < 1000; i++ { group := fmt.Sprintf("group_%d", i%10) _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } } b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { _, err := pdb.GetStats() if err != nil { b.Fatalf("GetStats failed: %v", err) } } }) } // BenchmarkConcurrentCache 基准测试:并发缓存操作 func BenchmarkConcurrentCache(b *testing.B) { cache := NewPageCache(200) // 准备测试数据 testPageData := make([]byte, PageSize) for i := 0; i < PageSize; i++ { testPageData[i] = byte(i % 256) } b.ResetTimer() b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { pageNo := uint16(i % 100) // 100个不同的页面 // 80% 读操作,20% 写操作 if i%5 == 0 { // 写操作 cache.Put(pageNo, testPageData, i%2 == 0) // 随机设置脏页标志 } else { // 读操作 _, _ = cache.Get(pageNo) } i++ } }) } // BenchmarkConcurrentIndexOperations 基准测试:并发索引操作 func BenchmarkConcurrentIndexOperations(b *testing.B) { indexMgr := NewIndexManager() groups := []string{"idx_group_1", "idx_group_2", "idx_group_3", "idx_group_4"} // 预创建索引 for _, group := range groups { indexMgr.GetOrCreateIndex(group) } b.ResetTimer() b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { group := groups[i%len(groups)] idx := indexMgr.GetOrCreateIndex(group) recordID := int64(i) pageNo := uint16(i % 50) slotNo := uint16(i % 20) // 70% 插入,20% 查询,10% 删除 switch i % 10 { case 0: // 删除操作 idx.Delete(recordID - 100) // 删除之前的记录 case 1, 2: // 查询操作 _, _, _ = idx.Get(recordID - 50) // 查询之前的记录 default: // 插入操作 idx.Insert(recordID, pageNo, slotNo) } i++ } }) } // BenchmarkHighConcurrencyWrite 基准测试:高并发写入 func BenchmarkHighConcurrencyWrite(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_high_concurrency_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 500, // 更大的缓存 SyncWrites: false, // 异步写入提高性能 } pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() testData := []byte("high concurrency write benchmark data") testMetadata := `{"type": "high_concurrency"}` b.ResetTimer() // 设置更高的并发度 b.SetParallelism(20) b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { group := fmt.Sprintf("hc_group_%d", i%20) // 20个不同的组 _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } i++ } }) } // BenchmarkConcurrentGroupOperations 基准测试:并发组操作 func BenchmarkConcurrentGroupOperations(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_concurrent_group_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{CacheSize: 250} pdb, err := Open(Options{ Filename: tmpFile.Name(), Config: config, }) if err != nil { b.Fatalf("Open failed: %v", err) } defer pdb.Stop() testData := []byte("concurrent group operations benchmark data") testMetadata := `{"type": "group_ops"}` b.ResetTimer() // 预插入一些数据,避免查询空组 for i := 0; i < 15; i++ { group := fmt.Sprintf("grp_%d", i) for j := 0; j < 5; j++ { _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } } } b.ResetTimer() b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { group := fmt.Sprintf("grp_%d", i%15) // 15个不同的组 // 混合操作:写入、查询、统计 switch i % 6 { case 0, 1, 2, 3: // 写入操作 (66.7%) _, err := pdb.AcceptData(group, testData, testMetadata) if err != nil { b.Fatalf("AcceptData failed: %v", err) } case 4: // 查询操作 (16.7%) pageReq := &PageRequest{Page: 1, PageSize: 3} _, err := pdb.GetRecordsByGroup(group, pageReq) if err != nil { b.Fatalf("GetRecordsByGroup failed: %v", err) } case 5: // 统计操作 (16.7%) _, err := pdb.GetStats() if err != nil { b.Fatalf("GetStats failed: %v", err) } } i++ } }) }