Files
seqlog/seqlog_test.go
bourdon 810664eb12 重构:优化记录格式并修复核心功能
- 修改记录存储格式为 [4B len][8B offset][4B CRC][16B UUID][data]
- 修复 TopicProcessor 中 WaitGroup 使用错误导致 handler 不执行的问题
- 修复写入保护逻辑,避免 dirtyOffset=-1 时误判为写入中
- 添加统计信息定期持久化功能
- 改进 UTF-8 字符截断处理,防止 CJK 字符乱码
- 优化 Web UI:显示人类可读的文件大小,支持点击外部关闭弹窗
- 重构示例代码,添加 webui 和 webui_integration 示例

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 17:54:49 +08:00

2089 lines
52 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"sync"
"testing"
"time"
)
func TestBasicWriteAndRead(t *testing.T) {
tmpFile := "test_basic.log"
defer os.Remove(tmpFile)
defer os.Remove(tmpFile + ".pos")
defer os.Remove(tmpFile + ".idx")
// 创建索引
index, err := NewRecordIndex(tmpFile)
if err != nil {
t.Fatalf("创建索引失败: %v", err)
}
defer index.Close()
// 写入测试数据
writer, err := NewLogWriter(tmpFile, index)
if err != nil {
t.Fatalf("创建 writer 失败: %v", err)
}
testData := [][]byte{
[]byte("hello world"),
[]byte("test log entry 1"),
[]byte("test log entry 2"),
}
for _, data := range testData {
if _, err := writer.Append(data); err != nil {
t.Fatalf("写入数据失败: %v", err)
}
}
// 读取数据(使用共享的 index
cursor, err := NewCursor(tmpFile, index, nil)
if err != nil {
t.Fatalf("创建 cursor 失败: %v", err)
}
defer cursor.Close()
for i, expected := range testData {
rec, err := cursor.Next()
if err != nil {
t.Fatalf("读取第 %d 条记录失败: %v", i, err)
}
if string(rec.Data) != string(expected) {
t.Errorf("第 %d 条记录数据不匹配: got %q, want %q", i, rec.Data, expected)
}
}
}
// TestCursorNextRange 测试范围游动功能
func TestCursorNextRange(t *testing.T) {
tmpFile := "test_range.log"
defer os.Remove(tmpFile)
defer os.Remove(tmpFile + ".pos")
defer os.Remove(tmpFile + ".idx")
// 创建索引
index, err := NewRecordIndex(tmpFile)
if err != nil {
t.Fatal(err)
}
defer index.Close()
// 写入多条记录
writer, err := NewLogWriter(tmpFile, index)
if err != nil {
t.Fatal(err)
}
messages := []string{"msg1", "msg2", "msg3", "msg4", "msg5", "msg6", "msg7", "msg8", "msg9", "msg10"}
for _, msg := range messages {
if _, err := writer.Append([]byte(msg)); err != nil {
t.Fatal(err)
}
}
writer.Close()
// 测试范围读取
cursor, err := NewCursor(tmpFile, index, nil)
if err != nil {
t.Fatal(err)
}
defer cursor.Close()
// 第一次读取 3 条
records, err := cursor.NextRange(3)
if err != nil {
t.Fatal(err)
}
if len(records) != 3 {
t.Fatalf("expected 3 records, got %d", len(records))
}
for i, rec := range records {
expected := messages[i]
if string(rec.Data) != expected {
t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data))
}
}
// 第二次读取 5 条
records, err = cursor.NextRange(5)
if err != nil {
t.Fatal(err)
}
if len(records) != 5 {
t.Fatalf("expected 5 records, got %d", len(records))
}
for i, rec := range records {
expected := messages[i+3]
if string(rec.Data) != expected {
t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data))
}
}
// 第三次读取 5 条(但只剩 2 条)
records, err = cursor.NextRange(5)
if err != nil {
t.Fatal(err)
}
if len(records) != 2 {
t.Fatalf("expected 2 records, got %d", len(records))
}
for i, rec := range records {
expected := messages[i+8]
if string(rec.Data) != expected {
t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data))
}
}
// 再读取应该返回 EOF
records, err = cursor.NextRange(1)
if err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
}
// TestCursorWindow 测试窗口模式
func TestCursorWindow(t *testing.T) {
tmpFile := "test_window.log"
defer os.Remove(tmpFile)
defer os.Remove(tmpFile + ".pos")
// 写入测试数据
index, err := NewRecordIndex(tmpFile)
if err != nil {
t.Fatal(err)
}
defer index.Close()
// 写入测试数据
writer, err := NewLogWriter(tmpFile, index)
if err != nil {
t.Fatal(err)
}
messages := []string{"msg1", "msg2", "msg3"}
for _, msg := range messages {
if _, err := writer.Append([]byte(msg)); err != nil {
t.Fatal(err)
}
}
writer.Close()
// 测试窗口模式
cursor, err := NewCursor(tmpFile, index, nil)
if err != nil {
t.Fatal(err)
}
defer cursor.Close()
// 初始状态
if cursor.StartIndex() != 0 {
t.Errorf("expected startIdx 0, got %d", cursor.StartIndex())
}
if cursor.EndIndex() != 0 {
t.Errorf("expected endIdx 0, got %d", cursor.EndIndex())
}
// 读取第一条记录
rec, err := cursor.Next()
if err != nil {
t.Fatal(err)
}
if string(rec.Data) != "msg1" {
t.Errorf("expected 'msg1', got '%s'", string(rec.Data))
}
// 检查窗口索引startIdx 未变endIdx 移动了
if cursor.StartIndex() != 0 {
t.Errorf("expected startIdx 0, got %d", cursor.StartIndex())
}
endIdxAfterFirst := cursor.EndIndex()
if endIdxAfterFirst == 0 {
t.Error("endIdx should have moved")
}
// 提交窗口
cursor.Commit()
if cursor.StartIndex() != endIdxAfterFirst {
t.Errorf("expected startIdx %d after commit, got %d", endIdxAfterFirst, cursor.StartIndex())
}
// 读取第二条记录
rec, err = cursor.Next()
if err != nil {
t.Fatal(err)
}
if string(rec.Data) != "msg2" {
t.Errorf("expected 'msg2', got '%s'", string(rec.Data))
}
endIdxAfterSecond := cursor.EndIndex()
// 回滚
if err := cursor.Rollback(); err != nil {
t.Fatal(err)
}
// 回滚后endIdx 应该回到 startIdx
if cursor.EndIndex() != cursor.StartIndex() {
t.Errorf("expected endIdx == startIdx after rollback, got %d != %d", cursor.EndIndex(), cursor.StartIndex())
}
// 再次读取应该还是第二条
rec, err = cursor.Next()
if err != nil {
t.Fatal(err)
}
if string(rec.Data) != "msg2" {
t.Errorf("expected 'msg2' after rollback, got '%s'", string(rec.Data))
}
// 这次提交
cursor.Commit()
if cursor.StartIndex() != endIdxAfterSecond {
t.Errorf("expected startIdx %d after commit, got %d", endIdxAfterSecond, cursor.StartIndex())
}
}
func TestCursorPersistence(t *testing.T) {
tmpFile := "test_cursor.log"
defer os.Remove(tmpFile)
defer os.Remove(tmpFile + ".pos")
// 写入测试数据
index, err := NewRecordIndex(tmpFile)
if err != nil {
t.Fatal(err)
}
defer index.Close()
// 写入测试数据
writer, err := NewLogWriter(tmpFile, index)
if err != nil {
t.Fatalf("创建 writer 失败: %v", err)
}
testData := [][]byte{
[]byte("record 1"),
[]byte("record 2"),
[]byte("record 3"),
}
for _, data := range testData {
writer.Append(data)
}
// 读取前两条记录
cursor1, err := NewCursor(tmpFile, index, nil)
if err != nil {
t.Fatalf("创建 cursor 失败: %v", err)
}
cursor1.Next() // 读取第一条
cursor1.Commit() // 提交
cursor1.Next() // 读取第二条
cursor1.Commit() // 提交
cursor1.Close()
// 重新打开 cursor应该从第三条开始读取
cursor2, err := NewCursor(tmpFile, index, nil)
if err != nil {
t.Fatalf("重新创建 cursor 失败: %v", err)
}
defer cursor2.Close()
rec, err := cursor2.Next()
if err != nil {
t.Fatalf("读取恢复后的记录失败: %v", err)
}
if string(rec.Data) != string(testData[2]) {
t.Errorf("游标恢复失败: got %q, want %q", rec.Data, testData[2])
}
}
func TestTailer(t *testing.T) {
tmpFile := "test_tailer.log"
defer os.Remove(tmpFile)
defer os.Remove(tmpFile + ".pos")
// 创建 writer
index, err := NewRecordIndex(tmpFile)
if err != nil {
t.Fatal(err)
}
defer index.Close()
// 写入测试数据
writer, err := NewLogWriter(tmpFile, index)
if err != nil {
t.Fatalf("创建 writer 失败: %v", err)
}
// 先写入一些数据
writer.Append([]byte("initial record"))
// 记录处理的数据
records := make([]string, 0)
handler := func(rec *Record) error {
records = append(records, string(rec.Data))
return nil
}
// 创建 cursor
cursor, err := NewCursor(tmpFile, index, nil)
if err != nil {
t.Fatalf("创建 cursor 失败: %v", err)
}
// 创建 tailer
tailer, err := NewTailer(cursor, handler, &TailConfig{
PollInterval: 50 * time.Millisecond,
SaveInterval: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("创建 tailer 失败: %v", err)
}
// 启动 tailer
ctx, cancel := context.WithCancel(context.Background())
go tailer.Start(ctx)
// 等待处理初始记录
time.Sleep(100 * time.Millisecond)
// 写入新数据
writer.Append([]byte("new record 1"))
writer.Append([]byte("new record 2"))
// 等待处理新数据
time.Sleep(200 * time.Millisecond)
// 停止 tailer
cancel()
time.Sleep(100 * time.Millisecond)
// 验证处理的数据
if len(records) < 3 {
t.Errorf("处理的记录数量不足: got %d, want at least 3", len(records))
}
expected := []string{"initial record", "new record 1", "new record 2"}
for i, exp := range expected {
if i >= len(records) || records[i] != exp {
t.Errorf("第 %d 条记录不匹配: got %q, want %q", i, records[i], exp)
}
}
}
func TestTailerStop(t *testing.T) {
tmpFile := "test_tailer_stop.log"
defer os.Remove(tmpFile)
defer os.Remove(tmpFile + ".pos")
index, err := NewRecordIndex(tmpFile)
if err != nil {
t.Fatal(err)
}
defer index.Close()
// 写入测试数据
writer, err := NewLogWriter(tmpFile, index)
if err != nil {
t.Fatalf("创建 writer 失败: %v", err)
}
writer.Append([]byte("test record"))
count := 0
handler := func(rec *Record) error {
count++
return nil
}
cursor, err := NewCursor(tmpFile, index, nil)
if err != nil {
t.Fatalf("创建 cursor 失败: %v", err)
}
tailer, err := NewTailer(cursor, handler, nil)
if err != nil {
t.Fatalf("创建 tailer 失败: %v", err)
}
// 启动 tailer
go tailer.Start(context.Background())
// 等待处理
time.Sleep(100 * time.Millisecond)
// 停止 tailer
tailer.Stop()
// 验证已处理
if count != 1 {
t.Errorf("处理的记录数量不正确: got %d, want 1", count)
}
}
func TestSeqlogBasic(t *testing.T) {
tmpDir := "test_seqlog"
os.MkdirAll(tmpDir, 0755)
defer os.RemoveAll(tmpDir)
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
// 注册 handler
appLogs := make([]string, 0)
seqlog.RegisterHandler("app", func(rec *Record) error {
appLogs = append(appLogs, string(rec.Data))
return nil
})
accessLogs := make([]string, 0)
seqlog.RegisterHandler("access", func(rec *Record) error {
accessLogs = append(accessLogs, string(rec.Data))
return nil
})
// 启动
if err := seqlog.Start(); err != nil {
t.Fatalf("启动 seqlog 失败: %v", err)
}
// 写入日志
seqlog.Write("app", []byte("app log 1"))
seqlog.Write("app", []byte("app log 2"))
seqlog.Write("access", []byte("access log 1"))
// 等待处理
time.Sleep(200 * time.Millisecond)
// 停止
seqlog.Stop()
// 验证
if len(appLogs) != 2 {
t.Errorf("app logs 数量不正确: got %d, want 2", len(appLogs))
}
if len(accessLogs) != 1 {
t.Errorf("access logs 数量不正确: got %d, want 1", len(accessLogs))
}
}
func TestSeqlogDefaultHandler(t *testing.T) {
tmpDir := "test_seqlog_default"
os.MkdirAll(tmpDir, 0755)
defer os.RemoveAll(tmpDir)
// 注册默认 handler
allLogs := make(map[string][]string)
var mu sync.Mutex
defaultHandler := func(topic string, rec *Record) error {
mu.Lock()
defer mu.Unlock()
allLogs[topic] = append(allLogs[topic], string(rec.Data))
return nil
}
seqlog := NewLogHub(tmpDir, slog.Default(), defaultHandler)
// 注册特定 handler
seqlog.RegisterHandler("special", func(rec *Record) error {
mu.Lock()
defer mu.Unlock()
allLogs["special"] = append(allLogs["special"], string(rec.Data))
return nil
})
// 启动
seqlog.Start()
// 写入日志
seqlog.Write("special", []byte("special log"))
seqlog.Write("other", []byte("other log 1"))
seqlog.Write("other", []byte("other log 2"))
// 等待处理
time.Sleep(200 * time.Millisecond)
// 停止
seqlog.Stop()
// 验证
mu.Lock()
defer mu.Unlock()
if len(allLogs["special"]) != 1 {
t.Errorf("special logs 数量不正确: got %d, want 1", len(allLogs["special"]))
}
if len(allLogs["other"]) != 2 {
t.Errorf("other logs 数量不正确: got %d, want 2", len(allLogs["other"]))
}
}
func TestSeqlogDynamicRegistration(t *testing.T) {
tmpDir := "test_seqlog_dynamic"
os.MkdirAll(tmpDir, 0755)
defer os.RemoveAll(tmpDir)
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
// 先注册 handlerhandler 现在是必填项)
logs := make([]string, 0)
seqlog.RegisterHandler("dynamic", func(rec *Record) error {
logs = append(logs, string(rec.Data))
return nil
})
seqlog.Start()
// 写入日志
seqlog.Write("dynamic", []byte("log 1"))
seqlog.Write("dynamic", []byte("log 2"))
seqlog.Write("dynamic", []byte("log 3"))
// 等待处理
time.Sleep(200 * time.Millisecond)
seqlog.Stop()
// 应该处理所有日志
if len(logs) != 3 {
t.Errorf("处理的日志数量不正确: got %d, want 3", len(logs))
}
}
func TestDynamicConfigUpdate(t *testing.T) {
tmpDir := "test_dynamic_config"
os.MkdirAll(tmpDir, 0755)
defer os.RemoveAll(tmpDir)
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
// 注册 handler
logs := make([]string, 0)
seqlog.RegisterHandler("test", func(rec *Record) error {
logs = append(logs, string(rec.Data))
return nil
})
seqlog.Start()
// 写入一些日志
seqlog.Write("test", []byte("log 1"))
seqlog.Write("test", []byte("log 2"))
time.Sleep(150 * time.Millisecond)
// 获取当前配置
config, err := seqlog.GetTopicConfig("test")
if err != nil {
t.Fatalf("获取配置失败: %v", err)
}
// 验证默认配置
if config.PollInterval != 100*time.Millisecond {
t.Errorf("默认 PollInterval 不正确: got %v, want 100ms", config.PollInterval)
}
// 动态更新配置
newConfig := &TailConfig{
PollInterval: 50 * time.Millisecond,
SaveInterval: 500 * time.Millisecond,
BatchSize: 10,
}
if err := seqlog.UpdateTopicConfig("test", newConfig); err != nil {
t.Fatalf("更新配置失败: %v", err)
}
// 继续写入日志
seqlog.Write("test", []byte("log 3"))
time.Sleep(150 * time.Millisecond)
seqlog.Stop()
// 验证所有日志都被处理
if len(logs) != 3 {
t.Errorf("处理的日志数量不正确: got %d, want 3", len(logs))
}
// 验证配置已更新
updatedConfig, _ := seqlog.GetTopicConfig("test")
if updatedConfig.PollInterval != 50*time.Millisecond {
t.Errorf("更新后的 PollInterval 不正确: got %v, want 50ms", updatedConfig.PollInterval)
}
}
func TestUUIDUniqueness(t *testing.T) {
tmpFile := "test_uuid.log"
defer os.Remove(tmpFile)
defer os.Remove(tmpFile + ".pos")
// 写入测试数据
index, err := NewRecordIndex(tmpFile)
if err != nil {
t.Fatal(err)
}
defer index.Close()
// 写入测试数据
writer, err := NewLogWriter(tmpFile, index)
if err != nil {
t.Fatalf("创建 writer 失败: %v", err)
}
// 写入多条相同内容的日志
for i := 0; i < 10; i++ {
if _, err := writer.Append([]byte("same content")); err != nil {
t.Fatalf("写入数据失败: %v", err)
}
}
// 读取并验证 UUID 唯一性
cursor, err := NewCursor(tmpFile, index, nil)
if err != nil {
t.Fatalf("创建 cursor 失败: %v", err)
}
defer cursor.Close()
uuids := make(map[string]bool)
for i := 0; i < 10; i++ {
rec, err := cursor.Next()
if err != nil {
t.Fatalf("读取第 %d 条记录失败: %v", i, err)
}
// 检查 UUID 是否已存在
uuidStr := rec.UUID.String()
if uuids[uuidStr] {
t.Errorf("发现重复的 UUID: %s", uuidStr)
}
uuids[uuidStr] = true
// 验证数据内容
if string(rec.Data) != "same content" {
t.Errorf("第 %d 条记录数据不匹配: got %q, want %q", i, rec.Data, "same content")
}
}
// 验证所有 UUID 都不同
if len(uuids) != 10 {
t.Errorf("UUID 数量不正确: got %d, want 10", len(uuids))
}
}
func TestUUIDValidation(t *testing.T) {
tmpFile := "test_uuid_validation.log"
defer os.Remove(tmpFile)
defer os.Remove(tmpFile + ".pos")
// 写入一条正常日志
index, err := NewRecordIndex(tmpFile)
if err != nil {
t.Fatal(err)
}
defer index.Close()
// 写入测试数据
writer, err := NewLogWriter(tmpFile, index)
if err != nil {
t.Fatalf("创建 writer 失败: %v", err)
}
if _, err := writer.Append([]byte("test data")); err != nil {
t.Fatalf("写入数据失败: %v", err)
}
// 读取并验证 UUID
cursor, err := NewCursor(tmpFile, index, nil)
if err != nil {
t.Fatalf("创建 cursor 失败: %v", err)
}
defer cursor.Close()
rec, err := cursor.Next()
if err != nil {
t.Fatalf("读取记录失败: %v", err)
}
// 验证 UUID 不为空
if rec.UUID == [16]byte{} {
t.Error("UUID 为空")
}
// 验证 UUID 是有效的
uuidStr := rec.UUID.String()
if len(uuidStr) != 36 { // UUID 字符串格式xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
t.Errorf("UUID 字符串格式不正确: %s (长度 %d)", uuidStr, len(uuidStr))
}
t.Logf("生成的 UUID: %s", uuidStr)
}
func TestSeqlogAutoRecovery(t *testing.T) {
tmpDir := "test_auto_recovery"
os.MkdirAll(tmpDir, 0755)
defer os.RemoveAll(tmpDir)
// 第一阶段:创建并写入日志
allLogs := make(map[string][]string)
var mu sync.Mutex
defaultHandler := func(topic string, rec *Record) error {
mu.Lock()
defer mu.Unlock()
allLogs[topic] = append(allLogs[topic], string(rec.Data))
return nil
}
seqlog1 := NewLogHub(tmpDir, slog.Default(), defaultHandler)
seqlog1.Start()
// 写入一些日志
seqlog1.Write("app", []byte("app log 1"))
seqlog1.Write("app", []byte("app log 2"))
seqlog1.Write("access", []byte("access log 1"))
seqlog1.Write("error", []byte("error log 1"))
time.Sleep(200 * time.Millisecond)
seqlog1.Stop()
// 验证第一阶段写入的日志
mu.Lock()
if len(allLogs["app"]) != 2 {
t.Errorf("第一阶段 app logs 数量不正确: got %d, want 2", len(allLogs["app"]))
}
if len(allLogs["access"]) != 1 {
t.Errorf("第一阶段 access logs 数量不正确: got %d, want 1", len(allLogs["access"]))
}
if len(allLogs["error"]) != 1 {
t.Errorf("第一阶段 error logs 数量不正确: got %d, want 1", len(allLogs["error"]))
}
mu.Unlock()
// 清空日志计数,模拟重启
mu.Lock()
allLogs = make(map[string][]string)
mu.Unlock()
// 第二阶段:重启并自动恢复
seqlog2 := NewLogHub(tmpDir, slog.Default(), defaultHandler)
seqlog2.Start()
// 写入新日志
seqlog2.Write("app", []byte("app log 3"))
seqlog2.Write("access", []byte("access log 2"))
seqlog2.Write("new_topic", []byte("new topic log 1"))
time.Sleep(300 * time.Millisecond)
// 验证恢复后的处理
mu.Lock()
// 应该只处理新写入的日志(因为游标已保存位置)
if len(allLogs["app"]) != 1 {
t.Errorf("恢复后 app logs 数量不正确: got %d, want 1 (只有新日志)", len(allLogs["app"]))
}
if len(allLogs["access"]) != 1 {
t.Errorf("恢复后 access logs 数量不正确: got %d, want 1 (只有新日志)", len(allLogs["access"]))
}
if len(allLogs["new_topic"]) != 1 {
t.Errorf("恢复后 new_topic logs 数量不正确: got %d, want 1", len(allLogs["new_topic"]))
}
// 验证日志内容
if allLogs["app"][0] != "app log 3" {
t.Errorf("app 日志内容不正确: got %q, want %q", allLogs["app"][0], "app log 3")
}
mu.Unlock()
seqlog2.Stop()
// 验证自动发现的 topic
topics := seqlog2.GetTopics()
expectedTopics := map[string]bool{
"app": true,
"access": true,
"error": true,
"new_topic": true,
}
if len(topics) != len(expectedTopics) {
t.Errorf("topic 数量不正确: got %d, want %d", len(topics), len(expectedTopics))
}
for _, topic := range topics {
if !expectedTopics[topic] {
t.Errorf("发现未预期的 topic: %s", topic)
}
}
}
func TestTopicProcessorClose(t *testing.T) {
tmpDir := "test_processor_close"
os.MkdirAll(tmpDir, 0755)
defer os.RemoveAll(tmpDir)
// 创建 processor提供空 handler
logger := slog.Default()
processor, err := NewTopicProcessor(tmpDir, "test", logger, &TopicConfig{
Handler: func(rec *Record) error {
return nil
},
})
if err != nil {
t.Fatalf("创建 processor 失败: %v", err)
}
// 写入一些数据
_, err = processor.Write([]byte("test data 1"))
if err != nil {
t.Fatalf("写入失败: %v", err)
}
_, err = processor.Write([]byte("test data 2"))
if err != nil {
t.Fatalf("写入失败: %v", err)
}
// 停止 processor
if err := processor.Stop(); err != nil {
t.Fatalf("停止失败: %v", err)
}
// 清理资源
if err := processor.Close(); err != nil {
t.Fatalf("清理失败: %v", err)
}
// 验证 writer 已被清理(设置为 nil
processor.mu.RLock()
if processor.writer != nil {
t.Error("writer 应该被清理为 nil")
}
processor.mu.RUnlock()
// 再次调用 Close 应该不报错
if err := processor.Close(); err != nil {
t.Errorf("重复调用 Close 失败: %v", err)
}
}
func TestSeqlogCleanup(t *testing.T) {
tmpDir := "test_seqlog_cleanup"
os.MkdirAll(tmpDir, 0755)
defer os.RemoveAll(tmpDir)
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
seqlog.Start()
// 写入多个 topic 的日志
topics := []string{"app", "access", "error"}
for _, topic := range topics {
for i := 0; i < 3; i++ {
data := []byte(fmt.Sprintf("%s log %d", topic, i))
if _, err := seqlog.Write(topic, data); err != nil {
t.Fatalf("写入失败 [topic=%s]: %v", topic, err)
}
}
}
// 停止并清理
if err := seqlog.Stop(); err != nil {
t.Fatalf("停止失败: %v", err)
}
// 验证所有 processor 都被清理
seqlog.mu.RLock()
for topic, processor := range seqlog.processors {
processor.mu.RLock()
if processor.writer != nil {
t.Errorf("topic %s 的 writer 未被清理", topic)
}
processor.mu.RUnlock()
}
seqlog.mu.RUnlock()
}
// TestTopicStats 测试 TopicStats 的基本功能
func TestTopicStats(t *testing.T) {
tmpDir := t.TempDir()
statsPath := filepath.Join(tmpDir, "test.stats")
// 创建统计管理器
stats := NewTopicStats(statsPath)
// 测试写入统计
stats.IncWrite(100)
stats.IncWrite(200)
s := stats.Get()
if s.WriteCount != 2 {
t.Errorf("expected write count 2, got %d", s.WriteCount)
}
if s.WriteBytes != 300 {
t.Errorf("expected write bytes 300, got %d", s.WriteBytes)
}
// 测试处理统计
stats.IncProcessed(50)
stats.IncProcessed(150)
s = stats.Get()
if s.ProcessedCount != 2 {
t.Errorf("expected processed count 2, got %d", s.ProcessedCount)
}
if s.ProcessedBytes != 200 {
t.Errorf("expected processed bytes 200, got %d", s.ProcessedBytes)
}
// 测试错误统计
stats.IncError()
stats.IncError()
stats.IncError()
s = stats.Get()
if s.ErrorCount != 3 {
t.Errorf("expected error count 3, got %d", s.ErrorCount)
}
// 测试持久化
if err := stats.Save(); err != nil {
t.Fatalf("failed to save stats: %v", err)
}
// 创建新的统计管理器,应该能恢复数据
stats2 := NewTopicStats(statsPath)
s2 := stats2.Get()
if s2.WriteCount != s.WriteCount {
t.Errorf("recovered write count mismatch: expected %d, got %d", s.WriteCount, s2.WriteCount)
}
if s2.WriteBytes != s.WriteBytes {
t.Errorf("recovered write bytes mismatch: expected %d, got %d", s.WriteBytes, s2.WriteBytes)
}
if s2.ProcessedCount != s.ProcessedCount {
t.Errorf("recovered processed count mismatch: expected %d, got %d", s.ProcessedCount, s2.ProcessedCount)
}
if s2.ProcessedBytes != s.ProcessedBytes {
t.Errorf("recovered processed bytes mismatch: expected %d, got %d", s.ProcessedBytes, s2.ProcessedBytes)
}
if s2.ErrorCount != s.ErrorCount {
t.Errorf("recovered error count mismatch: expected %d, got %d", s.ErrorCount, s2.ErrorCount)
}
}
// TestTopicProcessorStats 测试 TopicProcessor 的统计功能
func TestTopicProcessorStats(t *testing.T) {
tmpDir := t.TempDir()
topic := "stats_test"
// 创建 processor
logger := slog.Default()
config := &TopicConfig{
Handler: func(rec *Record) error {
// 简单处理函数
return nil
},
}
processor, err := NewTopicProcessor(tmpDir, topic, logger, config)
if err != nil {
t.Fatalf("创建 processor 失败: %v", err)
}
// 启动 processor
if err := processor.Start(); err != nil {
t.Fatalf("failed to start processor: %v", err)
}
defer func() {
processor.Stop()
processor.Close()
}()
// 写入一些数据
data1 := []byte("test message 1")
data2 := []byte("test message 2")
data3 := []byte("test message 3")
if _, err := processor.Write(data1); err != nil {
t.Fatalf("failed to write: %v", err)
}
if _, err := processor.Write(data2); err != nil {
t.Fatalf("failed to write: %v", err)
}
if _, err := processor.Write(data3); err != nil {
t.Fatalf("failed to write: %v", err)
}
// 等待处理完成
time.Sleep(200 * time.Millisecond)
// 检查统计信息
stats := processor.GetStats()
if stats.WriteCount != 3 {
t.Errorf("expected write count 3, got %d", stats.WriteCount)
}
expectedBytes := int64(len(data1) + len(data2) + len(data3))
if stats.WriteBytes != expectedBytes {
t.Errorf("expected write bytes %d, got %d", expectedBytes, stats.WriteBytes)
}
if stats.ProcessedCount != 3 {
t.Errorf("expected processed count 3, got %d", stats.ProcessedCount)
}
if stats.ProcessedBytes != expectedBytes {
t.Errorf("expected processed bytes %d, got %d", expectedBytes, stats.ProcessedBytes)
}
}
// TestStatsRecovery 测试统计信息的恢复功能
func TestStatsRecovery(t *testing.T) {
tmpDir := t.TempDir()
topic := "recovery_test"
// 第一次运行:写入数据
logger := slog.Default()
config := &TopicConfig{
Handler: func(rec *Record) error {
return nil
},
}
processor1, err := NewTopicProcessor(tmpDir, topic, logger, config)
if err != nil {
t.Fatalf("创建 processor 失败: %v", err)
}
if err := processor1.Start(); err != nil {
t.Fatalf("failed to start processor: %v", err)
}
// 写入数据
for i := 0; i < 5; i++ {
data := []byte(fmt.Sprintf("message %d", i))
if _, err := processor1.Write(data); err != nil {
t.Fatalf("failed to write: %v", err)
}
}
// 等待处理
time.Sleep(200 * time.Millisecond)
// 获取统计
stats1 := processor1.GetStats()
// 停止并保存
if err := processor1.Stop(); err != nil {
t.Fatalf("failed to stop processor: %v", err)
}
if err := processor1.Close(); err != nil {
t.Fatalf("failed to close processor: %v", err)
}
// 第二次运行:应该能恢复统计信息
processor2, err := NewTopicProcessor(tmpDir, topic, logger, config)
if err != nil {
t.Fatalf("创建 processor 失败: %v", err)
}
stats2 := processor2.GetStats()
// 验证恢复的统计信息
if stats2.WriteCount != stats1.WriteCount {
t.Errorf("write count mismatch: expected %d, got %d", stats1.WriteCount, stats2.WriteCount)
}
if stats2.WriteBytes != stats1.WriteBytes {
t.Errorf("write bytes mismatch: expected %d, got %d", stats1.WriteBytes, stats2.WriteBytes)
}
if stats2.ProcessedCount != stats1.ProcessedCount {
t.Errorf("processed count mismatch: expected %d, got %d", stats1.ProcessedCount, stats2.ProcessedCount)
}
if stats2.ProcessedBytes != stats1.ProcessedBytes {
t.Errorf("processed bytes mismatch: expected %d, got %d", stats1.ProcessedBytes, stats2.ProcessedBytes)
}
processor2.Close()
}
// TestSeqlogStats 测试 Seqlog 层面的统计聚合
func TestSeqlogStats(t *testing.T) {
tmpDir := t.TempDir()
handler := func(topic string, rec *Record) error {
return nil
}
seq := NewLogHub(tmpDir, slog.Default(), handler)
if err := seq.Start(); err != nil {
t.Fatalf("failed to start seqlog: %v", err)
}
defer seq.Stop()
// 写入多个 topic
topics := []string{"app", "sys", "audit"}
for _, topic := range topics {
for i := 0; i < 3; i++ {
data := []byte(fmt.Sprintf("%s message %d", topic, i))
if _, err := seq.Write(topic, data); err != nil {
t.Fatalf("failed to write to %s: %v", topic, err)
}
}
}
// 等待处理
time.Sleep(300 * time.Millisecond)
// 检查单个 topic 统计
for _, topic := range topics {
stats, err := seq.GetTopicStats(topic)
if err != nil {
t.Errorf("failed to get stats for %s: %v", topic, err)
continue
}
if stats.WriteCount != 3 {
t.Errorf("%s: expected write count 3, got %d", topic, stats.WriteCount)
}
}
// 检查所有统计
allStats := seq.GetAllStats()
if len(allStats) != len(topics) {
t.Errorf("expected %d topics, got %d", len(topics), len(allStats))
}
for _, topic := range topics {
stats, exists := allStats[topic]
if !exists {
t.Errorf("stats for %s not found", topic)
continue
}
if stats.WriteCount != 3 {
t.Errorf("%s: expected write count 3, got %d", topic, stats.WriteCount)
}
}
}
// TestRecordQuery 测试记录查询功能
func TestRecordQuery(t *testing.T) {
tmpFile := "test_query.log"
defer os.Remove(tmpFile)
defer os.Remove(tmpFile + ".pos")
// 写入测试数据
index, err := NewRecordIndex(tmpFile)
if err != nil {
t.Fatal(err)
}
defer index.Close()
// 写入测试数据
writer, err := NewLogWriter(tmpFile, index)
if err != nil {
t.Fatalf("failed to create writer: %v", err)
}
messages := []string{
"message 0",
"message 1",
"message 2",
"message 3",
"message 4",
"message 5",
"message 6",
"message 7",
"message 8",
"message 9",
}
offsets := make([]int64, len(messages))
for i, msg := range messages {
offset, err := writer.Append([]byte(msg))
if err != nil {
t.Fatalf("failed to write message %d: %v", i, err)
}
offsets[i] = offset
}
defer writer.Close()
// 模拟处理到第 5 条记录
// 窗口范围:[索引 5, 索引 6)
startIdx := 5
endIdx := 6
// 创建索引
index, err = NewRecordIndex(tmpFile)
if err != nil {
t.Fatalf("failed to create index: %v", err)
}
defer index.Close()
// 创建查询器
query, err := NewRecordQuery(tmpFile, index, writer)
if err != nil {
t.Fatalf("failed to create query: %v", err)
}
defer query.Close()
// 测试查询当前位置(使用 QueryNewest 查询 startIdx
current, err := query.QueryNewest(startIdx-1, 1)
if err != nil {
t.Fatalf("failed to query current: %v", err)
}
if len(current) != 1 {
t.Fatalf("expected 1 current result, got %d", len(current))
}
if string(current[0].Record.Data) != "message 5" {
t.Errorf("expected current 'message 5', got '%s'", string(current[0].Record.Data))
}
if current[0].Index != startIdx {
t.Errorf("expected index %d, got %d", startIdx, current[0].Index)
}
// 手动判断状态
status := GetRecordStatus(startIdx, startIdx, endIdx)
if status != StatusProcessing {
t.Errorf("expected status Processing, got %s", status)
}
// 测试 QueryOldest查询更早的记录向索引递减方向
// QueryOldest(5, 3) 查询索引 2, 3, 4
backResults, err := query.QueryOldest(startIdx, 3)
if err != nil {
t.Fatalf("failed to query backward: %v", err)
}
if len(backResults) != 3 {
t.Errorf("expected 3 backward results, got %d", len(backResults))
}
// 返回按索引递增排序的结果2, 3, 4
expectedBack := []string{"message 2", "message 3", "message 4"}
for i, rec := range backResults {
if string(rec.Record.Data) != expectedBack[i] {
t.Errorf("backward[%d]: expected '%s', got '%s'", i, expectedBack[i], string(rec.Record.Data))
}
expectedIndex := startIdx - 3 + i
if rec.Index != expectedIndex {
t.Errorf("backward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index)
}
// 手动判断状态:索引 2, 3, 4 都已处理
recStatus := GetRecordStatus(rec.Index, startIdx, endIdx)
if recStatus != StatusProcessed {
t.Errorf("backward[%d]: expected status Processed, got %s", i, recStatus)
}
}
// 测试 QueryNewest查询更新的记录向索引递增方向
// QueryNewest(endIdx, 3) 从 endIdx 向后查询,查询索引 6, 7, 8
forwardResults, err := query.QueryNewest(endIdx-1, 3)
if err != nil {
t.Fatalf("failed to query forward: %v", err)
}
if len(forwardResults) != 3 {
t.Errorf("expected 3 forward results, got %d", len(forwardResults))
}
// 返回按索引递增排序的结果6, 7, 8
expectedForward := []string{"message 6", "message 7", "message 8"}
for i, rec := range forwardResults {
if string(rec.Record.Data) != expectedForward[i] {
t.Errorf("forward[%d]: expected '%s', got '%s'", i, expectedForward[i], string(rec.Record.Data))
}
expectedIndex := endIdx + i
if rec.Index != expectedIndex {
t.Errorf("forward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index)
}
// 手动判断状态:索引 6, 7, 8 待处理
recStatus := GetRecordStatus(rec.Index, startIdx, endIdx)
if recStatus != StatusPending {
t.Errorf("forward[%d]: expected status Pending, got %s", i, recStatus)
}
}
}
// TestTopicQuery 测试 TopicProcessor 的查询功能
func TestTopicQuery(t *testing.T) {
tmpDir := t.TempDir()
topic := "query_test"
logger := slog.Default()
processedCount := 0
config := &TopicConfig{
Handler: func(rec *Record) error {
processedCount++
// 只处理前 3 条
if processedCount >= 3 {
return fmt.Errorf("stop processing")
}
return nil
},
}
processor, err := NewTopicProcessor(tmpDir, topic, logger, config)
if err != nil {
t.Fatalf("创建 processor 失败: %v", err)
}
if err := processor.Start(); err != nil {
t.Fatalf("failed to start processor: %v", err)
}
// 写入 10 条消息
for i := 0; i < 10; i++ {
data := []byte(fmt.Sprintf("message %d", i))
if _, err := processor.Write(data); err != nil {
t.Fatalf("failed to write: %v", err)
}
}
// 等待处理
time.Sleep(300 * time.Millisecond)
processor.Stop()
// 获取当前处理索引
startIdx := processor.GetProcessingIndex()
endIdx := processor.GetReadIndex()
t.Logf("Processing index: [%d, %d)", startIdx, endIdx)
// 测试查询当前位置(使用 processor 方法,带状态)
if startIdx < endIdx {
current, err := processor.QueryOldest(startIdx, 1)
if err != nil {
t.Fatalf("failed to query current: %v", err)
}
if len(current) > 0 {
t.Logf("Current: %s - %s", string(current[0].Record.Data), current[0].Status)
}
}
// 测试向后查询
if startIdx > 0 {
back, err := processor.QueryNewest(startIdx-1, 2)
if err != nil {
t.Fatalf("failed to query backward: %v", err)
}
for i, rec := range back {
t.Logf("Back[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
}
}
// 测试向前查询
if startIdx < processor.GetRecordCount() {
forward, err := processor.QueryOldest(endIdx, 3)
if err != nil {
t.Fatalf("failed to query forward: %v", err)
}
for i, rec := range forward {
t.Logf("Forward[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
}
}
processor.Close()
}
// TestSeqlogQuery 测试 Seqlog 层面的查询功能
func TestSeqlogQuery(t *testing.T) {
tmpDir := t.TempDir()
processedCount := 0
handler := func(topic string, rec *Record) error {
processedCount++
// 只处理前 5 条
if processedCount >= 5 {
return fmt.Errorf("stop processing")
}
return nil
}
seq := NewLogHub(tmpDir, slog.Default(), handler)
if err := seq.Start(); err != nil {
t.Fatalf("failed to start seqlog: %v", err)
}
// 写入消息
for i := 0; i < 10; i++ {
data := []byte(fmt.Sprintf("app message %d", i))
if _, err := seq.Write("app", data); err != nil {
t.Fatalf("failed to write: %v", err)
}
}
// 等待处理
time.Sleep(300 * time.Millisecond)
// 创建查询器
query, err := seq.NewTopicQuery("app")
if err != nil {
t.Fatalf("failed to create query: %v", err)
}
defer query.Close()
// 获取当前处理索引
startIdx := seq.GetProcessingIndex("app")
endIdx := seq.GetReadIndex("app")
t.Logf("Processing index: [%d, %d)", startIdx, endIdx)
// 获取 processor 用于查询(带状态)
processor, _ := seq.GetProcessor("app")
totalCount := processor.GetRecordCount()
// 测试查询当前
if startIdx < endIdx {
current, err := processor.QueryOldest(startIdx, 1)
if err != nil {
t.Fatalf("failed to query current: %v", err)
}
if len(current) > 0 {
t.Logf("Current: %s, Status: %s", string(current[0].Record.Data), current[0].Status)
}
}
// 测试向后查询
if startIdx > 0 {
back, err := processor.QueryNewest(startIdx-1, 2)
if err != nil {
t.Fatalf("failed to query backward: %v", err)
}
for i, rec := range back {
t.Logf("Back[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
}
}
// 测试向前查询
if startIdx < totalCount {
forward, err := processor.QueryOldest(endIdx, 3)
if err != nil {
t.Fatalf("failed to query forward: %v", err)
}
for i, rec := range forward {
t.Logf("Forward[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
}
}
seq.Stop()
}
// TestEventNotification 测试事件通知功能
func TestEventNotification(t *testing.T) {
tmpDir := t.TempDir()
topic := "event_test"
logger := slog.Default()
processedCount := 0
config := &TopicConfig{
Handler: func(rec *Record) error {
processedCount++
// 第 2 条消息返回错误
if processedCount == 2 {
return fmt.Errorf("模拟处理错误")
}
return nil
},
}
processor, err := NewTopicProcessor(tmpDir, topic, logger, config)
if err != nil {
t.Fatalf("创建 processor 失败: %v", err)
}
// 记录收到的事件
events := make([]EventType, 0)
var eventsMu sync.Mutex
// 订阅所有事件
processor.SubscribeAll(func(event *Event) {
eventsMu.Lock()
events = append(events, event.Type)
t.Logf("收到事件: %s - Topic: %s", event.Type, event.Topic)
eventsMu.Unlock()
})
// 启动 processor
if err := processor.Start(); err != nil {
t.Fatalf("failed to start processor: %v", err)
}
// 写入 3 条消息
for i := 0; i < 3; i++ {
data := []byte(fmt.Sprintf("message %d", i))
if _, err := processor.Write(data); err != nil {
t.Fatalf("failed to write: %v", err)
}
}
// 等待处理
time.Sleep(500 * time.Millisecond)
// 停止
processor.Stop()
processor.Close()
// 等待事件处理完成
time.Sleep(100 * time.Millisecond)
// 检查事件
eventsMu.Lock()
defer eventsMu.Unlock()
t.Logf("收到 %d 个事件", len(events))
// 应该有1 个启动事件 + 3 个写入成功事件 + 1 个处理成功事件 + 1 个处理错误事件 + 1 个停止事件
expectedMinEvents := 7
if len(events) < expectedMinEvents {
t.Errorf("expected at least %d events, got %d", expectedMinEvents, len(events))
}
// 检查特定事件类型
hasStart := false
hasStop := false
hasWriteSuccess := false
hasProcessSuccess := false
hasProcessError := false
for _, eventType := range events {
switch eventType {
case EventProcessorStart:
hasStart = true
case EventProcessorStop:
hasStop = true
case EventWriteSuccess:
hasWriteSuccess = true
case EventProcessSuccess:
hasProcessSuccess = true
case EventProcessError:
hasProcessError = true
}
}
if !hasStart {
t.Error("missing ProcessorStart event")
}
if !hasStop {
t.Error("missing ProcessorStop event")
}
if !hasWriteSuccess {
t.Error("missing WriteSuccess event")
}
if !hasProcessSuccess {
t.Error("missing ProcessSuccess event")
}
if !hasProcessError {
t.Error("missing ProcessError event")
}
}
// TestSeqlogEventSubscription 测试 Seqlog 层面的事件订阅
func TestSeqlogEventSubscription(t *testing.T) {
tmpDir := t.TempDir()
handler := func(topic string, rec *Record) error {
return nil
}
seq := NewLogHub(tmpDir, slog.Default(), handler)
if err := seq.Start(); err != nil {
t.Fatalf("failed to start seqlog: %v", err)
}
defer seq.Stop()
// 订阅写入成功事件
writeCount := 0
var writeMu sync.Mutex
seq.Subscribe("app", EventWriteSuccess, func(event *Event) {
writeMu.Lock()
writeCount++
t.Logf("写入成功: topic=%s, position=%d", event.Topic, event.Position)
writeMu.Unlock()
})
// 写入消息
for i := 0; i < 5; i++ {
data := []byte(fmt.Sprintf("message %d", i))
if _, err := seq.Write("app", data); err != nil {
t.Fatalf("failed to write: %v", err)
}
}
// 等待事件处理
time.Sleep(200 * time.Millisecond)
writeMu.Lock()
if writeCount != 5 {
t.Errorf("expected 5 write events, got %d", writeCount)
}
writeMu.Unlock()
}
// TestMultiTopicEventSubscription 测试多 topic 事件订阅
func TestMultiTopicEventSubscription(t *testing.T) {
tmpDir := t.TempDir()
handler := func(topic string, rec *Record) error {
return nil
}
seq := NewLogHub(tmpDir, slog.Default(), handler)
if err := seq.Start(); err != nil {
t.Fatalf("failed to start seqlog: %v", err)
}
defer seq.Stop()
// 统计每个 topic 的写入事件
eventCounts := make(map[string]int)
var countMu sync.Mutex
// 为所有 topic 订阅写入成功事件
seq.SubscribeAllTopics(EventWriteSuccess, func(event *Event) {
countMu.Lock()
eventCounts[event.Topic]++
countMu.Unlock()
})
// 写入不同 topic
topics := []string{"app", "sys", "audit"}
for _, topic := range topics {
for i := 0; i < 3; i++ {
data := []byte(fmt.Sprintf("%s message %d", topic, i))
if _, err := seq.Write(topic, data); err != nil {
t.Fatalf("failed to write to %s: %v", topic, err)
}
}
}
// 等待事件处理
time.Sleep(300 * time.Millisecond)
countMu.Lock()
defer countMu.Unlock()
for _, topic := range topics {
count := eventCounts[topic]
if count != 3 {
t.Errorf("topic %s: expected 3 write events, got %d", topic, count)
}
}
}
// TestTopicReset 测试 topic 的 Reset 功能
func TestTopicReset(t *testing.T) {
tmpDir := t.TempDir()
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
// 注册 handler
seqlog.RegisterHandler("test", func(rec *Record) error {
return nil
})
seqlog.Start()
// 写入一些日志
for i := 0; i < 5; i++ {
data := []byte(fmt.Sprintf("message %d", i))
if _, err := seqlog.Write("test", data); err != nil {
t.Fatalf("写入失败: %v", err)
}
}
// 等待处理
time.Sleep(300 * time.Millisecond)
// 验证统计信息
stats, err := seqlog.GetTopicStats("test")
if err != nil {
t.Fatalf("获取统计失败: %v", err)
}
if stats.WriteCount != 5 {
t.Errorf("期望写入 5 条,实际 %d 条", stats.WriteCount)
}
if stats.ProcessedCount != 5 {
t.Errorf("期望处理 5 条,实际 %d 条", stats.ProcessedCount)
}
// 检查日志文件是否存在
logFile := filepath.Join(tmpDir, "test.log")
if _, err := os.Stat(logFile); os.IsNotExist(err) {
t.Error("日志文件不存在")
}
// 使用 ResetTopic 方法
if err := seqlog.ResetTopic("test"); err != nil {
t.Fatalf("Reset 失败: %v", err)
}
// 等待重置完成
time.Sleep(100 * time.Millisecond)
// 验证日志文件已被重置Reset 会重新初始化组件,创建空文件)
fileInfo, err := os.Stat(logFile)
if err != nil {
t.Errorf("日志文件应该存在(空文件): %v", err)
} else if fileInfo.Size() != 0 {
t.Errorf("日志文件应该是空的,但大小为 %d", fileInfo.Size())
}
// 验证统计信息已重置
stats, err = seqlog.GetTopicStats("test")
if err != nil {
t.Fatalf("获取统计失败: %v", err)
}
if stats.WriteCount != 0 {
t.Errorf("期望写入计数为 0实际 %d", stats.WriteCount)
}
if stats.ProcessedCount != 0 {
t.Errorf("期望处理计数为 0实际 %d", stats.ProcessedCount)
}
// 验证可以继续写入
for i := 0; i < 3; i++ {
data := []byte(fmt.Sprintf("new message %d", i))
if _, err := seqlog.Write("test", data); err != nil {
t.Fatalf("重置后写入失败: %v", err)
}
}
time.Sleep(300 * time.Millisecond)
stats, _ = seqlog.GetTopicStats("test")
if stats.WriteCount != 3 {
t.Errorf("重置后期望写入 3 条,实际 %d 条", stats.WriteCount)
}
if stats.ProcessedCount != 3 {
t.Errorf("重置后期望处理 3 条,实际 %d 条", stats.ProcessedCount)
}
// 最后停止
seqlog.Stop()
}
// TestTopicResetWithPendingRecords 测试当有待处理日志时 Reset 返回错误
func TestTopicResetWithPendingRecords(t *testing.T) {
tmpDir := t.TempDir()
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
// 注册一个慢速 handler让日志堆积
slowHandler := func(rec *Record) error {
time.Sleep(100 * time.Millisecond) // 模拟慢速处理
return nil
}
seqlog.RegisterHandler("test", slowHandler)
seqlog.Start()
// 快速写入多条日志
for i := 0; i < 10; i++ {
data := []byte(fmt.Sprintf("message %d", i))
if _, err := seqlog.Write("test", data); err != nil {
t.Fatalf("写入失败: %v", err)
}
}
// 短暂等待,让一部分日志开始处理但不是全部
time.Sleep(200 * time.Millisecond)
// 尝试重置,应该失败因为有待处理的日志
err := seqlog.ResetTopic("test")
if err == nil {
t.Fatal("期望 Reset 失败因为有待处理的日志,但成功了")
}
t.Logf("预期的错误: %v", err)
// 停止处理
seqlog.Stop()
// 现在 Reset 应该成功(停止后没有待处理的日志)
processor, _ := seqlog.GetProcessor("test")
if err := processor.Reset(); err != nil {
t.Fatalf("停止后 Reset 应该成功: %v", err)
}
}
// TestQueryOldestNewest 测试 QueryOldest 和 QueryNewest
func TestQueryOldestNewest(t *testing.T) {
tmpDir := t.TempDir()
// 创建 TopicProcessor提供空 handler
processor, err := NewTopicProcessor(tmpDir, "test", nil, &TopicConfig{
Handler: func(rec *Record) error {
return nil
},
})
if err != nil {
t.Fatal(err)
}
defer processor.Close()
// 写入测试数据
for i := 0; i < 10; i++ {
data := fmt.Sprintf("message %d", i)
if _, err := processor.Write([]byte(data)); err != nil {
t.Fatal(err)
}
}
// 测试 QueryNewest - 查询索引 0, 1, 2向索引递增方向
// QueryNewest(-1, 3) 从 -1 向后查询,得到索引 0, 1, 2
oldest, err := processor.QueryNewest(-1, 3)
if err != nil {
t.Fatalf("QueryNewest failed: %v", err)
}
if len(oldest) != 3 {
t.Errorf("expected 3 records, got %d", len(oldest))
}
// 验证顺序:应该是 0, 1, 2
for i := 0; i < 3; i++ {
expected := fmt.Sprintf("message %d", i)
if string(oldest[i].Record.Data) != expected {
t.Errorf("oldest[%d]: expected %s, got %s", i, expected, string(oldest[i].Record.Data))
}
if oldest[i].Index != i {
t.Errorf("oldest[%d]: expected index %d, got %d", i, i, oldest[i].Index)
}
t.Logf("Oldest[%d]: index=%d, %s - %s", i, oldest[i].Index, string(oldest[i].Record.Data), oldest[i].Status)
}
// 测试 QueryOldest - 查询索引 7, 8, 9向索引递减方向
// QueryOldest(10, 3) 从 10 向前查询,得到索引 7, 8, 9
totalCount := processor.GetRecordCount()
newest, err := processor.QueryOldest(totalCount, 3)
if err != nil {
t.Fatalf("QueryOldest failed: %v", err)
}
if len(newest) != 3 {
t.Errorf("expected 3 records, got %d", len(newest))
}
// 验证顺序:应该是 7, 8, 9按索引递增
for i := 0; i < 3; i++ {
expected := fmt.Sprintf("message %d", 7+i)
if string(newest[i].Record.Data) != expected {
t.Errorf("newest[%d]: expected %s, got %s", i, expected, string(newest[i].Record.Data))
}
if newest[i].Index != 7+i {
t.Errorf("newest[%d]: expected index %d, got %d", i, 7+i, newest[i].Index)
}
t.Logf("Newest[%d]: index=%d, %s - %s", i, newest[i].Index, string(newest[i].Record.Data), newest[i].Status)
}
// 测试超出范围 - 查询所有记录
// QueryNewest(-1, 100) 从 -1 向后查询,会返回所有记录(最多 100 条)
all, err := processor.QueryNewest(-1, 100)
if err != nil {
t.Fatalf("QueryNewest(-1, 100) failed: %v", err)
}
if len(all) != 10 {
t.Errorf("expected 10 records, got %d", len(all))
}
// 测试空结果
processor2, err := NewTopicProcessor(t.TempDir(), "empty", nil, &TopicConfig{
Handler: func(rec *Record) error {
return nil
},
})
if err != nil {
t.Fatal(err)
}
defer processor2.Close()
emptyNewest, err := processor2.QueryNewest(-1, 10)
if err != nil {
t.Fatalf("QueryNewest on empty failed: %v", err)
}
if len(emptyNewest) != 0 {
t.Errorf("expected 0 records, got %d", len(emptyNewest))
}
}
// TestQueryFromFirstAndLast 测试 QueryFromFirst 和 QueryFromLast
func TestQueryFromFirstAndLast(t *testing.T) {
tmpDir := t.TempDir()
// 创建 TopicProcessor
processor, err := NewTopicProcessor(tmpDir, "test", nil, &TopicConfig{
Handler: func(rec *Record) error {
return nil
},
})
if err != nil {
t.Fatal(err)
}
defer processor.Close()
// 写入 10 条测试数据
for i := 0; i < 10; i++ {
data := fmt.Sprintf("message %d", i)
if _, err := processor.Write([]byte(data)); err != nil {
t.Fatal(err)
}
}
// 测试 QueryFromFirst - 从第一条记录向索引递增方向查询
t.Run("QueryFromFirst", func(t *testing.T) {
// 查询前 3 条记录
records, err := processor.QueryFromFirst(3)
if err != nil {
t.Fatalf("QueryFromFirst failed: %v", err)
}
if len(records) != 3 {
t.Fatalf("expected 3 records, got %d", len(records))
}
// 验证结果:应该是索引 0, 1, 2
for i := 0; i < 3; i++ {
expectedData := fmt.Sprintf("message %d", i)
if string(records[i].Record.Data) != expectedData {
t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data))
}
if records[i].Index != i {
t.Errorf("records[%d]: expected index %d, got %d", i, i, records[i].Index)
}
t.Logf("FromFirst[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status)
}
// 查询超过总数的记录
allRecords, err := processor.QueryFromFirst(100)
if err != nil {
t.Fatalf("QueryFromFirst(100) failed: %v", err)
}
if len(allRecords) != 10 {
t.Errorf("expected 10 records, got %d", len(allRecords))
}
})
// 测试 QueryFromLast - 从最后一条记录向索引递减方向查询
t.Run("QueryFromLast", func(t *testing.T) {
// 查询最后 3 条记录
records, err := processor.QueryFromLast(3)
if err != nil {
t.Fatalf("QueryFromLast failed: %v", err)
}
if len(records) != 3 {
t.Fatalf("expected 3 records, got %d", len(records))
}
// 验证结果:应该是索引 7, 8, 9按索引递增顺序排列
for i := 0; i < 3; i++ {
expectedIndex := 7 + i
expectedData := fmt.Sprintf("message %d", expectedIndex)
if string(records[i].Record.Data) != expectedData {
t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data))
}
if records[i].Index != expectedIndex {
t.Errorf("records[%d]: expected index %d, got %d", i, expectedIndex, records[i].Index)
}
t.Logf("FromLast[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status)
}
// 查询超过总数的记录
allRecords, err := processor.QueryFromLast(100)
if err != nil {
t.Fatalf("QueryFromLast(100) failed: %v", err)
}
if len(allRecords) != 10 {
t.Errorf("expected 10 records, got %d", len(allRecords))
}
})
// 测试空数据库
t.Run("EmptyDatabase", func(t *testing.T) {
emptyProcessor, err := NewTopicProcessor(t.TempDir(), "empty", nil, &TopicConfig{
Handler: func(rec *Record) error {
return nil
},
})
if err != nil {
t.Fatal(err)
}
defer emptyProcessor.Close()
// QueryFromFirst 应该返回空数组
firstRecords, err := emptyProcessor.QueryFromFirst(10)
if err != nil {
t.Fatalf("QueryFromFirst on empty failed: %v", err)
}
if len(firstRecords) != 0 {
t.Errorf("expected 0 records, got %d", len(firstRecords))
}
// QueryFromLast 应该返回空数组
lastRecords, err := emptyProcessor.QueryFromLast(10)
if err != nil {
t.Fatalf("QueryFromLast on empty failed: %v", err)
}
if len(lastRecords) != 0 {
t.Errorf("expected 0 records, got %d", len(lastRecords))
}
})
}
// TestLogHubQueryFromFirstAndLast 测试 LogHub 的 QueryFromFirst 和 QueryFromLast
func TestLogHubQueryFromFirstAndLast(t *testing.T) {
tmpDir := t.TempDir()
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
seqlog.RegisterHandler("test", func(rec *Record) error {
return nil
})
seqlog.Start()
defer seqlog.Stop()
// 写入测试数据
for i := 0; i < 10; i++ {
data := fmt.Sprintf("message %d", i)
if _, err := seqlog.Write("test", []byte(data)); err != nil {
t.Fatal(err)
}
}
// 测试 QueryFromFirst
firstRecords, err := seqlog.QueryFromFirst("test", 3)
if err != nil {
t.Fatalf("QueryFromFirst failed: %v", err)
}
if len(firstRecords) != 3 {
t.Fatalf("expected 3 records, got %d", len(firstRecords))
}
for i := 0; i < 3; i++ {
if firstRecords[i].Index != i {
t.Errorf("firstRecords[%d]: expected index %d, got %d", i, i, firstRecords[i].Index)
}
}
// 测试 QueryFromLast
lastRecords, err := seqlog.QueryFromLast("test", 3)
if err != nil {
t.Fatalf("QueryFromLast failed: %v", err)
}
if len(lastRecords) != 3 {
t.Fatalf("expected 3 records, got %d", len(lastRecords))
}
for i := 0; i < 3; i++ {
expectedIndex := 7 + i
if lastRecords[i].Index != expectedIndex {
t.Errorf("lastRecords[%d]: expected index %d, got %d", i, expectedIndex, lastRecords[i].Index)
}
}
// 测试不存在的 topic
_, err = seqlog.QueryFromFirst("nonexistent", 10)
if err == nil {
t.Error("expected error for nonexistent topic")
}
_, err = seqlog.QueryFromLast("nonexistent", 10)
if err == nil {
t.Error("expected error for nonexistent topic")
}
}