主要更改: 1. 核心重命名 - Seqlog -> LogHub (更准确地反映其作为日志中枢的角色) - NewSeqlog() -> NewLogHub() - LogCursor -> ProcessCursor (更准确地反映其用于处理场景) - seqlog_manager.go -> loghub.go (文件名与结构体名对应) 2. TopicProcessor.Reset 增强 - 如果正在运行且没有待处理的日志,会自动停止后重置 - 如果有待处理的日志,返回详细错误(显示已处理/总记录数) - 简化了 LogHub.ResetTopic,移除显式 Stop 调用 3. 新增查询方法 - TopicProcessor.QueryFromFirst(count) - 从第一条记录向索引递增方向查询 - TopicProcessor.QueryFromLast(count) - 从最后一条记录向索引递减方向查询 - LogHub.QueryFromFirst(topic, count) - LogHub.QueryFromLast(topic, count) 4. 测试覆盖 - 添加 query_test.go - QueryFromProcessing 测试 - 添加 TestQueryFromFirstAndLast - TopicProcessor 查询测试 - 添加 TestLogHubQueryFromFirstAndLast - LogHub 查询测试 - 添加 TestTopicResetWithPendingRecords - Reset 增强功能测试 5. 示例代码 - 添加 example/get_record/ - 演示 QueryFromProcessing 用法 - 更新所有示例以使用 LogHub 和新 API 所有测试通过 ✅ 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
76 lines
1.9 KiB
Go
76 lines
1.9 KiB
Go
package seqlog
|
||
|
||
import (
|
||
"fmt"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
// TestQueryFromProcessing 测试从处理窗口开始位置查询记录
|
||
func TestQueryFromProcessing(t *testing.T) {
|
||
// 创建临时目录
|
||
tmpDir := t.TempDir()
|
||
|
||
// 创建配置(带 handler,不处理任何记录以保持窗口稳定)
|
||
config := &TopicConfig{
|
||
Handler: func(record *Record) error {
|
||
// 不处理,只是为了让 tailer 启动
|
||
time.Sleep(1 * time.Second) // 延迟处理
|
||
return nil
|
||
},
|
||
}
|
||
|
||
// 创建 TopicProcessor
|
||
processor, err := NewTopicProcessor(tmpDir, "test", nil, config)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
defer processor.Close()
|
||
|
||
// 写入 10 条记录
|
||
for i := 0; i < 10; i++ {
|
||
msg := fmt.Sprintf("message %d", i)
|
||
_, err := processor.Write([]byte(msg))
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
}
|
||
|
||
// 不启动 tailer,直接测试查询功能
|
||
// startIdx 应该是 0(没有处理任何记录)
|
||
|
||
// 从处理窗口开始位置查询 5 条记录
|
||
records, err := processor.QueryFromProcessing(5)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
t.Logf("查询到 %d 条记录", len(records))
|
||
|
||
if len(records) != 5 {
|
||
t.Fatalf("expected 5 records, got %d", len(records))
|
||
}
|
||
|
||
// 验证查询结果从索引 0 开始
|
||
for i, rec := range records {
|
||
expectedIndex := i
|
||
if rec.Index != expectedIndex {
|
||
t.Errorf("record[%d]: expected index %d, got %d", i, expectedIndex, rec.Index)
|
||
}
|
||
|
||
expectedMsg := fmt.Sprintf("message %d", expectedIndex)
|
||
if string(rec.Record.Data) != expectedMsg {
|
||
t.Errorf("record[%d]: expected data '%s', got '%s'", i, expectedMsg, string(rec.Record.Data))
|
||
}
|
||
|
||
// 未启动 tailer,所有记录都应该是 Pending 状态
|
||
if rec.Status != StatusPending {
|
||
t.Errorf("record[%d]: expected StatusPending, got %s", i, rec.Status)
|
||
}
|
||
|
||
t.Logf(" [索引 %d] %s - 状态: %s", rec.Index, string(rec.Record.Data), rec.Status)
|
||
}
|
||
|
||
t.Log("QueryFromProcessing 测试通过")
|
||
}
|