重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
package seqlog
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"io"
|
|
|
|
|
|
"log/slog"
|
|
|
|
|
|
"os"
|
|
|
|
|
|
"path/filepath"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
"testing"
|
|
|
|
|
|
"time"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
func TestBasicWriteAndRead(t *testing.T) {
|
|
|
|
|
|
tmpFile := "test_basic.log"
|
|
|
|
|
|
defer os.Remove(tmpFile)
|
|
|
|
|
|
defer os.Remove(tmpFile + ".pos")
|
|
|
|
|
|
defer os.Remove(tmpFile + ".idx")
|
|
|
|
|
|
|
|
|
|
|
|
// 创建索引
|
|
|
|
|
|
index, err := NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建索引失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
writer, err := NewLogWriter(tmpFile, index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 writer 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
testData := [][]byte{
|
|
|
|
|
|
[]byte("hello world"),
|
|
|
|
|
|
[]byte("test log entry 1"),
|
|
|
|
|
|
[]byte("test log entry 2"),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, data := range testData {
|
|
|
|
|
|
if _, err := writer.Append(data); err != nil {
|
|
|
|
|
|
t.Fatalf("写入数据失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取数据(使用共享的 index)
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor, err := NewCursor(tmpFile, index, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 cursor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer cursor.Close()
|
|
|
|
|
|
|
|
|
|
|
|
for i, expected := range testData {
|
|
|
|
|
|
rec, err := cursor.Next()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("读取第 %d 条记录失败: %v", i, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if string(rec.Data) != string(expected) {
|
|
|
|
|
|
t.Errorf("第 %d 条记录数据不匹配: got %q, want %q", i, rec.Data, expected)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestCursorNextRange 测试范围游动功能
|
|
|
|
|
|
func TestCursorNextRange(t *testing.T) {
|
|
|
|
|
|
tmpFile := "test_range.log"
|
|
|
|
|
|
defer os.Remove(tmpFile)
|
|
|
|
|
|
defer os.Remove(tmpFile + ".pos")
|
|
|
|
|
|
defer os.Remove(tmpFile + ".idx")
|
|
|
|
|
|
|
|
|
|
|
|
// 创建索引
|
|
|
|
|
|
index, err := NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入多条记录
|
|
|
|
|
|
writer, err := NewLogWriter(tmpFile, index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
messages := []string{"msg1", "msg2", "msg3", "msg4", "msg5", "msg6", "msg7", "msg8", "msg9", "msg10"}
|
|
|
|
|
|
for _, msg := range messages {
|
|
|
|
|
|
if _, err := writer.Append([]byte(msg)); err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
writer.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 测试范围读取
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor, err := NewCursor(tmpFile, index, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer cursor.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 第一次读取 3 条
|
|
|
|
|
|
records, err := cursor.NextRange(3)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(records) != 3 {
|
|
|
|
|
|
t.Fatalf("expected 3 records, got %d", len(records))
|
|
|
|
|
|
}
|
|
|
|
|
|
for i, rec := range records {
|
|
|
|
|
|
expected := messages[i]
|
|
|
|
|
|
if string(rec.Data) != expected {
|
|
|
|
|
|
t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 第二次读取 5 条
|
|
|
|
|
|
records, err = cursor.NextRange(5)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(records) != 5 {
|
|
|
|
|
|
t.Fatalf("expected 5 records, got %d", len(records))
|
|
|
|
|
|
}
|
|
|
|
|
|
for i, rec := range records {
|
|
|
|
|
|
expected := messages[i+3]
|
|
|
|
|
|
if string(rec.Data) != expected {
|
|
|
|
|
|
t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 第三次读取 5 条(但只剩 2 条)
|
|
|
|
|
|
records, err = cursor.NextRange(5)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(records) != 2 {
|
|
|
|
|
|
t.Fatalf("expected 2 records, got %d", len(records))
|
|
|
|
|
|
}
|
|
|
|
|
|
for i, rec := range records {
|
|
|
|
|
|
expected := messages[i+8]
|
|
|
|
|
|
if string(rec.Data) != expected {
|
|
|
|
|
|
t.Errorf("record[%d]: expected '%s', got '%s'", i, expected, string(rec.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 再读取应该返回 EOF
|
|
|
|
|
|
records, err = cursor.NextRange(1)
|
|
|
|
|
|
if err != io.EOF {
|
|
|
|
|
|
t.Fatalf("expected EOF, got %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestCursorWindow 测试窗口模式
|
|
|
|
|
|
func TestCursorWindow(t *testing.T) {
|
|
|
|
|
|
tmpFile := "test_window.log"
|
|
|
|
|
|
defer os.Remove(tmpFile)
|
|
|
|
|
|
defer os.Remove(tmpFile + ".pos")
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
index, err := NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
writer, err := NewLogWriter(tmpFile, index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
messages := []string{"msg1", "msg2", "msg3"}
|
|
|
|
|
|
for _, msg := range messages {
|
|
|
|
|
|
if _, err := writer.Append([]byte(msg)); err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
writer.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 测试窗口模式
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor, err := NewCursor(tmpFile, index, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer cursor.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 初始状态
|
|
|
|
|
|
if cursor.StartIndex() != 0 {
|
|
|
|
|
|
t.Errorf("expected startIdx 0, got %d", cursor.StartIndex())
|
|
|
|
|
|
}
|
|
|
|
|
|
if cursor.EndIndex() != 0 {
|
|
|
|
|
|
t.Errorf("expected endIdx 0, got %d", cursor.EndIndex())
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取第一条记录
|
|
|
|
|
|
rec, err := cursor.Next()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if string(rec.Data) != "msg1" {
|
|
|
|
|
|
t.Errorf("expected 'msg1', got '%s'", string(rec.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查窗口索引:startIdx 未变,endIdx 移动了
|
|
|
|
|
|
if cursor.StartIndex() != 0 {
|
|
|
|
|
|
t.Errorf("expected startIdx 0, got %d", cursor.StartIndex())
|
|
|
|
|
|
}
|
|
|
|
|
|
endIdxAfterFirst := cursor.EndIndex()
|
|
|
|
|
|
if endIdxAfterFirst == 0 {
|
|
|
|
|
|
t.Error("endIdx should have moved")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 提交窗口
|
|
|
|
|
|
cursor.Commit()
|
|
|
|
|
|
if cursor.StartIndex() != endIdxAfterFirst {
|
|
|
|
|
|
t.Errorf("expected startIdx %d after commit, got %d", endIdxAfterFirst, cursor.StartIndex())
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取第二条记录
|
|
|
|
|
|
rec, err = cursor.Next()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if string(rec.Data) != "msg2" {
|
|
|
|
|
|
t.Errorf("expected 'msg2', got '%s'", string(rec.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
endIdxAfterSecond := cursor.EndIndex()
|
|
|
|
|
|
|
|
|
|
|
|
// 回滚
|
|
|
|
|
|
if err := cursor.Rollback(); err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 回滚后,endIdx 应该回到 startIdx
|
|
|
|
|
|
if cursor.EndIndex() != cursor.StartIndex() {
|
|
|
|
|
|
t.Errorf("expected endIdx == startIdx after rollback, got %d != %d", cursor.EndIndex(), cursor.StartIndex())
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 再次读取应该还是第二条
|
|
|
|
|
|
rec, err = cursor.Next()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if string(rec.Data) != "msg2" {
|
|
|
|
|
|
t.Errorf("expected 'msg2' after rollback, got '%s'", string(rec.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 这次提交
|
|
|
|
|
|
cursor.Commit()
|
|
|
|
|
|
if cursor.StartIndex() != endIdxAfterSecond {
|
|
|
|
|
|
t.Errorf("expected startIdx %d after commit, got %d", endIdxAfterSecond, cursor.StartIndex())
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestCursorPersistence(t *testing.T) {
|
|
|
|
|
|
tmpFile := "test_cursor.log"
|
|
|
|
|
|
defer os.Remove(tmpFile)
|
|
|
|
|
|
defer os.Remove(tmpFile + ".pos")
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
index, err := NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
writer, err := NewLogWriter(tmpFile, index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 writer 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
testData := [][]byte{
|
|
|
|
|
|
[]byte("record 1"),
|
|
|
|
|
|
[]byte("record 2"),
|
|
|
|
|
|
[]byte("record 3"),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, data := range testData {
|
|
|
|
|
|
writer.Append(data)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取前两条记录
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor1, err := NewCursor(tmpFile, index, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 cursor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor1.Next() // 读取第一条
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
cursor1.Commit() // 提交
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor1.Next() // 读取第二条
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
cursor1.Commit() // 提交
|
|
|
|
|
|
cursor1.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 重新打开 cursor,应该从第三条开始读取
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor2, err := NewCursor(tmpFile, index, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("重新创建 cursor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer cursor2.Close()
|
|
|
|
|
|
|
|
|
|
|
|
rec, err := cursor2.Next()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("读取恢复后的记录失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if string(rec.Data) != string(testData[2]) {
|
|
|
|
|
|
t.Errorf("游标恢复失败: got %q, want %q", rec.Data, testData[2])
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestTailer(t *testing.T) {
|
|
|
|
|
|
tmpFile := "test_tailer.log"
|
|
|
|
|
|
defer os.Remove(tmpFile)
|
|
|
|
|
|
defer os.Remove(tmpFile + ".pos")
|
|
|
|
|
|
|
|
|
|
|
|
// 创建 writer
|
|
|
|
|
|
index, err := NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
writer, err := NewLogWriter(tmpFile, index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 writer 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 先写入一些数据
|
|
|
|
|
|
writer.Append([]byte("initial record"))
|
|
|
|
|
|
|
|
|
|
|
|
// 记录处理的数据
|
|
|
|
|
|
records := make([]string, 0)
|
|
|
|
|
|
handler := func(rec *Record) error {
|
|
|
|
|
|
records = append(records, string(rec.Data))
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 创建 cursor
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor, err := NewCursor(tmpFile, index, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 cursor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 创建 tailer
|
|
|
|
|
|
tailer, err := NewTailer(cursor, handler, &TailConfig{
|
|
|
|
|
|
PollInterval: 50 * time.Millisecond,
|
|
|
|
|
|
SaveInterval: 100 * time.Millisecond,
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 tailer 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 启动 tailer
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
|
go tailer.Start(ctx)
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理初始记录
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 写入新数据
|
|
|
|
|
|
writer.Append([]byte("new record 1"))
|
|
|
|
|
|
writer.Append([]byte("new record 2"))
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理新数据
|
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 停止 tailer
|
|
|
|
|
|
cancel()
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 验证处理的数据
|
|
|
|
|
|
if len(records) < 3 {
|
|
|
|
|
|
t.Errorf("处理的记录数量不足: got %d, want at least 3", len(records))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
expected := []string{"initial record", "new record 1", "new record 2"}
|
|
|
|
|
|
for i, exp := range expected {
|
|
|
|
|
|
if i >= len(records) || records[i] != exp {
|
|
|
|
|
|
t.Errorf("第 %d 条记录不匹配: got %q, want %q", i, records[i], exp)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestTailerStop(t *testing.T) {
|
|
|
|
|
|
tmpFile := "test_tailer_stop.log"
|
|
|
|
|
|
defer os.Remove(tmpFile)
|
|
|
|
|
|
defer os.Remove(tmpFile + ".pos")
|
|
|
|
|
|
|
|
|
|
|
|
index, err := NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
writer, err := NewLogWriter(tmpFile, index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 writer 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
writer.Append([]byte("test record"))
|
|
|
|
|
|
|
|
|
|
|
|
count := 0
|
|
|
|
|
|
handler := func(rec *Record) error {
|
|
|
|
|
|
count++
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor, err := NewCursor(tmpFile, index, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 cursor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tailer, err := NewTailer(cursor, handler, nil)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 tailer 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 启动 tailer
|
|
|
|
|
|
go tailer.Start(context.Background())
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 停止 tailer
|
|
|
|
|
|
tailer.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 验证已处理
|
|
|
|
|
|
if count != 1 {
|
|
|
|
|
|
t.Errorf("处理的记录数量不正确: got %d, want 1", count)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestSeqlogBasic(t *testing.T) {
|
|
|
|
|
|
tmpDir := "test_seqlog"
|
|
|
|
|
|
os.MkdirAll(tmpDir, 0755)
|
|
|
|
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
|
|
// 注册 handler
|
|
|
|
|
|
appLogs := make([]string, 0)
|
|
|
|
|
|
seqlog.RegisterHandler("app", func(rec *Record) error {
|
|
|
|
|
|
appLogs = append(appLogs, string(rec.Data))
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
accessLogs := make([]string, 0)
|
|
|
|
|
|
seqlog.RegisterHandler("access", func(rec *Record) error {
|
|
|
|
|
|
accessLogs = append(accessLogs, string(rec.Data))
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 启动
|
|
|
|
|
|
if err := seqlog.Start(); err != nil {
|
|
|
|
|
|
t.Fatalf("启动 seqlog 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 写入日志
|
|
|
|
|
|
seqlog.Write("app", []byte("app log 1"))
|
|
|
|
|
|
seqlog.Write("app", []byte("app log 2"))
|
|
|
|
|
|
seqlog.Write("access", []byte("access log 1"))
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 停止
|
|
|
|
|
|
seqlog.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 验证
|
|
|
|
|
|
if len(appLogs) != 2 {
|
|
|
|
|
|
t.Errorf("app logs 数量不正确: got %d, want 2", len(appLogs))
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(accessLogs) != 1 {
|
|
|
|
|
|
t.Errorf("access logs 数量不正确: got %d, want 1", len(accessLogs))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestSeqlogDefaultHandler(t *testing.T) {
|
|
|
|
|
|
tmpDir := "test_seqlog_default"
|
|
|
|
|
|
os.MkdirAll(tmpDir, 0755)
|
|
|
|
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
|
|
|
|
|
|
|
|
// 注册默认 handler
|
|
|
|
|
|
allLogs := make(map[string][]string)
|
|
|
|
|
|
var mu sync.Mutex
|
|
|
|
|
|
|
|
|
|
|
|
defaultHandler := func(topic string, rec *Record) error {
|
|
|
|
|
|
mu.Lock()
|
|
|
|
|
|
defer mu.Unlock()
|
|
|
|
|
|
allLogs[topic] = append(allLogs[topic], string(rec.Data))
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seqlog := NewLogHub(tmpDir, slog.Default(), defaultHandler)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
|
|
// 注册特定 handler
|
|
|
|
|
|
seqlog.RegisterHandler("special", func(rec *Record) error {
|
|
|
|
|
|
mu.Lock()
|
|
|
|
|
|
defer mu.Unlock()
|
|
|
|
|
|
allLogs["special"] = append(allLogs["special"], string(rec.Data))
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 启动
|
|
|
|
|
|
seqlog.Start()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入日志
|
|
|
|
|
|
seqlog.Write("special", []byte("special log"))
|
|
|
|
|
|
seqlog.Write("other", []byte("other log 1"))
|
|
|
|
|
|
seqlog.Write("other", []byte("other log 2"))
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 停止
|
|
|
|
|
|
seqlog.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 验证
|
|
|
|
|
|
mu.Lock()
|
|
|
|
|
|
defer mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
if len(allLogs["special"]) != 1 {
|
|
|
|
|
|
t.Errorf("special logs 数量不正确: got %d, want 1", len(allLogs["special"]))
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(allLogs["other"]) != 2 {
|
|
|
|
|
|
t.Errorf("other logs 数量不正确: got %d, want 2", len(allLogs["other"]))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestSeqlogDynamicRegistration(t *testing.T) {
|
|
|
|
|
|
tmpDir := "test_seqlog_dynamic"
|
|
|
|
|
|
os.MkdirAll(tmpDir, 0755)
|
|
|
|
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
|
|
// 先注册 handler(handler 现在是必填项)
|
|
|
|
|
|
logs := make([]string, 0)
|
|
|
|
|
|
seqlog.RegisterHandler("dynamic", func(rec *Record) error {
|
|
|
|
|
|
logs = append(logs, string(rec.Data))
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
seqlog.Start()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入日志
|
|
|
|
|
|
seqlog.Write("dynamic", []byte("log 1"))
|
|
|
|
|
|
seqlog.Write("dynamic", []byte("log 2"))
|
|
|
|
|
|
seqlog.Write("dynamic", []byte("log 3"))
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
seqlog.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 应该处理所有日志
|
|
|
|
|
|
if len(logs) != 3 {
|
|
|
|
|
|
t.Errorf("处理的日志数量不正确: got %d, want 3", len(logs))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestDynamicConfigUpdate(t *testing.T) {
|
|
|
|
|
|
tmpDir := "test_dynamic_config"
|
|
|
|
|
|
os.MkdirAll(tmpDir, 0755)
|
|
|
|
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
|
|
// 注册 handler
|
|
|
|
|
|
logs := make([]string, 0)
|
|
|
|
|
|
seqlog.RegisterHandler("test", func(rec *Record) error {
|
|
|
|
|
|
logs = append(logs, string(rec.Data))
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
seqlog.Start()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入一些日志
|
|
|
|
|
|
seqlog.Write("test", []byte("log 1"))
|
|
|
|
|
|
seqlog.Write("test", []byte("log 2"))
|
|
|
|
|
|
|
|
|
|
|
|
time.Sleep(150 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 获取当前配置
|
|
|
|
|
|
config, err := seqlog.GetTopicConfig("test")
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("获取配置失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证默认配置
|
|
|
|
|
|
if config.PollInterval != 100*time.Millisecond {
|
|
|
|
|
|
t.Errorf("默认 PollInterval 不正确: got %v, want 100ms", config.PollInterval)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 动态更新配置
|
|
|
|
|
|
newConfig := &TailConfig{
|
|
|
|
|
|
PollInterval: 50 * time.Millisecond,
|
|
|
|
|
|
SaveInterval: 500 * time.Millisecond,
|
|
|
|
|
|
BatchSize: 10,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if err := seqlog.UpdateTopicConfig("test", newConfig); err != nil {
|
|
|
|
|
|
t.Fatalf("更新配置失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 继续写入日志
|
|
|
|
|
|
seqlog.Write("test", []byte("log 3"))
|
|
|
|
|
|
|
|
|
|
|
|
time.Sleep(150 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
seqlog.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 验证所有日志都被处理
|
|
|
|
|
|
if len(logs) != 3 {
|
|
|
|
|
|
t.Errorf("处理的日志数量不正确: got %d, want 3", len(logs))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证配置已更新
|
|
|
|
|
|
updatedConfig, _ := seqlog.GetTopicConfig("test")
|
|
|
|
|
|
if updatedConfig.PollInterval != 50*time.Millisecond {
|
|
|
|
|
|
t.Errorf("更新后的 PollInterval 不正确: got %v, want 50ms", updatedConfig.PollInterval)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestUUIDUniqueness(t *testing.T) {
|
|
|
|
|
|
tmpFile := "test_uuid.log"
|
|
|
|
|
|
defer os.Remove(tmpFile)
|
|
|
|
|
|
defer os.Remove(tmpFile + ".pos")
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
index, err := NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
writer, err := NewLogWriter(tmpFile, index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 writer 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 写入多条相同内容的日志
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
|
if _, err := writer.Append([]byte("same content")); err != nil {
|
|
|
|
|
|
t.Fatalf("写入数据失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取并验证 UUID 唯一性
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor, err := NewCursor(tmpFile, index, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 cursor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer cursor.Close()
|
|
|
|
|
|
|
|
|
|
|
|
uuids := make(map[string]bool)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
|
rec, err := cursor.Next()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("读取第 %d 条记录失败: %v", i, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查 UUID 是否已存在
|
|
|
|
|
|
uuidStr := rec.UUID.String()
|
|
|
|
|
|
if uuids[uuidStr] {
|
|
|
|
|
|
t.Errorf("发现重复的 UUID: %s", uuidStr)
|
|
|
|
|
|
}
|
|
|
|
|
|
uuids[uuidStr] = true
|
|
|
|
|
|
|
|
|
|
|
|
// 验证数据内容
|
|
|
|
|
|
if string(rec.Data) != "same content" {
|
|
|
|
|
|
t.Errorf("第 %d 条记录数据不匹配: got %q, want %q", i, rec.Data, "same content")
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证所有 UUID 都不同
|
|
|
|
|
|
if len(uuids) != 10 {
|
|
|
|
|
|
t.Errorf("UUID 数量不正确: got %d, want 10", len(uuids))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestUUIDValidation(t *testing.T) {
|
|
|
|
|
|
tmpFile := "test_uuid_validation.log"
|
|
|
|
|
|
defer os.Remove(tmpFile)
|
|
|
|
|
|
defer os.Remove(tmpFile + ".pos")
|
|
|
|
|
|
|
|
|
|
|
|
// 写入一条正常日志
|
|
|
|
|
|
index, err := NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
writer, err := NewLogWriter(tmpFile, index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 writer 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if _, err := writer.Append([]byte("test data")); err != nil {
|
|
|
|
|
|
t.Fatalf("写入数据失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取并验证 UUID
|
2025-10-04 17:54:49 +08:00
|
|
|
|
cursor, err := NewCursor(tmpFile, index, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 cursor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer cursor.Close()
|
|
|
|
|
|
|
|
|
|
|
|
rec, err := cursor.Next()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("读取记录失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证 UUID 不为空
|
|
|
|
|
|
if rec.UUID == [16]byte{} {
|
|
|
|
|
|
t.Error("UUID 为空")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证 UUID 是有效的
|
|
|
|
|
|
uuidStr := rec.UUID.String()
|
|
|
|
|
|
if len(uuidStr) != 36 { // UUID 字符串格式:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
|
|
|
|
|
|
t.Errorf("UUID 字符串格式不正确: %s (长度 %d)", uuidStr, len(uuidStr))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
t.Logf("生成的 UUID: %s", uuidStr)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestSeqlogAutoRecovery(t *testing.T) {
|
|
|
|
|
|
tmpDir := "test_auto_recovery"
|
|
|
|
|
|
os.MkdirAll(tmpDir, 0755)
|
|
|
|
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
|
|
|
|
|
|
|
|
// 第一阶段:创建并写入日志
|
|
|
|
|
|
allLogs := make(map[string][]string)
|
|
|
|
|
|
var mu sync.Mutex
|
|
|
|
|
|
|
|
|
|
|
|
defaultHandler := func(topic string, rec *Record) error {
|
|
|
|
|
|
mu.Lock()
|
|
|
|
|
|
defer mu.Unlock()
|
|
|
|
|
|
allLogs[topic] = append(allLogs[topic], string(rec.Data))
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seqlog1 := NewLogHub(tmpDir, slog.Default(), defaultHandler)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
seqlog1.Start()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入一些日志
|
|
|
|
|
|
seqlog1.Write("app", []byte("app log 1"))
|
|
|
|
|
|
seqlog1.Write("app", []byte("app log 2"))
|
|
|
|
|
|
seqlog1.Write("access", []byte("access log 1"))
|
|
|
|
|
|
seqlog1.Write("error", []byte("error log 1"))
|
|
|
|
|
|
|
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
seqlog1.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 验证第一阶段写入的日志
|
|
|
|
|
|
mu.Lock()
|
|
|
|
|
|
if len(allLogs["app"]) != 2 {
|
|
|
|
|
|
t.Errorf("第一阶段 app logs 数量不正确: got %d, want 2", len(allLogs["app"]))
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(allLogs["access"]) != 1 {
|
|
|
|
|
|
t.Errorf("第一阶段 access logs 数量不正确: got %d, want 1", len(allLogs["access"]))
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(allLogs["error"]) != 1 {
|
|
|
|
|
|
t.Errorf("第一阶段 error logs 数量不正确: got %d, want 1", len(allLogs["error"]))
|
|
|
|
|
|
}
|
|
|
|
|
|
mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 清空日志计数,模拟重启
|
|
|
|
|
|
mu.Lock()
|
|
|
|
|
|
allLogs = make(map[string][]string)
|
|
|
|
|
|
mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 第二阶段:重启并自动恢复
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seqlog2 := NewLogHub(tmpDir, slog.Default(), defaultHandler)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
seqlog2.Start()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入新日志
|
|
|
|
|
|
seqlog2.Write("app", []byte("app log 3"))
|
|
|
|
|
|
seqlog2.Write("access", []byte("access log 2"))
|
|
|
|
|
|
seqlog2.Write("new_topic", []byte("new topic log 1"))
|
|
|
|
|
|
|
|
|
|
|
|
time.Sleep(300 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 验证恢复后的处理
|
|
|
|
|
|
mu.Lock()
|
|
|
|
|
|
// 应该只处理新写入的日志(因为游标已保存位置)
|
|
|
|
|
|
if len(allLogs["app"]) != 1 {
|
|
|
|
|
|
t.Errorf("恢复后 app logs 数量不正确: got %d, want 1 (只有新日志)", len(allLogs["app"]))
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(allLogs["access"]) != 1 {
|
|
|
|
|
|
t.Errorf("恢复后 access logs 数量不正确: got %d, want 1 (只有新日志)", len(allLogs["access"]))
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(allLogs["new_topic"]) != 1 {
|
|
|
|
|
|
t.Errorf("恢复后 new_topic logs 数量不正确: got %d, want 1", len(allLogs["new_topic"]))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证日志内容
|
|
|
|
|
|
if allLogs["app"][0] != "app log 3" {
|
|
|
|
|
|
t.Errorf("app 日志内容不正确: got %q, want %q", allLogs["app"][0], "app log 3")
|
|
|
|
|
|
}
|
|
|
|
|
|
mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
seqlog2.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 验证自动发现的 topic
|
|
|
|
|
|
topics := seqlog2.GetTopics()
|
|
|
|
|
|
expectedTopics := map[string]bool{
|
|
|
|
|
|
"app": true,
|
|
|
|
|
|
"access": true,
|
|
|
|
|
|
"error": true,
|
|
|
|
|
|
"new_topic": true,
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(topics) != len(expectedTopics) {
|
|
|
|
|
|
t.Errorf("topic 数量不正确: got %d, want %d", len(topics), len(expectedTopics))
|
|
|
|
|
|
}
|
|
|
|
|
|
for _, topic := range topics {
|
|
|
|
|
|
if !expectedTopics[topic] {
|
|
|
|
|
|
t.Errorf("发现未预期的 topic: %s", topic)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestTopicProcessorClose(t *testing.T) {
|
|
|
|
|
|
tmpDir := "test_processor_close"
|
|
|
|
|
|
os.MkdirAll(tmpDir, 0755)
|
|
|
|
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
|
|
|
|
|
|
|
|
// 创建 processor(提供空 handler)
|
|
|
|
|
|
logger := slog.Default()
|
|
|
|
|
|
processor, err := NewTopicProcessor(tmpDir, "test", logger, &TopicConfig{
|
|
|
|
|
|
Handler: func(rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
},
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 processor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 写入一些数据
|
|
|
|
|
|
_, err = processor.Write([]byte("test data 1"))
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("写入失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
_, err = processor.Write([]byte("test data 2"))
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("写入失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 停止 processor
|
|
|
|
|
|
if err := processor.Stop(); err != nil {
|
|
|
|
|
|
t.Fatalf("停止失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 清理资源
|
|
|
|
|
|
if err := processor.Close(); err != nil {
|
|
|
|
|
|
t.Fatalf("清理失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证 writer 已被清理(设置为 nil)
|
|
|
|
|
|
processor.mu.RLock()
|
|
|
|
|
|
if processor.writer != nil {
|
|
|
|
|
|
t.Error("writer 应该被清理为 nil")
|
|
|
|
|
|
}
|
|
|
|
|
|
processor.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 再次调用 Close 应该不报错
|
|
|
|
|
|
if err := processor.Close(); err != nil {
|
|
|
|
|
|
t.Errorf("重复调用 Close 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestSeqlogCleanup(t *testing.T) {
|
|
|
|
|
|
tmpDir := "test_seqlog_cleanup"
|
|
|
|
|
|
os.MkdirAll(tmpDir, 0755)
|
|
|
|
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
seqlog.Start()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入多个 topic 的日志
|
|
|
|
|
|
topics := []string{"app", "access", "error"}
|
|
|
|
|
|
for _, topic := range topics {
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("%s log %d", topic, i))
|
|
|
|
|
|
if _, err := seqlog.Write(topic, data); err != nil {
|
|
|
|
|
|
t.Fatalf("写入失败 [topic=%s]: %v", topic, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 停止并清理
|
|
|
|
|
|
if err := seqlog.Stop(); err != nil {
|
|
|
|
|
|
t.Fatalf("停止失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证所有 processor 都被清理
|
|
|
|
|
|
seqlog.mu.RLock()
|
|
|
|
|
|
for topic, processor := range seqlog.processors {
|
|
|
|
|
|
processor.mu.RLock()
|
|
|
|
|
|
if processor.writer != nil {
|
|
|
|
|
|
t.Errorf("topic %s 的 writer 未被清理", topic)
|
|
|
|
|
|
}
|
|
|
|
|
|
processor.mu.RUnlock()
|
|
|
|
|
|
}
|
|
|
|
|
|
seqlog.mu.RUnlock()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestTopicStats 测试 TopicStats 的基本功能
|
|
|
|
|
|
func TestTopicStats(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
statsPath := filepath.Join(tmpDir, "test.stats")
|
|
|
|
|
|
|
|
|
|
|
|
// 创建统计管理器
|
|
|
|
|
|
stats := NewTopicStats(statsPath)
|
|
|
|
|
|
|
|
|
|
|
|
// 测试写入统计
|
|
|
|
|
|
stats.IncWrite(100)
|
|
|
|
|
|
stats.IncWrite(200)
|
|
|
|
|
|
|
|
|
|
|
|
s := stats.Get()
|
|
|
|
|
|
if s.WriteCount != 2 {
|
|
|
|
|
|
t.Errorf("expected write count 2, got %d", s.WriteCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
if s.WriteBytes != 300 {
|
|
|
|
|
|
t.Errorf("expected write bytes 300, got %d", s.WriteBytes)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试处理统计
|
|
|
|
|
|
stats.IncProcessed(50)
|
|
|
|
|
|
stats.IncProcessed(150)
|
|
|
|
|
|
|
|
|
|
|
|
s = stats.Get()
|
|
|
|
|
|
if s.ProcessedCount != 2 {
|
|
|
|
|
|
t.Errorf("expected processed count 2, got %d", s.ProcessedCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
if s.ProcessedBytes != 200 {
|
|
|
|
|
|
t.Errorf("expected processed bytes 200, got %d", s.ProcessedBytes)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试错误统计
|
|
|
|
|
|
stats.IncError()
|
|
|
|
|
|
stats.IncError()
|
|
|
|
|
|
stats.IncError()
|
|
|
|
|
|
|
|
|
|
|
|
s = stats.Get()
|
|
|
|
|
|
if s.ErrorCount != 3 {
|
|
|
|
|
|
t.Errorf("expected error count 3, got %d", s.ErrorCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试持久化
|
|
|
|
|
|
if err := stats.Save(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to save stats: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 创建新的统计管理器,应该能恢复数据
|
|
|
|
|
|
stats2 := NewTopicStats(statsPath)
|
|
|
|
|
|
s2 := stats2.Get()
|
|
|
|
|
|
|
|
|
|
|
|
if s2.WriteCount != s.WriteCount {
|
|
|
|
|
|
t.Errorf("recovered write count mismatch: expected %d, got %d", s.WriteCount, s2.WriteCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
if s2.WriteBytes != s.WriteBytes {
|
|
|
|
|
|
t.Errorf("recovered write bytes mismatch: expected %d, got %d", s.WriteBytes, s2.WriteBytes)
|
|
|
|
|
|
}
|
|
|
|
|
|
if s2.ProcessedCount != s.ProcessedCount {
|
|
|
|
|
|
t.Errorf("recovered processed count mismatch: expected %d, got %d", s.ProcessedCount, s2.ProcessedCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
if s2.ProcessedBytes != s.ProcessedBytes {
|
|
|
|
|
|
t.Errorf("recovered processed bytes mismatch: expected %d, got %d", s.ProcessedBytes, s2.ProcessedBytes)
|
|
|
|
|
|
}
|
|
|
|
|
|
if s2.ErrorCount != s.ErrorCount {
|
|
|
|
|
|
t.Errorf("recovered error count mismatch: expected %d, got %d", s.ErrorCount, s2.ErrorCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestTopicProcessorStats 测试 TopicProcessor 的统计功能
|
|
|
|
|
|
func TestTopicProcessorStats(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
topic := "stats_test"
|
|
|
|
|
|
|
|
|
|
|
|
// 创建 processor
|
|
|
|
|
|
logger := slog.Default()
|
|
|
|
|
|
config := &TopicConfig{
|
|
|
|
|
|
Handler: func(rec *Record) error {
|
|
|
|
|
|
// 简单处理函数
|
|
|
|
|
|
return nil
|
|
|
|
|
|
},
|
|
|
|
|
|
}
|
|
|
|
|
|
processor, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 processor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 启动 processor
|
|
|
|
|
|
if err := processor.Start(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to start processor: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer func() {
|
|
|
|
|
|
processor.Stop()
|
|
|
|
|
|
processor.Close()
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入一些数据
|
|
|
|
|
|
data1 := []byte("test message 1")
|
|
|
|
|
|
data2 := []byte("test message 2")
|
|
|
|
|
|
data3 := []byte("test message 3")
|
|
|
|
|
|
|
|
|
|
|
|
if _, err := processor.Write(data1); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if _, err := processor.Write(data2); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if _, err := processor.Write(data3); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理完成
|
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 检查统计信息
|
|
|
|
|
|
stats := processor.GetStats()
|
|
|
|
|
|
|
|
|
|
|
|
if stats.WriteCount != 3 {
|
|
|
|
|
|
t.Errorf("expected write count 3, got %d", stats.WriteCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
expectedBytes := int64(len(data1) + len(data2) + len(data3))
|
|
|
|
|
|
if stats.WriteBytes != expectedBytes {
|
|
|
|
|
|
t.Errorf("expected write bytes %d, got %d", expectedBytes, stats.WriteBytes)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if stats.ProcessedCount != 3 {
|
|
|
|
|
|
t.Errorf("expected processed count 3, got %d", stats.ProcessedCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
if stats.ProcessedBytes != expectedBytes {
|
|
|
|
|
|
t.Errorf("expected processed bytes %d, got %d", expectedBytes, stats.ProcessedBytes)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestStatsRecovery 测试统计信息的恢复功能
|
|
|
|
|
|
func TestStatsRecovery(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
topic := "recovery_test"
|
|
|
|
|
|
|
|
|
|
|
|
// 第一次运行:写入数据
|
|
|
|
|
|
logger := slog.Default()
|
|
|
|
|
|
config := &TopicConfig{
|
|
|
|
|
|
Handler: func(rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
},
|
|
|
|
|
|
}
|
|
|
|
|
|
processor1, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 processor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if err := processor1.Start(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to start processor: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 写入数据
|
|
|
|
|
|
for i := 0; i < 5; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("message %d", i))
|
|
|
|
|
|
if _, err := processor1.Write(data); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 获取统计
|
|
|
|
|
|
stats1 := processor1.GetStats()
|
|
|
|
|
|
|
|
|
|
|
|
// 停止并保存
|
|
|
|
|
|
if err := processor1.Stop(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to stop processor: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if err := processor1.Close(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to close processor: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 第二次运行:应该能恢复统计信息
|
|
|
|
|
|
processor2, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 processor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
stats2 := processor2.GetStats()
|
|
|
|
|
|
|
|
|
|
|
|
// 验证恢复的统计信息
|
|
|
|
|
|
if stats2.WriteCount != stats1.WriteCount {
|
|
|
|
|
|
t.Errorf("write count mismatch: expected %d, got %d", stats1.WriteCount, stats2.WriteCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
if stats2.WriteBytes != stats1.WriteBytes {
|
|
|
|
|
|
t.Errorf("write bytes mismatch: expected %d, got %d", stats1.WriteBytes, stats2.WriteBytes)
|
|
|
|
|
|
}
|
|
|
|
|
|
if stats2.ProcessedCount != stats1.ProcessedCount {
|
|
|
|
|
|
t.Errorf("processed count mismatch: expected %d, got %d", stats1.ProcessedCount, stats2.ProcessedCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
if stats2.ProcessedBytes != stats1.ProcessedBytes {
|
|
|
|
|
|
t.Errorf("processed bytes mismatch: expected %d, got %d", stats1.ProcessedBytes, stats2.ProcessedBytes)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
processor2.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestSeqlogStats 测试 Seqlog 层面的统计聚合
|
|
|
|
|
|
func TestSeqlogStats(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
|
|
handler := func(topic string, rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seq := NewLogHub(tmpDir, slog.Default(), handler)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err := seq.Start(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to start seqlog: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer seq.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入多个 topic
|
|
|
|
|
|
topics := []string{"app", "sys", "audit"}
|
|
|
|
|
|
for _, topic := range topics {
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("%s message %d", topic, i))
|
|
|
|
|
|
if _, err := seq.Write(topic, data); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write to %s: %v", topic, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(300 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 检查单个 topic 统计
|
|
|
|
|
|
for _, topic := range topics {
|
|
|
|
|
|
stats, err := seq.GetTopicStats(topic)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Errorf("failed to get stats for %s: %v", topic, err)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
if stats.WriteCount != 3 {
|
|
|
|
|
|
t.Errorf("%s: expected write count 3, got %d", topic, stats.WriteCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查所有统计
|
|
|
|
|
|
allStats := seq.GetAllStats()
|
|
|
|
|
|
if len(allStats) != len(topics) {
|
|
|
|
|
|
t.Errorf("expected %d topics, got %d", len(topics), len(allStats))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, topic := range topics {
|
|
|
|
|
|
stats, exists := allStats[topic]
|
|
|
|
|
|
if !exists {
|
|
|
|
|
|
t.Errorf("stats for %s not found", topic)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
if stats.WriteCount != 3 {
|
|
|
|
|
|
t.Errorf("%s: expected write count 3, got %d", topic, stats.WriteCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestRecordQuery 测试记录查询功能
|
|
|
|
|
|
func TestRecordQuery(t *testing.T) {
|
|
|
|
|
|
tmpFile := "test_query.log"
|
|
|
|
|
|
defer os.Remove(tmpFile)
|
|
|
|
|
|
defer os.Remove(tmpFile + ".pos")
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
index, err := NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
writer, err := NewLogWriter(tmpFile, index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to create writer: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
messages := []string{
|
|
|
|
|
|
"message 0",
|
|
|
|
|
|
"message 1",
|
|
|
|
|
|
"message 2",
|
|
|
|
|
|
"message 3",
|
|
|
|
|
|
"message 4",
|
|
|
|
|
|
"message 5",
|
|
|
|
|
|
"message 6",
|
|
|
|
|
|
"message 7",
|
|
|
|
|
|
"message 8",
|
|
|
|
|
|
"message 9",
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
offsets := make([]int64, len(messages))
|
|
|
|
|
|
for i, msg := range messages {
|
|
|
|
|
|
offset, err := writer.Append([]byte(msg))
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write message %d: %v", i, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
offsets[i] = offset
|
|
|
|
|
|
}
|
2025-10-04 17:54:49 +08:00
|
|
|
|
defer writer.Close()
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
|
|
// 模拟处理到第 5 条记录
|
|
|
|
|
|
// 窗口范围:[索引 5, 索引 6)
|
|
|
|
|
|
startIdx := 5
|
|
|
|
|
|
endIdx := 6
|
|
|
|
|
|
|
|
|
|
|
|
// 创建索引
|
|
|
|
|
|
index, err = NewRecordIndex(tmpFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to create index: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer index.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 创建查询器
|
2025-10-04 17:54:49 +08:00
|
|
|
|
query, err := NewRecordQuery(tmpFile, index, writer)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to create query: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer query.Close()
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 测试查询当前位置(使用 QueryNewest 查询 startIdx)
|
|
|
|
|
|
current, err := query.QueryNewest(startIdx-1, 1)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to query current: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(current) != 1 {
|
|
|
|
|
|
t.Fatalf("expected 1 current result, got %d", len(current))
|
|
|
|
|
|
}
|
2025-10-04 13:26:21 +08:00
|
|
|
|
if string(current[0].Record.Data) != "message 5" {
|
|
|
|
|
|
t.Errorf("expected current 'message 5', got '%s'", string(current[0].Record.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
if current[0].Index != startIdx {
|
|
|
|
|
|
t.Errorf("expected index %d, got %d", startIdx, current[0].Index)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
2025-10-04 00:10:14 +08:00
|
|
|
|
// 手动判断状态
|
|
|
|
|
|
status := GetRecordStatus(startIdx, startIdx, endIdx)
|
|
|
|
|
|
if status != StatusProcessing {
|
|
|
|
|
|
t.Errorf("expected status Processing, got %s", status)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 测试 QueryOldest:查询更早的记录(向索引递减方向)
|
|
|
|
|
|
// QueryOldest(5, 3) 查询索引 2, 3, 4
|
|
|
|
|
|
backResults, err := query.QueryOldest(startIdx, 3)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to query backward: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(backResults) != 3 {
|
|
|
|
|
|
t.Errorf("expected 3 backward results, got %d", len(backResults))
|
|
|
|
|
|
}
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 返回按索引递增排序的结果:2, 3, 4
|
2025-10-04 11:55:44 +08:00
|
|
|
|
expectedBack := []string{"message 2", "message 3", "message 4"}
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
for i, rec := range backResults {
|
2025-10-04 13:26:21 +08:00
|
|
|
|
if string(rec.Record.Data) != expectedBack[i] {
|
|
|
|
|
|
t.Errorf("backward[%d]: expected '%s', got '%s'", i, expectedBack[i], string(rec.Record.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
expectedIndex := startIdx - 3 + i
|
|
|
|
|
|
if rec.Index != expectedIndex {
|
|
|
|
|
|
t.Errorf("backward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 手动判断状态:索引 2, 3, 4 都已处理
|
|
|
|
|
|
recStatus := GetRecordStatus(rec.Index, startIdx, endIdx)
|
2025-10-04 00:10:14 +08:00
|
|
|
|
if recStatus != StatusProcessed {
|
|
|
|
|
|
t.Errorf("backward[%d]: expected status Processed, got %s", i, recStatus)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 测试 QueryNewest:查询更新的记录(向索引递增方向)
|
|
|
|
|
|
// QueryNewest(endIdx, 3) 从 endIdx 向后查询,查询索引 6, 7, 8
|
|
|
|
|
|
forwardResults, err := query.QueryNewest(endIdx-1, 3)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to query forward: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(forwardResults) != 3 {
|
|
|
|
|
|
t.Errorf("expected 3 forward results, got %d", len(forwardResults))
|
|
|
|
|
|
}
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 返回按索引递增排序的结果:6, 7, 8
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
expectedForward := []string{"message 6", "message 7", "message 8"}
|
|
|
|
|
|
for i, rec := range forwardResults {
|
2025-10-04 13:26:21 +08:00
|
|
|
|
if string(rec.Record.Data) != expectedForward[i] {
|
|
|
|
|
|
t.Errorf("forward[%d]: expected '%s', got '%s'", i, expectedForward[i], string(rec.Record.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
expectedIndex := endIdx + i
|
|
|
|
|
|
if rec.Index != expectedIndex {
|
|
|
|
|
|
t.Errorf("forward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 手动判断状态:索引 6, 7, 8 待处理
|
|
|
|
|
|
recStatus := GetRecordStatus(rec.Index, startIdx, endIdx)
|
2025-10-04 00:10:14 +08:00
|
|
|
|
if recStatus != StatusPending {
|
|
|
|
|
|
t.Errorf("forward[%d]: expected status Pending, got %s", i, recStatus)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestTopicQuery 测试 TopicProcessor 的查询功能
|
|
|
|
|
|
func TestTopicQuery(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
topic := "query_test"
|
|
|
|
|
|
|
|
|
|
|
|
logger := slog.Default()
|
|
|
|
|
|
processedCount := 0
|
|
|
|
|
|
config := &TopicConfig{
|
|
|
|
|
|
Handler: func(rec *Record) error {
|
|
|
|
|
|
processedCount++
|
|
|
|
|
|
// 只处理前 3 条
|
|
|
|
|
|
if processedCount >= 3 {
|
|
|
|
|
|
return fmt.Errorf("stop processing")
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
},
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
processor, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 processor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if err := processor.Start(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to start processor: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 写入 10 条消息
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("message %d", i))
|
|
|
|
|
|
if _, err := processor.Write(data); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(300 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
processor.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 获取当前处理索引
|
|
|
|
|
|
startIdx := processor.GetProcessingIndex()
|
|
|
|
|
|
endIdx := processor.GetReadIndex()
|
|
|
|
|
|
t.Logf("Processing index: [%d, %d)", startIdx, endIdx)
|
|
|
|
|
|
|
2025-10-04 00:10:14 +08:00
|
|
|
|
// 测试查询当前位置(使用 processor 方法,带状态)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if startIdx < endIdx {
|
2025-10-04 00:10:14 +08:00
|
|
|
|
current, err := processor.QueryOldest(startIdx, 1)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to query current: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(current) > 0 {
|
|
|
|
|
|
t.Logf("Current: %s - %s", string(current[0].Record.Data), current[0].Status)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试向后查询
|
|
|
|
|
|
if startIdx > 0 {
|
2025-10-04 00:10:14 +08:00
|
|
|
|
back, err := processor.QueryNewest(startIdx-1, 2)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to query backward: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
for i, rec := range back {
|
|
|
|
|
|
t.Logf("Back[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试向前查询
|
2025-10-04 00:10:14 +08:00
|
|
|
|
if startIdx < processor.GetRecordCount() {
|
|
|
|
|
|
forward, err := processor.QueryOldest(endIdx, 3)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to query forward: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
for i, rec := range forward {
|
|
|
|
|
|
t.Logf("Forward[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
processor.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestSeqlogQuery 测试 Seqlog 层面的查询功能
|
|
|
|
|
|
func TestSeqlogQuery(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
|
|
processedCount := 0
|
|
|
|
|
|
handler := func(topic string, rec *Record) error {
|
|
|
|
|
|
processedCount++
|
|
|
|
|
|
// 只处理前 5 条
|
|
|
|
|
|
if processedCount >= 5 {
|
|
|
|
|
|
return fmt.Errorf("stop processing")
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seq := NewLogHub(tmpDir, slog.Default(), handler)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err := seq.Start(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to start seqlog: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 写入消息
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("app message %d", i))
|
|
|
|
|
|
if _, err := seq.Write("app", data); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(300 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 创建查询器
|
|
|
|
|
|
query, err := seq.NewTopicQuery("app")
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to create query: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer query.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 获取当前处理索引
|
|
|
|
|
|
startIdx := seq.GetProcessingIndex("app")
|
|
|
|
|
|
endIdx := seq.GetReadIndex("app")
|
|
|
|
|
|
t.Logf("Processing index: [%d, %d)", startIdx, endIdx)
|
|
|
|
|
|
|
2025-10-04 00:10:14 +08:00
|
|
|
|
// 获取 processor 用于查询(带状态)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
processor, _ := seq.GetProcessor("app")
|
2025-10-04 13:26:21 +08:00
|
|
|
|
totalCount := processor.GetRecordCount()
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
|
|
// 测试查询当前
|
|
|
|
|
|
if startIdx < endIdx {
|
2025-10-04 00:10:14 +08:00
|
|
|
|
current, err := processor.QueryOldest(startIdx, 1)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to query current: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(current) > 0 {
|
|
|
|
|
|
t.Logf("Current: %s, Status: %s", string(current[0].Record.Data), current[0].Status)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试向后查询
|
|
|
|
|
|
if startIdx > 0 {
|
2025-10-04 00:10:14 +08:00
|
|
|
|
back, err := processor.QueryNewest(startIdx-1, 2)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to query backward: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
for i, rec := range back {
|
|
|
|
|
|
t.Logf("Back[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试向前查询
|
2025-10-04 13:26:21 +08:00
|
|
|
|
if startIdx < totalCount {
|
2025-10-04 00:10:14 +08:00
|
|
|
|
forward, err := processor.QueryOldest(endIdx, 3)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("failed to query forward: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
for i, rec := range forward {
|
|
|
|
|
|
t.Logf("Forward[%d]: %s - %s", i, string(rec.Record.Data), rec.Status)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
seq.Stop()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestEventNotification 测试事件通知功能
|
|
|
|
|
|
func TestEventNotification(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
topic := "event_test"
|
|
|
|
|
|
|
|
|
|
|
|
logger := slog.Default()
|
|
|
|
|
|
processedCount := 0
|
|
|
|
|
|
config := &TopicConfig{
|
|
|
|
|
|
Handler: func(rec *Record) error {
|
|
|
|
|
|
processedCount++
|
|
|
|
|
|
// 第 2 条消息返回错误
|
|
|
|
|
|
if processedCount == 2 {
|
|
|
|
|
|
return fmt.Errorf("模拟处理错误")
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
},
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
processor, err := NewTopicProcessor(tmpDir, topic, logger, config)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("创建 processor 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 记录收到的事件
|
|
|
|
|
|
events := make([]EventType, 0)
|
|
|
|
|
|
var eventsMu sync.Mutex
|
|
|
|
|
|
|
|
|
|
|
|
// 订阅所有事件
|
|
|
|
|
|
processor.SubscribeAll(func(event *Event) {
|
|
|
|
|
|
eventsMu.Lock()
|
|
|
|
|
|
events = append(events, event.Type)
|
|
|
|
|
|
t.Logf("收到事件: %s - Topic: %s", event.Type, event.Topic)
|
|
|
|
|
|
eventsMu.Unlock()
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 启动 processor
|
|
|
|
|
|
if err := processor.Start(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to start processor: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 写入 3 条消息
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("message %d", i))
|
|
|
|
|
|
if _, err := processor.Write(data); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 停止
|
|
|
|
|
|
processor.Stop()
|
|
|
|
|
|
processor.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 等待事件处理完成
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 检查事件
|
|
|
|
|
|
eventsMu.Lock()
|
|
|
|
|
|
defer eventsMu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
t.Logf("收到 %d 个事件", len(events))
|
|
|
|
|
|
|
|
|
|
|
|
// 应该有:1 个启动事件 + 3 个写入成功事件 + 1 个处理成功事件 + 1 个处理错误事件 + 1 个停止事件
|
|
|
|
|
|
expectedMinEvents := 7
|
|
|
|
|
|
if len(events) < expectedMinEvents {
|
|
|
|
|
|
t.Errorf("expected at least %d events, got %d", expectedMinEvents, len(events))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查特定事件类型
|
|
|
|
|
|
hasStart := false
|
|
|
|
|
|
hasStop := false
|
|
|
|
|
|
hasWriteSuccess := false
|
|
|
|
|
|
hasProcessSuccess := false
|
|
|
|
|
|
hasProcessError := false
|
|
|
|
|
|
|
|
|
|
|
|
for _, eventType := range events {
|
|
|
|
|
|
switch eventType {
|
|
|
|
|
|
case EventProcessorStart:
|
|
|
|
|
|
hasStart = true
|
|
|
|
|
|
case EventProcessorStop:
|
|
|
|
|
|
hasStop = true
|
|
|
|
|
|
case EventWriteSuccess:
|
|
|
|
|
|
hasWriteSuccess = true
|
|
|
|
|
|
case EventProcessSuccess:
|
|
|
|
|
|
hasProcessSuccess = true
|
|
|
|
|
|
case EventProcessError:
|
|
|
|
|
|
hasProcessError = true
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if !hasStart {
|
|
|
|
|
|
t.Error("missing ProcessorStart event")
|
|
|
|
|
|
}
|
|
|
|
|
|
if !hasStop {
|
|
|
|
|
|
t.Error("missing ProcessorStop event")
|
|
|
|
|
|
}
|
|
|
|
|
|
if !hasWriteSuccess {
|
|
|
|
|
|
t.Error("missing WriteSuccess event")
|
|
|
|
|
|
}
|
|
|
|
|
|
if !hasProcessSuccess {
|
|
|
|
|
|
t.Error("missing ProcessSuccess event")
|
|
|
|
|
|
}
|
|
|
|
|
|
if !hasProcessError {
|
|
|
|
|
|
t.Error("missing ProcessError event")
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestSeqlogEventSubscription 测试 Seqlog 层面的事件订阅
|
|
|
|
|
|
func TestSeqlogEventSubscription(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
|
|
handler := func(topic string, rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seq := NewLogHub(tmpDir, slog.Default(), handler)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err := seq.Start(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to start seqlog: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer seq.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 订阅写入成功事件
|
|
|
|
|
|
writeCount := 0
|
|
|
|
|
|
var writeMu sync.Mutex
|
|
|
|
|
|
|
|
|
|
|
|
seq.Subscribe("app", EventWriteSuccess, func(event *Event) {
|
|
|
|
|
|
writeMu.Lock()
|
|
|
|
|
|
writeCount++
|
|
|
|
|
|
t.Logf("写入成功: topic=%s, position=%d", event.Topic, event.Position)
|
|
|
|
|
|
writeMu.Unlock()
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 写入消息
|
|
|
|
|
|
for i := 0; i < 5; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("message %d", i))
|
|
|
|
|
|
if _, err := seq.Write("app", data); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待事件处理
|
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
writeMu.Lock()
|
|
|
|
|
|
if writeCount != 5 {
|
|
|
|
|
|
t.Errorf("expected 5 write events, got %d", writeCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
writeMu.Unlock()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestMultiTopicEventSubscription 测试多 topic 事件订阅
|
|
|
|
|
|
func TestMultiTopicEventSubscription(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
|
|
handler := func(topic string, rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seq := NewLogHub(tmpDir, slog.Default(), handler)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err := seq.Start(); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to start seqlog: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer seq.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 统计每个 topic 的写入事件
|
|
|
|
|
|
eventCounts := make(map[string]int)
|
|
|
|
|
|
var countMu sync.Mutex
|
|
|
|
|
|
|
|
|
|
|
|
// 为所有 topic 订阅写入成功事件
|
|
|
|
|
|
seq.SubscribeAllTopics(EventWriteSuccess, func(event *Event) {
|
|
|
|
|
|
countMu.Lock()
|
|
|
|
|
|
eventCounts[event.Topic]++
|
|
|
|
|
|
countMu.Unlock()
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 写入不同 topic
|
|
|
|
|
|
topics := []string{"app", "sys", "audit"}
|
|
|
|
|
|
for _, topic := range topics {
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("%s message %d", topic, i))
|
|
|
|
|
|
if _, err := seq.Write(topic, data); err != nil {
|
|
|
|
|
|
t.Fatalf("failed to write to %s: %v", topic, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待事件处理
|
|
|
|
|
|
time.Sleep(300 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
countMu.Lock()
|
|
|
|
|
|
defer countMu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
for _, topic := range topics {
|
|
|
|
|
|
count := eventCounts[topic]
|
|
|
|
|
|
if count != 3 {
|
|
|
|
|
|
t.Errorf("topic %s: expected 3 write events, got %d", topic, count)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestTopicReset 测试 topic 的 Reset 功能
|
|
|
|
|
|
func TestTopicReset(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
|
|
// 注册 handler
|
|
|
|
|
|
seqlog.RegisterHandler("test", func(rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
seqlog.Start()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入一些日志
|
|
|
|
|
|
for i := 0; i < 5; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("message %d", i))
|
|
|
|
|
|
if _, err := seqlog.Write("test", data); err != nil {
|
|
|
|
|
|
t.Fatalf("写入失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待处理
|
|
|
|
|
|
time.Sleep(300 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 验证统计信息
|
|
|
|
|
|
stats, err := seqlog.GetTopicStats("test")
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("获取统计失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if stats.WriteCount != 5 {
|
|
|
|
|
|
t.Errorf("期望写入 5 条,实际 %d 条", stats.WriteCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if stats.ProcessedCount != 5 {
|
|
|
|
|
|
t.Errorf("期望处理 5 条,实际 %d 条", stats.ProcessedCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查日志文件是否存在
|
|
|
|
|
|
logFile := filepath.Join(tmpDir, "test.log")
|
|
|
|
|
|
|
|
|
|
|
|
if _, err := os.Stat(logFile); os.IsNotExist(err) {
|
|
|
|
|
|
t.Error("日志文件不存在")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 使用 ResetTopic 方法
|
|
|
|
|
|
if err := seqlog.ResetTopic("test"); err != nil {
|
|
|
|
|
|
t.Fatalf("Reset 失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待重置完成
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 验证日志文件已被重置(Reset 会重新初始化组件,创建空文件)
|
|
|
|
|
|
fileInfo, err := os.Stat(logFile)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Errorf("日志文件应该存在(空文件): %v", err)
|
|
|
|
|
|
} else if fileInfo.Size() != 0 {
|
|
|
|
|
|
t.Errorf("日志文件应该是空的,但大小为 %d", fileInfo.Size())
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证统计信息已重置
|
|
|
|
|
|
stats, err = seqlog.GetTopicStats("test")
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("获取统计失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if stats.WriteCount != 0 {
|
|
|
|
|
|
t.Errorf("期望写入计数为 0,实际 %d", stats.WriteCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
if stats.ProcessedCount != 0 {
|
|
|
|
|
|
t.Errorf("期望处理计数为 0,实际 %d", stats.ProcessedCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证可以继续写入
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("new message %d", i))
|
|
|
|
|
|
if _, err := seqlog.Write("test", data); err != nil {
|
|
|
|
|
|
t.Fatalf("重置后写入失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
time.Sleep(300 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
stats, _ = seqlog.GetTopicStats("test")
|
|
|
|
|
|
if stats.WriteCount != 3 {
|
|
|
|
|
|
t.Errorf("重置后期望写入 3 条,实际 %d 条", stats.WriteCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
if stats.ProcessedCount != 3 {
|
|
|
|
|
|
t.Errorf("重置后期望处理 3 条,实际 %d 条", stats.ProcessedCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 最后停止
|
|
|
|
|
|
seqlog.Stop()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// TestTopicResetWithPendingRecords 测试当有待处理日志时 Reset 返回错误
|
|
|
|
|
|
func TestTopicResetWithPendingRecords(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
|
|
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
|
|
|
|
|
|
|
|
|
|
|
// 注册一个慢速 handler,让日志堆积
|
|
|
|
|
|
slowHandler := func(rec *Record) error {
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond) // 模拟慢速处理
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
seqlog.RegisterHandler("test", slowHandler)
|
|
|
|
|
|
seqlog.Start()
|
|
|
|
|
|
|
|
|
|
|
|
// 快速写入多条日志
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
|
data := []byte(fmt.Sprintf("message %d", i))
|
|
|
|
|
|
if _, err := seqlog.Write("test", data); err != nil {
|
|
|
|
|
|
t.Fatalf("写入失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 短暂等待,让一部分日志开始处理但不是全部
|
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
|
|
// 尝试重置,应该失败因为有待处理的日志
|
|
|
|
|
|
err := seqlog.ResetTopic("test")
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
t.Fatal("期望 Reset 失败因为有待处理的日志,但成功了")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
t.Logf("预期的错误: %v", err)
|
|
|
|
|
|
|
|
|
|
|
|
// 停止处理
|
|
|
|
|
|
seqlog.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 现在 Reset 应该成功(停止后没有待处理的日志)
|
|
|
|
|
|
processor, _ := seqlog.GetProcessor("test")
|
|
|
|
|
|
if err := processor.Reset(); err != nil {
|
|
|
|
|
|
t.Fatalf("停止后 Reset 应该成功: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// TestQueryOldestNewest 测试 QueryOldest 和 QueryNewest
|
|
|
|
|
|
func TestQueryOldestNewest(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
|
|
// 创建 TopicProcessor(提供空 handler)
|
|
|
|
|
|
processor, err := NewTopicProcessor(tmpDir, "test", nil, &TopicConfig{
|
|
|
|
|
|
Handler: func(rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
},
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer processor.Close()
|
2025-10-04 17:54:49 +08:00
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
|
data := fmt.Sprintf("message %d", i)
|
|
|
|
|
|
if _, err := processor.Write([]byte(data)); err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-10-04 17:54:49 +08:00
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 测试 QueryNewest - 查询索引 0, 1, 2(向索引递增方向)
|
|
|
|
|
|
// QueryNewest(-1, 3) 从 -1 向后查询,得到索引 0, 1, 2
|
|
|
|
|
|
oldest, err := processor.QueryNewest(-1, 3)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
2025-10-04 13:26:21 +08:00
|
|
|
|
t.Fatalf("QueryNewest failed: %v", err)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
if len(oldest) != 3 {
|
|
|
|
|
|
t.Errorf("expected 3 records, got %d", len(oldest))
|
|
|
|
|
|
}
|
|
|
|
|
|
// 验证顺序:应该是 0, 1, 2
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
expected := fmt.Sprintf("message %d", i)
|
|
|
|
|
|
if string(oldest[i].Record.Data) != expected {
|
|
|
|
|
|
t.Errorf("oldest[%d]: expected %s, got %s", i, expected, string(oldest[i].Record.Data))
|
|
|
|
|
|
}
|
2025-10-04 13:26:21 +08:00
|
|
|
|
if oldest[i].Index != i {
|
|
|
|
|
|
t.Errorf("oldest[%d]: expected index %d, got %d", i, i, oldest[i].Index)
|
|
|
|
|
|
}
|
|
|
|
|
|
t.Logf("Oldest[%d]: index=%d, %s - %s", i, oldest[i].Index, string(oldest[i].Record.Data), oldest[i].Status)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 测试 QueryOldest - 查询索引 7, 8, 9(向索引递减方向)
|
|
|
|
|
|
// QueryOldest(10, 3) 从 10 向前查询,得到索引 7, 8, 9
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
totalCount := processor.GetRecordCount()
|
2025-10-04 13:26:21 +08:00
|
|
|
|
newest, err := processor.QueryOldest(totalCount, 3)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
2025-10-04 13:26:21 +08:00
|
|
|
|
t.Fatalf("QueryOldest failed: %v", err)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
if len(newest) != 3 {
|
|
|
|
|
|
t.Errorf("expected 3 records, got %d", len(newest))
|
|
|
|
|
|
}
|
2025-10-04 11:55:44 +08:00
|
|
|
|
// 验证顺序:应该是 7, 8, 9(按索引递增)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
for i := 0; i < 3; i++ {
|
2025-10-04 11:55:44 +08:00
|
|
|
|
expected := fmt.Sprintf("message %d", 7+i)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if string(newest[i].Record.Data) != expected {
|
|
|
|
|
|
t.Errorf("newest[%d]: expected %s, got %s", i, expected, string(newest[i].Record.Data))
|
|
|
|
|
|
}
|
2025-10-04 13:26:21 +08:00
|
|
|
|
if newest[i].Index != 7+i {
|
|
|
|
|
|
t.Errorf("newest[%d]: expected index %d, got %d", i, 7+i, newest[i].Index)
|
|
|
|
|
|
}
|
|
|
|
|
|
t.Logf("Newest[%d]: index=%d, %s - %s", i, newest[i].Index, string(newest[i].Record.Data), newest[i].Status)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 测试超出范围 - 查询所有记录
|
|
|
|
|
|
// QueryNewest(-1, 100) 从 -1 向后查询,会返回所有记录(最多 100 条)
|
|
|
|
|
|
all, err := processor.QueryNewest(-1, 100)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
2025-10-04 13:26:21 +08:00
|
|
|
|
t.Fatalf("QueryNewest(-1, 100) failed: %v", err)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
if len(all) != 10 {
|
|
|
|
|
|
t.Errorf("expected 10 records, got %d", len(all))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试空结果
|
|
|
|
|
|
processor2, err := NewTopicProcessor(t.TempDir(), "empty", nil, &TopicConfig{
|
|
|
|
|
|
Handler: func(rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
},
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer processor2.Close()
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
emptyNewest, err := processor2.QueryNewest(-1, 10)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
2025-10-04 13:26:21 +08:00
|
|
|
|
t.Fatalf("QueryNewest on empty failed: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(emptyNewest) != 0 {
|
|
|
|
|
|
t.Errorf("expected 0 records, got %d", len(emptyNewest))
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
2025-10-04 13:26:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestQueryFromFirstAndLast 测试 QueryFromFirst 和 QueryFromLast
|
|
|
|
|
|
func TestQueryFromFirstAndLast(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
|
|
// 创建 TopicProcessor
|
|
|
|
|
|
processor, err := NewTopicProcessor(tmpDir, "test", nil, &TopicConfig{
|
|
|
|
|
|
Handler: func(rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
},
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer processor.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入 10 条测试数据
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
|
data := fmt.Sprintf("message %d", i)
|
|
|
|
|
|
if _, err := processor.Write([]byte(data)); err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试 QueryFromFirst - 从第一条记录向索引递增方向查询
|
|
|
|
|
|
t.Run("QueryFromFirst", func(t *testing.T) {
|
|
|
|
|
|
// 查询前 3 条记录
|
|
|
|
|
|
records, err := processor.QueryFromFirst(3)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("QueryFromFirst failed: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if len(records) != 3 {
|
|
|
|
|
|
t.Fatalf("expected 3 records, got %d", len(records))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证结果:应该是索引 0, 1, 2
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
expectedData := fmt.Sprintf("message %d", i)
|
|
|
|
|
|
if string(records[i].Record.Data) != expectedData {
|
|
|
|
|
|
t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
if records[i].Index != i {
|
|
|
|
|
|
t.Errorf("records[%d]: expected index %d, got %d", i, i, records[i].Index)
|
|
|
|
|
|
}
|
|
|
|
|
|
t.Logf("FromFirst[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 查询超过总数的记录
|
|
|
|
|
|
allRecords, err := processor.QueryFromFirst(100)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("QueryFromFirst(100) failed: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(allRecords) != 10 {
|
|
|
|
|
|
t.Errorf("expected 10 records, got %d", len(allRecords))
|
|
|
|
|
|
}
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 测试 QueryFromLast - 从最后一条记录向索引递减方向查询
|
|
|
|
|
|
t.Run("QueryFromLast", func(t *testing.T) {
|
|
|
|
|
|
// 查询最后 3 条记录
|
|
|
|
|
|
records, err := processor.QueryFromLast(3)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("QueryFromLast failed: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if len(records) != 3 {
|
|
|
|
|
|
t.Fatalf("expected 3 records, got %d", len(records))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 验证结果:应该是索引 7, 8, 9(按索引递增顺序排列)
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
expectedIndex := 7 + i
|
|
|
|
|
|
expectedData := fmt.Sprintf("message %d", expectedIndex)
|
|
|
|
|
|
if string(records[i].Record.Data) != expectedData {
|
|
|
|
|
|
t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data))
|
|
|
|
|
|
}
|
|
|
|
|
|
if records[i].Index != expectedIndex {
|
|
|
|
|
|
t.Errorf("records[%d]: expected index %d, got %d", i, expectedIndex, records[i].Index)
|
|
|
|
|
|
}
|
|
|
|
|
|
t.Logf("FromLast[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 查询超过总数的记录
|
|
|
|
|
|
allRecords, err := processor.QueryFromLast(100)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("QueryFromLast(100) failed: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(allRecords) != 10 {
|
|
|
|
|
|
t.Errorf("expected 10 records, got %d", len(allRecords))
|
|
|
|
|
|
}
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 测试空数据库
|
|
|
|
|
|
t.Run("EmptyDatabase", func(t *testing.T) {
|
|
|
|
|
|
emptyProcessor, err := NewTopicProcessor(t.TempDir(), "empty", nil, &TopicConfig{
|
|
|
|
|
|
Handler: func(rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
},
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
defer emptyProcessor.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// QueryFromFirst 应该返回空数组
|
|
|
|
|
|
firstRecords, err := emptyProcessor.QueryFromFirst(10)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("QueryFromFirst on empty failed: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(firstRecords) != 0 {
|
|
|
|
|
|
t.Errorf("expected 0 records, got %d", len(firstRecords))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// QueryFromLast 应该返回空数组
|
|
|
|
|
|
lastRecords, err := emptyProcessor.QueryFromLast(10)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("QueryFromLast on empty failed: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(lastRecords) != 0 {
|
|
|
|
|
|
t.Errorf("expected 0 records, got %d", len(lastRecords))
|
|
|
|
|
|
}
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TestLogHubQueryFromFirstAndLast 测试 LogHub 的 QueryFromFirst 和 QueryFromLast
|
|
|
|
|
|
func TestLogHubQueryFromFirstAndLast(t *testing.T) {
|
|
|
|
|
|
tmpDir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
|
|
seqlog := NewLogHub(tmpDir, slog.Default(), nil)
|
|
|
|
|
|
seqlog.RegisterHandler("test", func(rec *Record) error {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
seqlog.Start()
|
|
|
|
|
|
defer seqlog.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// 写入测试数据
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
|
data := fmt.Sprintf("message %d", i)
|
|
|
|
|
|
if _, err := seqlog.Write("test", []byte(data)); err != nil {
|
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试 QueryFromFirst
|
|
|
|
|
|
firstRecords, err := seqlog.QueryFromFirst("test", 3)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("QueryFromFirst failed: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(firstRecords) != 3 {
|
|
|
|
|
|
t.Fatalf("expected 3 records, got %d", len(firstRecords))
|
|
|
|
|
|
}
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
if firstRecords[i].Index != i {
|
|
|
|
|
|
t.Errorf("firstRecords[%d]: expected index %d, got %d", i, i, firstRecords[i].Index)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试 QueryFromLast
|
|
|
|
|
|
lastRecords, err := seqlog.QueryFromLast("test", 3)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
t.Fatalf("QueryFromLast failed: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(lastRecords) != 3 {
|
|
|
|
|
|
t.Fatalf("expected 3 records, got %d", len(lastRecords))
|
|
|
|
|
|
}
|
|
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
|
|
expectedIndex := 7 + i
|
|
|
|
|
|
if lastRecords[i].Index != expectedIndex {
|
|
|
|
|
|
t.Errorf("lastRecords[%d]: expected index %d, got %d", i, expectedIndex, lastRecords[i].Index)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 测试不存在的 topic
|
|
|
|
|
|
_, err = seqlog.QueryFromFirst("nonexistent", 10)
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
t.Error("expected error for nonexistent topic")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
_, err = seqlog.QueryFromLast("nonexistent", 10)
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
t.Error("expected error for nonexistent topic")
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|