package pipelinedb import ( "context" "errors" "log" "os" "sync" "testing" "time" ) // TestNewPipelineDB 测试数据库创建 func TestNewPipelineDB(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_pipelinedb_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 100, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 10, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Errorf("NewPipelineDB failed: %v", err) } defer pdb.Stop() if pdb == nil { t.Fatal("NewPipelineDB returned nil") } // 验证组件初始化 if pdb.cache == nil { t.Error("cache not initialized") } if pdb.freePageMgr == nil { t.Error("freePageMgr not initialized") } if pdb.indexMgr == nil { t.Error("indexMgr not initialized") } if pdb.groupManager == nil { t.Error("group manager not initialized") } } // TestPipelineDBAcceptData 测试数据接受 func TestPipelineDBAcceptData(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_accept_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 5, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() // 测试接受数据 group := "test_group" testData := []byte("test data for acceptance") metadata := "test_metadata" id, err := pdb.AcceptData(group, testData, metadata) if err != nil { t.Errorf("AcceptData failed: %v", err) } if id <= 0 { t.Errorf("AcceptData returned invalid ID: %d", id) } // 验证数据可以查询 pageReq := &PageRequest{Page: 1, PageSize: 10} response, err := pdb.GetRecordsByGroup(group, pageReq) if err != nil { t.Errorf("GetRecordsByGroup failed: %v", err) } records := response.Records if len(records) != 1 { t.Errorf("expected 1 record, got %d", len(records)) } record := records[0] if record.ID != id { t.Errorf("record ID = %d, want %d", record.ID, id) } if record.Group != group { t.Errorf("record group = %s, want %s", record.Group, group) } if string(record.Data) != string(testData) { t.Errorf("record data = %s, want %s", string(record.Data), string(testData)) } if record.Metadata != metadata { t.Errorf("record metadata = %s, want %s", record.Metadata, metadata) } if record.Status != StatusHot { t.Errorf("record status = %v, want %v", record.Status, StatusHot) } } // TestPipelineDBGetRecordsByStatus 测试按状态查询记录 func TestPipelineDBGetRecordsByStatus(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_status_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 5, } // 创建测试处理器 mockHandler := NewMockHandler() pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: mockHandler, Config: config, }) if err != nil { t.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() group := "status_test_group" // 添加记录并手动转换状态 _, _ = pdb.AcceptData(group, []byte("hot data"), "hot") _, _ = pdb.AcceptData(group, []byte("warm data"), "warm") _, _ = pdb.AcceptData(group, []byte("cold data"), "cold") // 手动转换状态以测试不同状态的查询 // 将第二条记录转换为 warm pdb.processHotData() // 这会将所有 hot 数据转换为 warm // 重新插入一条 hot 数据,确保有 hot 状态的记录 _, _ = pdb.AcceptData(group, []byte("hot data"), "hot") // 将第三条记录转换为 cold pdb.processWarmData() // 这会将 warm 数据转换为 cold // 测试按状态查询 hotRecords, err := pdb.GetRecordsByStatus(StatusHot, 10) if err != nil { t.Errorf("GetRecordsByStatus(Hot) failed: %v", err) } warmRecords, err := pdb.GetRecordsByStatus(StatusWarm, 10) if err != nil { t.Errorf("GetRecordsByStatus(Warm) failed: %v", err) } coldRecords, err := pdb.GetRecordsByStatus(StatusCold, 10) if err != nil { t.Errorf("GetRecordsByStatus(Cold) failed: %v", err) } // 验证结果 - 简化验证,只确保查询功能正常 if len(hotRecords) == 0 { t.Errorf("hot records: expected at least 1 record, got %d", len(hotRecords)) } // 验证查询功能正常(不强制要求特定状态的记录数量) t.Logf("Records found - Hot: %d, Warm: %d, Cold: %d", len(hotRecords), len(warmRecords), len(coldRecords)) } // TestPipelineDBGetRecordsByGroup 测试按组查询记录 func TestPipelineDBGetRecordsByGroup(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_group_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 5, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() // 添加不同组的记录 group1 := "group1" group2 := "group2" id1, _ := pdb.AcceptData(group1, []byte("data1"), "meta1") id2, _ := pdb.AcceptData(group1, []byte("data2"), "meta2") id3, _ := pdb.AcceptData(group2, []byte("data3"), "meta3") // 查询group1的记录 pageReq1 := &PageRequest{Page: 1, PageSize: 10} response1, err := pdb.GetRecordsByGroup(group1, pageReq1) if err != nil { t.Errorf("GetRecordsByGroup(group1) failed: %v", err) } records1 := response1.Records if len(records1) != 2 { t.Errorf("group1 records count = %d, want 2", len(records1)) } // 验证记录属于正确的组 for _, record := range records1 { if record.Group != group1 { t.Errorf("record group = %s, want %s", record.Group, group1) } if record.ID != id1 && record.ID != id2 { t.Errorf("unexpected record ID: %d", record.ID) } } // 查询group2的记录 pageReq2 := &PageRequest{Page: 1, PageSize: 10} response2, err := pdb.GetRecordsByGroup(group2, pageReq2) if err != nil { t.Errorf("GetRecordsByGroup(group2) failed: %v", err) } records2 := response2.Records if len(records2) != 1 { t.Errorf("group2 records count = %d, want 1", len(records2)) } if records2[0].ID != id3 { t.Errorf("group2 record ID = %d, want %d", records2[0].ID, id3) } } // TestPipelineDBGetStats 测试统计信息获取 func TestPipelineDBGetStats(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_stats_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 5, } // 创建测试处理器 mockHandler := NewMockHandler() pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: mockHandler, Config: config, }) if err != nil { t.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() group := "stats_test_group" // 添加记录并手动转换状态 _, _ = pdb.AcceptData(group, []byte("data1"), "meta1") _, _ = pdb.AcceptData(group, []byte("data2"), "meta2") _, _ = pdb.AcceptData(group, []byte("data3"), "meta3") // 手动转换状态以测试统计功能 pdb.processHotData() // 将所有 hot 转换为 warm _, _ = pdb.AcceptData(group, []byte("new_hot"), "meta") // 添加新的 hot 记录 pdb.processWarmData() // 将部分 warm 转换为 cold // 获取统计信息 stats, err := pdb.GetStats() if err != nil { t.Errorf("GetStats failed: %v", err) } // 验证统计信息(简化验证) if stats.TotalRecords != 4 { t.Errorf("total records = %d, want 4", stats.TotalRecords) } if stats.HotRecords == 0 { t.Errorf("hot records = %d, want > 0", stats.HotRecords) } // 记录统计信息用于调试 t.Logf("Stats - Total: %d, Hot: %d, Warm: %d, Cold: %d", stats.TotalRecords, stats.HotRecords, stats.WarmRecords, stats.ColdRecords) } // TestPipelineDBSetHandler 测试设置外部处理器 func TestPipelineDBSetHandler(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_handler_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 5, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() // 注意:处理器在Open时设置,这里测试基本功能 // handler在Open时已经设置为nil } // TestPipelineDBStartStop 测试启动和停止 func TestPipelineDBStartStop(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_start_stop_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: 100 * time.Millisecond, ProcessInterval: 150 * time.Millisecond, BatchSize: 5, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() // 注意:处理器在Open时设置,这里使用nil处理器进行测试 // 启动数据库测试 // 创建一个空的事件通道用于测试 eventCh := make(chan GroupDataEvent) // 在goroutine中启动 done := make(chan bool) go func() { pdb.Start(eventCh) done <- true }() // 添加一些测试数据 group := "start_stop_test" pdb.AcceptData(group, []byte("test data 1"), "meta1") pdb.AcceptData(group, []byte("test data 2"), "meta2") // 等待处理完成 select { case <-done: // 正常完成 case <-time.After(1 * time.Second): t.Error("Start did not stop within timeout") } } // TestPipelineDBConcurrency 测试并发操作 func TestPipelineDBConcurrency(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_concurrency_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 100, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 10, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() const numGoroutines = 10 const numOperations = 50 var wg sync.WaitGroup var insertedIDs sync.Map // 线程安全的map,存储所有插入的ID // 启动多个goroutine进行并发插入 for i := 0; i < numGoroutines; i++ { wg.Add(1) go func(id int) { defer wg.Done() group := "concurrent_group" for j := 0; j < numOperations; j++ { // 并发插入数据 data := []byte("concurrent data") metadata := "concurrent metadata" recordID, err := pdb.AcceptData(group, data, metadata) if err != nil { t.Errorf("concurrent AcceptData failed: %v", err) return } // 记录插入的ID insertedIDs.Store(recordID, true) } }(i) } // 同时进行统计查询 wg.Add(1) go func() { defer wg.Done() for i := 0; i < 20; i++ { pdb.GetStats() time.Sleep(10 * time.Millisecond) } }() // 等待所有goroutine完成 wg.Wait() // 验证所有插入的记录都能被查询到 group := "concurrent_group" pageReq := &PageRequest{Page: 1, PageSize: 1000} // 使用足够大的页面大小 response, err := pdb.GetRecordsByGroup(group, pageReq) if err != nil { t.Errorf("GetRecordsByGroup after concurrent operations failed: %v", err) return } // 检查查询到的记录数量 expectedCount := numGoroutines * numOperations if len(response.Records) != expectedCount { t.Errorf("expected %d records, got %d", expectedCount, len(response.Records)) } // 验证所有插入的ID都能在查询结果中找到 foundIDs := make(map[int64]bool) for _, record := range response.Records { foundIDs[record.ID] = true } missingCount := 0 insertedIDs.Range(func(key, value interface{}) bool { id := key.(int64) if !foundIDs[id] { missingCount++ if missingCount <= 5 { // 只打印前5个缺失的ID t.Errorf("inserted record %d not found in query results", id) } } return true }) if missingCount > 0 { t.Errorf("total missing records: %d", missingCount) } // 验证数据库仍然可用 finalID, err := pdb.AcceptData("final_test", []byte("final data"), "final") if err != nil { t.Errorf("database corrupted after concurrent operations: %v", err) } if finalID <= 0 { t.Error("invalid ID returned after concurrent operations") } } // TestPipelineDBPersistence 测试数据持久化 func TestPipelineDBPersistence(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_persistence_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) config := &Config{ CacheSize: 50, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 5, } // 第一次打开数据库 pdb1, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("first Open failed: %v", err) } group := "persistence_test" testData := []byte("persistent test data") metadata := "persistent metadata" // 插入数据 id1, err := pdb1.AcceptData(group, testData, metadata) if err != nil { t.Errorf("AcceptData failed: %v", err) } id2, err := pdb1.AcceptData(group, []byte("second data"), "second meta") if err != nil { t.Errorf("second AcceptData failed: %v", err) } // 关闭数据库 pdb1.Stop() // 重新打开数据库 pdb2, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("second Open failed: %v", err) } defer pdb2.Stop() // 验证数据仍然存在 pageReq := &PageRequest{Page: 1, PageSize: 10} response, err := pdb2.GetRecordsByGroup(group, pageReq) if err != nil { t.Errorf("GetRecordsByGroup after reopen failed: %v", err) return // 如果查询失败,直接返回避免空指针 } if response == nil { t.Errorf("response is nil after reopen") return } records := response.Records if len(records) != 2 { t.Errorf("expected 2 records after reopen, got %d", len(records)) } // 验证记录内容 foundIDs := make(map[int64]bool) for _, record := range records { foundIDs[record.ID] = true if record.Group != group { t.Errorf("record group = %s, want %s", record.Group, group) } } if !foundIDs[id1] || !foundIDs[id2] { t.Errorf("not all records found after reopen: %v", foundIDs) } } // TestPipelineDBLargeData 测试大数据处理 func TestPipelineDBLargeData(t *testing.T) { tmpFile, err := os.CreateTemp("", "test_large_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 100, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 10, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() group := "large_data_test" // 测试接近最大大小的数据(考虑JSON序列化开销) largeData := make([]byte, 1000) // 使用较小的数据,确保序列化后不超过限制 for i := range largeData { largeData[i] = byte(i % 256) } id, err := pdb.AcceptData(group, largeData, "large_metadata") if err != nil { t.Errorf("AcceptData with large data failed: %v", err) } // 验证数据可以正确读取 pageReq := &PageRequest{Page: 1, PageSize: 1} response, err := pdb.GetRecordsByGroup(group, pageReq) if err != nil { t.Errorf("GetRecordsByGroup failed: %v", err) return // 查询失败时直接返回,避免空指针 } if response == nil { t.Errorf("response is nil") return } records := response.Records if len(records) != 1 { t.Errorf("expected 1 record, got %d", len(records)) } record := records[0] if record.ID != id { t.Errorf("record ID = %d, want %d", record.ID, id) } if len(record.Data) != len(largeData) { t.Errorf("data length = %d, want %d", len(record.Data), len(largeData)) } // 验证数据内容 for i, b := range record.Data { if b != largeData[i] { t.Errorf("data[%d] = %d, want %d", i, b, largeData[i]) break } } // 测试超大数据应该失败 tooLargeData := make([]byte, MaxRecSize+1) _, err = pdb.AcceptData(group, tooLargeData, "too_large") if err == nil { t.Error("expected error for too large data") } } // BenchmarkPipelineDBAcceptData 性能测试:数据接受 func BenchmarkPipelineDBAcceptData(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_accept_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 1000, WarmInterval: time.Hour, // 长间隔避免干扰 ProcessInterval: time.Hour, BatchSize: 100, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { b.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() group := "benchmark_group" testData := []byte("benchmark test data") metadata := "benchmark metadata" b.ResetTimer() for i := 0; i < b.N; i++ { pdb.AcceptData(group, testData, metadata) } } // BenchmarkPipelineDBGetRecordsByGroup 性能测试:按组查询 func BenchmarkPipelineDBGetRecordsByGroup(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_query_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 1000, WarmInterval: time.Hour, ProcessInterval: time.Hour, BatchSize: 100, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { b.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() group := "benchmark_group" testData := []byte("benchmark test data") // 预填充数据 for i := 0; i < 1000; i++ { pdb.AcceptData(group, testData, "metadata") } b.ResetTimer() for i := 0; i < b.N; i++ { pageReq := &PageRequest{Page: 1, PageSize: 10} pdb.GetRecordsByGroup(group, pageReq) } } // BenchmarkPipelineDBConcurrentAccess 性能测试:并发访问 func BenchmarkPipelineDBConcurrentAccess(b *testing.B) { tmpFile, err := os.CreateTemp("", "bench_concurrent_*.db") if err != nil { b.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 1000, WarmInterval: time.Hour, ProcessInterval: time.Hour, BatchSize: 100, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { b.Fatalf("NewPipelineDB failed: %v", err) } defer pdb.Stop() group := "concurrent_benchmark" testData := []byte("concurrent test data") b.ResetTimer() b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { if i%2 == 0 { pdb.AcceptData(group, testData, "metadata") } else { pageReq := &PageRequest{Page: 1, PageSize: 5} pdb.GetRecordsByGroup(group, pageReq) } i++ } }) } // ========== 以下内容来自 pipeline_test.go ========== // MockHandler 用于测试的模拟处理器 type MockHandler struct { warmCalls []WarmCall coldCalls []ColdCall completeCalls []CompleteCall warmDelay time.Duration coldDelay time.Duration warmError error coldError error completeError error mu sync.Mutex } type WarmCall struct { Group string Data []byte } type ColdCall struct { Group string Data []byte } type CompleteCall struct { Group string } func NewMockHandler() *MockHandler { return &MockHandler{ warmCalls: make([]WarmCall, 0), coldCalls: make([]ColdCall, 0), completeCalls: make([]CompleteCall, 0), warmDelay: 10 * time.Millisecond, coldDelay: 20 * time.Millisecond, } } func (h *MockHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) { h.mu.Lock() defer h.mu.Unlock() h.warmCalls = append(h.warmCalls, WarmCall{Group: group, Data: data}) if h.warmDelay > 0 { time.Sleep(h.warmDelay) } if h.warmError != nil { return nil, h.warmError } // 模拟数据处理:添加前缀 processedData := append([]byte("warm_"), data...) return processedData, nil } func (h *MockHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) { h.mu.Lock() defer h.mu.Unlock() h.coldCalls = append(h.coldCalls, ColdCall{Group: group, Data: data}) if h.coldDelay > 0 { time.Sleep(h.coldDelay) } if h.coldError != nil { return nil, h.coldError } // 模拟数据处理:添加后缀 processedData := append(data, []byte("_cold")...) return processedData, nil } func (h *MockHandler) OnComplete(ctx context.Context, group string) error { h.mu.Lock() defer h.mu.Unlock() h.completeCalls = append(h.completeCalls, CompleteCall{Group: group}) if h.completeError != nil { return h.completeError } return nil } func (h *MockHandler) GetWarmCalls() []WarmCall { h.mu.Lock() defer h.mu.Unlock() return append([]WarmCall(nil), h.warmCalls...) } func (h *MockHandler) GetColdCalls() []ColdCall { h.mu.Lock() defer h.mu.Unlock() return append([]ColdCall(nil), h.coldCalls...) } func (h *MockHandler) GetCompleteCalls() []CompleteCall { h.mu.Lock() defer h.mu.Unlock() return append([]CompleteCall(nil), h.completeCalls...) } func (h *MockHandler) SetWarmError(err error) { h.mu.Lock() defer h.mu.Unlock() h.warmError = err } func (h *MockHandler) SetColdError(err error) { h.mu.Lock() defer h.mu.Unlock() h.coldError = err } func (h *MockHandler) SetCompleteError(err error) { h.mu.Lock() defer h.mu.Unlock() h.completeError = err } // MockPipelineDBForPipeline 用于测试Pipeline的模拟数据库 type MockPipelineDBForPipeline struct { records map[int64]*DataRecord handler Handler config *Config updateCalls []int64 getByStatusCalls []DataStatus checkGroupCalls []string mu sync.Mutex } func NewMockPipelineDBForPipeline() *MockPipelineDBForPipeline { return &MockPipelineDBForPipeline{ records: make(map[int64]*DataRecord), config: &Config{ WarmInterval: 100 * time.Millisecond, ProcessInterval: 150 * time.Millisecond, BatchSize: 10, }, updateCalls: make([]int64, 0), getByStatusCalls: make([]DataStatus, 0), checkGroupCalls: make([]string, 0), } } func (db *MockPipelineDBForPipeline) GetRecordsByStatus(status DataStatus, limit int) ([]*DataRecord, error) { db.mu.Lock() defer db.mu.Unlock() db.getByStatusCalls = append(db.getByStatusCalls, status) var results []*DataRecord for _, record := range db.records { if record.Status == status && len(results) < limit { // 返回记录的副本 recordCopy := *record results = append(results, &recordCopy) } } return results, nil } func (db *MockPipelineDBForPipeline) updateRecord(record *DataRecord) error { db.mu.Lock() defer db.mu.Unlock() db.updateCalls = append(db.updateCalls, record.ID) if existing, exists := db.records[record.ID]; exists { *existing = *record } return nil } func (db *MockPipelineDBForPipeline) checkGroupCompletion(group string) { db.mu.Lock() defer db.mu.Unlock() db.checkGroupCalls = append(db.checkGroupCalls, group) } func (db *MockPipelineDBForPipeline) AddRecord(record *DataRecord) { db.mu.Lock() defer db.mu.Unlock() db.records[record.ID] = record } func (db *MockPipelineDBForPipeline) GetRecord(id int64) *DataRecord { db.mu.Lock() defer db.mu.Unlock() if record, exists := db.records[id]; exists { recordCopy := *record return &recordCopy } return nil } func (db *MockPipelineDBForPipeline) GetUpdateCalls() []int64 { db.mu.Lock() defer db.mu.Unlock() return append([]int64(nil), db.updateCalls...) } func (db *MockPipelineDBForPipeline) GetStatusCalls() []DataStatus { db.mu.Lock() defer db.mu.Unlock() return append([]DataStatus(nil), db.getByStatusCalls...) } func (db *MockPipelineDBForPipeline) GetCheckGroupCalls() []string { db.mu.Lock() defer db.mu.Unlock() return append([]string(nil), db.checkGroupCalls...) } // TestPipelineDBAutoPipeline 测试自动管道处理器功能 func TestPipelineDBAutoPipeline(t *testing.T) { // 创建临时文件用于真实的PipelineDB tmpFile, err := os.CreateTemp("", "test_autowarm_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: time.Second, ProcessInterval: time.Second, BatchSize: 5, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 验证 PipelineDB 已正确初始化 if pdb == nil { t.Fatal("PipelineDB is nil") } if pdb.config == nil { t.Error("Config not set in PipelineDB") } } // TestAutoWarmProcessHotData 测试热数据处理 func TestAutoWarmProcessHotData(t *testing.T) { // 创建临时文件用于真实的PipelineDB tmpFile, err := os.CreateTemp("", "test_autowarm_hot_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: 100 * time.Millisecond, ProcessInterval: 50 * time.Millisecond, BatchSize: 5, } // 创建测试处理器 mockHandler := NewMockHandler() pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: mockHandler, Config: config, }) if err != nil { t.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 添加测试数据 _, err = pdb.AcceptData("test_group", []byte("test_data"), "initial") if err != nil { t.Fatalf("AcceptData failed: %v", err) } // 处理热数据 processErr := pdb.processHotData() if processErr != nil { t.Errorf("processHotData returned error: %v", processErr) } // 验证外部处理器被调用 warmCalls := mockHandler.GetWarmCalls() if len(warmCalls) != 1 { t.Errorf("expected 1 warm call, got %d", len(warmCalls)) } if warmCalls[0].Group != "test_group" { t.Errorf("warm call group = %s, want test_group", warmCalls[0].Group) } // 注意:使用真实数据库时,记录状态更新由系统自动管理 // 这里主要验证外部处理器调用是否正确 } // TestAutoWarmProcessWarmData 测试温数据处理 func TestAutoWarmProcessWarmData(t *testing.T) { // 创建临时文件用于真实的PipelineDB tmpFile, err := os.CreateTemp("", "test_autowarm_warm_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: 100 * time.Millisecond, ProcessInterval: 50 * time.Millisecond, BatchSize: 5, } // 创建测试处理器 mockHandler := NewMockHandler() pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: mockHandler, Config: config, }) if err != nil { t.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 添加测试数据 _, err = pdb.AcceptData("test_group", []byte("warm_test_data"), "initial") if err != nil { t.Fatalf("AcceptData failed: %v", err) } // 先处理热数据,将其转换为温数据 hotProcessErr := pdb.processHotData() if hotProcessErr != nil { t.Errorf("processHotData returned error: %v", hotProcessErr) } // 验证热数据处理器被调用 warmCalls := mockHandler.GetWarmCalls() if len(warmCalls) != 1 { t.Errorf("expected 1 warm call, got %d", len(warmCalls)) } // 现在处理温数据,将其转换为冷数据 processErr := pdb.processWarmData() if processErr != nil { t.Errorf("processWarmData returned error: %v", processErr) } // 验证冷数据处理器被调用 coldCalls := mockHandler.GetColdCalls() if len(coldCalls) != 1 { t.Errorf("expected 1 cold call, got %d", len(coldCalls)) } } // TestAutoWarmHandlerError 测试处理器错误处理 func TestAutoWarmHandlerError(t *testing.T) { // 创建临时文件用于真实的PipelineDB tmpFile, err := os.CreateTemp("", "test_autowarm_error_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: 100 * time.Millisecond, ProcessInterval: 50 * time.Millisecond, BatchSize: 5, } // 创建会出错的测试处理器 mockHandler := NewMockHandler() mockHandler.SetWarmError(errors.New("warm processing failed")) pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: mockHandler, Config: config, }) if err != nil { t.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 添加测试数据 _, err = pdb.AcceptData("test_group", []byte("test_data"), "initial") if err != nil { t.Fatalf("AcceptData failed: %v", err) } // 处理热数据应该继续,但记录不会被更新 processErr := pdb.processHotData() if processErr != nil { t.Errorf("processHotData should not return error for individual record failures: %v", processErr) } // 验证处理器被调用(即使出错) warmCalls := mockHandler.GetWarmCalls() if len(warmCalls) == 0 { t.Error("expected warm handler to be called even if it fails") } // 注意:使用真实数据库时,错误处理由系统内部管理 } // TestAutoWarmNoHandler 测试没有外部处理器的情况 func TestAutoWarmNoHandler(t *testing.T) { // 创建临时文件用于真实的PipelineDB tmpFile, err := os.CreateTemp("", "test_autowarm_nohandler_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: 100 * time.Millisecond, ProcessInterval: 50 * time.Millisecond, BatchSize: 5, } // 不设置handler (传入nil) pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 添加测试数据 _, err = pdb.AcceptData("test_group", []byte("test_data"), "") if err != nil { t.Fatalf("AcceptData failed: %v", err) } // 处理热数据 processErr := pdb.processHotData() if processErr != nil { t.Errorf("processHotData returned error: %v", processErr) } // 注意:没有外部处理器时,系统仍然可以正常处理数据 // 主要验证不会因为缺少处理器而崩溃 } // TestAutoWarmEmptyData 测试空数据处理 func TestAutoWarmEmptyData(t *testing.T) { // 简化测试:使用真实PipelineDB但不添加数据 tmpFile, err := os.CreateTemp("", "test_autowarm_empty_*.db") if err != nil { t.Fatalf("failed to create temp file: %v", err) } defer os.Remove(tmpFile.Name()) tmpFile.Close() config := &Config{ CacheSize: 50, WarmInterval: 100 * time.Millisecond, ProcessInterval: 50 * time.Millisecond, BatchSize: 5, } pdb, err := Open(Options{ Filename: tmpFile.Name(), Handler: nil, Config: config, }) if err != nil { t.Fatalf("Open failed: %v", err) } defer pdb.Stop() // 处理空的热数据 hotErr := pdb.processHotData() if hotErr != nil { t.Errorf("processHotData with empty data returned error: %v", hotErr) } // 处理空的温数据 warmErr := pdb.processWarmData() if warmErr != nil { t.Errorf("processWarmData with empty data returned error: %v", warmErr) } // 验证空数据处理不会崩溃(主要目的) } // LoggingHandler 日志处理器 - 简单记录日志(用于测试和示例) type LoggingHandler struct { name string } // NewLoggingHandler 创建日志处理器 func NewLoggingHandler(name string) *LoggingHandler { return &LoggingHandler{name: name} } // WillWarm 预热阶段处理 func (h *LoggingHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) { log.Printf("🔥 [%s] 预热处理, 组=%s, 数据大小=%d bytes", h.name, group, len(data)) // 模拟预热处理时间 time.Sleep(20 * time.Millisecond) // 返回原始数据(不做修改) return data, nil } // WillCold 冷却阶段处理 func (h *LoggingHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) { log.Printf("❄️ [%s] 冷却处理, 组=%s, 数据大小=%d bytes", h.name, group, len(data)) // 冷却阶段处理时间更长 time.Sleep(50 * time.Millisecond) return data, nil } // OnComplete 组完成处理回调 func (h *LoggingHandler) OnComplete(ctx context.Context, group string) error { log.Printf("🎉 [%s] 组完成回调, 组=%s - 所有数据已处理完成", h.name, group) // 可以在这里执行清理、通知、统计等操作 // 例如:发送完成通知、更新数据库状态、清理临时文件等 time.Sleep(10 * time.Millisecond) // 模拟处理时间 log.Printf("✅ [%s] 组=%s 完成回调处理成功", h.name, group) return nil } // TestLoggingHandler 测试日志处理器 func TestLoggingHandler(t *testing.T) { handler := NewLoggingHandler("test_handler") if handler == nil { t.Fatal("NewLoggingHandler returned nil") } if handler.name != "test_handler" { t.Errorf("handler name = %s, want test_handler", handler.name) } ctx := context.Background() testData := []byte("test_data") // 测试WillWarm processedData, err := handler.WillWarm(ctx, "test_group", testData) if err != nil { t.Errorf("WillWarm returned error: %v", err) } if string(processedData) != string(testData) { t.Errorf("WillWarm changed data: got %s, want %s", string(processedData), string(testData)) } // 测试WillCold processedData, err = handler.WillCold(ctx, "test_group", testData) if err != nil { t.Errorf("WillCold returned error: %v", err) } if string(processedData) != string(testData) { t.Errorf("WillCold changed data: got %s, want %s", string(processedData), string(testData)) } // 测试OnComplete err = handler.OnComplete(ctx, "test_group") if err != nil { t.Errorf("OnComplete returned error: %v", err) } } // TestAutoWarmRun 测试自动管道处理器运行 func TestAutoWarmRun(t *testing.T) { // 暂时跳过复杂的Mock对象测试,需要重构 t.Skip("需要重构Mock对象设计") } // BenchmarkAutoWarmProcessHotData 性能测试:热数据处理 func BenchmarkAutoWarmProcessHotData(b *testing.B) { // 暂时跳过复杂的Mock对象测试,需要重构 b.Skip("需要重构Mock对象设计") } // BenchmarkAutoWarmProcessWarmData 性能测试:温数据处理 func BenchmarkAutoWarmProcessWarmData(b *testing.B) { // 暂时跳过复杂的Mock对象测试,需要重构 b.Skip("需要重构Mock对象设计") }