重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
package seqlog
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
)
// TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口
type TopicProcessor struct {
topic string
2025-10-04 17:54:49 +08:00
title string // 显示标题,用于 UI 展示
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
logPath string
logger * slog . Logger
// 核心组件(聚合)
2025-10-04 17:54:49 +08:00
writer * LogWriter // 写入器
index * RecordIndex // 索引管理器
query * RecordQuery // 查询器
cursor * ProcessCursor // 游标
tailer * LogTailer // 持续处理器
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// 配置和状态
handler RecordHandler
tailConfig * TailConfig
stats * TopicStats // 统计信息
eventBus * EventBus // 事件总线
// 并发控制
mu sync . RWMutex
ctx context . Context
cancel context . CancelFunc
wg sync . WaitGroup
running bool
}
// TopicConfig topic 配置
type TopicConfig struct {
2025-10-04 17:54:49 +08:00
Title string // 显示标题,可选,默认为 topic 名称
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
Handler RecordHandler // 处理函数(必填)
TailConfig * TailConfig // tail 配置,可选
}
// NewTopicProcessor 创建一个新的 topic 处理器
// 在初始化时创建所有核心组件, index 在组件间共享
// handler 为必填参数,如果 config 为 nil 或 config.Handler 为 nil 会返回错误
func NewTopicProcessor ( baseDir , topic string , logger * slog . Logger , config * TopicConfig ) ( * TopicProcessor , error ) {
// 验证必填参数
if config == nil || config . Handler == nil {
新增:统一的错误类型系统 (errors.go)
主要功能:
- 定义哨兵错误(Sentinel Errors):ErrNilParameter, ErrInvalidCount,
ErrInvalidRange, ErrAlreadyRunning, ErrNotFound, ErrCRCMismatch 等
- 实现结构化错误类型:TopicError, FileError, IndexError, ValidationError
- 提供错误检查辅助函数:IsTopicNotFound, IsIndexOutOfRange, IsCRCMismatch
- 支持 errors.Is 和 errors.As 进行错误判断
更新相关文件使用新错误类型:
- cursor.go: 使用 ValidationError 和 ErrCRCMismatch
- index.go: 使用 IndexError 处理索引越界
- query.go: 使用 ValidationError 验证参数
- seqlog_manager.go: 使用 TopicError 和 ErrAlreadyRegistered
- topic_processor.go: 使用 ErrAlreadyRunning 和 ErrInvalidConfig
测试覆盖:
- errors_test.go 提供完整的错误类型测试
- 所有现有测试继续通过
使用示例:
```go
// 检查 topic 是否存在
if IsTopicNotFound(err) {
// 处理 topic 不存在的情况
}
// 检查索引越界
if IsIndexOutOfRange(err) {
var indexErr *IndexError
errors.As(err, &indexErr)
fmt.Printf("index %d out of range\n", indexErr.Index)
}
```
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 01:56:22 +08:00
return nil , NewValidationError ( "config" , "config and config.Handler are required" , ErrInvalidConfig )
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
ctx , cancel := context . WithCancel ( context . Background ( ) )
// 默认配置
tailConfig := & TailConfig {
PollInterval : 100 * 1000000 , // 100ms
SaveInterval : 1000 * 1000000 , // 1s
}
if config . TailConfig != nil {
tailConfig = config . TailConfig
}
if logger == nil {
logger = slog . Default ( )
}
logPath := filepath . Join ( baseDir , topic + ".log" )
statsPath := filepath . Join ( baseDir , topic + ".stats" )
2025-10-04 17:54:49 +08:00
// 设置 title, 如果未提供则使用 topic 名称
title := config . Title
if title == "" {
title = topic
}
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
tp := & TopicProcessor {
topic : topic ,
2025-10-04 17:54:49 +08:00
title : title ,
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
logPath : logPath ,
logger : logger ,
handler : config . Handler ,
tailConfig : tailConfig ,
stats : NewTopicStats ( statsPath ) ,
eventBus : NewEventBus ( ) ,
ctx : ctx ,
cancel : cancel ,
}
// 初始化所有组件
if err := tp . initializeComponents ( ) ; err != nil {
cancel ( )
return nil , fmt . Errorf ( "failed to initialize components: %w" , err )
}
return tp , nil
}
// initializeComponents 初始化所有核心组件
func ( tp * TopicProcessor ) initializeComponents ( ) error {
// 1. 创建共享的索引管理器
index , err := NewRecordIndex ( tp . logPath )
if err != nil {
return fmt . Errorf ( "create index: %w" , err )
}
tp . index = index
// 2. 创建写入器(使用共享 index)
writer , err := NewLogWriter ( tp . logPath , tp . index )
if err != nil {
tp . index . Close ( )
return fmt . Errorf ( "create writer: %w" , err )
}
tp . writer = writer
// 3. 创建查询器(使用共享 index)
2025-10-04 17:54:49 +08:00
query , err := NewRecordQuery ( tp . logPath , tp . index , tp . writer )
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
if err != nil {
tp . writer . Close ( )
tp . index . Close ( )
return fmt . Errorf ( "create query: %w" , err )
}
tp . query = query
2025-10-04 17:54:49 +08:00
// 4. 创建游标(使用共享 index 和 writer)
cursor , err := NewCursor ( tp . logPath , tp . index , tp . writer )
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
if err != nil {
tp . query . Close ( )
tp . writer . Close ( )
tp . index . Close ( )
return fmt . Errorf ( "create cursor: %w" , err )
}
tp . cursor = cursor
// 5. 创建 tailer( handler 为必填,总是创建)
// 注意:只创建不启动,启动在 Start() 中进行
if err := tp . createTailer ( ) ; err != nil {
tp . cursor . Close ( )
tp . query . Close ( )
tp . writer . Close ( )
tp . index . Close ( )
return fmt . Errorf ( "create tailer: %w" , err )
}
tp . logger . Debug ( "all components initialized" )
return nil
}
// createTailer 创建 tailer( 不启动)
func ( tp * TopicProcessor ) createTailer ( ) error {
// 包装 handler, 添加统计功能和事件发布
wrappedHandler := func ( rec * Record ) error {
if err := tp . handler ( rec ) ; err != nil {
tp . stats . IncError ( )
// 发布处理错误事件
tp . eventBus . Publish ( & Event {
Type : EventProcessError ,
Topic : tp . topic ,
Timestamp : time . Now ( ) ,
Record : rec ,
Error : err ,
Position : 0 , // Position 在 tailer 模式下不可用
} )
return err
}
// 处理成功,更新统计
tp . stats . IncProcessed ( int64 ( len ( rec . Data ) ) )
// 发布处理成功事件
tp . eventBus . Publish ( & Event {
Type : EventProcessSuccess ,
Topic : tp . topic ,
Timestamp : time . Now ( ) ,
Record : rec ,
Position : 0 , // Position 在 tailer 模式下不可用
} )
return nil
}
tp . logger . Debug ( "creating tailer" )
tailer , err := NewTailer ( tp . cursor , wrappedHandler , tp . tailConfig )
if err != nil {
tp . logger . Error ( "failed to create tailer" , "error" , err )
return fmt . Errorf ( "failed to create tailer: %w" , err )
}
tp . tailer = tailer
tp . logger . Debug ( "tailer created" )
return nil
}
// Write 写入日志(统一接口)
func ( tp * TopicProcessor ) Write ( data [ ] byte ) ( int64 , error ) {
offset , err := tp . writer . Append ( data )
if err != nil {
tp . logger . Error ( "failed to append" , "error" , err )
tp . stats . IncError ( )
// 发布写入错误事件
tp . eventBus . Publish ( & Event {
Type : EventWriteError ,
Topic : tp . topic ,
Timestamp : time . Now ( ) ,
Error : err ,
} )
return 0 , err
}
// 更新统计信息
tp . stats . IncWrite ( int64 ( len ( data ) ) )
tp . logger . Debug ( "write success" , "offset" , offset , "size" , len ( data ) )
// 发布写入成功事件
tp . eventBus . Publish ( & Event {
Type : EventWriteSuccess ,
Topic : tp . topic ,
Timestamp : time . Now ( ) ,
Position : offset ,
} )
return offset , nil
}
// Start 启动 tailer( 如果已创建)
func ( tp * TopicProcessor ) Start ( ) error {
tp . mu . Lock ( )
defer tp . mu . Unlock ( )
if tp . running {
新增:统一的错误类型系统 (errors.go)
主要功能:
- 定义哨兵错误(Sentinel Errors):ErrNilParameter, ErrInvalidCount,
ErrInvalidRange, ErrAlreadyRunning, ErrNotFound, ErrCRCMismatch 等
- 实现结构化错误类型:TopicError, FileError, IndexError, ValidationError
- 提供错误检查辅助函数:IsTopicNotFound, IsIndexOutOfRange, IsCRCMismatch
- 支持 errors.Is 和 errors.As 进行错误判断
更新相关文件使用新错误类型:
- cursor.go: 使用 ValidationError 和 ErrCRCMismatch
- index.go: 使用 IndexError 处理索引越界
- query.go: 使用 ValidationError 验证参数
- seqlog_manager.go: 使用 TopicError 和 ErrAlreadyRegistered
- topic_processor.go: 使用 ErrAlreadyRunning 和 ErrInvalidConfig
测试覆盖:
- errors_test.go 提供完整的错误类型测试
- 所有现有测试继续通过
使用示例:
```go
// 检查 topic 是否存在
if IsTopicNotFound(err) {
// 处理 topic 不存在的情况
}
// 检查索引越界
if IsIndexOutOfRange(err) {
var indexErr *IndexError
errors.As(err, &indexErr)
fmt.Printf("index %d out of range\n", indexErr.Index)
}
```
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 01:56:22 +08:00
return NewTopicError ( tp . topic , "start" , ErrAlreadyRunning )
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
tp . logger . Debug ( "starting processor" )
// 重新创建 context( 如果之前被 cancel 了)
if tp . ctx . Err ( ) != nil {
tp . ctx , tp . cancel = context . WithCancel ( context . Background ( ) )
}
tp . running = true
2025-10-04 17:54:49 +08:00
// 启动定期保存统计信息的 goroutine
tp . wg . Add ( 1 )
go func ( ) {
defer tp . wg . Done ( )
ticker := time . NewTicker ( tp . tailConfig . SaveInterval )
defer ticker . Stop ( )
for {
select {
case <- tp . ctx . Done ( ) :
return
case <- ticker . C :
if err := tp . stats . Save ( ) ; err != nil {
tp . logger . Error ( "failed to save stats" , "error" , err )
}
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
2025-10-04 17:54:49 +08:00
}
} ( )
// 如果 tailer 已创建,启动它
tp . logger . Debug ( "launching tailer goroutine" )
tp . wg . Add ( 1 )
go func ( ) {
defer tp . wg . Done ( )
tp . logger . Debug ( "tailer goroutine started" )
if err := tp . tailer . Start ( tp . ctx ) ; err != nil && err != context . Canceled {
tp . logger . Error ( "tailer error" , "error" , err )
}
tp . logger . Debug ( "tailer goroutine finished" )
} ( )
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// 发布启动事件
tp . eventBus . Publish ( & Event {
Type : EventProcessorStart ,
Topic : tp . topic ,
Timestamp : time . Now ( ) ,
} )
return nil
}
// Stop 停止 tailer
func ( tp * TopicProcessor ) Stop ( ) error {
tp . mu . Lock ( )
if ! tp . running {
tp . mu . Unlock ( )
return nil
}
tp . logger . Debug ( "stopping processor" )
tp . running = false
tp . cancel ( )
tp . mu . Unlock ( )
// 等待 tailer 停止
tp . wg . Wait ( )
tp . logger . Debug ( "processor stopped" )
// 发布停止事件
tp . eventBus . Publish ( & Event {
Type : EventProcessorStop ,
Topic : tp . topic ,
Timestamp : time . Now ( ) ,
} )
return nil
}
// Topic 返回 topic 名称
func ( tp * TopicProcessor ) Topic ( ) string {
return tp . topic
}
2025-10-04 17:54:49 +08:00
// Title 返回显示标题
func ( tp * TopicProcessor ) Title ( ) string {
return tp . title
}
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// IsRunning 检查是否正在运行
func ( tp * TopicProcessor ) IsRunning ( ) bool {
tp . mu . RLock ( )
defer tp . mu . RUnlock ( )
return tp . running
}
// UpdateTailConfig 动态更新 tail 配置
func ( tp * TopicProcessor ) UpdateTailConfig ( config * TailConfig ) error {
tp . mu . Lock ( )
defer tp . mu . Unlock ( )
if config == nil {
return fmt . Errorf ( "config cannot be nil" )
}
tp . tailConfig = config
// 如果 tailer 已经在运行,更新其配置
if tp . tailer != nil {
tp . tailer . UpdateConfig ( * config )
}
return nil
}
// GetTailConfig 获取当前 tail 配置
func ( tp * TopicProcessor ) GetTailConfig ( ) * TailConfig {
tp . mu . RLock ( )
defer tp . mu . RUnlock ( )
cfg := tp . tailConfig
return cfg
}
// GetStats 获取当前统计信息
func ( tp * TopicProcessor ) GetStats ( ) Stats {
return tp . stats . Get ( )
}
// Query 获取共享的查询器
func ( tp * TopicProcessor ) Query ( ) * RecordQuery {
return tp . query
}
2025-10-04 13:32:44 +08:00
// addStatusToRecords 为记录添加状态信息(辅助方法)
func ( tp * TopicProcessor ) addStatusToRecords ( records [ ] * RecordWithIndex ) [ ] * RecordWithStatus {
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// 获取窗口索引范围(用于状态判断)
var startIdx , endIdx int
tp . mu . RLock ( )
if tp . tailer != nil {
startIdx = tp . tailer . GetStartIndex ( )
endIdx = tp . tailer . GetEndIndex ( )
}
tp . mu . RUnlock ( )
2025-10-04 00:10:14 +08:00
// 为每条记录添加状态
results := make ( [ ] * RecordWithStatus , len ( records ) )
for i , rec := range records {
results [ i ] = & RecordWithStatus {
2025-10-04 13:26:21 +08:00
Record : rec . Record ,
Index : rec . Index ,
Status : GetRecordStatus ( rec . Index , startIdx , endIdx ) ,
2025-10-04 00:10:14 +08:00
}
}
2025-10-04 13:32:44 +08:00
return results
}
2025-10-04 17:54:49 +08:00
// addStatusToMetadata 为元数据添加状态信息
func ( tp * TopicProcessor ) addStatusToMetadata ( metadata [ ] * RecordMetadata ) [ ] * RecordMetadataWithStatus {
// 获取窗口索引范围(用于状态判断)
var startIdx , endIdx int
tp . mu . RLock ( )
if tp . tailer != nil {
startIdx = tp . tailer . GetStartIndex ( )
endIdx = tp . tailer . GetEndIndex ( )
}
tp . mu . RUnlock ( )
// 为每个元数据添加状态
results := make ( [ ] * RecordMetadataWithStatus , len ( metadata ) )
for i , meta := range metadata {
results [ i ] = & RecordMetadataWithStatus {
Metadata : meta ,
Status : GetRecordStatus ( meta . Index , startIdx , endIdx ) ,
}
}
return results
}
2025-10-04 13:32:44 +08:00
// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录包含索引和状态信息,按索引递增方向排序
// 例如: QueryOldest(5, 3) 查询索引 2, 3, 4( 不包含 5)
func ( tp * TopicProcessor ) QueryOldest ( refIndex , count int ) ( [ ] * RecordWithStatus , error ) {
records , err := tp . query . QueryOldest ( refIndex , count )
if err != nil {
return nil , err
}
return tp . addStatusToRecords ( records ) , nil
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
2025-10-04 13:26:21 +08:00
// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录)
// refIndex: 参考索引位置
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// count: 查询数量
2025-10-04 13:26:21 +08:00
// 返回的记录包含索引和状态信息,按索引递增方向排序
// 例如: QueryNewest(5, 3) 查询索引 6, 7, 8( 不包含 5)
func ( tp * TopicProcessor ) QueryNewest ( refIndex , count int ) ( [ ] * RecordWithStatus , error ) {
records , err := tp . query . QueryNewest ( refIndex , count )
2025-10-04 00:10:14 +08:00
if err != nil {
return nil , err
}
2025-10-04 13:32:44 +08:00
return tp . addStatusToRecords ( records ) , nil
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
2025-10-04 17:54:49 +08:00
// QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录只包含元数据信息( 索引、UUID、数据大小) , 按索引递增方向排序
// 例如: QueryOldestMetadata(5, 3) 查询索引 2, 3, 4( 不包含 5)
func ( tp * TopicProcessor ) QueryOldestMetadata ( refIndex , count int ) ( [ ] * RecordMetadataWithStatus , error ) {
metadata , err := tp . query . QueryOldestMetadata ( refIndex , count )
if err != nil {
return nil , err
}
return tp . addStatusToMetadata ( metadata ) , nil
}
// QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录只包含元数据信息( 索引、UUID、数据大小) , 按索引递增方向排序
// 例如: QueryNewestMetadata(5, 3) 查询索引 6, 7, 8( 不包含 5)
func ( tp * TopicProcessor ) QueryNewestMetadata ( refIndex , count int ) ( [ ] * RecordMetadataWithStatus , error ) {
metadata , err := tp . query . QueryNewestMetadata ( refIndex , count )
if err != nil {
return nil , err
}
return tp . addStatusToMetadata ( metadata ) , nil
}
// QueryByIndex 根据索引查询单条记录的完整数据
// index: 记录索引
// 返回完整的记录数据,包含状态信息
func ( tp * TopicProcessor ) QueryByIndex ( index int ) ( * RecordWithStatus , error ) {
record , err := tp . query . QueryByIndex ( index )
if err != nil {
return nil , err
}
// 获取当前处理窗口位置
var startIdx , endIdx int
tp . mu . RLock ( )
if tp . tailer != nil {
startIdx = tp . tailer . GetStartIndex ( )
endIdx = tp . tailer . GetEndIndex ( )
}
tp . mu . RUnlock ( )
status := GetRecordStatus ( index , startIdx , endIdx )
return & RecordWithStatus {
Record : record ,
Index : index ,
Status : status ,
} , nil
}
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// GetRecordCount 获取记录总数(统一接口)
func ( tp * TopicProcessor ) GetRecordCount ( ) int {
return tp . index . Count ( )
}
2025-10-04 13:26:21 +08:00
// QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录
// count: 查询数量
// 返回从处理窗口开始位置( startIndex) 向后的记录, 包含状态信息
func ( tp * TopicProcessor ) QueryFromProcessing ( count int ) ( [ ] * RecordWithStatus , error ) {
// 获取当前处理窗口的开始位置
tp . mu . RLock ( )
var startIdx int
if tp . tailer != nil {
startIdx = tp . tailer . GetStartIndex ( )
}
tp . mu . RUnlock ( )
// 从 startIdx 开始向索引递增方向查询
// QueryNewest(refIndex, count) 查询从 refIndex+1 开始的记录
// 所以要从 startIdx 开始,应该调用 QueryNewest(startIdx - 1, count)
return tp . QueryNewest ( startIdx - 1 , count )
}
// QueryFromFirst 从第一条记录向索引递增方向查询
// count: 查询数量
// 返回从第一条记录(索引 0) 开始的记录, 包含状态信息
// 例如: QueryFromFirst(3) 查询索引 0, 1, 2
func ( tp * TopicProcessor ) QueryFromFirst ( count int ) ( [ ] * RecordWithStatus , error ) {
// QueryNewest(-1, count) 会从索引 0 开始向后查询
return tp . QueryNewest ( - 1 , count )
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
2025-10-04 13:26:21 +08:00
// QueryFromLast 从最后一条记录向索引递减方向查询
// count: 查询数量
// 返回从最后一条记录开始向前的记录,包含状态信息,按索引递增方向排序
// 例如:如果总共有 10 条记录, QueryFromLast(3) 查询索引 7, 8, 9
func ( tp * TopicProcessor ) QueryFromLast ( count int ) ( [ ] * RecordWithStatus , error ) {
// 获取记录总数
totalCount := tp . index . Count ( )
2025-10-04 17:54:49 +08:00
// 如果没有记录,返回空数组
if totalCount == 0 {
return [ ] * RecordWithStatus { } , nil
}
2025-10-04 13:26:21 +08:00
// QueryOldest(totalCount, count) 会从最后一条记录开始向前查询
2025-10-04 17:54:49 +08:00
// totalCount 是记录总数,有效索引是 0 到 totalCount-1
// 所以传入 totalCount 作为 refIndex, 会查询 totalCount-count 到 totalCount-1 的记录
2025-10-04 13:26:21 +08:00
return tp . QueryOldest ( totalCount , count )
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
2025-10-04 17:54:49 +08:00
// QueryFromFirstMetadata 从第一条记录向索引递增方向查询元数据
// count: 查询数量
// 返回从第一条记录(索引 0) 开始的记录元数据, 包含状态信息
// 例如: QueryFromFirstMetadata(3) 查询索引 0, 1, 2 的元数据
func ( tp * TopicProcessor ) QueryFromFirstMetadata ( count int ) ( [ ] * RecordMetadataWithStatus , error ) {
// QueryNewestMetadata(-1, count) 会从索引 0 开始向后查询
return tp . QueryNewestMetadata ( - 1 , count )
}
// QueryFromLastMetadata 从最后一条记录向索引递减方向查询元数据
// count: 查询数量
// 返回最后 N 条记录的元数据(按索引递增方向排序),包含状态信息
// 例如: QueryFromLastMetadata(3) 查询最后 3 条记录的元数据
func ( tp * TopicProcessor ) QueryFromLastMetadata ( count int ) ( [ ] * RecordMetadataWithStatus , error ) {
totalCount := tp . index . Count ( )
if totalCount == 0 {
return [ ] * RecordMetadataWithStatus { } , nil
}
// QueryOldestMetadata(totalCount, count) 会从 totalCount 向前查询 count 条
return tp . QueryOldestMetadata ( totalCount , count )
}
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// GetProcessingIndex 获取当前处理索引(窗口开始索引)
func ( tp * TopicProcessor ) GetProcessingIndex ( ) int {
tp . mu . RLock ( )
defer tp . mu . RUnlock ( )
if tp . tailer == nil {
return 0
}
return tp . tailer . GetStartIndex ( )
}
// GetReadIndex 获取当前读取索引(窗口结束索引)
func ( tp * TopicProcessor ) GetReadIndex ( ) int {
tp . mu . RLock ( )
defer tp . mu . RUnlock ( )
if tp . tailer == nil {
return 0
}
return tp . tailer . GetEndIndex ( )
}
// Subscribe 订阅事件
func ( tp * TopicProcessor ) Subscribe ( eventType EventType , listener EventListener ) {
tp . eventBus . Subscribe ( eventType , listener )
}
// SubscribeAll 订阅所有事件
func ( tp * TopicProcessor ) SubscribeAll ( listener EventListener ) {
tp . eventBus . SubscribeAll ( listener )
}
// Unsubscribe 取消订阅
func ( tp * TopicProcessor ) Unsubscribe ( eventType EventType ) {
tp . eventBus . Unsubscribe ( eventType )
}
// Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件
2025-10-04 13:26:21 +08:00
// 如果 processor 正在运行且没有待处理的日志,会自动停止
// 如果有待处理的日志,则返回错误,需要等待处理完成或手动停止
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
func ( tp * TopicProcessor ) Reset ( ) error {
tp . mu . Lock ( )
defer tp . mu . Unlock ( )
if tp . running {
2025-10-04 13:26:21 +08:00
// 检查是否有待处理的日志
processingIndex := 0
if tp . tailer != nil {
processingIndex = tp . tailer . GetStartIndex ( )
}
recordCount := tp . index . Count ( )
hasPendingRecords := processingIndex < recordCount
if hasPendingRecords {
return fmt . Errorf ( "cannot reset while processor is running with pending records (%d/%d processed), please stop first or wait for processing to complete" , processingIndex , recordCount )
}
// 没有待处理的日志,自动停止
tp . logger . Debug ( "auto-stopping processor before reset (no pending records)" )
tp . running = false
tp . cancel ( )
// 释放锁以等待 tailer 停止
tp . mu . Unlock ( )
tp . wg . Wait ( )
tp . mu . Lock ( )
// 发布停止事件
tp . eventBus . Publish ( & Event {
Type : EventProcessorStop ,
Topic : tp . topic ,
Timestamp : time . Now ( ) ,
} )
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
tp . logger . Debug ( "resetting processor" )
var errs [ ] error
// 关闭 writer( 如果还未关闭)
if tp . writer != nil {
if err := tp . writer . Close ( ) ; err != nil {
tp . logger . Error ( "failed to close writer during reset" , "error" , err )
errs = append ( errs , fmt . Errorf ( "close writer: %w" , err ) )
}
tp . writer = nil
}
// 删除日志文件
if err := os . Remove ( tp . logPath ) ; err != nil && ! os . IsNotExist ( err ) {
tp . logger . Error ( "failed to remove log file" , "error" , err )
errs = append ( errs , fmt . Errorf ( "remove log file: %w" , err ) )
}
// 删除位置文件
posFile := tp . logPath + ".pos"
if err := os . Remove ( posFile ) ; err != nil && ! os . IsNotExist ( err ) {
tp . logger . Error ( "failed to remove position file" , "error" , err )
errs = append ( errs , fmt . Errorf ( "remove position file: %w" , err ) )
}
// 删除索引文件
indexFile := tp . logPath + ".idx"
if err := os . Remove ( indexFile ) ; err != nil && ! os . IsNotExist ( err ) {
tp . logger . Error ( "failed to remove index file" , "error" , err )
errs = append ( errs , fmt . Errorf ( "remove index file: %w" , err ) )
}
// 关闭所有组件
if tp . query != nil {
tp . query . Close ( )
tp . query = nil
}
if tp . index != nil {
tp . index . Close ( )
tp . index = nil
}
// 重新初始化所有组件(已持有锁)
// 这会重新创建 index, writer, query, 如果有 handler 也会创建 tailer
if err := tp . initializeComponents ( ) ; err != nil {
tp . logger . Error ( "failed to reinitialize components" , "error" , err )
errs = append ( errs , fmt . Errorf ( "reinitialize components: %w" , err ) )
}
// 重置统计信息
if tp . stats != nil {
tp . stats . Reset ( )
}
tp . logger . Debug ( "processor reset completed" )
// 发布重置事件
tp . eventBus . Publish ( & Event {
Type : EventProcessorReset ,
Topic : tp . topic ,
Timestamp : time . Now ( ) ,
} )
// 如果有多个错误,返回第一个
if len ( errs ) > 0 {
return errs [ 0 ]
}
return nil
}
// Close 清理 processor 的所有资源
func ( tp * TopicProcessor ) Close ( ) error {
tp . mu . Lock ( )
defer tp . mu . Unlock ( )
tp . logger . Debug ( "closing processor" )
var errs [ ] error
// 保存统计信息
if tp . stats != nil {
if err := tp . stats . Save ( ) ; err != nil {
tp . logger . Error ( "failed to save stats" , "error" , err )
errs = append ( errs , fmt . Errorf ( "save stats: %w" , err ) )
}
}
// 关闭 query
if tp . query != nil {
if err := tp . query . Close ( ) ; err != nil {
tp . logger . Error ( "failed to close query" , "error" , err )
errs = append ( errs , fmt . Errorf ( "close query: %w" , err ) )
}
tp . query = nil
}
// 关闭 cursor( 如果 tailer 未启动, cursor 可能还未关闭)
if tp . cursor != nil {
if err := tp . cursor . Close ( ) ; err != nil {
tp . logger . Error ( "failed to close cursor" , "error" , err )
errs = append ( errs , fmt . Errorf ( "close cursor: %w" , err ) )
}
tp . cursor = nil
}
// 关闭 writer
if tp . writer != nil {
if err := tp . writer . Close ( ) ; err != nil {
tp . logger . Error ( "failed to close writer" , "error" , err )
errs = append ( errs , fmt . Errorf ( "close writer: %w" , err ) )
}
tp . writer = nil
}
// 关闭 index( 最后关闭, 因为其他组件可能依赖它)
if tp . index != nil {
if err := tp . index . Close ( ) ; err != nil {
tp . logger . Error ( "failed to close index" , "error" , err )
errs = append ( errs , fmt . Errorf ( "close index: %w" , err ) )
}
tp . index = nil
}
// tailer 会通过 context cancel 和 Stop() 自动关闭
tp . tailer = nil
tp . logger . Debug ( "processor closed" )
// 如果有多个错误,返回第一个
if len ( errs ) > 0 {
return errs [ 0 ]
}
return nil
}