Files
pipelinedb/benchmark_test.go
2025-09-30 15:05:56 +08:00

717 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package pipelinedb
import (
"context"
"fmt"
"os"
"testing"
)
// BenchmarkPipelineDBOpen 基准测试:数据库打开
func BenchmarkPipelineDBOpen(b *testing.B) {
config := &Config{CacheSize: 50}
for i := 0; i < b.N; i++ {
tmpFile, err := os.CreateTemp("", "bench_open_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
tmpFile.Close()
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
pdb.Stop()
os.Remove(tmpFile.Name())
}
}
// BenchmarkAcceptData 基准测试:数据接收
func BenchmarkAcceptData(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_accept_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 100}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
testData := []byte("benchmark test data for AcceptData performance")
testMetadata := `{"source": "benchmark", "type": "test"}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
group := fmt.Sprintf("group_%d", i%10) // 10个不同的组
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
}
}
// BenchmarkGetRecordsByGroup 基准测试:按组查询记录
func BenchmarkGetRecordsByGroup(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_query_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 100}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 预插入测试数据
testData := []byte("query benchmark data")
testMetadata := `{"type": "benchmark"}`
groups := []string{"group_a", "group_b", "group_c"}
for _, group := range groups {
for i := 0; i < 100; i++ {
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
}
}
pageReq := &PageRequest{Page: 1, PageSize: 20}
b.ResetTimer()
for i := 0; i < b.N; i++ {
group := groups[i%len(groups)]
_, err := pdb.GetRecordsByGroup(group, pageReq)
if err != nil {
b.Fatalf("GetRecordsByGroup failed: %v", err)
}
}
}
// BenchmarkDataProcessing 基准测试:数据处理流程
func BenchmarkDataProcessing(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_process_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 100}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
testData := []byte("processing benchmark data")
testMetadata := `{"type": "benchmark"}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
group := fmt.Sprintf("group_%d", i%3)
// 接收数据
recordID, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
// 查询数据
pageReq := &PageRequest{Page: 1, PageSize: 1}
_, err = pdb.GetRecordsByGroup(group, pageReq)
if err != nil {
b.Fatalf("GetRecordsByGroup failed: %v", err)
}
// 使用recordID避免未使用警告
_ = recordID
}
}
// BenchmarkGetStats 基准测试:获取统计信息
func BenchmarkGetStats(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_stats_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 100}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 预插入测试数据
testData := []byte("stats benchmark data")
testMetadata := `{"type": "benchmark"}`
for i := 0; i < 500; i++ {
group := fmt.Sprintf("group_%d", i%5)
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := pdb.GetStats()
if err != nil {
b.Fatalf("GetStats failed: %v", err)
}
}
}
// BenchmarkCacheOperations 基准测试:缓存操作
func BenchmarkCacheOperations(b *testing.B) {
cache := NewPageCache(100)
// 准备测试数据
testPageData := make([]byte, PageSize)
for i := 0; i < PageSize; i++ {
testPageData[i] = byte(i % 256)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
pageNo := uint16(i % 50) // 50个不同的页面
// 测试 Put 和 Get 操作
cache.Put(pageNo, testPageData, false) // false表示非脏页
_, found := cache.Get(pageNo)
if !found {
b.Fatalf("cache Get failed for page %d", pageNo)
}
}
}
// BenchmarkFreePageManager 基准测试:空闲页面管理
func BenchmarkFreePageManager(b *testing.B) {
fpm := NewFreePageManager()
// 预分配一些页面
for i := uint16(1); i <= 100; i++ {
fpm.FreePage(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 分配页面
pageNo, ok := fpm.AllocPage()
if !ok {
// 重新添加页面
fpm.FreePage(uint16(i%100 + 1))
pageNo, ok = fpm.AllocPage()
if !ok {
b.Fatalf("AllocPage failed")
}
}
// 释放页面
fpm.FreePage(pageNo)
}
}
// BenchmarkGroupIndex 基准测试:组索引操作
func BenchmarkGroupIndex(b *testing.B) {
idx := NewGroupIndex("test_group")
b.ResetTimer()
for i := 0; i < b.N; i++ {
recordID := int64(i)
pageNo := uint16(i % 100)
slotNo := uint16(i % 10)
// 插入
idx.Insert(recordID, pageNo, slotNo)
// 查询
_, _, found := idx.Get(recordID)
if !found {
b.Fatalf("Get failed for record %d", recordID)
}
// 删除每10次删除一次保持索引有数据
if i%10 == 0 {
idx.Delete(recordID)
}
}
}
// BenchmarkConcurrentAcceptData 基准测试:并发数据接收
func BenchmarkConcurrentAcceptData(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_concurrent_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 200}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
testData := []byte("concurrent benchmark data")
testMetadata := `{"type": "concurrent"}`
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
group := fmt.Sprintf("group_%d", i%5)
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
i++
}
})
}
// BenchmarkHandler 基准测试:数据处理器
func BenchmarkHandler(b *testing.B) {
handler := NewLoggingHandler("benchmark")
ctx := context.Background()
testData := []byte("handler benchmark data")
group := "test_group"
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 测试预热处理
_, err := handler.WillWarm(ctx, group, testData)
if err != nil {
b.Fatalf("WillWarm failed: %v", err)
}
}
}
// BenchmarkStorageOperations 基准测试:存储层操作(从原来的文件移过来)
func BenchmarkStorageOperations(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_storage_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 100}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 预分配页面
pageNo, err := pdb.allocPage()
if err != nil {
b.Fatalf("allocPage failed: %v", err)
}
testData := []byte("storage benchmark data")
b.ResetTimer()
for i := 0; i < b.N; i++ {
id := int64(i)
// 插入记录
slotNo, err := pdb.insertToPage(pageNo, id, testData)
if err != nil {
// 页面满了,分配新页面
pageNo, err = pdb.allocPage()
if err != nil {
b.Fatalf("allocPage failed: %v", err)
}
slotNo, err = pdb.insertToPage(pageNo, id, testData)
if err != nil {
b.Fatalf("insertToPage failed: %v", err)
}
}
// 读取记录
_, err = pdb.readRecord(pageNo, slotNo, id)
if err != nil {
b.Fatalf("readRecord failed: %v", err)
}
}
}
// ==================== 并发基准测试 ====================
// BenchmarkConcurrentQuery 基准测试:并发查询
func BenchmarkConcurrentQuery(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_concurrent_query_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 200}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 预插入测试数据
testData := []byte("concurrent query benchmark data")
testMetadata := `{"type": "concurrent_query"}`
groups := []string{"group_a", "group_b", "group_c", "group_d", "group_e"}
for _, group := range groups {
for i := 0; i < 200; i++ {
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
}
}
pageReq := &PageRequest{Page: 1, PageSize: 10}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
group := groups[i%len(groups)]
_, err := pdb.GetRecordsByGroup(group, pageReq)
if err != nil {
b.Fatalf("GetRecordsByGroup failed: %v", err)
}
i++
}
})
}
// BenchmarkConcurrentMixed 基准测试:并发读写混合操作
func BenchmarkConcurrentMixed(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_concurrent_mixed_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 300}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
testData := []byte("concurrent mixed benchmark data")
testMetadata := `{"type": "concurrent_mixed"}`
groups := []string{"group_1", "group_2", "group_3"}
// 预插入一些数据用于查询
for _, group := range groups {
for i := 0; i < 50; i++ {
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
}
}
pageReq := &PageRequest{Page: 1, PageSize: 5}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
group := groups[i%len(groups)]
// 80% 写操作20% 读操作
if i%5 == 0 {
// 读操作
_, err := pdb.GetRecordsByGroup(group, pageReq)
if err != nil {
b.Fatalf("GetRecordsByGroup failed: %v", err)
}
} else {
// 写操作
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
}
i++
}
})
}
// BenchmarkConcurrentStats 基准测试:并发统计查询
func BenchmarkConcurrentStats(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_concurrent_stats_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 150}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 预插入测试数据
testData := []byte("concurrent stats benchmark data")
testMetadata := `{"type": "concurrent_stats"}`
for i := 0; i < 1000; i++ {
group := fmt.Sprintf("group_%d", i%10)
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := pdb.GetStats()
if err != nil {
b.Fatalf("GetStats failed: %v", err)
}
}
})
}
// BenchmarkConcurrentCache 基准测试:并发缓存操作
func BenchmarkConcurrentCache(b *testing.B) {
cache := NewPageCache(200)
// 准备测试数据
testPageData := make([]byte, PageSize)
for i := 0; i < PageSize; i++ {
testPageData[i] = byte(i % 256)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
pageNo := uint16(i % 100) // 100个不同的页面
// 80% 读操作20% 写操作
if i%5 == 0 {
// 写操作
cache.Put(pageNo, testPageData, i%2 == 0) // 随机设置脏页标志
} else {
// 读操作
_, _ = cache.Get(pageNo)
}
i++
}
})
}
// BenchmarkConcurrentIndexOperations 基准测试:并发索引操作
func BenchmarkConcurrentIndexOperations(b *testing.B) {
indexMgr := NewIndexManager()
groups := []string{"idx_group_1", "idx_group_2", "idx_group_3", "idx_group_4"}
// 预创建索引
for _, group := range groups {
indexMgr.GetOrCreateIndex(group)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
group := groups[i%len(groups)]
idx := indexMgr.GetOrCreateIndex(group)
recordID := int64(i)
pageNo := uint16(i % 50)
slotNo := uint16(i % 20)
// 70% 插入20% 查询10% 删除
switch i % 10 {
case 0: // 删除操作
idx.Delete(recordID - 100) // 删除之前的记录
case 1, 2: // 查询操作
_, _, _ = idx.Get(recordID - 50) // 查询之前的记录
default: // 插入操作
idx.Insert(recordID, pageNo, slotNo)
}
i++
}
})
}
// BenchmarkHighConcurrencyWrite 基准测试:高并发写入
func BenchmarkHighConcurrencyWrite(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_high_concurrency_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{
CacheSize: 500, // 更大的缓存
SyncWrites: false, // 异步写入提高性能
}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
testData := []byte("high concurrency write benchmark data")
testMetadata := `{"type": "high_concurrency"}`
b.ResetTimer()
// 设置更高的并发度
b.SetParallelism(20)
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
group := fmt.Sprintf("hc_group_%d", i%20) // 20个不同的组
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
i++
}
})
}
// BenchmarkConcurrentGroupOperations 基准测试:并发组操作
func BenchmarkConcurrentGroupOperations(b *testing.B) {
tmpFile, err := os.CreateTemp("", "bench_concurrent_group_*.db")
if err != nil {
b.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 250}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
b.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
testData := []byte("concurrent group operations benchmark data")
testMetadata := `{"type": "group_ops"}`
b.ResetTimer()
// 预插入一些数据,避免查询空组
for i := 0; i < 15; i++ {
group := fmt.Sprintf("grp_%d", i)
for j := 0; j < 5; j++ {
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
}
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
group := fmt.Sprintf("grp_%d", i%15) // 15个不同的组
// 混合操作:写入、查询、统计
switch i % 6 {
case 0, 1, 2, 3: // 写入操作 (66.7%)
_, err := pdb.AcceptData(group, testData, testMetadata)
if err != nil {
b.Fatalf("AcceptData failed: %v", err)
}
case 4: // 查询操作 (16.7%)
pageReq := &PageRequest{Page: 1, PageSize: 3}
_, err := pdb.GetRecordsByGroup(group, pageReq)
if err != nil {
b.Fatalf("GetRecordsByGroup failed: %v", err)
}
case 5: // 统计操作 (16.7%)
_, err := pdb.GetStats()
if err != nil {
b.Fatalf("GetStats failed: %v", err)
}
}
i++
}
})
}