701 lines
18 KiB
Go
701 lines
18 KiB
Go
|
|
package pipelinedb
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"log/slog"
|
|||
|
|
"sync"
|
|||
|
|
"testing"
|
|||
|
|
"time"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// MockPipelineDBForGroupManager 用于测试GroupManager的模拟数据库
|
|||
|
|
type MockPipelineDBForGroupManager struct {
|
|||
|
|
PipelineDB // 嵌入PipelineDB以支持类型转换
|
|||
|
|
pages map[uint16][]byte
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func NewMockPipelineDBForGroupManager() *MockPipelineDBForGroupManager {
|
|||
|
|
mock := &MockPipelineDBForGroupManager{
|
|||
|
|
pages: make(map[uint16][]byte),
|
|||
|
|
}
|
|||
|
|
// 初始化logger字段以避免nil指针错误
|
|||
|
|
mock.PipelineDB.logger = slog.Default()
|
|||
|
|
// 初始化header以避免nil指针错误
|
|||
|
|
mock.PipelineDB.header = &Header{
|
|||
|
|
Magic: 0x50444200, // PDB magic number
|
|||
|
|
PageSize: PageSize,
|
|||
|
|
TotalPages: 0,
|
|||
|
|
FreeHead: 0,
|
|||
|
|
RootPage: 0,
|
|||
|
|
CounterPage: 0, // 没有计数器页面
|
|||
|
|
}
|
|||
|
|
// 初始化缓存以避免nil指针错误
|
|||
|
|
mock.PipelineDB.cache = NewPageCache(10)
|
|||
|
|
// 初始化空闲页面管理器
|
|||
|
|
mock.PipelineDB.freePageMgr = NewFreePageManager()
|
|||
|
|
return mock
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (db *MockPipelineDBForGroupManager) readPage(pageNo uint16) ([]byte, error) {
|
|||
|
|
if page, exists := db.pages[pageNo]; exists {
|
|||
|
|
return page, nil
|
|||
|
|
}
|
|||
|
|
return make([]byte, PageSize), nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (db *MockPipelineDBForGroupManager) saveHeader() error {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestNewGroupManager 测试组管理器的创建
|
|||
|
|
func TestNewGroupManager(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
if gm == nil {
|
|||
|
|
t.Fatal("NewGroupManager returned nil")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if gm.pdb == nil {
|
|||
|
|
t.Error("database reference not set correctly")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证初始状态
|
|||
|
|
if len(gm.GetPausedGroups()) != 0 {
|
|||
|
|
t.Error("should have no paused groups initially")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if len(gm.GetExecutingGroups()) != 0 {
|
|||
|
|
t.Error("should have no executing groups initially")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
stats := gm.GetFastStats()
|
|||
|
|
if len(stats) != 0 {
|
|||
|
|
t.Error("should have no stats initially")
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerPauseResume 测试组的暂停和恢复
|
|||
|
|
func TestGroupManagerPauseResume(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "test_group"
|
|||
|
|
|
|||
|
|
// 初始状态应该是活跃的
|
|||
|
|
if gm.IsPaused(groupName) {
|
|||
|
|
t.Error("group should be active initially")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if gm.GetGroupStatus(groupName) != "active" {
|
|||
|
|
t.Errorf("group status = %s, want 'active'", gm.GetGroupStatus(groupName))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 暂停组
|
|||
|
|
gm.PauseGroup(groupName)
|
|||
|
|
|
|||
|
|
if !gm.IsPaused(groupName) {
|
|||
|
|
t.Error("group should be paused after PauseGroup")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if gm.GetGroupStatus(groupName) != "paused" {
|
|||
|
|
t.Errorf("group status = %s, want 'paused'", gm.GetGroupStatus(groupName))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证暂停组列表
|
|||
|
|
pausedGroups := gm.GetPausedGroups()
|
|||
|
|
if len(pausedGroups) != 1 || pausedGroups[0] != groupName {
|
|||
|
|
t.Errorf("paused groups = %v, want [%s]", pausedGroups, groupName)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 恢复组
|
|||
|
|
gm.ResumeGroup(groupName)
|
|||
|
|
|
|||
|
|
if gm.IsPaused(groupName) {
|
|||
|
|
t.Error("group should be active after ResumeGroup")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if gm.GetGroupStatus(groupName) != "active" {
|
|||
|
|
t.Errorf("group status = %s, want 'active'", gm.GetGroupStatus(groupName))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证暂停组列表为空
|
|||
|
|
pausedGroups = gm.GetPausedGroups()
|
|||
|
|
if len(pausedGroups) != 0 {
|
|||
|
|
t.Errorf("paused groups = %v, want []", pausedGroups)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerMultipleGroups 测试多个组的管理
|
|||
|
|
func TestGroupManagerMultipleGroups(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groups := []string{"group1", "group2", "group3"}
|
|||
|
|
|
|||
|
|
// 暂停部分组
|
|||
|
|
gm.PauseGroup(groups[0])
|
|||
|
|
gm.PauseGroup(groups[2])
|
|||
|
|
|
|||
|
|
// 验证状态
|
|||
|
|
if !gm.IsPaused(groups[0]) {
|
|||
|
|
t.Errorf("group %s should be paused", groups[0])
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if gm.IsPaused(groups[1]) {
|
|||
|
|
t.Errorf("group %s should be active", groups[1])
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if !gm.IsPaused(groups[2]) {
|
|||
|
|
t.Errorf("group %s should be paused", groups[2])
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证暂停组列表
|
|||
|
|
pausedGroups := gm.GetPausedGroups()
|
|||
|
|
if len(pausedGroups) != 2 {
|
|||
|
|
t.Errorf("paused groups count = %d, want 2", len(pausedGroups))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证包含正确的组(顺序可能不同)
|
|||
|
|
pausedMap := make(map[string]bool)
|
|||
|
|
for _, group := range pausedGroups {
|
|||
|
|
pausedMap[group] = true
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if !pausedMap[groups[0]] || !pausedMap[groups[2]] {
|
|||
|
|
t.Errorf("paused groups = %v, should contain %s and %s", pausedGroups, groups[0], groups[2])
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerOnCompleteExecution 测试OnComplete执行状态管理
|
|||
|
|
func TestGroupManagerOnCompleteExecution(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "test_group"
|
|||
|
|
|
|||
|
|
// 初始状态不应该在执行
|
|||
|
|
if gm.IsOnCompleteExecuting(groupName) {
|
|||
|
|
t.Error("group should not be executing initially")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 设置为执行状态
|
|||
|
|
gm.SetOnCompleteExecuting(groupName, true)
|
|||
|
|
|
|||
|
|
if !gm.IsOnCompleteExecuting(groupName) {
|
|||
|
|
t.Error("group should be executing after setting to true")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证执行组列表
|
|||
|
|
executingGroups := gm.GetExecutingGroups()
|
|||
|
|
if len(executingGroups) != 1 || executingGroups[0] != groupName {
|
|||
|
|
t.Errorf("executing groups = %v, want [%s]", executingGroups, groupName)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 设置为非执行状态
|
|||
|
|
gm.SetOnCompleteExecuting(groupName, false)
|
|||
|
|
|
|||
|
|
if gm.IsOnCompleteExecuting(groupName) {
|
|||
|
|
t.Error("group should not be executing after setting to false")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证执行组列表为空
|
|||
|
|
executingGroups = gm.GetExecutingGroups()
|
|||
|
|
if len(executingGroups) != 0 {
|
|||
|
|
t.Errorf("executing groups = %v, want []", executingGroups)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerGetNextID 测试ID生成功能
|
|||
|
|
func TestGroupManagerGetNextID(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "test_group"
|
|||
|
|
|
|||
|
|
// 第一次获取ID应该是1
|
|||
|
|
id1 := gm.GetNextID(groupName)
|
|||
|
|
if id1 != 1 {
|
|||
|
|
t.Errorf("first ID = %d, want 1", id1)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 后续ID应该递增
|
|||
|
|
id2 := gm.GetNextID(groupName)
|
|||
|
|
if id2 != 2 {
|
|||
|
|
t.Errorf("second ID = %d, want 2", id2)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
id3 := gm.GetNextID(groupName)
|
|||
|
|
if id3 != 3 {
|
|||
|
|
t.Errorf("third ID = %d, want 3", id3)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证计数器值
|
|||
|
|
counter := gm.GetGroupCounter(groupName)
|
|||
|
|
if counter != 3 {
|
|||
|
|
t.Errorf("group counter = %d, want 3", counter)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerMultipleGroupCounters 测试多个组的ID计数器
|
|||
|
|
func TestGroupManagerMultipleGroupCounters(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
group1 := "group1"
|
|||
|
|
group2 := "group2"
|
|||
|
|
|
|||
|
|
// 为不同组生成ID
|
|||
|
|
id1_1 := gm.GetNextID(group1)
|
|||
|
|
id2_1 := gm.GetNextID(group2)
|
|||
|
|
id1_2 := gm.GetNextID(group1)
|
|||
|
|
id2_2 := gm.GetNextID(group2)
|
|||
|
|
|
|||
|
|
// 验证每个组的ID独立递增
|
|||
|
|
if id1_1 != 1 || id1_2 != 2 {
|
|||
|
|
t.Errorf("group1 IDs = [%d, %d], want [1, 2]", id1_1, id1_2)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if id2_1 != 1 || id2_2 != 2 {
|
|||
|
|
t.Errorf("group2 IDs = [%d, %d], want [1, 2]", id2_1, id2_2)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证计数器值
|
|||
|
|
if gm.GetGroupCounter(group1) != 2 {
|
|||
|
|
t.Errorf("group1 counter = %d, want 2", gm.GetGroupCounter(group1))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if gm.GetGroupCounter(group2) != 2 {
|
|||
|
|
t.Errorf("group2 counter = %d, want 2", gm.GetGroupCounter(group2))
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerIncrementStats 测试统计信息增量更新
|
|||
|
|
func TestGroupManagerIncrementStats(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "test_group"
|
|||
|
|
|
|||
|
|
// 增加不同状态的记录
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
gm.IncrementStats(groupName, StatusWarm)
|
|||
|
|
gm.IncrementStats(groupName, StatusCold)
|
|||
|
|
|
|||
|
|
// 获取统计信息
|
|||
|
|
stats := gm.GetFastStats()
|
|||
|
|
|
|||
|
|
if len(stats) != 1 {
|
|||
|
|
t.Errorf("stats count = %d, want 1", len(stats))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
groupStats := stats[groupName]
|
|||
|
|
if groupStats == nil {
|
|||
|
|
t.Fatal("group stats not found")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证统计数据
|
|||
|
|
if groupStats.TotalRecords != 4 {
|
|||
|
|
t.Errorf("total records = %d, want 4", groupStats.TotalRecords)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if groupStats.HotRecords != 2 {
|
|||
|
|
t.Errorf("hot records = %d, want 2", groupStats.HotRecords)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if groupStats.WarmRecords != 1 {
|
|||
|
|
t.Errorf("warm records = %d, want 1", groupStats.WarmRecords)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if groupStats.ColdRecords != 1 {
|
|||
|
|
t.Errorf("cold records = %d, want 1", groupStats.ColdRecords)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerUpdateStats 测试统计信息状态转换更新
|
|||
|
|
func TestGroupManagerUpdateStats(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "test_group"
|
|||
|
|
|
|||
|
|
// 先增加一些记录
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
gm.IncrementStats(groupName, StatusWarm)
|
|||
|
|
|
|||
|
|
// 验证初始状态
|
|||
|
|
stats := gm.GetFastStats()[groupName]
|
|||
|
|
if stats.HotRecords != 2 || stats.WarmRecords != 1 || stats.ColdRecords != 0 {
|
|||
|
|
t.Errorf("initial stats: hot=%d, warm=%d, cold=%d, want hot=2, warm=1, cold=0",
|
|||
|
|
stats.HotRecords, stats.WarmRecords, stats.ColdRecords)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 执行状态转换:Hot -> Warm
|
|||
|
|
gm.UpdateStats(groupName, StatusHot, StatusWarm)
|
|||
|
|
|
|||
|
|
// 验证转换后的状态
|
|||
|
|
stats = gm.GetFastStats()[groupName]
|
|||
|
|
if stats.TotalRecords != 3 {
|
|||
|
|
t.Errorf("total records after update = %d, want 3", stats.TotalRecords)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if stats.HotRecords != 1 {
|
|||
|
|
t.Errorf("hot records after update = %d, want 1", stats.HotRecords)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if stats.WarmRecords != 2 {
|
|||
|
|
t.Errorf("warm records after update = %d, want 2", stats.WarmRecords)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if stats.ColdRecords != 0 {
|
|||
|
|
t.Errorf("cold records after update = %d, want 0", stats.ColdRecords)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 执行另一个转换:Warm -> Cold
|
|||
|
|
gm.UpdateStats(groupName, StatusWarm, StatusCold)
|
|||
|
|
|
|||
|
|
// 验证第二次转换后的状态
|
|||
|
|
stats = gm.GetFastStats()[groupName]
|
|||
|
|
if stats.HotRecords != 1 || stats.WarmRecords != 1 || stats.ColdRecords != 1 {
|
|||
|
|
t.Errorf("final stats: hot=%d, warm=%d, cold=%d, want hot=1, warm=1, cold=1",
|
|||
|
|
stats.HotRecords, stats.WarmRecords, stats.ColdRecords)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerMultipleGroupStats 测试多个组的统计信息
|
|||
|
|
func TestGroupManagerMultipleGroupStats(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
// 为不同组添加统计数据
|
|||
|
|
groups := map[string]struct {
|
|||
|
|
hot, warm, cold int
|
|||
|
|
}{
|
|||
|
|
"group1": {3, 2, 1},
|
|||
|
|
"group2": {1, 4, 2},
|
|||
|
|
"group3": {2, 1, 3},
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for groupName, counts := range groups {
|
|||
|
|
for i := 0; i < counts.hot; i++ {
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
}
|
|||
|
|
for i := 0; i < counts.warm; i++ {
|
|||
|
|
gm.IncrementStats(groupName, StatusWarm)
|
|||
|
|
}
|
|||
|
|
for i := 0; i < counts.cold; i++ {
|
|||
|
|
gm.IncrementStats(groupName, StatusCold)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 获取所有统计信息
|
|||
|
|
allStats := gm.GetFastStats()
|
|||
|
|
|
|||
|
|
if len(allStats) != len(groups) {
|
|||
|
|
t.Errorf("stats count = %d, want %d", len(allStats), len(groups))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证每个组的统计信息
|
|||
|
|
for groupName, expected := range groups {
|
|||
|
|
stats := allStats[groupName]
|
|||
|
|
if stats == nil {
|
|||
|
|
t.Errorf("stats for group %s not found", groupName)
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
expectedTotal := expected.hot + expected.warm + expected.cold
|
|||
|
|
if stats.TotalRecords != expectedTotal {
|
|||
|
|
t.Errorf("group %s total = %d, want %d", groupName, stats.TotalRecords, expectedTotal)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if stats.HotRecords != expected.hot {
|
|||
|
|
t.Errorf("group %s hot = %d, want %d", groupName, stats.HotRecords, expected.hot)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if stats.WarmRecords != expected.warm {
|
|||
|
|
t.Errorf("group %s warm = %d, want %d", groupName, stats.WarmRecords, expected.warm)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if stats.ColdRecords != expected.cold {
|
|||
|
|
t.Errorf("group %s cold = %d, want %d", groupName, stats.ColdRecords, expected.cold)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerStatsIsolation 测试统计信息的数据隔离
|
|||
|
|
func TestGroupManagerStatsIsolation(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "test_group"
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
|
|||
|
|
// 获取统计信息
|
|||
|
|
stats1 := gm.GetFastStats()
|
|||
|
|
stats2 := gm.GetFastStats()
|
|||
|
|
|
|||
|
|
// 修改第一个返回的统计信息
|
|||
|
|
stats1[groupName].HotRecords = 999
|
|||
|
|
|
|||
|
|
// 验证第二个统计信息未被影响
|
|||
|
|
if stats2[groupName].HotRecords != 1 {
|
|||
|
|
t.Error("stats isolation failed: modifying returned stats affected original data")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证原始数据未被影响
|
|||
|
|
stats3 := gm.GetFastStats()
|
|||
|
|
if stats3[groupName].HotRecords != 1 {
|
|||
|
|
t.Error("original stats data was modified")
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerConcurrency 测试并发安全性
|
|||
|
|
func TestGroupManagerConcurrency(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
const numGoroutines = 10
|
|||
|
|
const numOperations = 100
|
|||
|
|
|
|||
|
|
var wg sync.WaitGroup
|
|||
|
|
|
|||
|
|
// 启动多个goroutine进行并发操作
|
|||
|
|
for i := 0; i < numGoroutines; i++ {
|
|||
|
|
wg.Add(1)
|
|||
|
|
go func(id int) {
|
|||
|
|
defer wg.Done()
|
|||
|
|
|
|||
|
|
groupName := "group_" + string(rune('A'+id%3))
|
|||
|
|
|
|||
|
|
for j := 0; j < numOperations; j++ {
|
|||
|
|
// 并发的各种操作
|
|||
|
|
switch j % 6 {
|
|||
|
|
case 0:
|
|||
|
|
gm.PauseGroup(groupName)
|
|||
|
|
case 1:
|
|||
|
|
gm.ResumeGroup(groupName)
|
|||
|
|
case 2:
|
|||
|
|
gm.SetOnCompleteExecuting(groupName, true)
|
|||
|
|
case 3:
|
|||
|
|
gm.SetOnCompleteExecuting(groupName, false)
|
|||
|
|
case 4:
|
|||
|
|
gm.GetNextID(groupName)
|
|||
|
|
case 5:
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 并发读取操作
|
|||
|
|
gm.IsPaused(groupName)
|
|||
|
|
gm.IsOnCompleteExecuting(groupName)
|
|||
|
|
gm.GetGroupCounter(groupName)
|
|||
|
|
}
|
|||
|
|
}(i)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 同时进行统计查询
|
|||
|
|
wg.Add(1)
|
|||
|
|
go func() {
|
|||
|
|
defer wg.Done()
|
|||
|
|
for i := 0; i < 50; i++ {
|
|||
|
|
gm.GetFastStats()
|
|||
|
|
gm.GetPausedGroups()
|
|||
|
|
gm.GetExecutingGroups()
|
|||
|
|
time.Sleep(time.Millisecond)
|
|||
|
|
}
|
|||
|
|
}()
|
|||
|
|
|
|||
|
|
// 等待所有goroutine完成
|
|||
|
|
wg.Wait()
|
|||
|
|
|
|||
|
|
// 验证管理器仍然可用
|
|||
|
|
testGroup := "final_test"
|
|||
|
|
gm.PauseGroup(testGroup)
|
|||
|
|
if !gm.IsPaused(testGroup) {
|
|||
|
|
t.Error("manager corrupted after concurrent operations")
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerIdempotentOperations 测试幂等操作
|
|||
|
|
func TestGroupManagerIdempotentOperations(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "test_group"
|
|||
|
|
|
|||
|
|
// 多次暂停同一组
|
|||
|
|
gm.PauseGroup(groupName)
|
|||
|
|
gm.PauseGroup(groupName)
|
|||
|
|
gm.PauseGroup(groupName)
|
|||
|
|
|
|||
|
|
// 应该只有一个暂停的组
|
|||
|
|
pausedGroups := gm.GetPausedGroups()
|
|||
|
|
if len(pausedGroups) != 1 {
|
|||
|
|
t.Errorf("paused groups count = %d, want 1", len(pausedGroups))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 多次恢复同一组
|
|||
|
|
gm.ResumeGroup(groupName)
|
|||
|
|
gm.ResumeGroup(groupName)
|
|||
|
|
gm.ResumeGroup(groupName)
|
|||
|
|
|
|||
|
|
// 应该没有暂停的组
|
|||
|
|
pausedGroups = gm.GetPausedGroups()
|
|||
|
|
if len(pausedGroups) != 0 {
|
|||
|
|
t.Errorf("paused groups count = %d, want 0", len(pausedGroups))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 多次设置OnComplete执行状态
|
|||
|
|
gm.SetOnCompleteExecuting(groupName, true)
|
|||
|
|
gm.SetOnCompleteExecuting(groupName, true)
|
|||
|
|
|
|||
|
|
executingGroups := gm.GetExecutingGroups()
|
|||
|
|
if len(executingGroups) != 1 {
|
|||
|
|
t.Errorf("executing groups count = %d, want 1", len(executingGroups))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
gm.SetOnCompleteExecuting(groupName, false)
|
|||
|
|
gm.SetOnCompleteExecuting(groupName, false)
|
|||
|
|
|
|||
|
|
executingGroups = gm.GetExecutingGroups()
|
|||
|
|
if len(executingGroups) != 0 {
|
|||
|
|
t.Errorf("executing groups count = %d, want 0", len(executingGroups))
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerGetGroupCounterNonExistent 测试获取不存在组的计数器
|
|||
|
|
func TestGroupManagerGetGroupCounterNonExistent(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
// 获取不存在组的计数器应该返回0
|
|||
|
|
counter := gm.GetGroupCounter("non_existent_group")
|
|||
|
|
if counter != 0 {
|
|||
|
|
t.Errorf("counter for non-existent group = %d, want 0", counter)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerStatsInitialization 测试统计信息的初始化
|
|||
|
|
func TestGroupManagerStatsInitialization(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "test_group"
|
|||
|
|
|
|||
|
|
// 第一次增加统计应该初始化结构
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
|
|||
|
|
stats := gm.GetFastStats()[groupName]
|
|||
|
|
if stats == nil {
|
|||
|
|
t.Fatal("stats should be initialized after first increment")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if stats.TotalRecords != 1 || stats.HotRecords != 1 {
|
|||
|
|
t.Errorf("initial stats: total=%d, hot=%d, want total=1, hot=1",
|
|||
|
|
stats.TotalRecords, stats.HotRecords)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if stats.WarmRecords != 0 || stats.ColdRecords != 0 {
|
|||
|
|
t.Errorf("initial stats: warm=%d, cold=%d, want warm=0, cold=0",
|
|||
|
|
stats.WarmRecords, stats.ColdRecords)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TestGroupManagerUpdateStatsInitialization 测试更新统计时的初始化
|
|||
|
|
func TestGroupManagerUpdateStatsInitialization(t *testing.T) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "test_group"
|
|||
|
|
|
|||
|
|
// 直接调用UpdateStats应该初始化统计结构
|
|||
|
|
gm.UpdateStats(groupName, StatusHot, StatusWarm)
|
|||
|
|
|
|||
|
|
stats := gm.GetFastStats()[groupName]
|
|||
|
|
if stats == nil {
|
|||
|
|
t.Fatal("stats should be initialized after UpdateStats")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 由于没有Hot记录,减少操作不会产生负数,但会初始化结构
|
|||
|
|
if stats.TotalRecords != 0 {
|
|||
|
|
t.Errorf("total records = %d, want 0", stats.TotalRecords)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// BenchmarkGroupManagerGetNextID 性能测试:ID生成
|
|||
|
|
func BenchmarkGroupManagerGetNextID(b *testing.B) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "bench_group"
|
|||
|
|
|
|||
|
|
b.ResetTimer()
|
|||
|
|
|
|||
|
|
for i := 0; i < b.N; i++ {
|
|||
|
|
gm.GetNextID(groupName)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// BenchmarkGroupManagerIncrementStats 性能测试:统计信息增量
|
|||
|
|
func BenchmarkGroupManagerIncrementStats(b *testing.B) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
groupName := "bench_group"
|
|||
|
|
statuses := []DataStatus{StatusHot, StatusWarm, StatusCold}
|
|||
|
|
|
|||
|
|
b.ResetTimer()
|
|||
|
|
|
|||
|
|
for i := 0; i < b.N; i++ {
|
|||
|
|
status := statuses[i%len(statuses)]
|
|||
|
|
gm.IncrementStats(groupName, status)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// BenchmarkGroupManagerGetFastStats 性能测试:快速统计查询
|
|||
|
|
func BenchmarkGroupManagerGetFastStats(b *testing.B) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
// 预填充一些统计数据
|
|||
|
|
for i := 0; i < 10; i++ {
|
|||
|
|
groupName := "group_" + string(rune('A'+i))
|
|||
|
|
for j := 0; j < 100; j++ {
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
b.ResetTimer()
|
|||
|
|
|
|||
|
|
for i := 0; i < b.N; i++ {
|
|||
|
|
gm.GetFastStats()
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// BenchmarkGroupManagerConcurrentOperations 性能测试:并发操作
|
|||
|
|
func BenchmarkGroupManagerConcurrentOperations(b *testing.B) {
|
|||
|
|
mockDB := NewMockPipelineDBForGroupManager()
|
|||
|
|
gm := NewGroupManager(&mockDB.PipelineDB)
|
|||
|
|
|
|||
|
|
b.ResetTimer()
|
|||
|
|
|
|||
|
|
b.RunParallel(func(pb *testing.PB) {
|
|||
|
|
i := 0
|
|||
|
|
for pb.Next() {
|
|||
|
|
groupName := "group_" + string(rune('A'+i%5))
|
|||
|
|
|
|||
|
|
switch i % 4 {
|
|||
|
|
case 0:
|
|||
|
|
gm.GetNextID(groupName)
|
|||
|
|
case 1:
|
|||
|
|
gm.IncrementStats(groupName, StatusHot)
|
|||
|
|
case 2:
|
|||
|
|
gm.IsPaused(groupName)
|
|||
|
|
case 3:
|
|||
|
|
gm.GetFastStats()
|
|||
|
|
}
|
|||
|
|
i++
|
|||
|
|
}
|
|||
|
|
})
|
|||
|
|
}
|