- 修改记录存储格式为 [4B len][8B offset][4B CRC][16B UUID][data] - 修复 TopicProcessor 中 WaitGroup 使用错误导致 handler 不执行的问题 - 修复写入保护逻辑,避免 dirtyOffset=-1 时误判为写入中 - 添加统计信息定期持久化功能 - 改进 UTF-8 字符截断处理,防止 CJK 字符乱码 - 优化 Web UI:显示人类可读的文件大小,支持点击外部关闭弹窗 - 重构示例代码,添加 webui 和 webui_integration 示例 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2089 lines
52 KiB
Go
2089 lines
52 KiB
Go
package seqlog
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"os"
|
||
"path/filepath"
|
||
"sync"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
func TestBasicWriteAndRead(t *testing.T) {
|
||
tmpFile := "test_basic.log"
|
||
defer os.Remove(tmpFile)
|
||
defer os.Remove(tmpFile + ".pos")
|
||
defer os.Remove(tmpFile + ".idx")
|
||
|
||
// 创建索引
|
||
index, err := NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatalf("创建索引失败: %v", err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 写入测试数据
|
||
writer, err := NewLogWriter(tmpFile, index)
|
||
if err != nil {
|
||
t.Fatalf("创建 writer 失败: %v", err)
|
||
}
|
||
|
||
testData := [][]byte{
|
||
[]byte("hello world"),
|
||
[]byte("test log entry 1"),
|
||
[]byte("test log entry 2"),
|
||
}
|
||
|
||
for _, data := range testData {
|
||
if _, err := writer.Append(data); err != nil {
|
||
t.Fatalf("写入数据失败: %v", err)
|
||
}
|
||
}
|
||
|
||
// 读取数据(使用共享的 index)
|
||
cursor, err := NewCursor(tmpFile, index, nil)
|
||
if err != nil {
|
||
t.Fatalf("创建 cursor 失败: %v", err)
|
||
}
|
||
defer cursor.Close()
|
||
|
||
for i, expected := range testData {
|
||
rec, err := cursor.Next()
|
||
if err != nil {
|
||
t.Fatalf("读取第 %d 条记录失败: %v", i, err)
|
||
}
|
||
if string(rec.Data) != string(expected) {
|
||
t.Errorf("第 %d 条记录数据不匹配: got %q, want %q", i, rec.Data, expected)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestCursorNextRange 测试范围游动功能
|
||
func TestCursorNextRange(t *testing.T) {
|
||
tmpFile := "test_range.log"
|
||
defer os.Remove(tmpFile)
|
||
defer os.Remove(tmpFile + ".pos")
|
||
defer os.Remove(tmpFile + ".idx")
|
||
|
||
// 创建索引
|
||
index, err := NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 写入多条记录
|
||
writer, err := NewLogWriter(tmpFile, index)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
messages := []string{"msg1", "msg2", "msg3", "msg4", "msg5", "msg6", "msg7", "msg8", "msg9", "msg10"}
|
||
for _, msg := range messages {
|
||
if _, err := writer.Append([]byte(msg)); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
}
|
||
writer.Close()
|
||
|
||
// 测试范围读取
|
||
cursor, err := NewCursor(tmpFile, index, nil)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer cursor.Close()
|
||
|
||
// 第一次读取 3 条
|
||
records, err := cursor.NextRange(3)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if len(records) != 3 {
|
||
t.Fatalf("expected 3 records, got %d", len(records))
|
||
}
|
||
for i, rec := range records {
|
||
expected := messages[i]
|
||
if string(rec.Data) != expected {
|
||
t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data))
|
||
}
|
||
}
|
||
|
||
// 第二次读取 5 条
|
||
records, err = cursor.NextRange(5)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if len(records) != 5 {
|
||
t.Fatalf("expected 5 records, got %d", len(records))
|
||
}
|
||
for i, rec := range records {
|
||
expected := messages[i+3]
|
||
if string(rec.Data) != expected {
|
||
t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data))
|
||
}
|
||
}
|
||
|
||
// 第三次读取 5 条(但只剩 2 条)
|
||
records, err = cursor.NextRange(5)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if len(records) != 2 {
|
||
t.Fatalf("expected 2 records, got %d", len(records))
|
||
}
|
||
for i, rec := range records {
|
||
expected := messages[i+8]
|
||
if string(rec.Data) != expected {
|
||
t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data))
|
||
}
|
||
}
|
||
|
||
// 再读取应该返回 EOF
|
||
records, err = cursor.NextRange(1)
|
||
if err != io.EOF {
|
||
t.Fatalf("expected EOF, got %v", err)
|
||
}
|
||
}
|
||
|
||
// TestCursorWindow 测试窗口模式
|
||
func TestCursorWindow(t *testing.T) {
|
||
tmpFile := "test_window.log"
|
||
defer os.Remove(tmpFile)
|
||
defer os.Remove(tmpFile + ".pos")
|
||
|
||
// 写入测试数据
|
||
index, err := NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 写入测试数据
|
||
writer, err := NewLogWriter(tmpFile, index)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
messages := []string{"msg1", "msg2", "msg3"}
|
||
for _, msg := range messages {
|
||
if _, err := writer.Append([]byte(msg)); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
}
|
||
writer.Close()
|
||
|
||
// 测试窗口模式
|
||
cursor, err := NewCursor(tmpFile, index, nil)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer cursor.Close()
|
||
|
||
// 初始状态
|
||
if cursor.StartIndex() != 0 {
|
||
t.Errorf("expected startIdx 0, got %d", cursor.StartIndex())
|
||
}
|
||
if cursor.EndIndex() != 0 {
|
||
t.Errorf("expected endIdx 0, got %d", cursor.EndIndex())
|
||
}
|
||
|
||
// 读取第一条记录
|
||
rec, err := cursor.Next()
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if string(rec.Data) != "msg1" {
|
||
t.Errorf("expected 'msg1', got '%s'", string(rec.Data))
|
||
}
|
||
|
||
// 检查窗口索引:startIdx 未变,endIdx 移动了
|
||
if cursor.StartIndex() != 0 {
|
||
t.Errorf("expected startIdx 0, got %d", cursor.StartIndex())
|
||
}
|
||
endIdxAfterFirst := cursor.EndIndex()
|
||
if endIdxAfterFirst == 0 {
|
||
t.Error("endIdx should have moved")
|
||
}
|
||
|
||
// 提交窗口
|
||
cursor.Commit()
|
||
if cursor.StartIndex() != endIdxAfterFirst {
|
||
t.Errorf("expected startIdx %d after commit, got %d", endIdxAfterFirst, cursor.StartIndex())
|
||
}
|
||
|
||
// 读取第二条记录
|
||
rec, err = cursor.Next()
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if string(rec.Data) != "msg2" {
|
||
t.Errorf("expected 'msg2', got '%s'", string(rec.Data))
|
||
}
|
||
|
||
endIdxAfterSecond := cursor.EndIndex()
|
||
|
||
// 回滚
|
||
if err := cursor.Rollback(); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
// 回滚后,endIdx 应该回到 startIdx
|
||
if cursor.EndIndex() != cursor.StartIndex() {
|
||
t.Errorf("expected endIdx == startIdx after rollback, got %d != %d", cursor.EndIndex(), cursor.StartIndex())
|
||
}
|
||
|
||
// 再次读取应该还是第二条
|
||
rec, err = cursor.Next()
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if string(rec.Data) != "msg2" {
|
||
t.Errorf("expected 'msg2' after rollback, got '%s'", string(rec.Data))
|
||
}
|
||
|
||
// 这次提交
|
||
cursor.Commit()
|
||
if cursor.StartIndex() != endIdxAfterSecond {
|
||
t.Errorf("expected startIdx %d after commit, got %d", endIdxAfterSecond, cursor.StartIndex())
|
||
}
|
||
}
|
||
|
||
func TestCursorPersistence(t *testing.T) {
|
||
tmpFile := "test_cursor.log"
|
||
defer os.Remove(tmpFile)
|
||
defer os.Remove(tmpFile + ".pos")
|
||
|
||
// 写入测试数据
|
||
index, err := NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 写入测试数据
|
||
writer, err := NewLogWriter(tmpFile, index)
|
||
if err != nil {
|
||
t.Fatalf("创建 writer 失败: %v", err)
|
||
}
|
||
|
||
testData := [][]byte{
|
||
[]byte("record 1"),
|
||
[]byte("record 2"),
|
||
[]byte("record 3"),
|
||
}
|
||
|
||
for _, data := range testData {
|
||
writer.Append(data)
|
||
}
|
||
|
||
// 读取前两条记录
|
||
cursor1, err := NewCursor(tmpFile, index, nil)
|
||
if err != nil {
|
||
t.Fatalf("创建 cursor 失败: %v", err)
|
||
}
|
||
|
||
cursor1.Next() // 读取第一条
|
||
cursor1.Commit() // 提交
|
||
cursor1.Next() // 读取第二条
|
||
cursor1.Commit() // 提交
|
||
cursor1.Close()
|
||
|
||
// 重新打开 cursor,应该从第三条开始读取
|
||
cursor2, err := NewCursor(tmpFile, index, nil)
|
||
if err != nil {
|
||
t.Fatalf("重新创建 cursor 失败: %v", err)
|
||
}
|
||
defer cursor2.Close()
|
||
|
||
rec, err := cursor2.Next()
|
||
if err != nil {
|
||
t.Fatalf("读取恢复后的记录失败: %v", err)
|
||
}
|
||
|
||
if string(rec.Data) != string(testData[2]) {
|
||
t.Errorf("游标恢复失败: got %q, want %q", rec.Data, testData[2])
|
||
}
|
||
}
|
||
|
||
func TestTailer(t *testing.T) {
|
||
tmpFile := "test_tailer.log"
|
||
defer os.Remove(tmpFile)
|
||
defer os.Remove(tmpFile + ".pos")
|
||
|
||
// 创建 writer
|
||
index, err := NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 写入测试数据
|
||
writer, err := NewLogWriter(tmpFile, index)
|
||
if err != nil {
|
||
t.Fatalf("创建 writer 失败: %v", err)
|
||
}
|
||
|
||
// 先写入一些数据
|
||
writer.Append([]byte("initial record"))
|
||
|
||
// 记录处理的数据
|
||
records := make([]string, 0)
|
||
handler := func(rec *Record) error {
|
||
records = append(records, string(rec.Data))
|
||
return nil
|
||
}
|
||
|
||
// 创建 cursor
|
||
cursor, err := NewCursor(tmpFile, index, nil)
|
||
if err != nil {
|
||
t.Fatalf("创建 cursor 失败: %v", err)
|
||
}
|
||
|
||
// 创建 tailer
|
||
tailer, err := NewTailer(cursor, handler, &TailConfig{
|
||
PollInterval: 50 * time.Millisecond,
|
||
SaveInterval: 100 * time.Millisecond,
|
||
})
|
||
if err != nil {
|
||
t.Fatalf("创建 tailer 失败: %v", err)
|
||
}
|
||
|
||
// 启动 tailer
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
go tailer.Start(ctx)
|
||
|
||
// 等待处理初始记录
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 写入新数据
|
||
writer.Append([]byte("new record 1"))
|
||
writer.Append([]byte("new record 2"))
|
||
|
||
// 等待处理新数据
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
// 停止 tailer
|
||
cancel()
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 验证处理的数据
|
||
if len(records) < 3 {
|
||
t.Errorf("处理的记录数量不足: got %d, want at least 3", len(records))
|
||
}
|
||
|
||
expected := []string{"initial record", "new record 1", "new record 2"}
|
||
for i, exp := range expected {
|
||
if i >= len(records) || records[i] != exp {
|
||
t.Errorf("第 %d 条记录不匹配: got %q, want %q", i, records[i], exp)
|
||
}
|
||
}
|
||
}
|
||
|
||
func TestTailerStop(t *testing.T) {
|
||
tmpFile := "test_tailer_stop.log"
|
||
defer os.Remove(tmpFile)
|
||
defer os.Remove(tmpFile + ".pos")
|
||
|
||
index, err := NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 写入测试数据
|
||
writer, err := NewLogWriter(tmpFile, index)
|
||
if err != nil {
|
||
t.Fatalf("创建 writer 失败: %v", err)
|
||
}
|
||
|
||
writer.Append([]byte("test record"))
|
||
|
||
count := 0
|
||
handler := func(rec *Record) error {
|
||
count++
|
||
return nil
|
||
}
|
||
|
||
cursor, err := NewCursor(tmpFile, index, nil)
|
||
if err != nil {
|
||
t.Fatalf("创建 cursor 失败: %v", err)
|
||
}
|
||
|
||
tailer, err := NewTailer(cursor, handler, nil)
|
||
if err != nil {
|
||
t.Fatalf("创建 tailer 失败: %v", err)
|
||
}
|
||
|
||
// 启动 tailer
|
||
go tailer.Start(context.Background())
|
||
|
||
// 等待处理
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 停止 tailer
|
||
tailer.Stop()
|
||
|
||
// 验证已处理
|
||
if count != 1 {
|
||
t.Errorf("处理的记录数量不正确: got %d, want 1", count)
|
||
}
|
||
}
|
||
|
||
func TestSeqlogBasic(t *testing.T) {
|
||
tmpDir := "test_seqlog"
|
||
os.MkdirAll(tmpDir, 0755)
|
||
defer os.RemoveAll(tmpDir)
|
||
|
||
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
||
|
||
// 注册 handler
|
||
appLogs := make([]string, 0)
|
||
seqlog.RegisterHandler("app", func(rec *Record) error {
|
||
appLogs = append(appLogs, string(rec.Data))
|
||
return nil
|
||
})
|
||
|
||
accessLogs := make([]string, 0)
|
||
seqlog.RegisterHandler("access", func(rec *Record) error {
|
||
accessLogs = append(accessLogs, string(rec.Data))
|
||
return nil
|
||
})
|
||
|
||
// 启动
|
||
if err := seqlog.Start(); err != nil {
|
||
t.Fatalf("启动 seqlog 失败: %v", err)
|
||
}
|
||
|
||
// 写入日志
|
||
seqlog.Write("app", []byte("app log 1"))
|
||
seqlog.Write("app", []byte("app log 2"))
|
||
seqlog.Write("access", []byte("access log 1"))
|
||
|
||
// 等待处理
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
// 停止
|
||
seqlog.Stop()
|
||
|
||
// 验证
|
||
if len(appLogs) != 2 {
|
||
t.Errorf("app logs 数量不正确: got %d, want 2", len(appLogs))
|
||
}
|
||
if len(accessLogs) != 1 {
|
||
t.Errorf("access logs 数量不正确: got %d, want 1", len(accessLogs))
|
||
}
|
||
}
|
||
|
||
func TestSeqlogDefaultHandler(t *testing.T) {
|
||
tmpDir := "test_seqlog_default"
|
||
os.MkdirAll(tmpDir, 0755)
|
||
defer os.RemoveAll(tmpDir)
|
||
|
||
// 注册默认 handler
|
||
allLogs := make(map[string][]string)
|
||
var mu sync.Mutex
|
||
|
||
defaultHandler := func(topic string, rec *Record) error {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
allLogs[topic] = append(allLogs[topic], string(rec.Data))
|
||
return nil
|
||
}
|
||
|
||
seqlog := NewLogHub(tmpDir, slog.Default(), defaultHandler)
|
||
|
||
// 注册特定 handler
|
||
seqlog.RegisterHandler("special", func(rec *Record) error {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
allLogs["special"] = append(allLogs["special"], string(rec.Data))
|
||
return nil
|
||
})
|
||
|
||
// 启动
|
||
seqlog.Start()
|
||
|
||
// 写入日志
|
||
seqlog.Write("special", []byte("special log"))
|
||
seqlog.Write("other", []byte("other log 1"))
|
||
seqlog.Write("other", []byte("other log 2"))
|
||
|
||
// 等待处理
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
// 停止
|
||
seqlog.Stop()
|
||
|
||
// 验证
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
|
||
if len(allLogs["special"]) != 1 {
|
||
t.Errorf("special logs 数量不正确: got %d, want 1", len(allLogs["special"]))
|
||
}
|
||
if len(allLogs["other"]) != 2 {
|
||
t.Errorf("other logs 数量不正确: got %d, want 2", len(allLogs["other"]))
|
||
}
|
||
}
|
||
|
||
func TestSeqlogDynamicRegistration(t *testing.T) {
|
||
tmpDir := "test_seqlog_dynamic"
|
||
os.MkdirAll(tmpDir, 0755)
|
||
defer os.RemoveAll(tmpDir)
|
||
|
||
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
||
|
||
// 先注册 handler(handler 现在是必填项)
|
||
logs := make([]string, 0)
|
||
seqlog.RegisterHandler("dynamic", func(rec *Record) error {
|
||
logs = append(logs, string(rec.Data))
|
||
return nil
|
||
})
|
||
|
||
seqlog.Start()
|
||
|
||
// 写入日志
|
||
seqlog.Write("dynamic", []byte("log 1"))
|
||
seqlog.Write("dynamic", []byte("log 2"))
|
||
seqlog.Write("dynamic", []byte("log 3"))
|
||
|
||
// 等待处理
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
seqlog.Stop()
|
||
|
||
// 应该处理所有日志
|
||
if len(logs) != 3 {
|
||
t.Errorf("处理的日志数量不正确: got %d, want 3", len(logs))
|
||
}
|
||
}
|
||
|
||
func TestDynamicConfigUpdate(t *testing.T) {
|
||
tmpDir := "test_dynamic_config"
|
||
os.MkdirAll(tmpDir, 0755)
|
||
defer os.RemoveAll(tmpDir)
|
||
|
||
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
||
|
||
// 注册 handler
|
||
logs := make([]string, 0)
|
||
seqlog.RegisterHandler("test", func(rec *Record) error {
|
||
logs = append(logs, string(rec.Data))
|
||
return nil
|
||
})
|
||
|
||
seqlog.Start()
|
||
|
||
// 写入一些日志
|
||
seqlog.Write("test", []byte("log 1"))
|
||
seqlog.Write("test", []byte("log 2"))
|
||
|
||
time.Sleep(150 * time.Millisecond)
|
||
|
||
// 获取当前配置
|
||
config, err := seqlog.GetTopicConfig("test")
|
||
if err != nil {
|
||
t.Fatalf("获取配置失败: %v", err)
|
||
}
|
||
|
||
// 验证默认配置
|
||
if config.PollInterval != 100*time.Millisecond {
|
||
t.Errorf("默认 PollInterval 不正确: got %v, want 100ms", config.PollInterval)
|
||
}
|
||
|
||
// 动态更新配置
|
||
newConfig := &TailConfig{
|
||
PollInterval: 50 * time.Millisecond,
|
||
SaveInterval: 500 * time.Millisecond,
|
||
BatchSize: 10,
|
||
}
|
||
|
||
if err := seqlog.UpdateTopicConfig("test", newConfig); err != nil {
|
||
t.Fatalf("更新配置失败: %v", err)
|
||
}
|
||
|
||
// 继续写入日志
|
||
seqlog.Write("test", []byte("log 3"))
|
||
|
||
time.Sleep(150 * time.Millisecond)
|
||
|
||
seqlog.Stop()
|
||
|
||
// 验证所有日志都被处理
|
||
if len(logs) != 3 {
|
||
t.Errorf("处理的日志数量不正确: got %d, want 3", len(logs))
|
||
}
|
||
|
||
// 验证配置已更新
|
||
updatedConfig, _ := seqlog.GetTopicConfig("test")
|
||
if updatedConfig.PollInterval != 50*time.Millisecond {
|
||
t.Errorf("更新后的 PollInterval 不正确: got %v, want 50ms", updatedConfig.PollInterval)
|
||
}
|
||
}
|
||
|
||
func TestUUIDUniqueness(t *testing.T) {
|
||
tmpFile := "test_uuid.log"
|
||
defer os.Remove(tmpFile)
|
||
defer os.Remove(tmpFile + ".pos")
|
||
|
||
// 写入测试数据
|
||
index, err := NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 写入测试数据
|
||
writer, err := NewLogWriter(tmpFile, index)
|
||
if err != nil {
|
||
t.Fatalf("创建 writer 失败: %v", err)
|
||
}
|
||
|
||
// 写入多条相同内容的日志
|
||
for i := 0; i < 10; i++ {
|
||
if _, err := writer.Append([]byte("same content")); err != nil {
|
||
t.Fatalf("写入数据失败: %v", err)
|
||
}
|
||
}
|
||
|
||
// 读取并验证 UUID 唯一性
|
||
cursor, err := NewCursor(tmpFile, index, nil)
|
||
if err != nil {
|
||
t.Fatalf("创建 cursor 失败: %v", err)
|
||
}
|
||
defer cursor.Close()
|
||
|
||
uuids := make(map[string]bool)
|
||
for i := 0; i < 10; i++ {
|
||
rec, err := cursor.Next()
|
||
if err != nil {
|
||
t.Fatalf("读取第 %d 条记录失败: %v", i, err)
|
||
}
|
||
|
||
// 检查 UUID 是否已存在
|
||
uuidStr := rec.UUID.String()
|
||
if uuids[uuidStr] {
|
||
t.Errorf("发现重复的 UUID: %s", uuidStr)
|
||
}
|
||
uuids[uuidStr] = true
|
||
|
||
// 验证数据内容
|
||
if string(rec.Data) != "same content" {
|
||
t.Errorf("第 %d 条记录数据不匹配: got %q, want %q", i, rec.Data, "same content")
|
||
}
|
||
}
|
||
|
||
// 验证所有 UUID 都不同
|
||
if len(uuids) != 10 {
|
||
t.Errorf("UUID 数量不正确: got %d, want 10", len(uuids))
|
||
}
|
||
}
|
||
|
||
func TestUUIDValidation(t *testing.T) {
|
||
tmpFile := "test_uuid_validation.log"
|
||
defer os.Remove(tmpFile)
|
||
defer os.Remove(tmpFile + ".pos")
|
||
|
||
// 写入一条正常日志
|
||
index, err := NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 写入测试数据
|
||
writer, err := NewLogWriter(tmpFile, index)
|
||
if err != nil {
|
||
t.Fatalf("创建 writer 失败: %v", err)
|
||
}
|
||
|
||
if _, err := writer.Append([]byte("test data")); err != nil {
|
||
t.Fatalf("写入数据失败: %v", err)
|
||
}
|
||
|
||
// 读取并验证 UUID
|
||
cursor, err := NewCursor(tmpFile, index, nil)
|
||
if err != nil {
|
||
t.Fatalf("创建 cursor 失败: %v", err)
|
||
}
|
||
defer cursor.Close()
|
||
|
||
rec, err := cursor.Next()
|
||
if err != nil {
|
||
t.Fatalf("读取记录失败: %v", err)
|
||
}
|
||
|
||
// 验证 UUID 不为空
|
||
if rec.UUID == [16]byte{} {
|
||
t.Error("UUID 为空")
|
||
}
|
||
|
||
// 验证 UUID 是有效的
|
||
uuidStr := rec.UUID.String()
|
||
if len(uuidStr) != 36 { // UUID 字符串格式:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
|
||
t.Errorf("UUID 字符串格式不正确: %s (长度 %d)", uuidStr, len(uuidStr))
|
||
}
|
||
|
||
t.Logf("生成的 UUID: %s", uuidStr)
|
||
}
|
||
|
||
func TestSeqlogAutoRecovery(t *testing.T) {
|
||
tmpDir := "test_auto_recovery"
|
||
os.MkdirAll(tmpDir, 0755)
|
||
defer os.RemoveAll(tmpDir)
|
||
|
||
// 第一阶段:创建并写入日志
|
||
allLogs := make(map[string][]string)
|
||
var mu sync.Mutex
|
||
|
||
defaultHandler := func(topic string, rec *Record) error {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
allLogs[topic] = append(allLogs[topic], string(rec.Data))
|
||
return nil
|
||
}
|
||
|
||
seqlog1 := NewLogHub(tmpDir, slog.Default(), defaultHandler)
|
||
seqlog1.Start()
|
||
|
||
// 写入一些日志
|
||
seqlog1.Write("app", []byte("app log 1"))
|
||
seqlog1.Write("app", []byte("app log 2"))
|
||
seqlog1.Write("access", []byte("access log 1"))
|
||
seqlog1.Write("error", []byte("error log 1"))
|
||
|
||
time.Sleep(200 * time.Millisecond)
|
||
seqlog1.Stop()
|
||
|
||
// 验证第一阶段写入的日志
|
||
mu.Lock()
|
||
if len(allLogs["app"]) != 2 {
|
||
t.Errorf("第一阶段 app logs 数量不正确: got %d, want 2", len(allLogs["app"]))
|
||
}
|
||
if len(allLogs["access"]) != 1 {
|
||
t.Errorf("第一阶段 access logs 数量不正确: got %d, want 1", len(allLogs["access"]))
|
||
}
|
||
if len(allLogs["error"]) != 1 {
|
||
t.Errorf("第一阶段 error logs 数量不正确: got %d, want 1", len(allLogs["error"]))
|
||
}
|
||
mu.Unlock()
|
||
|
||
// 清空日志计数,模拟重启
|
||
mu.Lock()
|
||
allLogs = make(map[string][]string)
|
||
mu.Unlock()
|
||
|
||
// 第二阶段:重启并自动恢复
|
||
seqlog2 := NewLogHub(tmpDir, slog.Default(), defaultHandler)
|
||
seqlog2.Start()
|
||
|
||
// 写入新日志
|
||
seqlog2.Write("app", []byte("app log 3"))
|
||
seqlog2.Write("access", []byte("access log 2"))
|
||
seqlog2.Write("new_topic", []byte("new topic log 1"))
|
||
|
||
time.Sleep(300 * time.Millisecond)
|
||
|
||
// 验证恢复后的处理
|
||
mu.Lock()
|
||
// 应该只处理新写入的日志(因为游标已保存位置)
|
||
if len(allLogs["app"]) != 1 {
|
||
t.Errorf("恢复后 app logs 数量不正确: got %d, want 1 (只有新日志)", len(allLogs["app"]))
|
||
}
|
||
if len(allLogs["access"]) != 1 {
|
||
t.Errorf("恢复后 access logs 数量不正确: got %d, want 1 (只有新日志)", len(allLogs["access"]))
|
||
}
|
||
if len(allLogs["new_topic"]) != 1 {
|
||
t.Errorf("恢复后 new_topic logs 数量不正确: got %d, want 1", len(allLogs["new_topic"]))
|
||
}
|
||
|
||
// 验证日志内容
|
||
if allLogs["app"][0] != "app log 3" {
|
||
t.Errorf("app 日志内容不正确: got %q, want %q", allLogs["app"][0], "app log 3")
|
||
}
|
||
mu.Unlock()
|
||
|
||
seqlog2.Stop()
|
||
|
||
// 验证自动发现的 topic
|
||
topics := seqlog2.GetTopics()
|
||
expectedTopics := map[string]bool{
|
||
"app": true,
|
||
"access": true,
|
||
"error": true,
|
||
"new_topic": true,
|
||
}
|
||
if len(topics) != len(expectedTopics) {
|
||
t.Errorf("topic 数量不正确: got %d, want %d", len(topics), len(expectedTopics))
|
||
}
|
||
for _, topic := range topics {
|
||
if !expectedTopics[topic] {
|
||
t.Errorf("发现未预期的 topic: %s", topic)
|
||
}
|
||
}
|
||
}
|
||
|
||
func TestTopicProcessorClose(t *testing.T) {
|
||
tmpDir := "test_processor_close"
|
||
os.MkdirAll(tmpDir, 0755)
|
||
defer os.RemoveAll(tmpDir)
|
||
|
||
// 创建 processor(提供空 handler)
|
||
logger := slog.Default()
|
||
processor, err := NewTopicProcessor(tmpDir, "test", logger, &TopicConfig{
|
||
Handler: func(rec *Record) error {
|
||
return nil
|
||
},
|
||
})
|
||
if err != nil {
|
||
t.Fatalf("创建 processor 失败: %v", err)
|
||
}
|
||
|
||
// 写入一些数据
|
||
_, err = processor.Write([]byte("test data 1"))
|
||
if err != nil {
|
||
t.Fatalf("写入失败: %v", err)
|
||
}
|
||
_, err = processor.Write([]byte("test data 2"))
|
||
if err != nil {
|
||
t.Fatalf("写入失败: %v", err)
|
||
}
|
||
|
||
// 停止 processor
|
||
if err := processor.Stop(); err != nil {
|
||
t.Fatalf("停止失败: %v", err)
|
||
}
|
||
|
||
// 清理资源
|
||
if err := processor.Close(); err != nil {
|
||
t.Fatalf("清理失败: %v", err)
|
||
}
|
||
|
||
// 验证 writer 已被清理(设置为 nil)
|
||
processor.mu.RLock()
|
||
if processor.writer != nil {
|
||
t.Error("writer 应该被清理为 nil")
|
||
}
|
||
processor.mu.RUnlock()
|
||
|
||
// 再次调用 Close 应该不报错
|
||
if err := processor.Close(); err != nil {
|
||
t.Errorf("重复调用 Close 失败: %v", err)
|
||
}
|
||
}
|
||
|
||
func TestSeqlogCleanup(t *testing.T) {
|
||
tmpDir := "test_seqlog_cleanup"
|
||
os.MkdirAll(tmpDir, 0755)
|
||
defer os.RemoveAll(tmpDir)
|
||
|
||
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
||
seqlog.Start()
|
||
|
||
// 写入多个 topic 的日志
|
||
topics := []string{"app", "access", "error"}
|
||
for _, topic := range topics {
|
||
for i := 0; i < 3; i++ {
|
||
data := []byte(fmt.Sprintf("%s log %d", topic, i))
|
||
if _, err := seqlog.Write(topic, data); err != nil {
|
||
t.Fatalf("写入失败 [topic=%s]: %v", topic, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 停止并清理
|
||
if err := seqlog.Stop(); err != nil {
|
||
t.Fatalf("停止失败: %v", err)
|
||
}
|
||
|
||
// 验证所有 processor 都被清理
|
||
seqlog.mu.RLock()
|
||
for topic, processor := range seqlog.processors {
|
||
processor.mu.RLock()
|
||
if processor.writer != nil {
|
||
t.Errorf("topic %s 的 writer 未被清理", topic)
|
||
}
|
||
processor.mu.RUnlock()
|
||
}
|
||
seqlog.mu.RUnlock()
|
||
}
|
||
|
||
// TestTopicStats 测试 TopicStats 的基本功能
|
||
func TestTopicStats(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
statsPath := filepath.Join(tmpDir, "test.stats")
|
||
|
||
// 创建统计管理器
|
||
stats := NewTopicStats(statsPath)
|
||
|
||
// 测试写入统计
|
||
stats.IncWrite(100)
|
||
stats.IncWrite(200)
|
||
|
||
s := stats.Get()
|
||
if s.WriteCount != 2 {
|
||
t.Errorf("expected write count 2, got %d", s.WriteCount)
|
||
}
|
||
if s.WriteBytes != 300 {
|
||
t.Errorf("expected write bytes 300, got %d", s.WriteBytes)
|
||
}
|
||
|
||
// 测试处理统计
|
||
stats.IncProcessed(50)
|
||
stats.IncProcessed(150)
|
||
|
||
s = stats.Get()
|
||
if s.ProcessedCount != 2 {
|
||
t.Errorf("expected processed count 2, got %d", s.ProcessedCount)
|
||
}
|
||
if s.ProcessedBytes != 200 {
|
||
t.Errorf("expected processed bytes 200, got %d", s.ProcessedBytes)
|
||
}
|
||
|
||
// 测试错误统计
|
||
stats.IncError()
|
||
stats.IncError()
|
||
stats.IncError()
|
||
|
||
s = stats.Get()
|
||
if s.ErrorCount != 3 {
|
||
t.Errorf("expected error count 3, got %d", s.ErrorCount)
|
||
}
|
||
|
||
// 测试持久化
|
||
if err := stats.Save(); err != nil {
|
||
t.Fatalf("failed to save stats: %v", err)
|
||
}
|
||
|
||
// 创建新的统计管理器,应该能恢复数据
|
||
stats2 := NewTopicStats(statsPath)
|
||
s2 := stats2.Get()
|
||
|
||
if s2.WriteCount != s.WriteCount {
|
||
t.Errorf("recovered write count mismatch: expected %d, got %d", s.WriteCount, s2.WriteCount)
|
||
}
|
||
if s2.WriteBytes != s.WriteBytes {
|
||
t.Errorf("recovered write bytes mismatch: expected %d, got %d", s.WriteBytes, s2.WriteBytes)
|
||
}
|
||
if s2.ProcessedCount != s.ProcessedCount {
|
||
t.Errorf("recovered processed count mismatch: expected %d, got %d", s.ProcessedCount, s2.ProcessedCount)
|
||
}
|
||
if s2.ProcessedBytes != s.ProcessedBytes {
|
||
t.Errorf("recovered processed bytes mismatch: expected %d, got %d", s.ProcessedBytes, s2.ProcessedBytes)
|
||
}
|
||
if s2.ErrorCount != s.ErrorCount {
|
||
t.Errorf("recovered error count mismatch: expected %d, got %d", s.ErrorCount, s2.ErrorCount)
|
||
}
|
||
}
|
||
|
||
// TestTopicProcessorStats 测试 TopicProcessor 的统计功能
|
||
func TestTopicProcessorStats(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
topic := "stats_test"
|
||
|
||
// 创建 processor
|
||
logger := slog.Default()
|
||
config := &TopicConfig{
|
||
Handler: func(rec *Record) error {
|
||
// 简单处理函数
|
||
return nil
|
||
},
|
||
}
|
||
processor, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
||
if err != nil {
|
||
t.Fatalf("创建 processor 失败: %v", err)
|
||
}
|
||
|
||
// 启动 processor
|
||
if err := processor.Start(); err != nil {
|
||
t.Fatalf("failed to start processor: %v", err)
|
||
}
|
||
defer func() {
|
||
processor.Stop()
|
||
processor.Close()
|
||
}()
|
||
|
||
// 写入一些数据
|
||
data1 := []byte("test message 1")
|
||
data2 := []byte("test message 2")
|
||
data3 := []byte("test message 3")
|
||
|
||
if _, err := processor.Write(data1); err != nil {
|
||
t.Fatalf("failed to write: %v", err)
|
||
}
|
||
if _, err := processor.Write(data2); err != nil {
|
||
t.Fatalf("failed to write: %v", err)
|
||
}
|
||
if _, err := processor.Write(data3); err != nil {
|
||
t.Fatalf("failed to write: %v", err)
|
||
}
|
||
|
||
// 等待处理完成
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
// 检查统计信息
|
||
stats := processor.GetStats()
|
||
|
||
if stats.WriteCount != 3 {
|
||
t.Errorf("expected write count 3, got %d", stats.WriteCount)
|
||
}
|
||
|
||
expectedBytes := int64(len(data1) + len(data2) + len(data3))
|
||
if stats.WriteBytes != expectedBytes {
|
||
t.Errorf("expected write bytes %d, got %d", expectedBytes, stats.WriteBytes)
|
||
}
|
||
|
||
if stats.ProcessedCount != 3 {
|
||
t.Errorf("expected processed count 3, got %d", stats.ProcessedCount)
|
||
}
|
||
if stats.ProcessedBytes != expectedBytes {
|
||
t.Errorf("expected processed bytes %d, got %d", expectedBytes, stats.ProcessedBytes)
|
||
}
|
||
}
|
||
|
||
// TestStatsRecovery 测试统计信息的恢复功能
|
||
func TestStatsRecovery(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
topic := "recovery_test"
|
||
|
||
// 第一次运行:写入数据
|
||
logger := slog.Default()
|
||
config := &TopicConfig{
|
||
Handler: func(rec *Record) error {
|
||
return nil
|
||
},
|
||
}
|
||
processor1, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
||
if err != nil {
|
||
t.Fatalf("创建 processor 失败: %v", err)
|
||
}
|
||
if err := processor1.Start(); err != nil {
|
||
t.Fatalf("failed to start processor: %v", err)
|
||
}
|
||
|
||
// 写入数据
|
||
for i := 0; i < 5; i++ {
|
||
data := []byte(fmt.Sprintf("message %d", i))
|
||
if _, err := processor1.Write(data); err != nil {
|
||
t.Fatalf("failed to write: %v", err)
|
||
}
|
||
}
|
||
|
||
// 等待处理
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
// 获取统计
|
||
stats1 := processor1.GetStats()
|
||
|
||
// 停止并保存
|
||
if err := processor1.Stop(); err != nil {
|
||
t.Fatalf("failed to stop processor: %v", err)
|
||
}
|
||
if err := processor1.Close(); err != nil {
|
||
t.Fatalf("failed to close processor: %v", err)
|
||
}
|
||
|
||
// 第二次运行:应该能恢复统计信息
|
||
processor2, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
||
if err != nil {
|
||
t.Fatalf("创建 processor 失败: %v", err)
|
||
}
|
||
stats2 := processor2.GetStats()
|
||
|
||
// 验证恢复的统计信息
|
||
if stats2.WriteCount != stats1.WriteCount {
|
||
t.Errorf("write count mismatch: expected %d, got %d", stats1.WriteCount, stats2.WriteCount)
|
||
}
|
||
if stats2.WriteBytes != stats1.WriteBytes {
|
||
t.Errorf("write bytes mismatch: expected %d, got %d", stats1.WriteBytes, stats2.WriteBytes)
|
||
}
|
||
if stats2.ProcessedCount != stats1.ProcessedCount {
|
||
t.Errorf("processed count mismatch: expected %d, got %d", stats1.ProcessedCount, stats2.ProcessedCount)
|
||
}
|
||
if stats2.ProcessedBytes != stats1.ProcessedBytes {
|
||
t.Errorf("processed bytes mismatch: expected %d, got %d", stats1.ProcessedBytes, stats2.ProcessedBytes)
|
||
}
|
||
|
||
processor2.Close()
|
||
}
|
||
|
||
// TestSeqlogStats 测试 Seqlog 层面的统计聚合
|
||
func TestSeqlogStats(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
|
||
handler := func(topic string, rec *Record) error {
|
||
return nil
|
||
}
|
||
|
||
seq := NewLogHub(tmpDir, slog.Default(), handler)
|
||
if err := seq.Start(); err != nil {
|
||
t.Fatalf("failed to start seqlog: %v", err)
|
||
}
|
||
defer seq.Stop()
|
||
|
||
// 写入多个 topic
|
||
topics := []string{"app", "sys", "audit"}
|
||
for _, topic := range topics {
|
||
for i := 0; i < 3; i++ {
|
||
data := []byte(fmt.Sprintf("%s message %d", topic, i))
|
||
if _, err := seq.Write(topic, data); err != nil {
|
||
t.Fatalf("failed to write to %s: %v", topic, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 等待处理
|
||
time.Sleep(300 * time.Millisecond)
|
||
|
||
// 检查单个 topic 统计
|
||
for _, topic := range topics {
|
||
stats, err := seq.GetTopicStats(topic)
|
||
if err != nil {
|
||
t.Errorf("failed to get stats for %s: %v", topic, err)
|
||
continue
|
||
}
|
||
if stats.WriteCount != 3 {
|
||
t.Errorf("%s: expected write count 3, got %d", topic, stats.WriteCount)
|
||
}
|
||
}
|
||
|
||
// 检查所有统计
|
||
allStats := seq.GetAllStats()
|
||
if len(allStats) != len(topics) {
|
||
t.Errorf("expected %d topics, got %d", len(topics), len(allStats))
|
||
}
|
||
|
||
for _, topic := range topics {
|
||
stats, exists := allStats[topic]
|
||
if !exists {
|
||
t.Errorf("stats for %s not found", topic)
|
||
continue
|
||
}
|
||
if stats.WriteCount != 3 {
|
||
t.Errorf("%s: expected write count 3, got %d", topic, stats.WriteCount)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestRecordQuery 测试记录查询功能
|
||
func TestRecordQuery(t *testing.T) {
|
||
tmpFile := "test_query.log"
|
||
defer os.Remove(tmpFile)
|
||
defer os.Remove(tmpFile + ".pos")
|
||
|
||
// 写入测试数据
|
||
index, err := NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 写入测试数据
|
||
writer, err := NewLogWriter(tmpFile, index)
|
||
if err != nil {
|
||
t.Fatalf("failed to create writer: %v", err)
|
||
}
|
||
|
||
messages := []string{
|
||
"message 0",
|
||
"message 1",
|
||
"message 2",
|
||
"message 3",
|
||
"message 4",
|
||
"message 5",
|
||
"message 6",
|
||
"message 7",
|
||
"message 8",
|
||
"message 9",
|
||
}
|
||
|
||
offsets := make([]int64, len(messages))
|
||
for i, msg := range messages {
|
||
offset, err := writer.Append([]byte(msg))
|
||
if err != nil {
|
||
t.Fatalf("failed to write message %d: %v", i, err)
|
||
}
|
||
offsets[i] = offset
|
||
}
|
||
defer writer.Close()
|
||
|
||
// 模拟处理到第 5 条记录
|
||
// 窗口范围:[索引 5, 索引 6)
|
||
startIdx := 5
|
||
endIdx := 6
|
||
|
||
// 创建索引
|
||
index, err = NewRecordIndex(tmpFile)
|
||
if err != nil {
|
||
t.Fatalf("failed to create index: %v", err)
|
||
}
|
||
defer index.Close()
|
||
|
||
// 创建查询器
|
||
query, err := NewRecordQuery(tmpFile, index, writer)
|
||
if err != nil {
|
||
t.Fatalf("failed to create query: %v", err)
|
||
}
|
||
defer query.Close()
|
||
|
||
// 测试查询当前位置(使用 QueryNewest 查询 startIdx)
|
||
current, err := query.QueryNewest(startIdx-1, 1)
|
||
if err != nil {
|
||
t.Fatalf("failed to query current: %v", err)
|
||
}
|
||
if len(current) != 1 {
|
||
t.Fatalf("expected 1 current result, got %d", len(current))
|
||
}
|
||
if string(current[0].Record.Data) != "message 5" {
|
||
t.Errorf("expected current 'message 5', got '%s'", string(current[0].Record.Data))
|
||
}
|
||
if current[0].Index != startIdx {
|
||
t.Errorf("expected index %d, got %d", startIdx, current[0].Index)
|
||
}
|
||
// 手动判断状态
|
||
status := GetRecordStatus(startIdx, startIdx, endIdx)
|
||
if status != StatusProcessing {
|
||
t.Errorf("expected status Processing, got %s", status)
|
||
}
|
||
|
||
// 测试 QueryOldest:查询更早的记录(向索引递减方向)
|
||
// QueryOldest(5, 3) 查询索引 2, 3, 4
|
||
backResults, err := query.QueryOldest(startIdx, 3)
|
||
if err != nil {
|
||
t.Fatalf("failed to query backward: %v", err)
|
||
}
|
||
if len(backResults) != 3 {
|
||
t.Errorf("expected 3 backward results, got %d", len(backResults))
|
||
}
|
||
// 返回按索引递增排序的结果:2, 3, 4
|
||
expectedBack := []string{"message 2", "message 3", "message 4"}
|
||
for i, rec := range backResults {
|
||
if string(rec.Record.Data) != expectedBack[i] {
|
||
t.Errorf("backward[%d]: expected '%s', got '%s'", i, expectedBack[i], string(rec.Record.Data))
|
||
}
|
||
expectedIndex := startIdx - 3 + i
|
||
if rec.Index != expectedIndex {
|
||
t.Errorf("backward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index)
|
||
}
|
||
// 手动判断状态:索引 2, 3, 4 都已处理
|
||
recStatus := GetRecordStatus(rec.Index, startIdx, endIdx)
|
||
if recStatus != StatusProcessed {
|
||
t.Errorf("backward[%d]: expected status Processed, got %s", i, recStatus)
|
||
}
|
||
}
|
||
|
||
// 测试 QueryNewest:查询更新的记录(向索引递增方向)
|
||
// QueryNewest(endIdx, 3) 从 endIdx 向后查询,查询索引 6, 7, 8
|
||
forwardResults, err := query.QueryNewest(endIdx-1, 3)
|
||
if err != nil {
|
||
t.Fatalf("failed to query forward: %v", err)
|
||
}
|
||
if len(forwardResults) != 3 {
|
||
t.Errorf("expected 3 forward results, got %d", len(forwardResults))
|
||
}
|
||
// 返回按索引递增排序的结果:6, 7, 8
|
||
expectedForward := []string{"message 6", "message 7", "message 8"}
|
||
for i, rec := range forwardResults {
|
||
if string(rec.Record.Data) != expectedForward[i] {
|
||
t.Errorf("forward[%d]: expected '%s', got '%s'", i, expectedForward[i], string(rec.Record.Data))
|
||
}
|
||
expectedIndex := endIdx + i
|
||
if rec.Index != expectedIndex {
|
||
t.Errorf("forward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index)
|
||
}
|
||
// 手动判断状态:索引 6, 7, 8 待处理
|
||
recStatus := GetRecordStatus(rec.Index, startIdx, endIdx)
|
||
if recStatus != StatusPending {
|
||
t.Errorf("forward[%d]: expected status Pending, got %s", i, recStatus)
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
// TestTopicQuery 测试 TopicProcessor 的查询功能
|
||
func TestTopicQuery(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
topic := "query_test"
|
||
|
||
logger := slog.Default()
|
||
processedCount := 0
|
||
config := &TopicConfig{
|
||
Handler: func(rec *Record) error {
|
||
processedCount++
|
||
// 只处理前 3 条
|
||
if processedCount >= 3 {
|
||
return fmt.Errorf("stop processing")
|
||
}
|
||
return nil
|
||
},
|
||
}
|
||
|
||
processor, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
||
if err != nil {
|
||
t.Fatalf("创建 processor 失败: %v", err)
|
||
}
|
||
if err := processor.Start(); err != nil {
|
||
t.Fatalf("failed to start processor: %v", err)
|
||
}
|
||
|
||
// 写入 10 条消息
|
||
for i := 0; i < 10; i++ {
|
||
data := []byte(fmt.Sprintf("message %d", i))
|
||
if _, err := processor.Write(data); err != nil {
|
||
t.Fatalf("failed to write: %v", err)
|
||
}
|
||
}
|
||
|
||
// 等待处理
|
||
time.Sleep(300 * time.Millisecond)
|
||
|
||
processor.Stop()
|
||
|
||
// 获取当前处理索引
|
||
startIdx := processor.GetProcessingIndex()
|
||
endIdx := processor.GetReadIndex()
|
||
t.Logf("Processing index: [%d, %d)", startIdx, endIdx)
|
||
|
||
// 测试查询当前位置(使用 processor 方法,带状态)
|
||
if startIdx < endIdx {
|
||
current, err := processor.QueryOldest(startIdx, 1)
|
||
if err != nil {
|
||
t.Fatalf("failed to query current: %v", err)
|
||
}
|
||
if len(current) > 0 {
|
||
t.Logf("Current: %s - %s", string(current[0].Record.Data), current[0].Status)
|
||
}
|
||
}
|
||
|
||
// 测试向后查询
|
||
if startIdx > 0 {
|
||
back, err := processor.QueryNewest(startIdx-1, 2)
|
||
if err != nil {
|
||
t.Fatalf("failed to query backward: %v", err)
|
||
}
|
||
for i, rec := range back {
|
||
t.Logf("Back[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
|
||
}
|
||
}
|
||
|
||
// 测试向前查询
|
||
if startIdx < processor.GetRecordCount() {
|
||
forward, err := processor.QueryOldest(endIdx, 3)
|
||
if err != nil {
|
||
t.Fatalf("failed to query forward: %v", err)
|
||
}
|
||
for i, rec := range forward {
|
||
t.Logf("Forward[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
|
||
}
|
||
}
|
||
|
||
processor.Close()
|
||
}
|
||
|
||
// TestSeqlogQuery 测试 Seqlog 层面的查询功能
|
||
func TestSeqlogQuery(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
|
||
processedCount := 0
|
||
handler := func(topic string, rec *Record) error {
|
||
processedCount++
|
||
// 只处理前 5 条
|
||
if processedCount >= 5 {
|
||
return fmt.Errorf("stop processing")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
seq := NewLogHub(tmpDir, slog.Default(), handler)
|
||
if err := seq.Start(); err != nil {
|
||
t.Fatalf("failed to start seqlog: %v", err)
|
||
}
|
||
|
||
// 写入消息
|
||
for i := 0; i < 10; i++ {
|
||
data := []byte(fmt.Sprintf("app message %d", i))
|
||
if _, err := seq.Write("app", data); err != nil {
|
||
t.Fatalf("failed to write: %v", err)
|
||
}
|
||
}
|
||
|
||
// 等待处理
|
||
time.Sleep(300 * time.Millisecond)
|
||
|
||
// 创建查询器
|
||
query, err := seq.NewTopicQuery("app")
|
||
if err != nil {
|
||
t.Fatalf("failed to create query: %v", err)
|
||
}
|
||
defer query.Close()
|
||
|
||
// 获取当前处理索引
|
||
startIdx := seq.GetProcessingIndex("app")
|
||
endIdx := seq.GetReadIndex("app")
|
||
t.Logf("Processing index: [%d, %d)", startIdx, endIdx)
|
||
|
||
// 获取 processor 用于查询(带状态)
|
||
processor, _ := seq.GetProcessor("app")
|
||
totalCount := processor.GetRecordCount()
|
||
|
||
// 测试查询当前
|
||
if startIdx < endIdx {
|
||
current, err := processor.QueryOldest(startIdx, 1)
|
||
if err != nil {
|
||
t.Fatalf("failed to query current: %v", err)
|
||
}
|
||
if len(current) > 0 {
|
||
t.Logf("Current: %s, Status: %s", string(current[0].Record.Data), current[0].Status)
|
||
}
|
||
}
|
||
|
||
// 测试向后查询
|
||
if startIdx > 0 {
|
||
back, err := processor.QueryNewest(startIdx-1, 2)
|
||
if err != nil {
|
||
t.Fatalf("failed to query backward: %v", err)
|
||
}
|
||
for i, rec := range back {
|
||
t.Logf("Back[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
|
||
}
|
||
}
|
||
|
||
// 测试向前查询
|
||
if startIdx < totalCount {
|
||
forward, err := processor.QueryOldest(endIdx, 3)
|
||
if err != nil {
|
||
t.Fatalf("failed to query forward: %v", err)
|
||
}
|
||
for i, rec := range forward {
|
||
t.Logf("Forward[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
|
||
}
|
||
}
|
||
|
||
seq.Stop()
|
||
}
|
||
|
||
// TestEventNotification 测试事件通知功能
|
||
func TestEventNotification(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
topic := "event_test"
|
||
|
||
logger := slog.Default()
|
||
processedCount := 0
|
||
config := &TopicConfig{
|
||
Handler: func(rec *Record) error {
|
||
processedCount++
|
||
// 第 2 条消息返回错误
|
||
if processedCount == 2 {
|
||
return fmt.Errorf("模拟处理错误")
|
||
}
|
||
return nil
|
||
},
|
||
}
|
||
|
||
processor, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
||
if err != nil {
|
||
t.Fatalf("创建 processor 失败: %v", err)
|
||
}
|
||
|
||
// 记录收到的事件
|
||
events := make([]EventType, 0)
|
||
var eventsMu sync.Mutex
|
||
|
||
// 订阅所有事件
|
||
processor.SubscribeAll(func(event *Event) {
|
||
eventsMu.Lock()
|
||
events = append(events, event.Type)
|
||
t.Logf("收到事件: %s - Topic: %s", event.Type, event.Topic)
|
||
eventsMu.Unlock()
|
||
})
|
||
|
||
// 启动 processor
|
||
if err := processor.Start(); err != nil {
|
||
t.Fatalf("failed to start processor: %v", err)
|
||
}
|
||
|
||
// 写入 3 条消息
|
||
for i := 0; i < 3; i++ {
|
||
data := []byte(fmt.Sprintf("message %d", i))
|
||
if _, err := processor.Write(data); err != nil {
|
||
t.Fatalf("failed to write: %v", err)
|
||
}
|
||
}
|
||
|
||
// 等待处理
|
||
time.Sleep(500 * time.Millisecond)
|
||
|
||
// 停止
|
||
processor.Stop()
|
||
processor.Close()
|
||
|
||
// 等待事件处理完成
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 检查事件
|
||
eventsMu.Lock()
|
||
defer eventsMu.Unlock()
|
||
|
||
t.Logf("收到 %d 个事件", len(events))
|
||
|
||
// 应该有:1 个启动事件 + 3 个写入成功事件 + 1 个处理成功事件 + 1 个处理错误事件 + 1 个停止事件
|
||
expectedMinEvents := 7
|
||
if len(events) < expectedMinEvents {
|
||
t.Errorf("expected at least %d events, got %d", expectedMinEvents, len(events))
|
||
}
|
||
|
||
// 检查特定事件类型
|
||
hasStart := false
|
||
hasStop := false
|
||
hasWriteSuccess := false
|
||
hasProcessSuccess := false
|
||
hasProcessError := false
|
||
|
||
for _, eventType := range events {
|
||
switch eventType {
|
||
case EventProcessorStart:
|
||
hasStart = true
|
||
case EventProcessorStop:
|
||
hasStop = true
|
||
case EventWriteSuccess:
|
||
hasWriteSuccess = true
|
||
case EventProcessSuccess:
|
||
hasProcessSuccess = true
|
||
case EventProcessError:
|
||
hasProcessError = true
|
||
}
|
||
}
|
||
|
||
if !hasStart {
|
||
t.Error("missing ProcessorStart event")
|
||
}
|
||
if !hasStop {
|
||
t.Error("missing ProcessorStop event")
|
||
}
|
||
if !hasWriteSuccess {
|
||
t.Error("missing WriteSuccess event")
|
||
}
|
||
if !hasProcessSuccess {
|
||
t.Error("missing ProcessSuccess event")
|
||
}
|
||
if !hasProcessError {
|
||
t.Error("missing ProcessError event")
|
||
}
|
||
}
|
||
|
||
// TestSeqlogEventSubscription 测试 Seqlog 层面的事件订阅
|
||
func TestSeqlogEventSubscription(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
|
||
handler := func(topic string, rec *Record) error {
|
||
return nil
|
||
}
|
||
|
||
seq := NewLogHub(tmpDir, slog.Default(), handler)
|
||
if err := seq.Start(); err != nil {
|
||
t.Fatalf("failed to start seqlog: %v", err)
|
||
}
|
||
defer seq.Stop()
|
||
|
||
// 订阅写入成功事件
|
||
writeCount := 0
|
||
var writeMu sync.Mutex
|
||
|
||
seq.Subscribe("app", EventWriteSuccess, func(event *Event) {
|
||
writeMu.Lock()
|
||
writeCount++
|
||
t.Logf("写入成功: topic=%s, position=%d", event.Topic, event.Position)
|
||
writeMu.Unlock()
|
||
})
|
||
|
||
// 写入消息
|
||
for i := 0; i < 5; i++ {
|
||
data := []byte(fmt.Sprintf("message %d", i))
|
||
if _, err := seq.Write("app", data); err != nil {
|
||
t.Fatalf("failed to write: %v", err)
|
||
}
|
||
}
|
||
|
||
// 等待事件处理
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
writeMu.Lock()
|
||
if writeCount != 5 {
|
||
t.Errorf("expected 5 write events, got %d", writeCount)
|
||
}
|
||
writeMu.Unlock()
|
||
}
|
||
|
||
// TestMultiTopicEventSubscription 测试多 topic 事件订阅
|
||
func TestMultiTopicEventSubscription(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
|
||
handler := func(topic string, rec *Record) error {
|
||
return nil
|
||
}
|
||
|
||
seq := NewLogHub(tmpDir, slog.Default(), handler)
|
||
if err := seq.Start(); err != nil {
|
||
t.Fatalf("failed to start seqlog: %v", err)
|
||
}
|
||
defer seq.Stop()
|
||
|
||
// 统计每个 topic 的写入事件
|
||
eventCounts := make(map[string]int)
|
||
var countMu sync.Mutex
|
||
|
||
// 为所有 topic 订阅写入成功事件
|
||
seq.SubscribeAllTopics(EventWriteSuccess, func(event *Event) {
|
||
countMu.Lock()
|
||
eventCounts[event.Topic]++
|
||
countMu.Unlock()
|
||
})
|
||
|
||
// 写入不同 topic
|
||
topics := []string{"app", "sys", "audit"}
|
||
for _, topic := range topics {
|
||
for i := 0; i < 3; i++ {
|
||
data := []byte(fmt.Sprintf("%s message %d", topic, i))
|
||
if _, err := seq.Write(topic, data); err != nil {
|
||
t.Fatalf("failed to write to %s: %v", topic, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 等待事件处理
|
||
time.Sleep(300 * time.Millisecond)
|
||
|
||
countMu.Lock()
|
||
defer countMu.Unlock()
|
||
|
||
for _, topic := range topics {
|
||
count := eventCounts[topic]
|
||
if count != 3 {
|
||
t.Errorf("topic %s: expected 3 write events, got %d", topic, count)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestTopicReset 测试 topic 的 Reset 功能
|
||
func TestTopicReset(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
|
||
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
||
|
||
// 注册 handler
|
||
seqlog.RegisterHandler("test", func(rec *Record) error {
|
||
return nil
|
||
})
|
||
|
||
seqlog.Start()
|
||
|
||
// 写入一些日志
|
||
for i := 0; i < 5; i++ {
|
||
data := []byte(fmt.Sprintf("message %d", i))
|
||
if _, err := seqlog.Write("test", data); err != nil {
|
||
t.Fatalf("写入失败: %v", err)
|
||
}
|
||
}
|
||
|
||
// 等待处理
|
||
time.Sleep(300 * time.Millisecond)
|
||
|
||
// 验证统计信息
|
||
stats, err := seqlog.GetTopicStats("test")
|
||
if err != nil {
|
||
t.Fatalf("获取统计失败: %v", err)
|
||
}
|
||
|
||
if stats.WriteCount != 5 {
|
||
t.Errorf("期望写入 5 条,实际 %d 条", stats.WriteCount)
|
||
}
|
||
|
||
if stats.ProcessedCount != 5 {
|
||
t.Errorf("期望处理 5 条,实际 %d 条", stats.ProcessedCount)
|
||
}
|
||
|
||
// 检查日志文件是否存在
|
||
logFile := filepath.Join(tmpDir, "test.log")
|
||
|
||
if _, err := os.Stat(logFile); os.IsNotExist(err) {
|
||
t.Error("日志文件不存在")
|
||
}
|
||
|
||
// 使用 ResetTopic 方法
|
||
if err := seqlog.ResetTopic("test"); err != nil {
|
||
t.Fatalf("Reset 失败: %v", err)
|
||
}
|
||
|
||
// 等待重置完成
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 验证日志文件已被重置(Reset 会重新初始化组件,创建空文件)
|
||
fileInfo, err := os.Stat(logFile)
|
||
if err != nil {
|
||
t.Errorf("日志文件应该存在(空文件): %v", err)
|
||
} else if fileInfo.Size() != 0 {
|
||
t.Errorf("日志文件应该是空的,但大小为 %d", fileInfo.Size())
|
||
}
|
||
|
||
// 验证统计信息已重置
|
||
stats, err = seqlog.GetTopicStats("test")
|
||
if err != nil {
|
||
t.Fatalf("获取统计失败: %v", err)
|
||
}
|
||
|
||
if stats.WriteCount != 0 {
|
||
t.Errorf("期望写入计数为 0,实际 %d", stats.WriteCount)
|
||
}
|
||
if stats.ProcessedCount != 0 {
|
||
t.Errorf("期望处理计数为 0,实际 %d", stats.ProcessedCount)
|
||
}
|
||
|
||
// 验证可以继续写入
|
||
for i := 0; i < 3; i++ {
|
||
data := []byte(fmt.Sprintf("new message %d", i))
|
||
if _, err := seqlog.Write("test", data); err != nil {
|
||
t.Fatalf("重置后写入失败: %v", err)
|
||
}
|
||
}
|
||
|
||
time.Sleep(300 * time.Millisecond)
|
||
|
||
stats, _ = seqlog.GetTopicStats("test")
|
||
if stats.WriteCount != 3 {
|
||
t.Errorf("重置后期望写入 3 条,实际 %d 条", stats.WriteCount)
|
||
}
|
||
if stats.ProcessedCount != 3 {
|
||
t.Errorf("重置后期望处理 3 条,实际 %d 条", stats.ProcessedCount)
|
||
}
|
||
|
||
// 最后停止
|
||
seqlog.Stop()
|
||
}
|
||
|
||
// TestTopicResetWithPendingRecords 测试当有待处理日志时 Reset 返回错误
|
||
func TestTopicResetWithPendingRecords(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
|
||
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
||
|
||
// 注册一个慢速 handler,让日志堆积
|
||
slowHandler := func(rec *Record) error {
|
||
time.Sleep(100 * time.Millisecond) // 模拟慢速处理
|
||
return nil
|
||
}
|
||
|
||
seqlog.RegisterHandler("test", slowHandler)
|
||
seqlog.Start()
|
||
|
||
// 快速写入多条日志
|
||
for i := 0; i < 10; i++ {
|
||
data := []byte(fmt.Sprintf("message %d", i))
|
||
if _, err := seqlog.Write("test", data); err != nil {
|
||
t.Fatalf("写入失败: %v", err)
|
||
}
|
||
}
|
||
|
||
// 短暂等待,让一部分日志开始处理但不是全部
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
// 尝试重置,应该失败因为有待处理的日志
|
||
err := seqlog.ResetTopic("test")
|
||
if err == nil {
|
||
t.Fatal("期望 Reset 失败因为有待处理的日志,但成功了")
|
||
}
|
||
|
||
t.Logf("预期的错误: %v", err)
|
||
|
||
// 停止处理
|
||
seqlog.Stop()
|
||
|
||
// 现在 Reset 应该成功(停止后没有待处理的日志)
|
||
processor, _ := seqlog.GetProcessor("test")
|
||
if err := processor.Reset(); err != nil {
|
||
t.Fatalf("停止后 Reset 应该成功: %v", err)
|
||
}
|
||
}
|
||
|
||
// TestQueryOldestNewest 测试 QueryOldest 和 QueryNewest
|
||
func TestQueryOldestNewest(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
|
||
// 创建 TopicProcessor(提供空 handler)
|
||
processor, err := NewTopicProcessor(tmpDir, "test", nil, &TopicConfig{
|
||
Handler: func(rec *Record) error {
|
||
return nil
|
||
},
|
||
})
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer processor.Close()
|
||
|
||
// 写入测试数据
|
||
for i := 0; i < 10; i++ {
|
||
data := fmt.Sprintf("message %d", i)
|
||
if _, err := processor.Write([]byte(data)); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
}
|
||
|
||
// 测试 QueryNewest - 查询索引 0, 1, 2(向索引递增方向)
|
||
// QueryNewest(-1, 3) 从 -1 向后查询,得到索引 0, 1, 2
|
||
oldest, err := processor.QueryNewest(-1, 3)
|
||
if err != nil {
|
||
t.Fatalf("QueryNewest failed: %v", err)
|
||
}
|
||
if len(oldest) != 3 {
|
||
t.Errorf("expected 3 records, got %d", len(oldest))
|
||
}
|
||
// 验证顺序:应该是 0, 1, 2
|
||
for i := 0; i < 3; i++ {
|
||
expected := fmt.Sprintf("message %d", i)
|
||
if string(oldest[i].Record.Data) != expected {
|
||
t.Errorf("oldest[%d]: expected %s, got %s", i, expected, string(oldest[i].Record.Data))
|
||
}
|
||
if oldest[i].Index != i {
|
||
t.Errorf("oldest[%d]: expected index %d, got %d", i, i, oldest[i].Index)
|
||
}
|
||
t.Logf("Oldest[%d]: index=%d, %s - %s", i, oldest[i].Index, string(oldest[i].Record.Data), oldest[i].Status)
|
||
}
|
||
|
||
// 测试 QueryOldest - 查询索引 7, 8, 9(向索引递减方向)
|
||
// QueryOldest(10, 3) 从 10 向前查询,得到索引 7, 8, 9
|
||
totalCount := processor.GetRecordCount()
|
||
newest, err := processor.QueryOldest(totalCount, 3)
|
||
if err != nil {
|
||
t.Fatalf("QueryOldest failed: %v", err)
|
||
}
|
||
if len(newest) != 3 {
|
||
t.Errorf("expected 3 records, got %d", len(newest))
|
||
}
|
||
// 验证顺序:应该是 7, 8, 9(按索引递增)
|
||
for i := 0; i < 3; i++ {
|
||
expected := fmt.Sprintf("message %d", 7+i)
|
||
if string(newest[i].Record.Data) != expected {
|
||
t.Errorf("newest[%d]: expected %s, got %s", i, expected, string(newest[i].Record.Data))
|
||
}
|
||
if newest[i].Index != 7+i {
|
||
t.Errorf("newest[%d]: expected index %d, got %d", i, 7+i, newest[i].Index)
|
||
}
|
||
t.Logf("Newest[%d]: index=%d, %s - %s", i, newest[i].Index, string(newest[i].Record.Data), newest[i].Status)
|
||
}
|
||
|
||
// 测试超出范围 - 查询所有记录
|
||
// QueryNewest(-1, 100) 从 -1 向后查询,会返回所有记录(最多 100 条)
|
||
all, err := processor.QueryNewest(-1, 100)
|
||
if err != nil {
|
||
t.Fatalf("QueryNewest(-1, 100) failed: %v", err)
|
||
}
|
||
if len(all) != 10 {
|
||
t.Errorf("expected 10 records, got %d", len(all))
|
||
}
|
||
|
||
// 测试空结果
|
||
processor2, err := NewTopicProcessor(t.TempDir(), "empty", nil, &TopicConfig{
|
||
Handler: func(rec *Record) error {
|
||
return nil
|
||
},
|
||
})
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer processor2.Close()
|
||
|
||
emptyNewest, err := processor2.QueryNewest(-1, 10)
|
||
if err != nil {
|
||
t.Fatalf("QueryNewest on empty failed: %v", err)
|
||
}
|
||
if len(emptyNewest) != 0 {
|
||
t.Errorf("expected 0 records, got %d", len(emptyNewest))
|
||
}
|
||
}
|
||
|
||
// TestQueryFromFirstAndLast 测试 QueryFromFirst 和 QueryFromLast
|
||
func TestQueryFromFirstAndLast(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
|
||
// 创建 TopicProcessor
|
||
processor, err := NewTopicProcessor(tmpDir, "test", nil, &TopicConfig{
|
||
Handler: func(rec *Record) error {
|
||
return nil
|
||
},
|
||
})
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer processor.Close()
|
||
|
||
// 写入 10 条测试数据
|
||
for i := 0; i < 10; i++ {
|
||
data := fmt.Sprintf("message %d", i)
|
||
if _, err := processor.Write([]byte(data)); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
}
|
||
|
||
// 测试 QueryFromFirst - 从第一条记录向索引递增方向查询
|
||
t.Run("QueryFromFirst", func(t *testing.T) {
|
||
// 查询前 3 条记录
|
||
records, err := processor.QueryFromFirst(3)
|
||
if err != nil {
|
||
t.Fatalf("QueryFromFirst failed: %v", err)
|
||
}
|
||
|
||
if len(records) != 3 {
|
||
t.Fatalf("expected 3 records, got %d", len(records))
|
||
}
|
||
|
||
// 验证结果:应该是索引 0, 1, 2
|
||
for i := 0; i < 3; i++ {
|
||
expectedData := fmt.Sprintf("message %d", i)
|
||
if string(records[i].Record.Data) != expectedData {
|
||
t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data))
|
||
}
|
||
if records[i].Index != i {
|
||
t.Errorf("records[%d]: expected index %d, got %d", i, i, records[i].Index)
|
||
}
|
||
t.Logf("FromFirst[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status)
|
||
}
|
||
|
||
// 查询超过总数的记录
|
||
allRecords, err := processor.QueryFromFirst(100)
|
||
if err != nil {
|
||
t.Fatalf("QueryFromFirst(100) failed: %v", err)
|
||
}
|
||
if len(allRecords) != 10 {
|
||
t.Errorf("expected 10 records, got %d", len(allRecords))
|
||
}
|
||
})
|
||
|
||
// 测试 QueryFromLast - 从最后一条记录向索引递减方向查询
|
||
t.Run("QueryFromLast", func(t *testing.T) {
|
||
// 查询最后 3 条记录
|
||
records, err := processor.QueryFromLast(3)
|
||
if err != nil {
|
||
t.Fatalf("QueryFromLast failed: %v", err)
|
||
}
|
||
|
||
if len(records) != 3 {
|
||
t.Fatalf("expected 3 records, got %d", len(records))
|
||
}
|
||
|
||
// 验证结果:应该是索引 7, 8, 9(按索引递增顺序排列)
|
||
for i := 0; i < 3; i++ {
|
||
expectedIndex := 7 + i
|
||
expectedData := fmt.Sprintf("message %d", expectedIndex)
|
||
if string(records[i].Record.Data) != expectedData {
|
||
t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data))
|
||
}
|
||
if records[i].Index != expectedIndex {
|
||
t.Errorf("records[%d]: expected index %d, got %d", i, expectedIndex, records[i].Index)
|
||
}
|
||
t.Logf("FromLast[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status)
|
||
}
|
||
|
||
// 查询超过总数的记录
|
||
allRecords, err := processor.QueryFromLast(100)
|
||
if err != nil {
|
||
t.Fatalf("QueryFromLast(100) failed: %v", err)
|
||
}
|
||
if len(allRecords) != 10 {
|
||
t.Errorf("expected 10 records, got %d", len(allRecords))
|
||
}
|
||
})
|
||
|
||
// 测试空数据库
|
||
t.Run("EmptyDatabase", func(t *testing.T) {
|
||
emptyProcessor, err := NewTopicProcessor(t.TempDir(), "empty", nil, &TopicConfig{
|
||
Handler: func(rec *Record) error {
|
||
return nil
|
||
},
|
||
})
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer emptyProcessor.Close()
|
||
|
||
// QueryFromFirst 应该返回空数组
|
||
firstRecords, err := emptyProcessor.QueryFromFirst(10)
|
||
if err != nil {
|
||
t.Fatalf("QueryFromFirst on empty failed: %v", err)
|
||
}
|
||
if len(firstRecords) != 0 {
|
||
t.Errorf("expected 0 records, got %d", len(firstRecords))
|
||
}
|
||
|
||
// QueryFromLast 应该返回空数组
|
||
lastRecords, err := emptyProcessor.QueryFromLast(10)
|
||
if err != nil {
|
||
t.Fatalf("QueryFromLast on empty failed: %v", err)
|
||
}
|
||
if len(lastRecords) != 0 {
|
||
t.Errorf("expected 0 records, got %d", len(lastRecords))
|
||
}
|
||
})
|
||
}
|
||
|
||
// TestLogHubQueryFromFirstAndLast 测试 LogHub 的 QueryFromFirst 和 QueryFromLast
|
||
func TestLogHubQueryFromFirstAndLast(t *testing.T) {
|
||
tmpDir := t.TempDir()
|
||
|
||
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
||
seqlog.RegisterHandler("test", func(rec *Record) error {
|
||
return nil
|
||
})
|
||
seqlog.Start()
|
||
defer seqlog.Stop()
|
||
|
||
// 写入测试数据
|
||
for i := 0; i < 10; i++ {
|
||
data := fmt.Sprintf("message %d", i)
|
||
if _, err := seqlog.Write("test", []byte(data)); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
}
|
||
|
||
// 测试 QueryFromFirst
|
||
firstRecords, err := seqlog.QueryFromFirst("test", 3)
|
||
if err != nil {
|
||
t.Fatalf("QueryFromFirst failed: %v", err)
|
||
}
|
||
if len(firstRecords) != 3 {
|
||
t.Fatalf("expected 3 records, got %d", len(firstRecords))
|
||
}
|
||
for i := 0; i < 3; i++ {
|
||
if firstRecords[i].Index != i {
|
||
t.Errorf("firstRecords[%d]: expected index %d, got %d", i, i, firstRecords[i].Index)
|
||
}
|
||
}
|
||
|
||
// 测试 QueryFromLast
|
||
lastRecords, err := seqlog.QueryFromLast("test", 3)
|
||
if err != nil {
|
||
t.Fatalf("QueryFromLast failed: %v", err)
|
||
}
|
||
if len(lastRecords) != 3 {
|
||
t.Fatalf("expected 3 records, got %d", len(lastRecords))
|
||
}
|
||
for i := 0; i < 3; i++ {
|
||
expectedIndex := 7 + i
|
||
if lastRecords[i].Index != expectedIndex {
|
||
t.Errorf("lastRecords[%d]: expected index %d, got %d", i, expectedIndex, lastRecords[i].Index)
|
||
}
|
||
}
|
||
|
||
// 测试不存在的 topic
|
||
_, err = seqlog.QueryFromFirst("nonexistent", 10)
|
||
if err == nil {
|
||
t.Error("expected error for nonexistent topic")
|
||
}
|
||
|
||
_, err = seqlog.QueryFromLast("nonexistent", 10)
|
||
if err == nil {
|
||
t.Error("expected error for nonexistent topic")
|
||
}
|
||
}
|