package seqlog import ( "context" "fmt" "io" "log/slog" "os" "path/filepath" "sync" "testing" "time" ) func TestBasicWriteAndRead(t *testing.T) { tmpFile := "test_basic.log" defer os.Remove(tmpFile) defer os.Remove(tmpFile + ".pos") defer os.Remove(tmpFile + ".idx") // 创建索引 index, err := NewRecordIndex(tmpFile) if err != nil { t.Fatalf("创建索引失败: %v", err) } defer index.Close() // 写入测试数据 writer, err := NewLogWriter(tmpFile, index) if err != nil { t.Fatalf("创建 writer 失败: %v", err) } testData := [][]byte{ []byte("hello world"), []byte("test log entry 1"), []byte("test log entry 2"), } for _, data := range testData { if _, err := writer.Append(data); err != nil { t.Fatalf("写入数据失败: %v", err) } } // 读取数据(使用共享的 index) cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } defer cursor.Close() for i, expected := range testData { rec, err := cursor.Next() if err != nil { t.Fatalf("读取第 %d 条记录失败: %v", i, err) } if string(rec.Data) != string(expected) { t.Errorf("第 %d 条记录数据不匹配: got %q, want %q", i, rec.Data, expected) } } } // TestCursorNextRange 测试范围游动功能 func TestCursorNextRange(t *testing.T) { tmpFile := "test_range.log" defer os.Remove(tmpFile) defer os.Remove(tmpFile + ".pos") defer os.Remove(tmpFile + ".idx") // 创建索引 index, err := NewRecordIndex(tmpFile) if err != nil { t.Fatal(err) } defer index.Close() // 写入多条记录 writer, err := NewLogWriter(tmpFile, index) if err != nil { t.Fatal(err) } messages := []string{"msg1", "msg2", "msg3", "msg4", "msg5", "msg6", "msg7", "msg8", "msg9", "msg10"} for _, msg := range messages { if _, err := writer.Append([]byte(msg)); err != nil { t.Fatal(err) } } writer.Close() // 测试范围读取 cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatal(err) } defer cursor.Close() // 第一次读取 3 条 records, err := cursor.NextRange(3) if err != nil { t.Fatal(err) } if len(records) != 3 { t.Fatalf("expected 3 records, got %d", len(records)) } for i, rec := range records { expected := messages[i] if string(rec.Data) != expected { t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data)) } } // 第二次读取 5 条 records, err = cursor.NextRange(5) if err != nil { t.Fatal(err) } if len(records) != 5 { t.Fatalf("expected 5 records, got %d", len(records)) } for i, rec := range records { expected := messages[i+3] if string(rec.Data) != expected { t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data)) } } // 第三次读取 5 条(但只剩 2 条) records, err = cursor.NextRange(5) if err != nil { t.Fatal(err) } if len(records) != 2 { t.Fatalf("expected 2 records, got %d", len(records)) } for i, rec := range records { expected := messages[i+8] if string(rec.Data) != expected { t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data)) } } // 再读取应该返回 EOF records, err = cursor.NextRange(1) if err != io.EOF { t.Fatalf("expected EOF, got %v", err) } } // TestCursorWindow 测试窗口模式 func TestCursorWindow(t *testing.T) { tmpFile := "test_window.log" defer os.Remove(tmpFile) defer os.Remove(tmpFile + ".pos") // 写入测试数据 index, err := NewRecordIndex(tmpFile) if err != nil { t.Fatal(err) } defer index.Close() // 写入测试数据 writer, err := NewLogWriter(tmpFile, index) if err != nil { t.Fatal(err) } messages := []string{"msg1", "msg2", "msg3"} for _, msg := range messages { if _, err := writer.Append([]byte(msg)); err != nil { t.Fatal(err) } } writer.Close() // 测试窗口模式 cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatal(err) } defer cursor.Close() // 初始状态 if cursor.StartIndex() != 0 { t.Errorf("expected startIdx 0, got %d", cursor.StartIndex()) } if cursor.EndIndex() != 0 { t.Errorf("expected endIdx 0, got %d", cursor.EndIndex()) } // 读取第一条记录 rec, err := cursor.Next() if err != nil { t.Fatal(err) } if string(rec.Data) != "msg1" { t.Errorf("expected 'msg1', got '%s'", string(rec.Data)) } // 检查窗口索引:startIdx 未变,endIdx 移动了 if cursor.StartIndex() != 0 { t.Errorf("expected startIdx 0, got %d", cursor.StartIndex()) } endIdxAfterFirst := cursor.EndIndex() if endIdxAfterFirst == 0 { t.Error("endIdx should have moved") } // 提交窗口 cursor.Commit() if cursor.StartIndex() != endIdxAfterFirst { t.Errorf("expected startIdx %d after commit, got %d", endIdxAfterFirst, cursor.StartIndex()) } // 读取第二条记录 rec, err = cursor.Next() if err != nil { t.Fatal(err) } if string(rec.Data) != "msg2" { t.Errorf("expected 'msg2', got '%s'", string(rec.Data)) } endIdxAfterSecond := cursor.EndIndex() // 回滚 if err := cursor.Rollback(); err != nil { t.Fatal(err) } // 回滚后,endIdx 应该回到 startIdx if cursor.EndIndex() != cursor.StartIndex() { t.Errorf("expected endIdx == startIdx after rollback, got %d != %d", cursor.EndIndex(), cursor.StartIndex()) } // 再次读取应该还是第二条 rec, err = cursor.Next() if err != nil { t.Fatal(err) } if string(rec.Data) != "msg2" { t.Errorf("expected 'msg2' after rollback, got '%s'", string(rec.Data)) } // 这次提交 cursor.Commit() if cursor.StartIndex() != endIdxAfterSecond { t.Errorf("expected startIdx %d after commit, got %d", endIdxAfterSecond, cursor.StartIndex()) } } func TestCursorPersistence(t *testing.T) { tmpFile := "test_cursor.log" defer os.Remove(tmpFile) defer os.Remove(tmpFile + ".pos") // 写入测试数据 index, err := NewRecordIndex(tmpFile) if err != nil { t.Fatal(err) } defer index.Close() // 写入测试数据 writer, err := NewLogWriter(tmpFile, index) if err != nil { t.Fatalf("创建 writer 失败: %v", err) } testData := [][]byte{ []byte("record 1"), []byte("record 2"), []byte("record 3"), } for _, data := range testData { writer.Append(data) } // 读取前两条记录 cursor1, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } cursor1.Next() // 读取第一条 cursor1.Commit() // 提交 cursor1.Next() // 读取第二条 cursor1.Commit() // 提交 cursor1.Close() // 重新打开 cursor,应该从第三条开始读取 cursor2, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("重新创建 cursor 失败: %v", err) } defer cursor2.Close() rec, err := cursor2.Next() if err != nil { t.Fatalf("读取恢复后的记录失败: %v", err) } if string(rec.Data) != string(testData[2]) { t.Errorf("游标恢复失败: got %q, want %q", rec.Data, testData[2]) } } func TestTailer(t *testing.T) { tmpFile := "test_tailer.log" defer os.Remove(tmpFile) defer os.Remove(tmpFile + ".pos") // 创建 writer index, err := NewRecordIndex(tmpFile) if err != nil { t.Fatal(err) } defer index.Close() // 写入测试数据 writer, err := NewLogWriter(tmpFile, index) if err != nil { t.Fatalf("创建 writer 失败: %v", err) } // 先写入一些数据 writer.Append([]byte("initial record")) // 记录处理的数据 records := make([]string, 0) handler := func(rec *Record) error { records = append(records, string(rec.Data)) return nil } // 创建 cursor cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } // 创建 tailer tailer, err := NewTailer(cursor, handler, &TailConfig{ PollInterval: 50 * time.Millisecond, SaveInterval: 100 * time.Millisecond, }) if err != nil { t.Fatalf("创建 tailer 失败: %v", err) } // 启动 tailer ctx, cancel := context.WithCancel(context.Background()) go tailer.Start(ctx) // 等待处理初始记录 time.Sleep(100 * time.Millisecond) // 写入新数据 writer.Append([]byte("new record 1")) writer.Append([]byte("new record 2")) // 等待处理新数据 time.Sleep(200 * time.Millisecond) // 停止 tailer cancel() time.Sleep(100 * time.Millisecond) // 验证处理的数据 if len(records) < 3 { t.Errorf("处理的记录数量不足: got %d, want at least 3", len(records)) } expected := []string{"initial record", "new record 1", "new record 2"} for i, exp := range expected { if i >= len(records) || records[i] != exp { t.Errorf("第 %d 条记录不匹配: got %q, want %q", i, records[i], exp) } } } func TestTailerStop(t *testing.T) { tmpFile := "test_tailer_stop.log" defer os.Remove(tmpFile) defer os.Remove(tmpFile + ".pos") index, err := NewRecordIndex(tmpFile) if err != nil { t.Fatal(err) } defer index.Close() // 写入测试数据 writer, err := NewLogWriter(tmpFile, index) if err != nil { t.Fatalf("创建 writer 失败: %v", err) } writer.Append([]byte("test record")) count := 0 handler := func(rec *Record) error { count++ return nil } cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } tailer, err := NewTailer(cursor, handler, nil) if err != nil { t.Fatalf("创建 tailer 失败: %v", err) } // 启动 tailer go tailer.Start(context.Background()) // 等待处理 time.Sleep(100 * time.Millisecond) // 停止 tailer tailer.Stop() // 验证已处理 if count != 1 { t.Errorf("处理的记录数量不正确: got %d, want 1", count) } } func TestSeqlogBasic(t *testing.T) { tmpDir := "test_seqlog" os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) seqlog := NewLogHub(tmpDir, slog.Default(), nil) // 注册 handler appLogs := make([]string, 0) seqlog.RegisterHandler("app", func(rec *Record) error { appLogs = append(appLogs, string(rec.Data)) return nil }) accessLogs := make([]string, 0) seqlog.RegisterHandler("access", func(rec *Record) error { accessLogs = append(accessLogs, string(rec.Data)) return nil }) // 启动 if err := seqlog.Start(); err != nil { t.Fatalf("启动 seqlog 失败: %v", err) } // 写入日志 seqlog.Write("app", []byte("app log 1")) seqlog.Write("app", []byte("app log 2")) seqlog.Write("access", []byte("access log 1")) // 等待处理 time.Sleep(200 * time.Millisecond) // 停止 seqlog.Stop() // 验证 if len(appLogs) != 2 { t.Errorf("app logs 数量不正确: got %d, want 2", len(appLogs)) } if len(accessLogs) != 1 { t.Errorf("access logs 数量不正确: got %d, want 1", len(accessLogs)) } } func TestSeqlogDefaultHandler(t *testing.T) { tmpDir := "test_seqlog_default" os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) // 注册默认 handler allLogs := make(map[string][]string) var mu sync.Mutex defaultHandler := func(topic string, rec *Record) error { mu.Lock() defer mu.Unlock() allLogs[topic] = append(allLogs[topic], string(rec.Data)) return nil } seqlog := NewLogHub(tmpDir, slog.Default(), defaultHandler) // 注册特定 handler seqlog.RegisterHandler("special", func(rec *Record) error { mu.Lock() defer mu.Unlock() allLogs["special"] = append(allLogs["special"], string(rec.Data)) return nil }) // 启动 seqlog.Start() // 写入日志 seqlog.Write("special", []byte("special log")) seqlog.Write("other", []byte("other log 1")) seqlog.Write("other", []byte("other log 2")) // 等待处理 time.Sleep(200 * time.Millisecond) // 停止 seqlog.Stop() // 验证 mu.Lock() defer mu.Unlock() if len(allLogs["special"]) != 1 { t.Errorf("special logs 数量不正确: got %d, want 1", len(allLogs["special"])) } if len(allLogs["other"]) != 2 { t.Errorf("other logs 数量不正确: got %d, want 2", len(allLogs["other"])) } } func TestSeqlogDynamicRegistration(t *testing.T) { tmpDir := "test_seqlog_dynamic" os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) seqlog := NewLogHub(tmpDir, slog.Default(), nil) // 先注册 handler(handler 现在是必填项) logs := make([]string, 0) seqlog.RegisterHandler("dynamic", func(rec *Record) error { logs = append(logs, string(rec.Data)) return nil }) seqlog.Start() // 写入日志 seqlog.Write("dynamic", []byte("log 1")) seqlog.Write("dynamic", []byte("log 2")) seqlog.Write("dynamic", []byte("log 3")) // 等待处理 time.Sleep(200 * time.Millisecond) seqlog.Stop() // 应该处理所有日志 if len(logs) != 3 { t.Errorf("处理的日志数量不正确: got %d, want 3", len(logs)) } } func TestDynamicConfigUpdate(t *testing.T) { tmpDir := "test_dynamic_config" os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) seqlog := NewLogHub(tmpDir, slog.Default(), nil) // 注册 handler logs := make([]string, 0) seqlog.RegisterHandler("test", func(rec *Record) error { logs = append(logs, string(rec.Data)) return nil }) seqlog.Start() // 写入一些日志 seqlog.Write("test", []byte("log 1")) seqlog.Write("test", []byte("log 2")) time.Sleep(150 * time.Millisecond) // 获取当前配置 config, err := seqlog.GetTopicConfig("test") if err != nil { t.Fatalf("获取配置失败: %v", err) } // 验证默认配置 if config.PollInterval != 100*time.Millisecond { t.Errorf("默认 PollInterval 不正确: got %v, want 100ms", config.PollInterval) } // 动态更新配置 newConfig := &TailConfig{ PollInterval: 50 * time.Millisecond, SaveInterval: 500 * time.Millisecond, BatchSize: 10, } if err := seqlog.UpdateTopicConfig("test", newConfig); err != nil { t.Fatalf("更新配置失败: %v", err) } // 继续写入日志 seqlog.Write("test", []byte("log 3")) time.Sleep(150 * time.Millisecond) seqlog.Stop() // 验证所有日志都被处理 if len(logs) != 3 { t.Errorf("处理的日志数量不正确: got %d, want 3", len(logs)) } // 验证配置已更新 updatedConfig, _ := seqlog.GetTopicConfig("test") if updatedConfig.PollInterval != 50*time.Millisecond { t.Errorf("更新后的 PollInterval 不正确: got %v, want 50ms", updatedConfig.PollInterval) } } func TestUUIDUniqueness(t *testing.T) { tmpFile := "test_uuid.log" defer os.Remove(tmpFile) defer os.Remove(tmpFile + ".pos") // 写入测试数据 index, err := NewRecordIndex(tmpFile) if err != nil { t.Fatal(err) } defer index.Close() // 写入测试数据 writer, err := NewLogWriter(tmpFile, index) if err != nil { t.Fatalf("创建 writer 失败: %v", err) } // 写入多条相同内容的日志 for i := 0; i < 10; i++ { if _, err := writer.Append([]byte("same content")); err != nil { t.Fatalf("写入数据失败: %v", err) } } // 读取并验证 UUID 唯一性 cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } defer cursor.Close() uuids := make(map[string]bool) for i := 0; i < 10; i++ { rec, err := cursor.Next() if err != nil { t.Fatalf("读取第 %d 条记录失败: %v", i, err) } // 检查 UUID 是否已存在 uuidStr := rec.UUID.String() if uuids[uuidStr] { t.Errorf("发现重复的 UUID: %s", uuidStr) } uuids[uuidStr] = true // 验证数据内容 if string(rec.Data) != "same content" { t.Errorf("第 %d 条记录数据不匹配: got %q, want %q", i, rec.Data, "same content") } } // 验证所有 UUID 都不同 if len(uuids) != 10 { t.Errorf("UUID 数量不正确: got %d, want 10", len(uuids)) } } func TestUUIDValidation(t *testing.T) { tmpFile := "test_uuid_validation.log" defer os.Remove(tmpFile) defer os.Remove(tmpFile + ".pos") // 写入一条正常日志 index, err := NewRecordIndex(tmpFile) if err != nil { t.Fatal(err) } defer index.Close() // 写入测试数据 writer, err := NewLogWriter(tmpFile, index) if err != nil { t.Fatalf("创建 writer 失败: %v", err) } if _, err := writer.Append([]byte("test data")); err != nil { t.Fatalf("写入数据失败: %v", err) } // 读取并验证 UUID cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } defer cursor.Close() rec, err := cursor.Next() if err != nil { t.Fatalf("读取记录失败: %v", err) } // 验证 UUID 不为空 if rec.UUID == [16]byte{} { t.Error("UUID 为空") } // 验证 UUID 是有效的 uuidStr := rec.UUID.String() if len(uuidStr) != 36 { // UUID 字符串格式:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx t.Errorf("UUID 字符串格式不正确: %s (长度 %d)", uuidStr, len(uuidStr)) } t.Logf("生成的 UUID: %s", uuidStr) } func TestSeqlogAutoRecovery(t *testing.T) { tmpDir := "test_auto_recovery" os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) // 第一阶段:创建并写入日志 allLogs := make(map[string][]string) var mu sync.Mutex defaultHandler := func(topic string, rec *Record) error { mu.Lock() defer mu.Unlock() allLogs[topic] = append(allLogs[topic], string(rec.Data)) return nil } seqlog1 := NewLogHub(tmpDir, slog.Default(), defaultHandler) seqlog1.Start() // 写入一些日志 seqlog1.Write("app", []byte("app log 1")) seqlog1.Write("app", []byte("app log 2")) seqlog1.Write("access", []byte("access log 1")) seqlog1.Write("error", []byte("error log 1")) time.Sleep(200 * time.Millisecond) seqlog1.Stop() // 验证第一阶段写入的日志 mu.Lock() if len(allLogs["app"]) != 2 { t.Errorf("第一阶段 app logs 数量不正确: got %d, want 2", len(allLogs["app"])) } if len(allLogs["access"]) != 1 { t.Errorf("第一阶段 access logs 数量不正确: got %d, want 1", len(allLogs["access"])) } if len(allLogs["error"]) != 1 { t.Errorf("第一阶段 error logs 数量不正确: got %d, want 1", len(allLogs["error"])) } mu.Unlock() // 清空日志计数,模拟重启 mu.Lock() allLogs = make(map[string][]string) mu.Unlock() // 第二阶段:重启并自动恢复 seqlog2 := NewLogHub(tmpDir, slog.Default(), defaultHandler) seqlog2.Start() // 写入新日志 seqlog2.Write("app", []byte("app log 3")) seqlog2.Write("access", []byte("access log 2")) seqlog2.Write("new_topic", []byte("new topic log 1")) time.Sleep(300 * time.Millisecond) // 验证恢复后的处理 mu.Lock() // 应该只处理新写入的日志(因为游标已保存位置) if len(allLogs["app"]) != 1 { t.Errorf("恢复后 app logs 数量不正确: got %d, want 1 (只有新日志)", len(allLogs["app"])) } if len(allLogs["access"]) != 1 { t.Errorf("恢复后 access logs 数量不正确: got %d, want 1 (只有新日志)", len(allLogs["access"])) } if len(allLogs["new_topic"]) != 1 { t.Errorf("恢复后 new_topic logs 数量不正确: got %d, want 1", len(allLogs["new_topic"])) } // 验证日志内容 if allLogs["app"][0] != "app log 3" { t.Errorf("app 日志内容不正确: got %q, want %q", allLogs["app"][0], "app log 3") } mu.Unlock() seqlog2.Stop() // 验证自动发现的 topic topics := seqlog2.GetTopics() expectedTopics := map[string]bool{ "app": true, "access": true, "error": true, "new_topic": true, } if len(topics) != len(expectedTopics) { t.Errorf("topic 数量不正确: got %d, want %d", len(topics), len(expectedTopics)) } for _, topic := range topics { if !expectedTopics[topic] { t.Errorf("发现未预期的 topic: %s", topic) } } } func TestTopicProcessorClose(t *testing.T) { tmpDir := "test_processor_close" os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) // 创建 processor(提供空 handler) logger := slog.Default() processor, err := NewTopicProcessor(tmpDir, "test", logger, &TopicConfig{ Handler: func(rec *Record) error { return nil }, }) if err != nil { t.Fatalf("创建 processor 失败: %v", err) } // 写入一些数据 _, err = processor.Write([]byte("test data 1")) if err != nil { t.Fatalf("写入失败: %v", err) } _, err = processor.Write([]byte("test data 2")) if err != nil { t.Fatalf("写入失败: %v", err) } // 停止 processor if err := processor.Stop(); err != nil { t.Fatalf("停止失败: %v", err) } // 清理资源 if err := processor.Close(); err != nil { t.Fatalf("清理失败: %v", err) } // 验证 writer 已被清理(设置为 nil) processor.mu.RLock() if processor.writer != nil { t.Error("writer 应该被清理为 nil") } processor.mu.RUnlock() // 再次调用 Close 应该不报错 if err := processor.Close(); err != nil { t.Errorf("重复调用 Close 失败: %v", err) } } func TestSeqlogCleanup(t *testing.T) { tmpDir := "test_seqlog_cleanup" os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) seqlog := NewLogHub(tmpDir, slog.Default(), nil) seqlog.Start() // 写入多个 topic 的日志 topics := []string{"app", "access", "error"} for _, topic := range topics { for i := 0; i < 3; i++ { data := []byte(fmt.Sprintf("%s log %d", topic, i)) if _, err := seqlog.Write(topic, data); err != nil { t.Fatalf("写入失败 [topic=%s]: %v", topic, err) } } } // 停止并清理 if err := seqlog.Stop(); err != nil { t.Fatalf("停止失败: %v", err) } // 验证所有 processor 都被清理 seqlog.mu.RLock() for topic, processor := range seqlog.processors { processor.mu.RLock() if processor.writer != nil { t.Errorf("topic %s 的 writer 未被清理", topic) } processor.mu.RUnlock() } seqlog.mu.RUnlock() } // TestTopicStats 测试 TopicStats 的基本功能 func TestTopicStats(t *testing.T) { tmpDir := t.TempDir() statsPath := filepath.Join(tmpDir, "test.stats") // 创建统计管理器 stats := NewTopicStats(statsPath) // 测试写入统计 stats.IncWrite(100) stats.IncWrite(200) s := stats.Get() if s.WriteCount != 2 { t.Errorf("expected write count 2, got %d", s.WriteCount) } if s.WriteBytes != 300 { t.Errorf("expected write bytes 300, got %d", s.WriteBytes) } // 测试处理统计 stats.IncProcessed(50) stats.IncProcessed(150) s = stats.Get() if s.ProcessedCount != 2 { t.Errorf("expected processed count 2, got %d", s.ProcessedCount) } if s.ProcessedBytes != 200 { t.Errorf("expected processed bytes 200, got %d", s.ProcessedBytes) } // 测试错误统计 stats.IncError() stats.IncError() stats.IncError() s = stats.Get() if s.ErrorCount != 3 { t.Errorf("expected error count 3, got %d", s.ErrorCount) } // 测试持久化 if err := stats.Save(); err != nil { t.Fatalf("failed to save stats: %v", err) } // 创建新的统计管理器,应该能恢复数据 stats2 := NewTopicStats(statsPath) s2 := stats2.Get() if s2.WriteCount != s.WriteCount { t.Errorf("recovered write count mismatch: expected %d, got %d", s.WriteCount, s2.WriteCount) } if s2.WriteBytes != s.WriteBytes { t.Errorf("recovered write bytes mismatch: expected %d, got %d", s.WriteBytes, s2.WriteBytes) } if s2.ProcessedCount != s.ProcessedCount { t.Errorf("recovered processed count mismatch: expected %d, got %d", s.ProcessedCount, s2.ProcessedCount) } if s2.ProcessedBytes != s.ProcessedBytes { t.Errorf("recovered processed bytes mismatch: expected %d, got %d", s.ProcessedBytes, s2.ProcessedBytes) } if s2.ErrorCount != s.ErrorCount { t.Errorf("recovered error count mismatch: expected %d, got %d", s.ErrorCount, s2.ErrorCount) } } // TestTopicProcessorStats 测试 TopicProcessor 的统计功能 func TestTopicProcessorStats(t *testing.T) { tmpDir := t.TempDir() topic := "stats_test" // 创建 processor logger := slog.Default() config := &TopicConfig{ Handler: func(rec *Record) error { // 简单处理函数 return nil }, } processor, err := NewTopicProcessor(tmpDir, topic, logger, config) if err != nil { t.Fatalf("创建 processor 失败: %v", err) } // 启动 processor if err := processor.Start(); err != nil { t.Fatalf("failed to start processor: %v", err) } defer func() { processor.Stop() processor.Close() }() // 写入一些数据 data1 := []byte("test message 1") data2 := []byte("test message 2") data3 := []byte("test message 3") if _, err := processor.Write(data1); err != nil { t.Fatalf("failed to write: %v", err) } if _, err := processor.Write(data2); err != nil { t.Fatalf("failed to write: %v", err) } if _, err := processor.Write(data3); err != nil { t.Fatalf("failed to write: %v", err) } // 等待处理完成 time.Sleep(200 * time.Millisecond) // 检查统计信息 stats := processor.GetStats() if stats.WriteCount != 3 { t.Errorf("expected write count 3, got %d", stats.WriteCount) } expectedBytes := int64(len(data1) + len(data2) + len(data3)) if stats.WriteBytes != expectedBytes { t.Errorf("expected write bytes %d, got %d", expectedBytes, stats.WriteBytes) } if stats.ProcessedCount != 3 { t.Errorf("expected processed count 3, got %d", stats.ProcessedCount) } if stats.ProcessedBytes != expectedBytes { t.Errorf("expected processed bytes %d, got %d", expectedBytes, stats.ProcessedBytes) } } // TestStatsRecovery 测试统计信息的恢复功能 func TestStatsRecovery(t *testing.T) { tmpDir := t.TempDir() topic := "recovery_test" // 第一次运行:写入数据 logger := slog.Default() config := &TopicConfig{ Handler: func(rec *Record) error { return nil }, } processor1, err := NewTopicProcessor(tmpDir, topic, logger, config) if err != nil { t.Fatalf("创建 processor 失败: %v", err) } if err := processor1.Start(); err != nil { t.Fatalf("failed to start processor: %v", err) } // 写入数据 for i := 0; i < 5; i++ { data := []byte(fmt.Sprintf("message %d", i)) if _, err := processor1.Write(data); err != nil { t.Fatalf("failed to write: %v", err) } } // 等待处理 time.Sleep(200 * time.Millisecond) // 获取统计 stats1 := processor1.GetStats() // 停止并保存 if err := processor1.Stop(); err != nil { t.Fatalf("failed to stop processor: %v", err) } if err := processor1.Close(); err != nil { t.Fatalf("failed to close processor: %v", err) } // 第二次运行:应该能恢复统计信息 processor2, err := NewTopicProcessor(tmpDir, topic, logger, config) if err != nil { t.Fatalf("创建 processor 失败: %v", err) } stats2 := processor2.GetStats() // 验证恢复的统计信息 if stats2.WriteCount != stats1.WriteCount { t.Errorf("write count mismatch: expected %d, got %d", stats1.WriteCount, stats2.WriteCount) } if stats2.WriteBytes != stats1.WriteBytes { t.Errorf("write bytes mismatch: expected %d, got %d", stats1.WriteBytes, stats2.WriteBytes) } if stats2.ProcessedCount != stats1.ProcessedCount { t.Errorf("processed count mismatch: expected %d, got %d", stats1.ProcessedCount, stats2.ProcessedCount) } if stats2.ProcessedBytes != stats1.ProcessedBytes { t.Errorf("processed bytes mismatch: expected %d, got %d", stats1.ProcessedBytes, stats2.ProcessedBytes) } processor2.Close() } // TestSeqlogStats 测试 Seqlog 层面的统计聚合 func TestSeqlogStats(t *testing.T) { tmpDir := t.TempDir() handler := func(topic string, rec *Record) error { return nil } seq := NewLogHub(tmpDir, slog.Default(), handler) if err := seq.Start(); err != nil { t.Fatalf("failed to start seqlog: %v", err) } defer seq.Stop() // 写入多个 topic topics := []string{"app", "sys", "audit"} for _, topic := range topics { for i := 0; i < 3; i++ { data := []byte(fmt.Sprintf("%s message %d", topic, i)) if _, err := seq.Write(topic, data); err != nil { t.Fatalf("failed to write to %s: %v", topic, err) } } } // 等待处理 time.Sleep(300 * time.Millisecond) // 检查单个 topic 统计 for _, topic := range topics { stats, err := seq.GetTopicStats(topic) if err != nil { t.Errorf("failed to get stats for %s: %v", topic, err) continue } if stats.WriteCount != 3 { t.Errorf("%s: expected write count 3, got %d", topic, stats.WriteCount) } } // 检查所有统计 allStats := seq.GetAllStats() if len(allStats) != len(topics) { t.Errorf("expected %d topics, got %d", len(topics), len(allStats)) } for _, topic := range topics { stats, exists := allStats[topic] if !exists { t.Errorf("stats for %s not found", topic) continue } if stats.WriteCount != 3 { t.Errorf("%s: expected write count 3, got %d", topic, stats.WriteCount) } } } // TestRecordQuery 测试记录查询功能 func TestRecordQuery(t *testing.T) { tmpFile := "test_query.log" defer os.Remove(tmpFile) defer os.Remove(tmpFile + ".pos") // 写入测试数据 index, err := NewRecordIndex(tmpFile) if err != nil { t.Fatal(err) } defer index.Close() // 写入测试数据 writer, err := NewLogWriter(tmpFile, index) if err != nil { t.Fatalf("failed to create writer: %v", err) } messages := []string{ "message 0", "message 1", "message 2", "message 3", "message 4", "message 5", "message 6", "message 7", "message 8", "message 9", } offsets := make([]int64, len(messages)) for i, msg := range messages { offset, err := writer.Append([]byte(msg)) if err != nil { t.Fatalf("failed to write message %d: %v", i, err) } offsets[i] = offset } defer writer.Close() // 模拟处理到第 5 条记录 // 窗口范围:[索引 5, 索引 6) startIdx := 5 endIdx := 6 // 创建索引 index, err = NewRecordIndex(tmpFile) if err != nil { t.Fatalf("failed to create index: %v", err) } defer index.Close() // 创建查询器 query, err := NewRecordQuery(tmpFile, index, writer) if err != nil { t.Fatalf("failed to create query: %v", err) } defer query.Close() // 测试查询当前位置(使用 QueryNewest 查询 startIdx) current, err := query.QueryNewest(startIdx-1, 1) if err != nil { t.Fatalf("failed to query current: %v", err) } if len(current) != 1 { t.Fatalf("expected 1 current result, got %d", len(current)) } if string(current[0].Record.Data) != "message 5" { t.Errorf("expected current 'message 5', got '%s'", string(current[0].Record.Data)) } if current[0].Index != startIdx { t.Errorf("expected index %d, got %d", startIdx, current[0].Index) } // 手动判断状态 status := GetRecordStatus(startIdx, startIdx, endIdx) if status != StatusProcessing { t.Errorf("expected status Processing, got %s", status) } // 测试 QueryOldest:查询更早的记录(向索引递减方向) // QueryOldest(5, 3) 查询索引 2, 3, 4 backResults, err := query.QueryOldest(startIdx, 3) if err != nil { t.Fatalf("failed to query backward: %v", err) } if len(backResults) != 3 { t.Errorf("expected 3 backward results, got %d", len(backResults)) } // 返回按索引递增排序的结果:2, 3, 4 expectedBack := []string{"message 2", "message 3", "message 4"} for i, rec := range backResults { if string(rec.Record.Data) != expectedBack[i] { t.Errorf("backward[%d]: expected '%s', got '%s'", i, expectedBack[i], string(rec.Record.Data)) } expectedIndex := startIdx - 3 + i if rec.Index != expectedIndex { t.Errorf("backward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index) } // 手动判断状态:索引 2, 3, 4 都已处理 recStatus := GetRecordStatus(rec.Index, startIdx, endIdx) if recStatus != StatusProcessed { t.Errorf("backward[%d]: expected status Processed, got %s", i, recStatus) } } // 测试 QueryNewest:查询更新的记录(向索引递增方向) // QueryNewest(endIdx, 3) 从 endIdx 向后查询,查询索引 6, 7, 8 forwardResults, err := query.QueryNewest(endIdx-1, 3) if err != nil { t.Fatalf("failed to query forward: %v", err) } if len(forwardResults) != 3 { t.Errorf("expected 3 forward results, got %d", len(forwardResults)) } // 返回按索引递增排序的结果:6, 7, 8 expectedForward := []string{"message 6", "message 7", "message 8"} for i, rec := range forwardResults { if string(rec.Record.Data) != expectedForward[i] { t.Errorf("forward[%d]: expected '%s', got '%s'", i, expectedForward[i], string(rec.Record.Data)) } expectedIndex := endIdx + i if rec.Index != expectedIndex { t.Errorf("forward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index) } // 手动判断状态:索引 6, 7, 8 待处理 recStatus := GetRecordStatus(rec.Index, startIdx, endIdx) if recStatus != StatusPending { t.Errorf("forward[%d]: expected status Pending, got %s", i, recStatus) } } } // TestTopicQuery 测试 TopicProcessor 的查询功能 func TestTopicQuery(t *testing.T) { tmpDir := t.TempDir() topic := "query_test" logger := slog.Default() processedCount := 0 config := &TopicConfig{ Handler: func(rec *Record) error { processedCount++ // 只处理前 3 条 if processedCount >= 3 { return fmt.Errorf("stop processing") } return nil }, } processor, err := NewTopicProcessor(tmpDir, topic, logger, config) if err != nil { t.Fatalf("创建 processor 失败: %v", err) } if err := processor.Start(); err != nil { t.Fatalf("failed to start processor: %v", err) } // 写入 10 条消息 for i := 0; i < 10; i++ { data := []byte(fmt.Sprintf("message %d", i)) if _, err := processor.Write(data); err != nil { t.Fatalf("failed to write: %v", err) } } // 等待处理 time.Sleep(300 * time.Millisecond) processor.Stop() // 获取当前处理索引 startIdx := processor.GetProcessingIndex() endIdx := processor.GetReadIndex() t.Logf("Processing index: [%d, %d)", startIdx, endIdx) // 测试查询当前位置(使用 processor 方法,带状态) if startIdx < endIdx { current, err := processor.QueryOldest(startIdx, 1) if err != nil { t.Fatalf("failed to query current: %v", err) } if len(current) > 0 { t.Logf("Current: %s - %s", string(current[0].Record.Data), current[0].Status) } } // 测试向后查询 if startIdx > 0 { back, err := processor.QueryNewest(startIdx-1, 2) if err != nil { t.Fatalf("failed to query backward: %v", err) } for i, rec := range back { t.Logf("Back[%d]: %s - %s", i, string(rec.Record.Data), rec.Status) } } // 测试向前查询 if startIdx < processor.GetRecordCount() { forward, err := processor.QueryOldest(endIdx, 3) if err != nil { t.Fatalf("failed to query forward: %v", err) } for i, rec := range forward { t.Logf("Forward[%d]: %s - %s", i, string(rec.Record.Data), rec.Status) } } processor.Close() } // TestSeqlogQuery 测试 Seqlog 层面的查询功能 func TestSeqlogQuery(t *testing.T) { tmpDir := t.TempDir() processedCount := 0 handler := func(topic string, rec *Record) error { processedCount++ // 只处理前 5 条 if processedCount >= 5 { return fmt.Errorf("stop processing") } return nil } seq := NewLogHub(tmpDir, slog.Default(), handler) if err := seq.Start(); err != nil { t.Fatalf("failed to start seqlog: %v", err) } // 写入消息 for i := 0; i < 10; i++ { data := []byte(fmt.Sprintf("app message %d", i)) if _, err := seq.Write("app", data); err != nil { t.Fatalf("failed to write: %v", err) } } // 等待处理 time.Sleep(300 * time.Millisecond) // 创建查询器 query, err := seq.NewTopicQuery("app") if err != nil { t.Fatalf("failed to create query: %v", err) } defer query.Close() // 获取当前处理索引 startIdx := seq.GetProcessingIndex("app") endIdx := seq.GetReadIndex("app") t.Logf("Processing index: [%d, %d)", startIdx, endIdx) // 获取 processor 用于查询(带状态) processor, _ := seq.GetProcessor("app") totalCount := processor.GetRecordCount() // 测试查询当前 if startIdx < endIdx { current, err := processor.QueryOldest(startIdx, 1) if err != nil { t.Fatalf("failed to query current: %v", err) } if len(current) > 0 { t.Logf("Current: %s, Status: %s", string(current[0].Record.Data), current[0].Status) } } // 测试向后查询 if startIdx > 0 { back, err := processor.QueryNewest(startIdx-1, 2) if err != nil { t.Fatalf("failed to query backward: %v", err) } for i, rec := range back { t.Logf("Back[%d]: %s - %s", i, string(rec.Record.Data), rec.Status) } } // 测试向前查询 if startIdx < totalCount { forward, err := processor.QueryOldest(endIdx, 3) if err != nil { t.Fatalf("failed to query forward: %v", err) } for i, rec := range forward { t.Logf("Forward[%d]: %s - %s", i, string(rec.Record.Data), rec.Status) } } seq.Stop() } // TestEventNotification 测试事件通知功能 func TestEventNotification(t *testing.T) { tmpDir := t.TempDir() topic := "event_test" logger := slog.Default() processedCount := 0 config := &TopicConfig{ Handler: func(rec *Record) error { processedCount++ // 第 2 条消息返回错误 if processedCount == 2 { return fmt.Errorf("模拟处理错误") } return nil }, } processor, err := NewTopicProcessor(tmpDir, topic, logger, config) if err != nil { t.Fatalf("创建 processor 失败: %v", err) } // 记录收到的事件 events := make([]EventType, 0) var eventsMu sync.Mutex // 订阅所有事件 processor.SubscribeAll(func(event *Event) { eventsMu.Lock() events = append(events, event.Type) t.Logf("收到事件: %s - Topic: %s", event.Type, event.Topic) eventsMu.Unlock() }) // 启动 processor if err := processor.Start(); err != nil { t.Fatalf("failed to start processor: %v", err) } // 写入 3 条消息 for i := 0; i < 3; i++ { data := []byte(fmt.Sprintf("message %d", i)) if _, err := processor.Write(data); err != nil { t.Fatalf("failed to write: %v", err) } } // 等待处理 time.Sleep(500 * time.Millisecond) // 停止 processor.Stop() processor.Close() // 等待事件处理完成 time.Sleep(100 * time.Millisecond) // 检查事件 eventsMu.Lock() defer eventsMu.Unlock() t.Logf("收到 %d 个事件", len(events)) // 应该有:1 个启动事件 + 3 个写入成功事件 + 1 个处理成功事件 + 1 个处理错误事件 + 1 个停止事件 expectedMinEvents := 7 if len(events) < expectedMinEvents { t.Errorf("expected at least %d events, got %d", expectedMinEvents, len(events)) } // 检查特定事件类型 hasStart := false hasStop := false hasWriteSuccess := false hasProcessSuccess := false hasProcessError := false for _, eventType := range events { switch eventType { case EventProcessorStart: hasStart = true case EventProcessorStop: hasStop = true case EventWriteSuccess: hasWriteSuccess = true case EventProcessSuccess: hasProcessSuccess = true case EventProcessError: hasProcessError = true } } if !hasStart { t.Error("missing ProcessorStart event") } if !hasStop { t.Error("missing ProcessorStop event") } if !hasWriteSuccess { t.Error("missing WriteSuccess event") } if !hasProcessSuccess { t.Error("missing ProcessSuccess event") } if !hasProcessError { t.Error("missing ProcessError event") } } // TestSeqlogEventSubscription 测试 Seqlog 层面的事件订阅 func TestSeqlogEventSubscription(t *testing.T) { tmpDir := t.TempDir() handler := func(topic string, rec *Record) error { return nil } seq := NewLogHub(tmpDir, slog.Default(), handler) if err := seq.Start(); err != nil { t.Fatalf("failed to start seqlog: %v", err) } defer seq.Stop() // 订阅写入成功事件 writeCount := 0 var writeMu sync.Mutex seq.Subscribe("app", EventWriteSuccess, func(event *Event) { writeMu.Lock() writeCount++ t.Logf("写入成功: topic=%s, position=%d", event.Topic, event.Position) writeMu.Unlock() }) // 写入消息 for i := 0; i < 5; i++ { data := []byte(fmt.Sprintf("message %d", i)) if _, err := seq.Write("app", data); err != nil { t.Fatalf("failed to write: %v", err) } } // 等待事件处理 time.Sleep(200 * time.Millisecond) writeMu.Lock() if writeCount != 5 { t.Errorf("expected 5 write events, got %d", writeCount) } writeMu.Unlock() } // TestMultiTopicEventSubscription 测试多 topic 事件订阅 func TestMultiTopicEventSubscription(t *testing.T) { tmpDir := t.TempDir() handler := func(topic string, rec *Record) error { return nil } seq := NewLogHub(tmpDir, slog.Default(), handler) if err := seq.Start(); err != nil { t.Fatalf("failed to start seqlog: %v", err) } defer seq.Stop() // 统计每个 topic 的写入事件 eventCounts := make(map[string]int) var countMu sync.Mutex // 为所有 topic 订阅写入成功事件 seq.SubscribeAllTopics(EventWriteSuccess, func(event *Event) { countMu.Lock() eventCounts[event.Topic]++ countMu.Unlock() }) // 写入不同 topic topics := []string{"app", "sys", "audit"} for _, topic := range topics { for i := 0; i < 3; i++ { data := []byte(fmt.Sprintf("%s message %d", topic, i)) if _, err := seq.Write(topic, data); err != nil { t.Fatalf("failed to write to %s: %v", topic, err) } } } // 等待事件处理 time.Sleep(300 * time.Millisecond) countMu.Lock() defer countMu.Unlock() for _, topic := range topics { count := eventCounts[topic] if count != 3 { t.Errorf("topic %s: expected 3 write events, got %d", topic, count) } } } // TestTopicReset 测试 topic 的 Reset 功能 func TestTopicReset(t *testing.T) { tmpDir := t.TempDir() seqlog := NewLogHub(tmpDir, slog.Default(), nil) // 注册 handler seqlog.RegisterHandler("test", func(rec *Record) error { return nil }) seqlog.Start() // 写入一些日志 for i := 0; i < 5; i++ { data := []byte(fmt.Sprintf("message %d", i)) if _, err := seqlog.Write("test", data); err != nil { t.Fatalf("写入失败: %v", err) } } // 等待处理 time.Sleep(300 * time.Millisecond) // 验证统计信息 stats, err := seqlog.GetTopicStats("test") if err != nil { t.Fatalf("获取统计失败: %v", err) } if stats.WriteCount != 5 { t.Errorf("期望写入 5 条,实际 %d 条", stats.WriteCount) } if stats.ProcessedCount != 5 { t.Errorf("期望处理 5 条,实际 %d 条", stats.ProcessedCount) } // 检查日志文件是否存在 logFile := filepath.Join(tmpDir, "test.log") if _, err := os.Stat(logFile); os.IsNotExist(err) { t.Error("日志文件不存在") } // 使用 ResetTopic 方法 if err := seqlog.ResetTopic("test"); err != nil { t.Fatalf("Reset 失败: %v", err) } // 等待重置完成 time.Sleep(100 * time.Millisecond) // 验证日志文件已被重置(Reset 会重新初始化组件,创建空文件) fileInfo, err := os.Stat(logFile) if err != nil { t.Errorf("日志文件应该存在(空文件): %v", err) } else if fileInfo.Size() != 0 { t.Errorf("日志文件应该是空的,但大小为 %d", fileInfo.Size()) } // 验证统计信息已重置 stats, err = seqlog.GetTopicStats("test") if err != nil { t.Fatalf("获取统计失败: %v", err) } if stats.WriteCount != 0 { t.Errorf("期望写入计数为 0,实际 %d", stats.WriteCount) } if stats.ProcessedCount != 0 { t.Errorf("期望处理计数为 0,实际 %d", stats.ProcessedCount) } // 验证可以继续写入 for i := 0; i < 3; i++ { data := []byte(fmt.Sprintf("new message %d", i)) if _, err := seqlog.Write("test", data); err != nil { t.Fatalf("重置后写入失败: %v", err) } } time.Sleep(300 * time.Millisecond) stats, _ = seqlog.GetTopicStats("test") if stats.WriteCount != 3 { t.Errorf("重置后期望写入 3 条,实际 %d 条", stats.WriteCount) } if stats.ProcessedCount != 3 { t.Errorf("重置后期望处理 3 条,实际 %d 条", stats.ProcessedCount) } // 最后停止 seqlog.Stop() } // TestTopicResetWithPendingRecords 测试当有待处理日志时 Reset 返回错误 func TestTopicResetWithPendingRecords(t *testing.T) { tmpDir := t.TempDir() seqlog := NewLogHub(tmpDir, slog.Default(), nil) // 注册一个慢速 handler,让日志堆积 slowHandler := func(rec *Record) error { time.Sleep(100 * time.Millisecond) // 模拟慢速处理 return nil } seqlog.RegisterHandler("test", slowHandler) seqlog.Start() // 快速写入多条日志 for i := 0; i < 10; i++ { data := []byte(fmt.Sprintf("message %d", i)) if _, err := seqlog.Write("test", data); err != nil { t.Fatalf("写入失败: %v", err) } } // 短暂等待,让一部分日志开始处理但不是全部 time.Sleep(200 * time.Millisecond) // 尝试重置,应该失败因为有待处理的日志 err := seqlog.ResetTopic("test") if err == nil { t.Fatal("期望 Reset 失败因为有待处理的日志,但成功了") } t.Logf("预期的错误: %v", err) // 停止处理 seqlog.Stop() // 现在 Reset 应该成功(停止后没有待处理的日志) processor, _ := seqlog.GetProcessor("test") if err := processor.Reset(); err != nil { t.Fatalf("停止后 Reset 应该成功: %v", err) } } // TestQueryOldestNewest 测试 QueryOldest 和 QueryNewest func TestQueryOldestNewest(t *testing.T) { tmpDir := t.TempDir() // 创建 TopicProcessor(提供空 handler) processor, err := NewTopicProcessor(tmpDir, "test", nil, &TopicConfig{ Handler: func(rec *Record) error { return nil }, }) if err != nil { t.Fatal(err) } defer processor.Close() // 写入测试数据 for i := 0; i < 10; i++ { data := fmt.Sprintf("message %d", i) if _, err := processor.Write([]byte(data)); err != nil { t.Fatal(err) } } // 测试 QueryNewest - 查询索引 0, 1, 2(向索引递增方向) // QueryNewest(-1, 3) 从 -1 向后查询,得到索引 0, 1, 2 oldest, err := processor.QueryNewest(-1, 3) if err != nil { t.Fatalf("QueryNewest failed: %v", err) } if len(oldest) != 3 { t.Errorf("expected 3 records, got %d", len(oldest)) } // 验证顺序:应该是 0, 1, 2 for i := 0; i < 3; i++ { expected := fmt.Sprintf("message %d", i) if string(oldest[i].Record.Data) != expected { t.Errorf("oldest[%d]: expected %s, got %s", i, expected, string(oldest[i].Record.Data)) } if oldest[i].Index != i { t.Errorf("oldest[%d]: expected index %d, got %d", i, i, oldest[i].Index) } t.Logf("Oldest[%d]: index=%d, %s - %s", i, oldest[i].Index, string(oldest[i].Record.Data), oldest[i].Status) } // 测试 QueryOldest - 查询索引 7, 8, 9(向索引递减方向) // QueryOldest(10, 3) 从 10 向前查询,得到索引 7, 8, 9 totalCount := processor.GetRecordCount() newest, err := processor.QueryOldest(totalCount, 3) if err != nil { t.Fatalf("QueryOldest failed: %v", err) } if len(newest) != 3 { t.Errorf("expected 3 records, got %d", len(newest)) } // 验证顺序:应该是 7, 8, 9(按索引递增) for i := 0; i < 3; i++ { expected := fmt.Sprintf("message %d", 7+i) if string(newest[i].Record.Data) != expected { t.Errorf("newest[%d]: expected %s, got %s", i, expected, string(newest[i].Record.Data)) } if newest[i].Index != 7+i { t.Errorf("newest[%d]: expected index %d, got %d", i, 7+i, newest[i].Index) } t.Logf("Newest[%d]: index=%d, %s - %s", i, newest[i].Index, string(newest[i].Record.Data), newest[i].Status) } // 测试超出范围 - 查询所有记录 // QueryNewest(-1, 100) 从 -1 向后查询,会返回所有记录(最多 100 条) all, err := processor.QueryNewest(-1, 100) if err != nil { t.Fatalf("QueryNewest(-1, 100) failed: %v", err) } if len(all) != 10 { t.Errorf("expected 10 records, got %d", len(all)) } // 测试空结果 processor2, err := NewTopicProcessor(t.TempDir(), "empty", nil, &TopicConfig{ Handler: func(rec *Record) error { return nil }, }) if err != nil { t.Fatal(err) } defer processor2.Close() emptyNewest, err := processor2.QueryNewest(-1, 10) if err != nil { t.Fatalf("QueryNewest on empty failed: %v", err) } if len(emptyNewest) != 0 { t.Errorf("expected 0 records, got %d", len(emptyNewest)) } } // TestQueryFromFirstAndLast 测试 QueryFromFirst 和 QueryFromLast func TestQueryFromFirstAndLast(t *testing.T) { tmpDir := t.TempDir() // 创建 TopicProcessor processor, err := NewTopicProcessor(tmpDir, "test", nil, &TopicConfig{ Handler: func(rec *Record) error { return nil }, }) if err != nil { t.Fatal(err) } defer processor.Close() // 写入 10 条测试数据 for i := 0; i < 10; i++ { data := fmt.Sprintf("message %d", i) if _, err := processor.Write([]byte(data)); err != nil { t.Fatal(err) } } // 测试 QueryFromFirst - 从第一条记录向索引递增方向查询 t.Run("QueryFromFirst", func(t *testing.T) { // 查询前 3 条记录 records, err := processor.QueryFromFirst(3) if err != nil { t.Fatalf("QueryFromFirst failed: %v", err) } if len(records) != 3 { t.Fatalf("expected 3 records, got %d", len(records)) } // 验证结果:应该是索引 0, 1, 2 for i := 0; i < 3; i++ { expectedData := fmt.Sprintf("message %d", i) if string(records[i].Record.Data) != expectedData { t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data)) } if records[i].Index != i { t.Errorf("records[%d]: expected index %d, got %d", i, i, records[i].Index) } t.Logf("FromFirst[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status) } // 查询超过总数的记录 allRecords, err := processor.QueryFromFirst(100) if err != nil { t.Fatalf("QueryFromFirst(100) failed: %v", err) } if len(allRecords) != 10 { t.Errorf("expected 10 records, got %d", len(allRecords)) } }) // 测试 QueryFromLast - 从最后一条记录向索引递减方向查询 t.Run("QueryFromLast", func(t *testing.T) { // 查询最后 3 条记录 records, err := processor.QueryFromLast(3) if err != nil { t.Fatalf("QueryFromLast failed: %v", err) } if len(records) != 3 { t.Fatalf("expected 3 records, got %d", len(records)) } // 验证结果:应该是索引 7, 8, 9(按索引递增顺序排列) for i := 0; i < 3; i++ { expectedIndex := 7 + i expectedData := fmt.Sprintf("message %d", expectedIndex) if string(records[i].Record.Data) != expectedData { t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data)) } if records[i].Index != expectedIndex { t.Errorf("records[%d]: expected index %d, got %d", i, expectedIndex, records[i].Index) } t.Logf("FromLast[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status) } // 查询超过总数的记录 allRecords, err := processor.QueryFromLast(100) if err != nil { t.Fatalf("QueryFromLast(100) failed: %v", err) } if len(allRecords) != 10 { t.Errorf("expected 10 records, got %d", len(allRecords)) } }) // 测试空数据库 t.Run("EmptyDatabase", func(t *testing.T) { emptyProcessor, err := NewTopicProcessor(t.TempDir(), "empty", nil, &TopicConfig{ Handler: func(rec *Record) error { return nil }, }) if err != nil { t.Fatal(err) } defer emptyProcessor.Close() // QueryFromFirst 应该返回空数组 firstRecords, err := emptyProcessor.QueryFromFirst(10) if err != nil { t.Fatalf("QueryFromFirst on empty failed: %v", err) } if len(firstRecords) != 0 { t.Errorf("expected 0 records, got %d", len(firstRecords)) } // QueryFromLast 应该返回空数组 lastRecords, err := emptyProcessor.QueryFromLast(10) if err != nil { t.Fatalf("QueryFromLast on empty failed: %v", err) } if len(lastRecords) != 0 { t.Errorf("expected 0 records, got %d", len(lastRecords)) } }) } // TestLogHubQueryFromFirstAndLast 测试 LogHub 的 QueryFromFirst 和 QueryFromLast func TestLogHubQueryFromFirstAndLast(t *testing.T) { tmpDir := t.TempDir() seqlog := NewLogHub(tmpDir, slog.Default(), nil) seqlog.RegisterHandler("test", func(rec *Record) error { return nil }) seqlog.Start() defer seqlog.Stop() // 写入测试数据 for i := 0; i < 10; i++ { data := fmt.Sprintf("message %d", i) if _, err := seqlog.Write("test", []byte(data)); err != nil { t.Fatal(err) } } // 测试 QueryFromFirst firstRecords, err := seqlog.QueryFromFirst("test", 3) if err != nil { t.Fatalf("QueryFromFirst failed: %v", err) } if len(firstRecords) != 3 { t.Fatalf("expected 3 records, got %d", len(firstRecords)) } for i := 0; i < 3; i++ { if firstRecords[i].Index != i { t.Errorf("firstRecords[%d]: expected index %d, got %d", i, i, firstRecords[i].Index) } } // 测试 QueryFromLast lastRecords, err := seqlog.QueryFromLast("test", 3) if err != nil { t.Fatalf("QueryFromLast failed: %v", err) } if len(lastRecords) != 3 { t.Fatalf("expected 3 records, got %d", len(lastRecords)) } for i := 0; i < 3; i++ { expectedIndex := 7 + i if lastRecords[i].Index != expectedIndex { t.Errorf("lastRecords[%d]: expected index %d, got %d", i, expectedIndex, lastRecords[i].Index) } } // 测试不存在的 topic _, err = seqlog.QueryFromFirst("nonexistent", 10) if err == nil { t.Error("expected error for nonexistent topic") } _, err = seqlog.QueryFromLast("nonexistent", 10) if err == nil { t.Error("expected error for nonexistent topic") } }