Files
seqlog/query_test.go
bourdon 90cc9e21c9 重构:重命名核心组件并增强查询功能
主要更改:

1. 核心重命名
   - Seqlog -> LogHub (更准确地反映其作为日志中枢的角色)
   - NewSeqlog() -> NewLogHub()
   - LogCursor -> ProcessCursor (更准确地反映其用于处理场景)
   - seqlog_manager.go -> loghub.go (文件名与结构体名对应)

2. TopicProcessor.Reset 增强
   - 如果正在运行且没有待处理的日志,会自动停止后重置
   - 如果有待处理的日志,返回详细错误(显示已处理/总记录数)
   - 简化了 LogHub.ResetTopic,移除显式 Stop 调用

3. 新增查询方法
   - TopicProcessor.QueryFromFirst(count) - 从第一条记录向索引递增方向查询
   - TopicProcessor.QueryFromLast(count) - 从最后一条记录向索引递减方向查询
   - LogHub.QueryFromFirst(topic, count)
   - LogHub.QueryFromLast(topic, count)

4. 测试覆盖
   - 添加 query_test.go - QueryFromProcessing 测试
   - 添加 TestQueryFromFirstAndLast - TopicProcessor 查询测试
   - 添加 TestLogHubQueryFromFirstAndLast - LogHub 查询测试
   - 添加 TestTopicResetWithPendingRecords - Reset 增强功能测试

5. 示例代码
   - 添加 example/get_record/ - 演示 QueryFromProcessing 用法
   - 更新所有示例以使用 LogHub 和新 API

所有测试通过 

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 13:26:21 +08:00

76 lines
1.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"fmt"
"testing"
"time"
)
// TestQueryFromProcessing 测试从处理窗口开始位置查询记录
func TestQueryFromProcessing(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
// 创建配置(带 handler不处理任何记录以保持窗口稳定
config := &TopicConfig{
Handler: func(record *Record) error {
// 不处理,只是为了让 tailer 启动
time.Sleep(1 * time.Second) // 延迟处理
return nil
},
}
// 创建 TopicProcessor
processor, err := NewTopicProcessor(tmpDir, "test", nil, config)
if err != nil {
t.Fatal(err)
}
defer processor.Close()
// 写入 10 条记录
for i := 0; i < 10; i++ {
msg := fmt.Sprintf("message %d", i)
_, err := processor.Write([]byte(msg))
if err != nil {
t.Fatal(err)
}
}
// 不启动 tailer直接测试查询功能
// startIdx 应该是 0没有处理任何记录
// 从处理窗口开始位置查询 5 条记录
records, err := processor.QueryFromProcessing(5)
if err != nil {
t.Fatal(err)
}
t.Logf("查询到 %d 条记录", len(records))
if len(records) != 5 {
t.Fatalf("expected 5 records, got %d", len(records))
}
// 验证查询结果从索引 0 开始
for i, rec := range records {
expectedIndex := i
if rec.Index != expectedIndex {
t.Errorf("record[%d]: expected index %d, got %d", i, expectedIndex, rec.Index)
}
expectedMsg := fmt.Sprintf("message %d", expectedIndex)
if string(rec.Record.Data) != expectedMsg {
t.Errorf("record[%d]: expected data '%s', got '%s'", i, expectedMsg, string(rec.Record.Data))
}
// 未启动 tailer所有记录都应该是 Pending 状态
if rec.Status != StatusPending {
t.Errorf("record[%d]: expected StatusPending, got %s", i, rec.Status)
}
t.Logf(" [索引 %d] %s - 状态: %s", rec.Index, string(rec.Record.Data), rec.Status)
}
t.Log("QueryFromProcessing 测试通过")
}