701 lines
18 KiB
Go
701 lines
18 KiB
Go
package pipelinedb
|
||
|
||
import (
|
||
"log/slog"
|
||
"sync"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
// MockPipelineDBForGroupManager 用于测试GroupManager的模拟数据库
|
||
type MockPipelineDBForGroupManager struct {
|
||
PipelineDB // 嵌入PipelineDB以支持类型转换
|
||
pages map[uint16][]byte
|
||
}
|
||
|
||
func NewMockPipelineDBForGroupManager() *MockPipelineDBForGroupManager {
|
||
mock := &MockPipelineDBForGroupManager{
|
||
pages: make(map[uint16][]byte),
|
||
}
|
||
// 初始化logger字段以避免nil指针错误
|
||
mock.PipelineDB.logger = slog.Default()
|
||
// 初始化header以避免nil指针错误
|
||
mock.PipelineDB.header = &Header{
|
||
Magic: 0x50444200, // PDB magic number
|
||
PageSize: PageSize,
|
||
TotalPages: 0,
|
||
FreeHead: 0,
|
||
RootPage: 0,
|
||
CounterPage: 0, // 没有计数器页面
|
||
}
|
||
// 初始化缓存以避免nil指针错误
|
||
mock.PipelineDB.cache = NewPageCache(10)
|
||
// 初始化空闲页面管理器
|
||
mock.PipelineDB.freePageMgr = NewFreePageManager()
|
||
return mock
|
||
}
|
||
|
||
func (db *MockPipelineDBForGroupManager) readPage(pageNo uint16) ([]byte, error) {
|
||
if page, exists := db.pages[pageNo]; exists {
|
||
return page, nil
|
||
}
|
||
return make([]byte, PageSize), nil
|
||
}
|
||
|
||
func (db *MockPipelineDBForGroupManager) saveHeader() error {
|
||
return nil
|
||
}
|
||
|
||
// TestNewGroupManager 测试组管理器的创建
|
||
func TestNewGroupManager(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
if gm == nil {
|
||
t.Fatal("NewGroupManager returned nil")
|
||
}
|
||
|
||
if gm.pdb == nil {
|
||
t.Error("database reference not set correctly")
|
||
}
|
||
|
||
// 验证初始状态
|
||
if len(gm.GetPausedGroups()) != 0 {
|
||
t.Error("should have no paused groups initially")
|
||
}
|
||
|
||
if len(gm.GetExecutingGroups()) != 0 {
|
||
t.Error("should have no executing groups initially")
|
||
}
|
||
|
||
stats := gm.GetFastStats()
|
||
if len(stats) != 0 {
|
||
t.Error("should have no stats initially")
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerPauseResume 测试组的暂停和恢复
|
||
func TestGroupManagerPauseResume(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "test_group"
|
||
|
||
// 初始状态应该是活跃的
|
||
if gm.IsPaused(groupName) {
|
||
t.Error("group should be active initially")
|
||
}
|
||
|
||
if gm.GetGroupStatus(groupName) != "active" {
|
||
t.Errorf("group status = %s, want 'active'", gm.GetGroupStatus(groupName))
|
||
}
|
||
|
||
// 暂停组
|
||
gm.PauseGroup(groupName)
|
||
|
||
if !gm.IsPaused(groupName) {
|
||
t.Error("group should be paused after PauseGroup")
|
||
}
|
||
|
||
if gm.GetGroupStatus(groupName) != "paused" {
|
||
t.Errorf("group status = %s, want 'paused'", gm.GetGroupStatus(groupName))
|
||
}
|
||
|
||
// 验证暂停组列表
|
||
pausedGroups := gm.GetPausedGroups()
|
||
if len(pausedGroups) != 1 || pausedGroups[0] != groupName {
|
||
t.Errorf("paused groups = %v, want [%s]", pausedGroups, groupName)
|
||
}
|
||
|
||
// 恢复组
|
||
gm.ResumeGroup(groupName)
|
||
|
||
if gm.IsPaused(groupName) {
|
||
t.Error("group should be active after ResumeGroup")
|
||
}
|
||
|
||
if gm.GetGroupStatus(groupName) != "active" {
|
||
t.Errorf("group status = %s, want 'active'", gm.GetGroupStatus(groupName))
|
||
}
|
||
|
||
// 验证暂停组列表为空
|
||
pausedGroups = gm.GetPausedGroups()
|
||
if len(pausedGroups) != 0 {
|
||
t.Errorf("paused groups = %v, want []", pausedGroups)
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerMultipleGroups 测试多个组的管理
|
||
func TestGroupManagerMultipleGroups(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groups := []string{"group1", "group2", "group3"}
|
||
|
||
// 暂停部分组
|
||
gm.PauseGroup(groups[0])
|
||
gm.PauseGroup(groups[2])
|
||
|
||
// 验证状态
|
||
if !gm.IsPaused(groups[0]) {
|
||
t.Errorf("group %s should be paused", groups[0])
|
||
}
|
||
|
||
if gm.IsPaused(groups[1]) {
|
||
t.Errorf("group %s should be active", groups[1])
|
||
}
|
||
|
||
if !gm.IsPaused(groups[2]) {
|
||
t.Errorf("group %s should be paused", groups[2])
|
||
}
|
||
|
||
// 验证暂停组列表
|
||
pausedGroups := gm.GetPausedGroups()
|
||
if len(pausedGroups) != 2 {
|
||
t.Errorf("paused groups count = %d, want 2", len(pausedGroups))
|
||
}
|
||
|
||
// 验证包含正确的组(顺序可能不同)
|
||
pausedMap := make(map[string]bool)
|
||
for _, group := range pausedGroups {
|
||
pausedMap[group] = true
|
||
}
|
||
|
||
if !pausedMap[groups[0]] || !pausedMap[groups[2]] {
|
||
t.Errorf("paused groups = %v, should contain %s and %s", pausedGroups, groups[0], groups[2])
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerOnCompleteExecution 测试OnComplete执行状态管理
|
||
func TestGroupManagerOnCompleteExecution(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "test_group"
|
||
|
||
// 初始状态不应该在执行
|
||
if gm.IsOnCompleteExecuting(groupName) {
|
||
t.Error("group should not be executing initially")
|
||
}
|
||
|
||
// 设置为执行状态
|
||
gm.SetOnCompleteExecuting(groupName, true)
|
||
|
||
if !gm.IsOnCompleteExecuting(groupName) {
|
||
t.Error("group should be executing after setting to true")
|
||
}
|
||
|
||
// 验证执行组列表
|
||
executingGroups := gm.GetExecutingGroups()
|
||
if len(executingGroups) != 1 || executingGroups[0] != groupName {
|
||
t.Errorf("executing groups = %v, want [%s]", executingGroups, groupName)
|
||
}
|
||
|
||
// 设置为非执行状态
|
||
gm.SetOnCompleteExecuting(groupName, false)
|
||
|
||
if gm.IsOnCompleteExecuting(groupName) {
|
||
t.Error("group should not be executing after setting to false")
|
||
}
|
||
|
||
// 验证执行组列表为空
|
||
executingGroups = gm.GetExecutingGroups()
|
||
if len(executingGroups) != 0 {
|
||
t.Errorf("executing groups = %v, want []", executingGroups)
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerGetNextID 测试ID生成功能
|
||
func TestGroupManagerGetNextID(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "test_group"
|
||
|
||
// 第一次获取ID应该是1
|
||
id1 := gm.GetNextID(groupName)
|
||
if id1 != 1 {
|
||
t.Errorf("first ID = %d, want 1", id1)
|
||
}
|
||
|
||
// 后续ID应该递增
|
||
id2 := gm.GetNextID(groupName)
|
||
if id2 != 2 {
|
||
t.Errorf("second ID = %d, want 2", id2)
|
||
}
|
||
|
||
id3 := gm.GetNextID(groupName)
|
||
if id3 != 3 {
|
||
t.Errorf("third ID = %d, want 3", id3)
|
||
}
|
||
|
||
// 验证计数器值
|
||
counter := gm.GetGroupCounter(groupName)
|
||
if counter != 3 {
|
||
t.Errorf("group counter = %d, want 3", counter)
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerMultipleGroupCounters 测试多个组的ID计数器
|
||
func TestGroupManagerMultipleGroupCounters(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
group1 := "group1"
|
||
group2 := "group2"
|
||
|
||
// 为不同组生成ID
|
||
id1_1 := gm.GetNextID(group1)
|
||
id2_1 := gm.GetNextID(group2)
|
||
id1_2 := gm.GetNextID(group1)
|
||
id2_2 := gm.GetNextID(group2)
|
||
|
||
// 验证每个组的ID独立递增
|
||
if id1_1 != 1 || id1_2 != 2 {
|
||
t.Errorf("group1 IDs = [%d, %d], want [1, 2]", id1_1, id1_2)
|
||
}
|
||
|
||
if id2_1 != 1 || id2_2 != 2 {
|
||
t.Errorf("group2 IDs = [%d, %d], want [1, 2]", id2_1, id2_2)
|
||
}
|
||
|
||
// 验证计数器值
|
||
if gm.GetGroupCounter(group1) != 2 {
|
||
t.Errorf("group1 counter = %d, want 2", gm.GetGroupCounter(group1))
|
||
}
|
||
|
||
if gm.GetGroupCounter(group2) != 2 {
|
||
t.Errorf("group2 counter = %d, want 2", gm.GetGroupCounter(group2))
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerIncrementStats 测试统计信息增量更新
|
||
func TestGroupManagerIncrementStats(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "test_group"
|
||
|
||
// 增加不同状态的记录
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
gm.IncrementStats(groupName, StatusWarm)
|
||
gm.IncrementStats(groupName, StatusCold)
|
||
|
||
// 获取统计信息
|
||
stats := gm.GetFastStats()
|
||
|
||
if len(stats) != 1 {
|
||
t.Errorf("stats count = %d, want 1", len(stats))
|
||
}
|
||
|
||
groupStats := stats[groupName]
|
||
if groupStats == nil {
|
||
t.Fatal("group stats not found")
|
||
}
|
||
|
||
// 验证统计数据
|
||
if groupStats.TotalRecords != 4 {
|
||
t.Errorf("total records = %d, want 4", groupStats.TotalRecords)
|
||
}
|
||
|
||
if groupStats.HotRecords != 2 {
|
||
t.Errorf("hot records = %d, want 2", groupStats.HotRecords)
|
||
}
|
||
|
||
if groupStats.WarmRecords != 1 {
|
||
t.Errorf("warm records = %d, want 1", groupStats.WarmRecords)
|
||
}
|
||
|
||
if groupStats.ColdRecords != 1 {
|
||
t.Errorf("cold records = %d, want 1", groupStats.ColdRecords)
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerUpdateStats 测试统计信息状态转换更新
|
||
func TestGroupManagerUpdateStats(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "test_group"
|
||
|
||
// 先增加一些记录
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
gm.IncrementStats(groupName, StatusWarm)
|
||
|
||
// 验证初始状态
|
||
stats := gm.GetFastStats()[groupName]
|
||
if stats.HotRecords != 2 || stats.WarmRecords != 1 || stats.ColdRecords != 0 {
|
||
t.Errorf("initial stats: hot=%d, warm=%d, cold=%d, want hot=2, warm=1, cold=0",
|
||
stats.HotRecords, stats.WarmRecords, stats.ColdRecords)
|
||
}
|
||
|
||
// 执行状态转换:Hot -> Warm
|
||
gm.UpdateStats(groupName, StatusHot, StatusWarm)
|
||
|
||
// 验证转换后的状态
|
||
stats = gm.GetFastStats()[groupName]
|
||
if stats.TotalRecords != 3 {
|
||
t.Errorf("total records after update = %d, want 3", stats.TotalRecords)
|
||
}
|
||
|
||
if stats.HotRecords != 1 {
|
||
t.Errorf("hot records after update = %d, want 1", stats.HotRecords)
|
||
}
|
||
|
||
if stats.WarmRecords != 2 {
|
||
t.Errorf("warm records after update = %d, want 2", stats.WarmRecords)
|
||
}
|
||
|
||
if stats.ColdRecords != 0 {
|
||
t.Errorf("cold records after update = %d, want 0", stats.ColdRecords)
|
||
}
|
||
|
||
// 执行另一个转换:Warm -> Cold
|
||
gm.UpdateStats(groupName, StatusWarm, StatusCold)
|
||
|
||
// 验证第二次转换后的状态
|
||
stats = gm.GetFastStats()[groupName]
|
||
if stats.HotRecords != 1 || stats.WarmRecords != 1 || stats.ColdRecords != 1 {
|
||
t.Errorf("final stats: hot=%d, warm=%d, cold=%d, want hot=1, warm=1, cold=1",
|
||
stats.HotRecords, stats.WarmRecords, stats.ColdRecords)
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerMultipleGroupStats 测试多个组的统计信息
|
||
func TestGroupManagerMultipleGroupStats(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
// 为不同组添加统计数据
|
||
groups := map[string]struct {
|
||
hot, warm, cold int
|
||
}{
|
||
"group1": {3, 2, 1},
|
||
"group2": {1, 4, 2},
|
||
"group3": {2, 1, 3},
|
||
}
|
||
|
||
for groupName, counts := range groups {
|
||
for i := 0; i < counts.hot; i++ {
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
}
|
||
for i := 0; i < counts.warm; i++ {
|
||
gm.IncrementStats(groupName, StatusWarm)
|
||
}
|
||
for i := 0; i < counts.cold; i++ {
|
||
gm.IncrementStats(groupName, StatusCold)
|
||
}
|
||
}
|
||
|
||
// 获取所有统计信息
|
||
allStats := gm.GetFastStats()
|
||
|
||
if len(allStats) != len(groups) {
|
||
t.Errorf("stats count = %d, want %d", len(allStats), len(groups))
|
||
}
|
||
|
||
// 验证每个组的统计信息
|
||
for groupName, expected := range groups {
|
||
stats := allStats[groupName]
|
||
if stats == nil {
|
||
t.Errorf("stats for group %s not found", groupName)
|
||
continue
|
||
}
|
||
|
||
expectedTotal := expected.hot + expected.warm + expected.cold
|
||
if stats.TotalRecords != expectedTotal {
|
||
t.Errorf("group %s total = %d, want %d", groupName, stats.TotalRecords, expectedTotal)
|
||
}
|
||
|
||
if stats.HotRecords != expected.hot {
|
||
t.Errorf("group %s hot = %d, want %d", groupName, stats.HotRecords, expected.hot)
|
||
}
|
||
|
||
if stats.WarmRecords != expected.warm {
|
||
t.Errorf("group %s warm = %d, want %d", groupName, stats.WarmRecords, expected.warm)
|
||
}
|
||
|
||
if stats.ColdRecords != expected.cold {
|
||
t.Errorf("group %s cold = %d, want %d", groupName, stats.ColdRecords, expected.cold)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerStatsIsolation 测试统计信息的数据隔离
|
||
func TestGroupManagerStatsIsolation(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "test_group"
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
|
||
// 获取统计信息
|
||
stats1 := gm.GetFastStats()
|
||
stats2 := gm.GetFastStats()
|
||
|
||
// 修改第一个返回的统计信息
|
||
stats1[groupName].HotRecords = 999
|
||
|
||
// 验证第二个统计信息未被影响
|
||
if stats2[groupName].HotRecords != 1 {
|
||
t.Error("stats isolation failed: modifying returned stats affected original data")
|
||
}
|
||
|
||
// 验证原始数据未被影响
|
||
stats3 := gm.GetFastStats()
|
||
if stats3[groupName].HotRecords != 1 {
|
||
t.Error("original stats data was modified")
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerConcurrency 测试并发安全性
|
||
func TestGroupManagerConcurrency(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
const numGoroutines = 10
|
||
const numOperations = 100
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动多个goroutine进行并发操作
|
||
for i := 0; i < numGoroutines; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
|
||
groupName := "group_" + string(rune('A'+id%3))
|
||
|
||
for j := 0; j < numOperations; j++ {
|
||
// 并发的各种操作
|
||
switch j % 6 {
|
||
case 0:
|
||
gm.PauseGroup(groupName)
|
||
case 1:
|
||
gm.ResumeGroup(groupName)
|
||
case 2:
|
||
gm.SetOnCompleteExecuting(groupName, true)
|
||
case 3:
|
||
gm.SetOnCompleteExecuting(groupName, false)
|
||
case 4:
|
||
gm.GetNextID(groupName)
|
||
case 5:
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
}
|
||
|
||
// 并发读取操作
|
||
gm.IsPaused(groupName)
|
||
gm.IsOnCompleteExecuting(groupName)
|
||
gm.GetGroupCounter(groupName)
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
// 同时进行统计查询
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
for i := 0; i < 50; i++ {
|
||
gm.GetFastStats()
|
||
gm.GetPausedGroups()
|
||
gm.GetExecutingGroups()
|
||
time.Sleep(time.Millisecond)
|
||
}
|
||
}()
|
||
|
||
// 等待所有goroutine完成
|
||
wg.Wait()
|
||
|
||
// 验证管理器仍然可用
|
||
testGroup := "final_test"
|
||
gm.PauseGroup(testGroup)
|
||
if !gm.IsPaused(testGroup) {
|
||
t.Error("manager corrupted after concurrent operations")
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerIdempotentOperations 测试幂等操作
|
||
func TestGroupManagerIdempotentOperations(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "test_group"
|
||
|
||
// 多次暂停同一组
|
||
gm.PauseGroup(groupName)
|
||
gm.PauseGroup(groupName)
|
||
gm.PauseGroup(groupName)
|
||
|
||
// 应该只有一个暂停的组
|
||
pausedGroups := gm.GetPausedGroups()
|
||
if len(pausedGroups) != 1 {
|
||
t.Errorf("paused groups count = %d, want 1", len(pausedGroups))
|
||
}
|
||
|
||
// 多次恢复同一组
|
||
gm.ResumeGroup(groupName)
|
||
gm.ResumeGroup(groupName)
|
||
gm.ResumeGroup(groupName)
|
||
|
||
// 应该没有暂停的组
|
||
pausedGroups = gm.GetPausedGroups()
|
||
if len(pausedGroups) != 0 {
|
||
t.Errorf("paused groups count = %d, want 0", len(pausedGroups))
|
||
}
|
||
|
||
// 多次设置OnComplete执行状态
|
||
gm.SetOnCompleteExecuting(groupName, true)
|
||
gm.SetOnCompleteExecuting(groupName, true)
|
||
|
||
executingGroups := gm.GetExecutingGroups()
|
||
if len(executingGroups) != 1 {
|
||
t.Errorf("executing groups count = %d, want 1", len(executingGroups))
|
||
}
|
||
|
||
gm.SetOnCompleteExecuting(groupName, false)
|
||
gm.SetOnCompleteExecuting(groupName, false)
|
||
|
||
executingGroups = gm.GetExecutingGroups()
|
||
if len(executingGroups) != 0 {
|
||
t.Errorf("executing groups count = %d, want 0", len(executingGroups))
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerGetGroupCounterNonExistent 测试获取不存在组的计数器
|
||
func TestGroupManagerGetGroupCounterNonExistent(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
// 获取不存在组的计数器应该返回0
|
||
counter := gm.GetGroupCounter("non_existent_group")
|
||
if counter != 0 {
|
||
t.Errorf("counter for non-existent group = %d, want 0", counter)
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerStatsInitialization 测试统计信息的初始化
|
||
func TestGroupManagerStatsInitialization(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "test_group"
|
||
|
||
// 第一次增加统计应该初始化结构
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
|
||
stats := gm.GetFastStats()[groupName]
|
||
if stats == nil {
|
||
t.Fatal("stats should be initialized after first increment")
|
||
}
|
||
|
||
if stats.TotalRecords != 1 || stats.HotRecords != 1 {
|
||
t.Errorf("initial stats: total=%d, hot=%d, want total=1, hot=1",
|
||
stats.TotalRecords, stats.HotRecords)
|
||
}
|
||
|
||
if stats.WarmRecords != 0 || stats.ColdRecords != 0 {
|
||
t.Errorf("initial stats: warm=%d, cold=%d, want warm=0, cold=0",
|
||
stats.WarmRecords, stats.ColdRecords)
|
||
}
|
||
}
|
||
|
||
// TestGroupManagerUpdateStatsInitialization 测试更新统计时的初始化
|
||
func TestGroupManagerUpdateStatsInitialization(t *testing.T) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "test_group"
|
||
|
||
// 直接调用UpdateStats应该初始化统计结构
|
||
gm.UpdateStats(groupName, StatusHot, StatusWarm)
|
||
|
||
stats := gm.GetFastStats()[groupName]
|
||
if stats == nil {
|
||
t.Fatal("stats should be initialized after UpdateStats")
|
||
}
|
||
|
||
// 由于没有Hot记录,减少操作不会产生负数,但会初始化结构
|
||
if stats.TotalRecords != 0 {
|
||
t.Errorf("total records = %d, want 0", stats.TotalRecords)
|
||
}
|
||
}
|
||
|
||
// BenchmarkGroupManagerGetNextID 性能测试:ID生成
|
||
func BenchmarkGroupManagerGetNextID(b *testing.B) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "bench_group"
|
||
|
||
b.ResetTimer()
|
||
|
||
for i := 0; i < b.N; i++ {
|
||
gm.GetNextID(groupName)
|
||
}
|
||
}
|
||
|
||
// BenchmarkGroupManagerIncrementStats 性能测试:统计信息增量
|
||
func BenchmarkGroupManagerIncrementStats(b *testing.B) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
groupName := "bench_group"
|
||
statuses := []DataStatus{StatusHot, StatusWarm, StatusCold}
|
||
|
||
b.ResetTimer()
|
||
|
||
for i := 0; i < b.N; i++ {
|
||
status := statuses[i%len(statuses)]
|
||
gm.IncrementStats(groupName, status)
|
||
}
|
||
}
|
||
|
||
// BenchmarkGroupManagerGetFastStats 性能测试:快速统计查询
|
||
func BenchmarkGroupManagerGetFastStats(b *testing.B) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
// 预填充一些统计数据
|
||
for i := 0; i < 10; i++ {
|
||
groupName := "group_" + string(rune('A'+i))
|
||
for j := 0; j < 100; j++ {
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
}
|
||
}
|
||
|
||
b.ResetTimer()
|
||
|
||
for i := 0; i < b.N; i++ {
|
||
gm.GetFastStats()
|
||
}
|
||
}
|
||
|
||
// BenchmarkGroupManagerConcurrentOperations 性能测试:并发操作
|
||
func BenchmarkGroupManagerConcurrentOperations(b *testing.B) {
|
||
mockDB := NewMockPipelineDBForGroupManager()
|
||
gm := NewGroupManager(&mockDB.PipelineDB)
|
||
|
||
b.ResetTimer()
|
||
|
||
b.RunParallel(func(pb *testing.PB) {
|
||
i := 0
|
||
for pb.Next() {
|
||
groupName := "group_" + string(rune('A'+i%5))
|
||
|
||
switch i % 4 {
|
||
case 0:
|
||
gm.GetNextID(groupName)
|
||
case 1:
|
||
gm.IncrementStats(groupName, StatusHot)
|
||
case 2:
|
||
gm.IsPaused(groupName)
|
||
case 3:
|
||
gm.GetFastStats()
|
||
}
|
||
i++
|
||
}
|
||
})
|
||
}
|