Files
pipelinedb/group_manager_test.go
2025-09-30 15:05:56 +08:00

701 lines
18 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package pipelinedb
import (
"log/slog"
"sync"
"testing"
"time"
)
// MockPipelineDBForGroupManager 用于测试GroupManager的模拟数据库
type MockPipelineDBForGroupManager struct {
PipelineDB // 嵌入PipelineDB以支持类型转换
pages map[uint16][]byte
}
func NewMockPipelineDBForGroupManager() *MockPipelineDBForGroupManager {
mock := &MockPipelineDBForGroupManager{
pages: make(map[uint16][]byte),
}
// 初始化logger字段以避免nil指针错误
mock.PipelineDB.logger = slog.Default()
// 初始化header以避免nil指针错误
mock.PipelineDB.header = &Header{
Magic: 0x50444200, // PDB magic number
PageSize: PageSize,
TotalPages: 0,
FreeHead: 0,
RootPage: 0,
CounterPage: 0, // 没有计数器页面
}
// 初始化缓存以避免nil指针错误
mock.PipelineDB.cache = NewPageCache(10)
// 初始化空闲页面管理器
mock.PipelineDB.freePageMgr = NewFreePageManager()
return mock
}
func (db *MockPipelineDBForGroupManager) readPage(pageNo uint16) ([]byte, error) {
if page, exists := db.pages[pageNo]; exists {
return page, nil
}
return make([]byte, PageSize), nil
}
func (db *MockPipelineDBForGroupManager) saveHeader() error {
return nil
}
// TestNewGroupManager 测试组管理器的创建
func TestNewGroupManager(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
if gm == nil {
t.Fatal("NewGroupManager returned nil")
}
if gm.pdb == nil {
t.Error("database reference not set correctly")
}
// 验证初始状态
if len(gm.GetPausedGroups()) != 0 {
t.Error("should have no paused groups initially")
}
if len(gm.GetExecutingGroups()) != 0 {
t.Error("should have no executing groups initially")
}
stats := gm.GetFastStats()
if len(stats) != 0 {
t.Error("should have no stats initially")
}
}
// TestGroupManagerPauseResume 测试组的暂停和恢复
func TestGroupManagerPauseResume(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "test_group"
// 初始状态应该是活跃的
if gm.IsPaused(groupName) {
t.Error("group should be active initially")
}
if gm.GetGroupStatus(groupName) != "active" {
t.Errorf("group status = %s, want 'active'", gm.GetGroupStatus(groupName))
}
// 暂停组
gm.PauseGroup(groupName)
if !gm.IsPaused(groupName) {
t.Error("group should be paused after PauseGroup")
}
if gm.GetGroupStatus(groupName) != "paused" {
t.Errorf("group status = %s, want 'paused'", gm.GetGroupStatus(groupName))
}
// 验证暂停组列表
pausedGroups := gm.GetPausedGroups()
if len(pausedGroups) != 1 || pausedGroups[0] != groupName {
t.Errorf("paused groups = %v, want [%s]", pausedGroups, groupName)
}
// 恢复组
gm.ResumeGroup(groupName)
if gm.IsPaused(groupName) {
t.Error("group should be active after ResumeGroup")
}
if gm.GetGroupStatus(groupName) != "active" {
t.Errorf("group status = %s, want 'active'", gm.GetGroupStatus(groupName))
}
// 验证暂停组列表为空
pausedGroups = gm.GetPausedGroups()
if len(pausedGroups) != 0 {
t.Errorf("paused groups = %v, want []", pausedGroups)
}
}
// TestGroupManagerMultipleGroups 测试多个组的管理
func TestGroupManagerMultipleGroups(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groups := []string{"group1", "group2", "group3"}
// 暂停部分组
gm.PauseGroup(groups[0])
gm.PauseGroup(groups[2])
// 验证状态
if !gm.IsPaused(groups[0]) {
t.Errorf("group %s should be paused", groups[0])
}
if gm.IsPaused(groups[1]) {
t.Errorf("group %s should be active", groups[1])
}
if !gm.IsPaused(groups[2]) {
t.Errorf("group %s should be paused", groups[2])
}
// 验证暂停组列表
pausedGroups := gm.GetPausedGroups()
if len(pausedGroups) != 2 {
t.Errorf("paused groups count = %d, want 2", len(pausedGroups))
}
// 验证包含正确的组(顺序可能不同)
pausedMap := make(map[string]bool)
for _, group := range pausedGroups {
pausedMap[group] = true
}
if !pausedMap[groups[0]] || !pausedMap[groups[2]] {
t.Errorf("paused groups = %v, should contain %s and %s", pausedGroups, groups[0], groups[2])
}
}
// TestGroupManagerOnCompleteExecution 测试OnComplete执行状态管理
func TestGroupManagerOnCompleteExecution(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "test_group"
// 初始状态不应该在执行
if gm.IsOnCompleteExecuting(groupName) {
t.Error("group should not be executing initially")
}
// 设置为执行状态
gm.SetOnCompleteExecuting(groupName, true)
if !gm.IsOnCompleteExecuting(groupName) {
t.Error("group should be executing after setting to true")
}
// 验证执行组列表
executingGroups := gm.GetExecutingGroups()
if len(executingGroups) != 1 || executingGroups[0] != groupName {
t.Errorf("executing groups = %v, want [%s]", executingGroups, groupName)
}
// 设置为非执行状态
gm.SetOnCompleteExecuting(groupName, false)
if gm.IsOnCompleteExecuting(groupName) {
t.Error("group should not be executing after setting to false")
}
// 验证执行组列表为空
executingGroups = gm.GetExecutingGroups()
if len(executingGroups) != 0 {
t.Errorf("executing groups = %v, want []", executingGroups)
}
}
// TestGroupManagerGetNextID 测试ID生成功能
func TestGroupManagerGetNextID(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "test_group"
// 第一次获取ID应该是1
id1 := gm.GetNextID(groupName)
if id1 != 1 {
t.Errorf("first ID = %d, want 1", id1)
}
// 后续ID应该递增
id2 := gm.GetNextID(groupName)
if id2 != 2 {
t.Errorf("second ID = %d, want 2", id2)
}
id3 := gm.GetNextID(groupName)
if id3 != 3 {
t.Errorf("third ID = %d, want 3", id3)
}
// 验证计数器值
counter := gm.GetGroupCounter(groupName)
if counter != 3 {
t.Errorf("group counter = %d, want 3", counter)
}
}
// TestGroupManagerMultipleGroupCounters 测试多个组的ID计数器
func TestGroupManagerMultipleGroupCounters(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
group1 := "group1"
group2 := "group2"
// 为不同组生成ID
id1_1 := gm.GetNextID(group1)
id2_1 := gm.GetNextID(group2)
id1_2 := gm.GetNextID(group1)
id2_2 := gm.GetNextID(group2)
// 验证每个组的ID独立递增
if id1_1 != 1 || id1_2 != 2 {
t.Errorf("group1 IDs = [%d, %d], want [1, 2]", id1_1, id1_2)
}
if id2_1 != 1 || id2_2 != 2 {
t.Errorf("group2 IDs = [%d, %d], want [1, 2]", id2_1, id2_2)
}
// 验证计数器值
if gm.GetGroupCounter(group1) != 2 {
t.Errorf("group1 counter = %d, want 2", gm.GetGroupCounter(group1))
}
if gm.GetGroupCounter(group2) != 2 {
t.Errorf("group2 counter = %d, want 2", gm.GetGroupCounter(group2))
}
}
// TestGroupManagerIncrementStats 测试统计信息增量更新
func TestGroupManagerIncrementStats(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "test_group"
// 增加不同状态的记录
gm.IncrementStats(groupName, StatusHot)
gm.IncrementStats(groupName, StatusHot)
gm.IncrementStats(groupName, StatusWarm)
gm.IncrementStats(groupName, StatusCold)
// 获取统计信息
stats := gm.GetFastStats()
if len(stats) != 1 {
t.Errorf("stats count = %d, want 1", len(stats))
}
groupStats := stats[groupName]
if groupStats == nil {
t.Fatal("group stats not found")
}
// 验证统计数据
if groupStats.TotalRecords != 4 {
t.Errorf("total records = %d, want 4", groupStats.TotalRecords)
}
if groupStats.HotRecords != 2 {
t.Errorf("hot records = %d, want 2", groupStats.HotRecords)
}
if groupStats.WarmRecords != 1 {
t.Errorf("warm records = %d, want 1", groupStats.WarmRecords)
}
if groupStats.ColdRecords != 1 {
t.Errorf("cold records = %d, want 1", groupStats.ColdRecords)
}
}
// TestGroupManagerUpdateStats 测试统计信息状态转换更新
func TestGroupManagerUpdateStats(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "test_group"
// 先增加一些记录
gm.IncrementStats(groupName, StatusHot)
gm.IncrementStats(groupName, StatusHot)
gm.IncrementStats(groupName, StatusWarm)
// 验证初始状态
stats := gm.GetFastStats()[groupName]
if stats.HotRecords != 2 || stats.WarmRecords != 1 || stats.ColdRecords != 0 {
t.Errorf("initial stats: hot=%d, warm=%d, cold=%d, want hot=2, warm=1, cold=0",
stats.HotRecords, stats.WarmRecords, stats.ColdRecords)
}
// 执行状态转换Hot -> Warm
gm.UpdateStats(groupName, StatusHot, StatusWarm)
// 验证转换后的状态
stats = gm.GetFastStats()[groupName]
if stats.TotalRecords != 3 {
t.Errorf("total records after update = %d, want 3", stats.TotalRecords)
}
if stats.HotRecords != 1 {
t.Errorf("hot records after update = %d, want 1", stats.HotRecords)
}
if stats.WarmRecords != 2 {
t.Errorf("warm records after update = %d, want 2", stats.WarmRecords)
}
if stats.ColdRecords != 0 {
t.Errorf("cold records after update = %d, want 0", stats.ColdRecords)
}
// 执行另一个转换Warm -> Cold
gm.UpdateStats(groupName, StatusWarm, StatusCold)
// 验证第二次转换后的状态
stats = gm.GetFastStats()[groupName]
if stats.HotRecords != 1 || stats.WarmRecords != 1 || stats.ColdRecords != 1 {
t.Errorf("final stats: hot=%d, warm=%d, cold=%d, want hot=1, warm=1, cold=1",
stats.HotRecords, stats.WarmRecords, stats.ColdRecords)
}
}
// TestGroupManagerMultipleGroupStats 测试多个组的统计信息
func TestGroupManagerMultipleGroupStats(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
// 为不同组添加统计数据
groups := map[string]struct {
hot, warm, cold int
}{
"group1": {3, 2, 1},
"group2": {1, 4, 2},
"group3": {2, 1, 3},
}
for groupName, counts := range groups {
for i := 0; i < counts.hot; i++ {
gm.IncrementStats(groupName, StatusHot)
}
for i := 0; i < counts.warm; i++ {
gm.IncrementStats(groupName, StatusWarm)
}
for i := 0; i < counts.cold; i++ {
gm.IncrementStats(groupName, StatusCold)
}
}
// 获取所有统计信息
allStats := gm.GetFastStats()
if len(allStats) != len(groups) {
t.Errorf("stats count = %d, want %d", len(allStats), len(groups))
}
// 验证每个组的统计信息
for groupName, expected := range groups {
stats := allStats[groupName]
if stats == nil {
t.Errorf("stats for group %s not found", groupName)
continue
}
expectedTotal := expected.hot + expected.warm + expected.cold
if stats.TotalRecords != expectedTotal {
t.Errorf("group %s total = %d, want %d", groupName, stats.TotalRecords, expectedTotal)
}
if stats.HotRecords != expected.hot {
t.Errorf("group %s hot = %d, want %d", groupName, stats.HotRecords, expected.hot)
}
if stats.WarmRecords != expected.warm {
t.Errorf("group %s warm = %d, want %d", groupName, stats.WarmRecords, expected.warm)
}
if stats.ColdRecords != expected.cold {
t.Errorf("group %s cold = %d, want %d", groupName, stats.ColdRecords, expected.cold)
}
}
}
// TestGroupManagerStatsIsolation 测试统计信息的数据隔离
func TestGroupManagerStatsIsolation(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "test_group"
gm.IncrementStats(groupName, StatusHot)
// 获取统计信息
stats1 := gm.GetFastStats()
stats2 := gm.GetFastStats()
// 修改第一个返回的统计信息
stats1[groupName].HotRecords = 999
// 验证第二个统计信息未被影响
if stats2[groupName].HotRecords != 1 {
t.Error("stats isolation failed: modifying returned stats affected original data")
}
// 验证原始数据未被影响
stats3 := gm.GetFastStats()
if stats3[groupName].HotRecords != 1 {
t.Error("original stats data was modified")
}
}
// TestGroupManagerConcurrency 测试并发安全性
func TestGroupManagerConcurrency(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
const numGoroutines = 10
const numOperations = 100
var wg sync.WaitGroup
// 启动多个goroutine进行并发操作
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
groupName := "group_" + string(rune('A'+id%3))
for j := 0; j < numOperations; j++ {
// 并发的各种操作
switch j % 6 {
case 0:
gm.PauseGroup(groupName)
case 1:
gm.ResumeGroup(groupName)
case 2:
gm.SetOnCompleteExecuting(groupName, true)
case 3:
gm.SetOnCompleteExecuting(groupName, false)
case 4:
gm.GetNextID(groupName)
case 5:
gm.IncrementStats(groupName, StatusHot)
}
// 并发读取操作
gm.IsPaused(groupName)
gm.IsOnCompleteExecuting(groupName)
gm.GetGroupCounter(groupName)
}
}(i)
}
// 同时进行统计查询
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
gm.GetFastStats()
gm.GetPausedGroups()
gm.GetExecutingGroups()
time.Sleep(time.Millisecond)
}
}()
// 等待所有goroutine完成
wg.Wait()
// 验证管理器仍然可用
testGroup := "final_test"
gm.PauseGroup(testGroup)
if !gm.IsPaused(testGroup) {
t.Error("manager corrupted after concurrent operations")
}
}
// TestGroupManagerIdempotentOperations 测试幂等操作
func TestGroupManagerIdempotentOperations(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "test_group"
// 多次暂停同一组
gm.PauseGroup(groupName)
gm.PauseGroup(groupName)
gm.PauseGroup(groupName)
// 应该只有一个暂停的组
pausedGroups := gm.GetPausedGroups()
if len(pausedGroups) != 1 {
t.Errorf("paused groups count = %d, want 1", len(pausedGroups))
}
// 多次恢复同一组
gm.ResumeGroup(groupName)
gm.ResumeGroup(groupName)
gm.ResumeGroup(groupName)
// 应该没有暂停的组
pausedGroups = gm.GetPausedGroups()
if len(pausedGroups) != 0 {
t.Errorf("paused groups count = %d, want 0", len(pausedGroups))
}
// 多次设置OnComplete执行状态
gm.SetOnCompleteExecuting(groupName, true)
gm.SetOnCompleteExecuting(groupName, true)
executingGroups := gm.GetExecutingGroups()
if len(executingGroups) != 1 {
t.Errorf("executing groups count = %d, want 1", len(executingGroups))
}
gm.SetOnCompleteExecuting(groupName, false)
gm.SetOnCompleteExecuting(groupName, false)
executingGroups = gm.GetExecutingGroups()
if len(executingGroups) != 0 {
t.Errorf("executing groups count = %d, want 0", len(executingGroups))
}
}
// TestGroupManagerGetGroupCounterNonExistent 测试获取不存在组的计数器
func TestGroupManagerGetGroupCounterNonExistent(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
// 获取不存在组的计数器应该返回0
counter := gm.GetGroupCounter("non_existent_group")
if counter != 0 {
t.Errorf("counter for non-existent group = %d, want 0", counter)
}
}
// TestGroupManagerStatsInitialization 测试统计信息的初始化
func TestGroupManagerStatsInitialization(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "test_group"
// 第一次增加统计应该初始化结构
gm.IncrementStats(groupName, StatusHot)
stats := gm.GetFastStats()[groupName]
if stats == nil {
t.Fatal("stats should be initialized after first increment")
}
if stats.TotalRecords != 1 || stats.HotRecords != 1 {
t.Errorf("initial stats: total=%d, hot=%d, want total=1, hot=1",
stats.TotalRecords, stats.HotRecords)
}
if stats.WarmRecords != 0 || stats.ColdRecords != 0 {
t.Errorf("initial stats: warm=%d, cold=%d, want warm=0, cold=0",
stats.WarmRecords, stats.ColdRecords)
}
}
// TestGroupManagerUpdateStatsInitialization 测试更新统计时的初始化
func TestGroupManagerUpdateStatsInitialization(t *testing.T) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "test_group"
// 直接调用UpdateStats应该初始化统计结构
gm.UpdateStats(groupName, StatusHot, StatusWarm)
stats := gm.GetFastStats()[groupName]
if stats == nil {
t.Fatal("stats should be initialized after UpdateStats")
}
// 由于没有Hot记录减少操作不会产生负数但会初始化结构
if stats.TotalRecords != 0 {
t.Errorf("total records = %d, want 0", stats.TotalRecords)
}
}
// BenchmarkGroupManagerGetNextID 性能测试ID生成
func BenchmarkGroupManagerGetNextID(b *testing.B) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "bench_group"
b.ResetTimer()
for i := 0; i < b.N; i++ {
gm.GetNextID(groupName)
}
}
// BenchmarkGroupManagerIncrementStats 性能测试:统计信息增量
func BenchmarkGroupManagerIncrementStats(b *testing.B) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
groupName := "bench_group"
statuses := []DataStatus{StatusHot, StatusWarm, StatusCold}
b.ResetTimer()
for i := 0; i < b.N; i++ {
status := statuses[i%len(statuses)]
gm.IncrementStats(groupName, status)
}
}
// BenchmarkGroupManagerGetFastStats 性能测试:快速统计查询
func BenchmarkGroupManagerGetFastStats(b *testing.B) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
// 预填充一些统计数据
for i := 0; i < 10; i++ {
groupName := "group_" + string(rune('A'+i))
for j := 0; j < 100; j++ {
gm.IncrementStats(groupName, StatusHot)
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
gm.GetFastStats()
}
}
// BenchmarkGroupManagerConcurrentOperations 性能测试:并发操作
func BenchmarkGroupManagerConcurrentOperations(b *testing.B) {
mockDB := NewMockPipelineDBForGroupManager()
gm := NewGroupManager(&mockDB.PipelineDB)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
groupName := "group_" + string(rune('A'+i%5))
switch i % 4 {
case 0:
gm.GetNextID(groupName)
case 1:
gm.IncrementStats(groupName, StatusHot)
case 2:
gm.IsPaused(groupName)
case 3:
gm.GetFastStats()
}
i++
}
})
}