package pipelinedb import ( "log/slog" "sync" "testing" "time" ) // MockPipelineDBForGroupManager 用于测试GroupManager的模拟数据库 type MockPipelineDBForGroupManager struct { PipelineDB // 嵌入PipelineDB以支持类型转换 pages map[uint16][]byte } func NewMockPipelineDBForGroupManager() *MockPipelineDBForGroupManager { mock := &MockPipelineDBForGroupManager{ pages: make(map[uint16][]byte), } // 初始化logger字段以避免nil指针错误 mock.PipelineDB.logger = slog.Default() // 初始化header以避免nil指针错误 mock.PipelineDB.header = &Header{ Magic: 0x50444200, // PDB magic number PageSize: PageSize, TotalPages: 0, FreeHead: 0, RootPage: 0, CounterPage: 0, // 没有计数器页面 } // 初始化缓存以避免nil指针错误 mock.PipelineDB.cache = NewPageCache(10) // 初始化空闲页面管理器 mock.PipelineDB.freePageMgr = NewFreePageManager() return mock } func (db *MockPipelineDBForGroupManager) readPage(pageNo uint16) ([]byte, error) { if page, exists := db.pages[pageNo]; exists { return page, nil } return make([]byte, PageSize), nil } func (db *MockPipelineDBForGroupManager) saveHeader() error { return nil } // TestNewGroupManager 测试组管理器的创建 func TestNewGroupManager(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) if gm == nil { t.Fatal("NewGroupManager returned nil") } if gm.pdb == nil { t.Error("database reference not set correctly") } // 验证初始状态 if len(gm.GetPausedGroups()) != 0 { t.Error("should have no paused groups initially") } if len(gm.GetExecutingGroups()) != 0 { t.Error("should have no executing groups initially") } stats := gm.GetFastStats() if len(stats) != 0 { t.Error("should have no stats initially") } } // TestGroupManagerPauseResume 测试组的暂停和恢复 func TestGroupManagerPauseResume(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "test_group" // 初始状态应该是活跃的 if gm.IsPaused(groupName) { t.Error("group should be active initially") } if gm.GetGroupStatus(groupName) != "active" { t.Errorf("group status = %s, want 'active'", gm.GetGroupStatus(groupName)) } // 暂停组 gm.PauseGroup(groupName) if !gm.IsPaused(groupName) { t.Error("group should be paused after PauseGroup") } if gm.GetGroupStatus(groupName) != "paused" { t.Errorf("group status = %s, want 'paused'", gm.GetGroupStatus(groupName)) } // 验证暂停组列表 pausedGroups := gm.GetPausedGroups() if len(pausedGroups) != 1 || pausedGroups[0] != groupName { t.Errorf("paused groups = %v, want [%s]", pausedGroups, groupName) } // 恢复组 gm.ResumeGroup(groupName) if gm.IsPaused(groupName) { t.Error("group should be active after ResumeGroup") } if gm.GetGroupStatus(groupName) != "active" { t.Errorf("group status = %s, want 'active'", gm.GetGroupStatus(groupName)) } // 验证暂停组列表为空 pausedGroups = gm.GetPausedGroups() if len(pausedGroups) != 0 { t.Errorf("paused groups = %v, want []", pausedGroups) } } // TestGroupManagerMultipleGroups 测试多个组的管理 func TestGroupManagerMultipleGroups(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groups := []string{"group1", "group2", "group3"} // 暂停部分组 gm.PauseGroup(groups[0]) gm.PauseGroup(groups[2]) // 验证状态 if !gm.IsPaused(groups[0]) { t.Errorf("group %s should be paused", groups[0]) } if gm.IsPaused(groups[1]) { t.Errorf("group %s should be active", groups[1]) } if !gm.IsPaused(groups[2]) { t.Errorf("group %s should be paused", groups[2]) } // 验证暂停组列表 pausedGroups := gm.GetPausedGroups() if len(pausedGroups) != 2 { t.Errorf("paused groups count = %d, want 2", len(pausedGroups)) } // 验证包含正确的组(顺序可能不同) pausedMap := make(map[string]bool) for _, group := range pausedGroups { pausedMap[group] = true } if !pausedMap[groups[0]] || !pausedMap[groups[2]] { t.Errorf("paused groups = %v, should contain %s and %s", pausedGroups, groups[0], groups[2]) } } // TestGroupManagerOnCompleteExecution 测试OnComplete执行状态管理 func TestGroupManagerOnCompleteExecution(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "test_group" // 初始状态不应该在执行 if gm.IsOnCompleteExecuting(groupName) { t.Error("group should not be executing initially") } // 设置为执行状态 gm.SetOnCompleteExecuting(groupName, true) if !gm.IsOnCompleteExecuting(groupName) { t.Error("group should be executing after setting to true") } // 验证执行组列表 executingGroups := gm.GetExecutingGroups() if len(executingGroups) != 1 || executingGroups[0] != groupName { t.Errorf("executing groups = %v, want [%s]", executingGroups, groupName) } // 设置为非执行状态 gm.SetOnCompleteExecuting(groupName, false) if gm.IsOnCompleteExecuting(groupName) { t.Error("group should not be executing after setting to false") } // 验证执行组列表为空 executingGroups = gm.GetExecutingGroups() if len(executingGroups) != 0 { t.Errorf("executing groups = %v, want []", executingGroups) } } // TestGroupManagerGetNextID 测试ID生成功能 func TestGroupManagerGetNextID(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "test_group" // 第一次获取ID应该是1 id1 := gm.GetNextID(groupName) if id1 != 1 { t.Errorf("first ID = %d, want 1", id1) } // 后续ID应该递增 id2 := gm.GetNextID(groupName) if id2 != 2 { t.Errorf("second ID = %d, want 2", id2) } id3 := gm.GetNextID(groupName) if id3 != 3 { t.Errorf("third ID = %d, want 3", id3) } // 验证计数器值 counter := gm.GetGroupCounter(groupName) if counter != 3 { t.Errorf("group counter = %d, want 3", counter) } } // TestGroupManagerMultipleGroupCounters 测试多个组的ID计数器 func TestGroupManagerMultipleGroupCounters(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) group1 := "group1" group2 := "group2" // 为不同组生成ID id1_1 := gm.GetNextID(group1) id2_1 := gm.GetNextID(group2) id1_2 := gm.GetNextID(group1) id2_2 := gm.GetNextID(group2) // 验证每个组的ID独立递增 if id1_1 != 1 || id1_2 != 2 { t.Errorf("group1 IDs = [%d, %d], want [1, 2]", id1_1, id1_2) } if id2_1 != 1 || id2_2 != 2 { t.Errorf("group2 IDs = [%d, %d], want [1, 2]", id2_1, id2_2) } // 验证计数器值 if gm.GetGroupCounter(group1) != 2 { t.Errorf("group1 counter = %d, want 2", gm.GetGroupCounter(group1)) } if gm.GetGroupCounter(group2) != 2 { t.Errorf("group2 counter = %d, want 2", gm.GetGroupCounter(group2)) } } // TestGroupManagerIncrementStats 测试统计信息增量更新 func TestGroupManagerIncrementStats(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "test_group" // 增加不同状态的记录 gm.IncrementStats(groupName, StatusHot) gm.IncrementStats(groupName, StatusHot) gm.IncrementStats(groupName, StatusWarm) gm.IncrementStats(groupName, StatusCold) // 获取统计信息 stats := gm.GetFastStats() if len(stats) != 1 { t.Errorf("stats count = %d, want 1", len(stats)) } groupStats := stats[groupName] if groupStats == nil { t.Fatal("group stats not found") } // 验证统计数据 if groupStats.TotalRecords != 4 { t.Errorf("total records = %d, want 4", groupStats.TotalRecords) } if groupStats.HotRecords != 2 { t.Errorf("hot records = %d, want 2", groupStats.HotRecords) } if groupStats.WarmRecords != 1 { t.Errorf("warm records = %d, want 1", groupStats.WarmRecords) } if groupStats.ColdRecords != 1 { t.Errorf("cold records = %d, want 1", groupStats.ColdRecords) } } // TestGroupManagerUpdateStats 测试统计信息状态转换更新 func TestGroupManagerUpdateStats(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "test_group" // 先增加一些记录 gm.IncrementStats(groupName, StatusHot) gm.IncrementStats(groupName, StatusHot) gm.IncrementStats(groupName, StatusWarm) // 验证初始状态 stats := gm.GetFastStats()[groupName] if stats.HotRecords != 2 || stats.WarmRecords != 1 || stats.ColdRecords != 0 { t.Errorf("initial stats: hot=%d, warm=%d, cold=%d, want hot=2, warm=1, cold=0", stats.HotRecords, stats.WarmRecords, stats.ColdRecords) } // 执行状态转换:Hot -> Warm gm.UpdateStats(groupName, StatusHot, StatusWarm) // 验证转换后的状态 stats = gm.GetFastStats()[groupName] if stats.TotalRecords != 3 { t.Errorf("total records after update = %d, want 3", stats.TotalRecords) } if stats.HotRecords != 1 { t.Errorf("hot records after update = %d, want 1", stats.HotRecords) } if stats.WarmRecords != 2 { t.Errorf("warm records after update = %d, want 2", stats.WarmRecords) } if stats.ColdRecords != 0 { t.Errorf("cold records after update = %d, want 0", stats.ColdRecords) } // 执行另一个转换:Warm -> Cold gm.UpdateStats(groupName, StatusWarm, StatusCold) // 验证第二次转换后的状态 stats = gm.GetFastStats()[groupName] if stats.HotRecords != 1 || stats.WarmRecords != 1 || stats.ColdRecords != 1 { t.Errorf("final stats: hot=%d, warm=%d, cold=%d, want hot=1, warm=1, cold=1", stats.HotRecords, stats.WarmRecords, stats.ColdRecords) } } // TestGroupManagerMultipleGroupStats 测试多个组的统计信息 func TestGroupManagerMultipleGroupStats(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) // 为不同组添加统计数据 groups := map[string]struct { hot, warm, cold int }{ "group1": {3, 2, 1}, "group2": {1, 4, 2}, "group3": {2, 1, 3}, } for groupName, counts := range groups { for i := 0; i < counts.hot; i++ { gm.IncrementStats(groupName, StatusHot) } for i := 0; i < counts.warm; i++ { gm.IncrementStats(groupName, StatusWarm) } for i := 0; i < counts.cold; i++ { gm.IncrementStats(groupName, StatusCold) } } // 获取所有统计信息 allStats := gm.GetFastStats() if len(allStats) != len(groups) { t.Errorf("stats count = %d, want %d", len(allStats), len(groups)) } // 验证每个组的统计信息 for groupName, expected := range groups { stats := allStats[groupName] if stats == nil { t.Errorf("stats for group %s not found", groupName) continue } expectedTotal := expected.hot + expected.warm + expected.cold if stats.TotalRecords != expectedTotal { t.Errorf("group %s total = %d, want %d", groupName, stats.TotalRecords, expectedTotal) } if stats.HotRecords != expected.hot { t.Errorf("group %s hot = %d, want %d", groupName, stats.HotRecords, expected.hot) } if stats.WarmRecords != expected.warm { t.Errorf("group %s warm = %d, want %d", groupName, stats.WarmRecords, expected.warm) } if stats.ColdRecords != expected.cold { t.Errorf("group %s cold = %d, want %d", groupName, stats.ColdRecords, expected.cold) } } } // TestGroupManagerStatsIsolation 测试统计信息的数据隔离 func TestGroupManagerStatsIsolation(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "test_group" gm.IncrementStats(groupName, StatusHot) // 获取统计信息 stats1 := gm.GetFastStats() stats2 := gm.GetFastStats() // 修改第一个返回的统计信息 stats1[groupName].HotRecords = 999 // 验证第二个统计信息未被影响 if stats2[groupName].HotRecords != 1 { t.Error("stats isolation failed: modifying returned stats affected original data") } // 验证原始数据未被影响 stats3 := gm.GetFastStats() if stats3[groupName].HotRecords != 1 { t.Error("original stats data was modified") } } // TestGroupManagerConcurrency 测试并发安全性 func TestGroupManagerConcurrency(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) const numGoroutines = 10 const numOperations = 100 var wg sync.WaitGroup // 启动多个goroutine进行并发操作 for i := 0; i < numGoroutines; i++ { wg.Add(1) go func(id int) { defer wg.Done() groupName := "group_" + string(rune('A'+id%3)) for j := 0; j < numOperations; j++ { // 并发的各种操作 switch j % 6 { case 0: gm.PauseGroup(groupName) case 1: gm.ResumeGroup(groupName) case 2: gm.SetOnCompleteExecuting(groupName, true) case 3: gm.SetOnCompleteExecuting(groupName, false) case 4: gm.GetNextID(groupName) case 5: gm.IncrementStats(groupName, StatusHot) } // 并发读取操作 gm.IsPaused(groupName) gm.IsOnCompleteExecuting(groupName) gm.GetGroupCounter(groupName) } }(i) } // 同时进行统计查询 wg.Add(1) go func() { defer wg.Done() for i := 0; i < 50; i++ { gm.GetFastStats() gm.GetPausedGroups() gm.GetExecutingGroups() time.Sleep(time.Millisecond) } }() // 等待所有goroutine完成 wg.Wait() // 验证管理器仍然可用 testGroup := "final_test" gm.PauseGroup(testGroup) if !gm.IsPaused(testGroup) { t.Error("manager corrupted after concurrent operations") } } // TestGroupManagerIdempotentOperations 测试幂等操作 func TestGroupManagerIdempotentOperations(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "test_group" // 多次暂停同一组 gm.PauseGroup(groupName) gm.PauseGroup(groupName) gm.PauseGroup(groupName) // 应该只有一个暂停的组 pausedGroups := gm.GetPausedGroups() if len(pausedGroups) != 1 { t.Errorf("paused groups count = %d, want 1", len(pausedGroups)) } // 多次恢复同一组 gm.ResumeGroup(groupName) gm.ResumeGroup(groupName) gm.ResumeGroup(groupName) // 应该没有暂停的组 pausedGroups = gm.GetPausedGroups() if len(pausedGroups) != 0 { t.Errorf("paused groups count = %d, want 0", len(pausedGroups)) } // 多次设置OnComplete执行状态 gm.SetOnCompleteExecuting(groupName, true) gm.SetOnCompleteExecuting(groupName, true) executingGroups := gm.GetExecutingGroups() if len(executingGroups) != 1 { t.Errorf("executing groups count = %d, want 1", len(executingGroups)) } gm.SetOnCompleteExecuting(groupName, false) gm.SetOnCompleteExecuting(groupName, false) executingGroups = gm.GetExecutingGroups() if len(executingGroups) != 0 { t.Errorf("executing groups count = %d, want 0", len(executingGroups)) } } // TestGroupManagerGetGroupCounterNonExistent 测试获取不存在组的计数器 func TestGroupManagerGetGroupCounterNonExistent(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) // 获取不存在组的计数器应该返回0 counter := gm.GetGroupCounter("non_existent_group") if counter != 0 { t.Errorf("counter for non-existent group = %d, want 0", counter) } } // TestGroupManagerStatsInitialization 测试统计信息的初始化 func TestGroupManagerStatsInitialization(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "test_group" // 第一次增加统计应该初始化结构 gm.IncrementStats(groupName, StatusHot) stats := gm.GetFastStats()[groupName] if stats == nil { t.Fatal("stats should be initialized after first increment") } if stats.TotalRecords != 1 || stats.HotRecords != 1 { t.Errorf("initial stats: total=%d, hot=%d, want total=1, hot=1", stats.TotalRecords, stats.HotRecords) } if stats.WarmRecords != 0 || stats.ColdRecords != 0 { t.Errorf("initial stats: warm=%d, cold=%d, want warm=0, cold=0", stats.WarmRecords, stats.ColdRecords) } } // TestGroupManagerUpdateStatsInitialization 测试更新统计时的初始化 func TestGroupManagerUpdateStatsInitialization(t *testing.T) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "test_group" // 直接调用UpdateStats应该初始化统计结构 gm.UpdateStats(groupName, StatusHot, StatusWarm) stats := gm.GetFastStats()[groupName] if stats == nil { t.Fatal("stats should be initialized after UpdateStats") } // 由于没有Hot记录,减少操作不会产生负数,但会初始化结构 if stats.TotalRecords != 0 { t.Errorf("total records = %d, want 0", stats.TotalRecords) } } // BenchmarkGroupManagerGetNextID 性能测试:ID生成 func BenchmarkGroupManagerGetNextID(b *testing.B) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "bench_group" b.ResetTimer() for i := 0; i < b.N; i++ { gm.GetNextID(groupName) } } // BenchmarkGroupManagerIncrementStats 性能测试:统计信息增量 func BenchmarkGroupManagerIncrementStats(b *testing.B) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) groupName := "bench_group" statuses := []DataStatus{StatusHot, StatusWarm, StatusCold} b.ResetTimer() for i := 0; i < b.N; i++ { status := statuses[i%len(statuses)] gm.IncrementStats(groupName, status) } } // BenchmarkGroupManagerGetFastStats 性能测试:快速统计查询 func BenchmarkGroupManagerGetFastStats(b *testing.B) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) // 预填充一些统计数据 for i := 0; i < 10; i++ { groupName := "group_" + string(rune('A'+i)) for j := 0; j < 100; j++ { gm.IncrementStats(groupName, StatusHot) } } b.ResetTimer() for i := 0; i < b.N; i++ { gm.GetFastStats() } } // BenchmarkGroupManagerConcurrentOperations 性能测试:并发操作 func BenchmarkGroupManagerConcurrentOperations(b *testing.B) { mockDB := NewMockPipelineDBForGroupManager() gm := NewGroupManager(&mockDB.PipelineDB) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { groupName := "group_" + string(rune('A'+i%5)) switch i % 4 { case 0: gm.GetNextID(groupName) case 1: gm.IncrementStats(groupName, StatusHot) case 2: gm.IsPaused(groupName) case 3: gm.GetFastStats() } i++ } }) }