重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
package seqlog
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"log/slog"
|
|
|
|
|
|
"path/filepath"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
"time"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// ProcessorState 处理器状态
|
|
|
|
|
|
type ProcessorState int
|
|
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
|
StateIdle ProcessorState = iota // 空闲(未启动)
|
|
|
|
|
|
StateStarting // 启动中
|
|
|
|
|
|
StateRunning // 运行中
|
|
|
|
|
|
StateStopping // 停止中
|
|
|
|
|
|
StateStopped // 已停止
|
|
|
|
|
|
StateResetting // 重置中(阻止所有操作)
|
|
|
|
|
|
StateError // 错误状态
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// String 返回状态的字符串表示
|
|
|
|
|
|
func (s ProcessorState) String() string {
|
|
|
|
|
|
switch s {
|
|
|
|
|
|
case StateIdle:
|
|
|
|
|
|
return "idle"
|
|
|
|
|
|
case StateStarting:
|
|
|
|
|
|
return "starting"
|
|
|
|
|
|
case StateRunning:
|
|
|
|
|
|
return "running"
|
|
|
|
|
|
case StateStopping:
|
|
|
|
|
|
return "stopping"
|
|
|
|
|
|
case StateStopped:
|
|
|
|
|
|
return "stopped"
|
|
|
|
|
|
case StateResetting:
|
|
|
|
|
|
return "resetting"
|
|
|
|
|
|
case StateError:
|
|
|
|
|
|
return "error"
|
|
|
|
|
|
default:
|
|
|
|
|
|
return "unknown"
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// CanWrite 判断当前状态是否允许写入
|
|
|
|
|
|
func (s ProcessorState) CanWrite() bool {
|
|
|
|
|
|
// 允许在 Idle、Starting、Running 状态下写入
|
|
|
|
|
|
// 不允许在 Stopping、Stopped、Resetting、Error 状态下写入
|
|
|
|
|
|
return s == StateIdle || s == StateStarting || s == StateRunning
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// CanQuery 判断当前状态是否允许查询
|
|
|
|
|
|
func (s ProcessorState) CanQuery() bool {
|
|
|
|
|
|
return s != StateResetting
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// CanProcess 判断当前状态是否允许处理
|
|
|
|
|
|
func (s ProcessorState) CanProcess() bool {
|
|
|
|
|
|
return s == StateRunning
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ProcessorStatus 处理器状态信息
|
|
|
|
|
|
type ProcessorStatus struct {
|
|
|
|
|
|
State ProcessorState // 当前状态
|
|
|
|
|
|
LastUpdated time.Time // 最后更新时间
|
|
|
|
|
|
Error error // 错误信息(仅在 StateError 时有效)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口
|
|
|
|
|
|
type TopicProcessor struct {
|
|
|
|
|
|
topic string
|
2025-10-04 17:54:49 +08:00
|
|
|
|
title string // 显示标题,用于 UI 展示
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
logPath string
|
|
|
|
|
|
logger *slog.Logger
|
|
|
|
|
|
|
|
|
|
|
|
// 核心组件(聚合)
|
2025-10-04 17:54:49 +08:00
|
|
|
|
writer *LogWriter // 写入器
|
|
|
|
|
|
index *RecordIndex // 索引管理器
|
|
|
|
|
|
query *RecordQuery // 查询器
|
|
|
|
|
|
cursor *ProcessCursor // 游标
|
|
|
|
|
|
tailer *LogTailer // 持续处理器
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
|
|
// 配置和状态
|
|
|
|
|
|
handler RecordHandler
|
|
|
|
|
|
tailConfig *TailConfig
|
2025-10-04 18:56:52 +08:00
|
|
|
|
stats *TopicStats // 统计信息
|
|
|
|
|
|
eventBus *EventBus // 事件总线
|
|
|
|
|
|
status ProcessorStatus // 处理器状态
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
|
|
// 并发控制
|
2025-10-04 18:56:52 +08:00
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
|
ctx context.Context
|
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
|
wg sync.WaitGroup
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TopicConfig topic 配置
|
|
|
|
|
|
type TopicConfig struct {
|
2025-10-04 17:54:49 +08:00
|
|
|
|
Title string // 显示标题,可选,默认为 topic 名称
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
Handler RecordHandler // 处理函数(必填)
|
|
|
|
|
|
TailConfig *TailConfig // tail 配置,可选
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewTopicProcessor 创建一个新的 topic 处理器
|
|
|
|
|
|
// 在初始化时创建所有核心组件,index 在组件间共享
|
|
|
|
|
|
// handler 为必填参数,如果 config 为 nil 或 config.Handler 为 nil 会返回错误
|
|
|
|
|
|
func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *TopicConfig) (*TopicProcessor, error) {
|
|
|
|
|
|
// 验证必填参数
|
|
|
|
|
|
if config == nil || config.Handler == nil {
|
新增:统一的错误类型系统 (errors.go)
主要功能:
- 定义哨兵错误(Sentinel Errors):ErrNilParameter, ErrInvalidCount,
ErrInvalidRange, ErrAlreadyRunning, ErrNotFound, ErrCRCMismatch 等
- 实现结构化错误类型:TopicError, FileError, IndexError, ValidationError
- 提供错误检查辅助函数:IsTopicNotFound, IsIndexOutOfRange, IsCRCMismatch
- 支持 errors.Is 和 errors.As 进行错误判断
更新相关文件使用新错误类型:
- cursor.go: 使用 ValidationError 和 ErrCRCMismatch
- index.go: 使用 IndexError 处理索引越界
- query.go: 使用 ValidationError 验证参数
- seqlog_manager.go: 使用 TopicError 和 ErrAlreadyRegistered
- topic_processor.go: 使用 ErrAlreadyRunning 和 ErrInvalidConfig
测试覆盖:
- errors_test.go 提供完整的错误类型测试
- 所有现有测试继续通过
使用示例:
```go
// 检查 topic 是否存在
if IsTopicNotFound(err) {
// 处理 topic 不存在的情况
}
// 检查索引越界
if IsIndexOutOfRange(err) {
var indexErr *IndexError
errors.As(err, &indexErr)
fmt.Printf("index %d out of range\n", indexErr.Index)
}
```
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 01:56:22 +08:00
|
|
|
|
return nil, NewValidationError("config", "config and config.Handler are required", ErrInvalidConfig)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
|
|
// 默认配置
|
|
|
|
|
|
tailConfig := &TailConfig{
|
|
|
|
|
|
PollInterval: 100 * 1000000, // 100ms
|
|
|
|
|
|
SaveInterval: 1000 * 1000000, // 1s
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if config.TailConfig != nil {
|
|
|
|
|
|
tailConfig = config.TailConfig
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if logger == nil {
|
|
|
|
|
|
logger = slog.Default()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
logPath := filepath.Join(baseDir, topic+".log")
|
|
|
|
|
|
statsPath := filepath.Join(baseDir, topic+".stats")
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// 设置 title,如果未提供则使用 topic 名称
|
|
|
|
|
|
title := config.Title
|
|
|
|
|
|
if title == "" {
|
|
|
|
|
|
title = topic
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
tp := &TopicProcessor{
|
|
|
|
|
|
topic: topic,
|
2025-10-04 17:54:49 +08:00
|
|
|
|
title: title,
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
logPath: logPath,
|
|
|
|
|
|
logger: logger,
|
|
|
|
|
|
handler: config.Handler,
|
|
|
|
|
|
tailConfig: tailConfig,
|
|
|
|
|
|
stats: NewTopicStats(statsPath),
|
|
|
|
|
|
eventBus: NewEventBus(),
|
2025-10-04 18:56:52 +08:00
|
|
|
|
status: ProcessorStatus{
|
|
|
|
|
|
State: StateIdle,
|
|
|
|
|
|
LastUpdated: time.Now(),
|
|
|
|
|
|
},
|
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
|
cancel: cancel,
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 初始化所有组件
|
|
|
|
|
|
if err := tp.initializeComponents(); err != nil {
|
|
|
|
|
|
cancel()
|
|
|
|
|
|
return nil, fmt.Errorf("failed to initialize components: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return tp, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// initializeComponents 初始化所有核心组件
|
|
|
|
|
|
func (tp *TopicProcessor) initializeComponents() error {
|
|
|
|
|
|
// 1. 创建共享的索引管理器
|
|
|
|
|
|
index, err := NewRecordIndex(tp.logPath)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("create index: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.index = index
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 创建写入器(使用共享 index)
|
|
|
|
|
|
writer, err := NewLogWriter(tp.logPath, tp.index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.index.Close()
|
|
|
|
|
|
return fmt.Errorf("create writer: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.writer = writer
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 创建查询器(使用共享 index)
|
2025-10-04 17:54:49 +08:00
|
|
|
|
query, err := NewRecordQuery(tp.logPath, tp.index, tp.writer)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.writer.Close()
|
|
|
|
|
|
tp.index.Close()
|
|
|
|
|
|
return fmt.Errorf("create query: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.query = query
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// 4. 创建游标(使用共享 index 和 writer)
|
|
|
|
|
|
cursor, err := NewCursor(tp.logPath, tp.index, tp.writer)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.query.Close()
|
|
|
|
|
|
tp.writer.Close()
|
|
|
|
|
|
tp.index.Close()
|
|
|
|
|
|
return fmt.Errorf("create cursor: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.cursor = cursor
|
|
|
|
|
|
|
|
|
|
|
|
// 5. 创建 tailer(handler 为必填,总是创建)
|
|
|
|
|
|
// 注意:只创建不启动,启动在 Start() 中进行
|
|
|
|
|
|
if err := tp.createTailer(); err != nil {
|
|
|
|
|
|
tp.cursor.Close()
|
|
|
|
|
|
tp.query.Close()
|
|
|
|
|
|
tp.writer.Close()
|
|
|
|
|
|
tp.index.Close()
|
|
|
|
|
|
return fmt.Errorf("create tailer: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("all components initialized")
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// createTailer 创建 tailer(不启动)
|
|
|
|
|
|
func (tp *TopicProcessor) createTailer() error {
|
|
|
|
|
|
// 包装 handler,添加统计功能和事件发布
|
|
|
|
|
|
wrappedHandler := func(rec *Record) error {
|
|
|
|
|
|
if err := tp.handler(rec); err != nil {
|
|
|
|
|
|
tp.stats.IncError()
|
|
|
|
|
|
|
|
|
|
|
|
// 发布处理错误事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessError,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
Record: rec,
|
|
|
|
|
|
Error: err,
|
|
|
|
|
|
Position: 0, // Position 在 tailer 模式下不可用
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 处理成功,更新统计
|
|
|
|
|
|
tp.stats.IncProcessed(int64(len(rec.Data)))
|
|
|
|
|
|
|
|
|
|
|
|
// 发布处理成功事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessSuccess,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
Record: rec,
|
|
|
|
|
|
Position: 0, // Position 在 tailer 模式下不可用
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("creating tailer")
|
|
|
|
|
|
tailer, err := NewTailer(tp.cursor, wrappedHandler, tp.tailConfig)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to create tailer", "error", err)
|
|
|
|
|
|
return fmt.Errorf("failed to create tailer: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.tailer = tailer
|
|
|
|
|
|
tp.logger.Debug("tailer created")
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Write 写入日志(统一接口)
|
|
|
|
|
|
func (tp *TopicProcessor) Write(data []byte) (int64, error) {
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// 检查状态
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
state := tp.status.State
|
|
|
|
|
|
tp.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
if !state.CanWrite() {
|
|
|
|
|
|
if state == StateResetting {
|
|
|
|
|
|
return 0, ErrProcessorResetting
|
|
|
|
|
|
}
|
|
|
|
|
|
return 0, ErrNotRunning
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
offset, err := tp.writer.Append(data)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to append", "error", err)
|
|
|
|
|
|
tp.stats.IncError()
|
|
|
|
|
|
|
|
|
|
|
|
// 发布写入错误事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventWriteError,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
Error: err,
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return 0, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 更新统计信息
|
|
|
|
|
|
tp.stats.IncWrite(int64(len(data)))
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("write success", "offset", offset, "size", len(data))
|
|
|
|
|
|
|
|
|
|
|
|
// 发布写入成功事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventWriteSuccess,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
Position: offset,
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return offset, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Start 启动 tailer(如果已创建)
|
|
|
|
|
|
func (tp *TopicProcessor) Start() error {
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
defer tp.mu.Unlock()
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// 检查状态
|
|
|
|
|
|
if tp.status.State != StateIdle && tp.status.State != StateStopped {
|
|
|
|
|
|
return NewTopicError(tp.topic, "start", ErrInvalidState)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("starting processor")
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// 设置为启动中状态
|
|
|
|
|
|
tp.setState(StateStarting, nil)
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// 重新创建 context(如果之前被 cancel 了)
|
|
|
|
|
|
if tp.ctx.Err() != nil {
|
|
|
|
|
|
tp.ctx, tp.cancel = context.WithCancel(context.Background())
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// 启动定期保存统计信息的 goroutine
|
|
|
|
|
|
tp.wg.Add(1)
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
defer tp.wg.Done()
|
|
|
|
|
|
ticker := time.NewTicker(tp.tailConfig.SaveInterval)
|
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-tp.ctx.Done():
|
|
|
|
|
|
return
|
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
|
if err := tp.stats.Save(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to save stats", "error", err)
|
|
|
|
|
|
}
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
2025-10-04 17:54:49 +08:00
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
// 如果 tailer 已创建,启动它
|
|
|
|
|
|
tp.logger.Debug("launching tailer goroutine")
|
|
|
|
|
|
tp.wg.Add(1)
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
defer tp.wg.Done()
|
|
|
|
|
|
tp.logger.Debug("tailer goroutine started")
|
|
|
|
|
|
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
|
|
|
|
|
|
tp.logger.Error("tailer error", "error", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.logger.Debug("tailer goroutine finished")
|
|
|
|
|
|
}()
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// 设置为运行中状态
|
|
|
|
|
|
tp.setState(StateRunning, nil)
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// 发布启动事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessorStart,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Stop 停止 tailer
|
|
|
|
|
|
func (tp *TopicProcessor) Stop() error {
|
|
|
|
|
|
tp.mu.Lock()
|
2025-10-04 18:56:52 +08:00
|
|
|
|
if tp.status.State != StateRunning {
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
tp.mu.Unlock()
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.logger.Debug("stopping processor")
|
2025-10-04 18:56:52 +08:00
|
|
|
|
|
|
|
|
|
|
// 设置为停止中状态
|
|
|
|
|
|
tp.setState(StateStopping, nil)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
tp.cancel()
|
|
|
|
|
|
tp.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 等待 tailer 停止
|
|
|
|
|
|
tp.wg.Wait()
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("processor stopped")
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// 设置为已停止状态
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
tp.setState(StateStopped, nil)
|
|
|
|
|
|
tp.mu.Unlock()
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// 发布停止事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessorStop,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Topic 返回 topic 名称
|
|
|
|
|
|
func (tp *TopicProcessor) Topic() string {
|
|
|
|
|
|
return tp.topic
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// Title 返回显示标题
|
|
|
|
|
|
func (tp *TopicProcessor) Title() string {
|
|
|
|
|
|
return tp.title
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// GetState 获取当前状态
|
|
|
|
|
|
func (tp *TopicProcessor) GetState() ProcessorState {
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
2025-10-04 18:56:52 +08:00
|
|
|
|
return tp.status.State
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetStatus 获取完整状态信息
|
|
|
|
|
|
func (tp *TopicProcessor) GetStatus() ProcessorStatus {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
return tp.status
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// setState 设置状态(需持有锁)
|
|
|
|
|
|
func (tp *TopicProcessor) setState(state ProcessorState, err error) {
|
|
|
|
|
|
tp.status.State = state
|
|
|
|
|
|
tp.status.LastUpdated = time.Now()
|
|
|
|
|
|
tp.status.Error = err
|
|
|
|
|
|
|
|
|
|
|
|
// 发布状态变更事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventStateChanged,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
Error: err,
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// IsRunning 检查是否正在运行
|
|
|
|
|
|
func (tp *TopicProcessor) IsRunning() bool {
|
|
|
|
|
|
return tp.GetState() == StateRunning
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// UpdateTailConfig 动态更新 tail 配置
|
|
|
|
|
|
func (tp *TopicProcessor) UpdateTailConfig(config *TailConfig) error {
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
defer tp.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
if config == nil {
|
|
|
|
|
|
return fmt.Errorf("config cannot be nil")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.tailConfig = config
|
|
|
|
|
|
|
|
|
|
|
|
// 如果 tailer 已经在运行,更新其配置
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
tp.tailer.UpdateConfig(*config)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetTailConfig 获取当前 tail 配置
|
|
|
|
|
|
func (tp *TopicProcessor) GetTailConfig() *TailConfig {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
cfg := tp.tailConfig
|
|
|
|
|
|
return cfg
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetStats 获取当前统计信息
|
|
|
|
|
|
func (tp *TopicProcessor) GetStats() Stats {
|
|
|
|
|
|
return tp.stats.Get()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Query 获取共享的查询器
|
2025-10-04 18:56:52 +08:00
|
|
|
|
func (tp *TopicProcessor) Query() (*RecordQuery, error) {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
if !tp.status.State.CanQuery() {
|
|
|
|
|
|
return nil, ErrProcessorResetting
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return tp.query, nil
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:32:44 +08:00
|
|
|
|
// addStatusToRecords 为记录添加状态信息(辅助方法)
|
|
|
|
|
|
func (tp *TopicProcessor) addStatusToRecords(records []*RecordWithIndex) []*RecordWithStatus {
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// 获取窗口索引范围(用于状态判断)
|
|
|
|
|
|
var startIdx, endIdx int
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
startIdx = tp.tailer.GetStartIndex()
|
|
|
|
|
|
endIdx = tp.tailer.GetEndIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.mu.RUnlock()
|
|
|
|
|
|
|
2025-10-04 00:10:14 +08:00
|
|
|
|
// 为每条记录添加状态
|
|
|
|
|
|
results := make([]*RecordWithStatus, len(records))
|
|
|
|
|
|
for i, rec := range records {
|
|
|
|
|
|
results[i] = &RecordWithStatus{
|
2025-10-04 13:26:21 +08:00
|
|
|
|
Record: rec.Record,
|
|
|
|
|
|
Index: rec.Index,
|
|
|
|
|
|
Status: GetRecordStatus(rec.Index, startIdx, endIdx),
|
2025-10-04 00:10:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:32:44 +08:00
|
|
|
|
return results
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// addStatusToMetadata 为元数据添加状态信息
|
|
|
|
|
|
func (tp *TopicProcessor) addStatusToMetadata(metadata []*RecordMetadata) []*RecordMetadataWithStatus {
|
|
|
|
|
|
// 获取窗口索引范围(用于状态判断)
|
|
|
|
|
|
var startIdx, endIdx int
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
startIdx = tp.tailer.GetStartIndex()
|
|
|
|
|
|
endIdx = tp.tailer.GetEndIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 为每个元数据添加状态
|
|
|
|
|
|
results := make([]*RecordMetadataWithStatus, len(metadata))
|
|
|
|
|
|
for i, meta := range metadata {
|
|
|
|
|
|
results[i] = &RecordMetadataWithStatus{
|
|
|
|
|
|
Metadata: meta,
|
|
|
|
|
|
Status: GetRecordStatus(meta.Index, startIdx, endIdx),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:32:44 +08:00
|
|
|
|
// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录)
|
|
|
|
|
|
// refIndex: 参考索引位置
|
|
|
|
|
|
// count: 查询数量
|
|
|
|
|
|
// 返回的记录包含索引和状态信息,按索引递增方向排序
|
|
|
|
|
|
// 例如:QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5)
|
|
|
|
|
|
func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) {
|
|
|
|
|
|
records, err := tp.query.QueryOldest(refIndex, count)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
return tp.addStatusToRecords(records), nil
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录)
|
|
|
|
|
|
// refIndex: 参考索引位置
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// count: 查询数量
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// 返回的记录包含索引和状态信息,按索引递增方向排序
|
|
|
|
|
|
// 例如:QueryNewest(5, 3) 查询索引 6, 7, 8(不包含 5)
|
|
|
|
|
|
func (tp *TopicProcessor) QueryNewest(refIndex, count int) ([]*RecordWithStatus, error) {
|
|
|
|
|
|
records, err := tp.query.QueryNewest(refIndex, count)
|
2025-10-04 00:10:14 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2025-10-04 13:32:44 +08:00
|
|
|
|
return tp.addStatusToRecords(records), nil
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据)
|
|
|
|
|
|
// refIndex: 参考索引位置
|
|
|
|
|
|
// count: 查询数量
|
|
|
|
|
|
// 返回的记录只包含元数据信息(索引、UUID、数据大小),按索引递增方向排序
|
|
|
|
|
|
// 例如:QueryOldestMetadata(5, 3) 查询索引 2, 3, 4(不包含 5)
|
|
|
|
|
|
func (tp *TopicProcessor) QueryOldestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) {
|
|
|
|
|
|
metadata, err := tp.query.QueryOldestMetadata(refIndex, count)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
return tp.addStatusToMetadata(metadata), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据)
|
|
|
|
|
|
// refIndex: 参考索引位置
|
|
|
|
|
|
// count: 查询数量
|
|
|
|
|
|
// 返回的记录只包含元数据信息(索引、UUID、数据大小),按索引递增方向排序
|
|
|
|
|
|
// 例如:QueryNewestMetadata(5, 3) 查询索引 6, 7, 8(不包含 5)
|
|
|
|
|
|
func (tp *TopicProcessor) QueryNewestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) {
|
|
|
|
|
|
metadata, err := tp.query.QueryNewestMetadata(refIndex, count)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
return tp.addStatusToMetadata(metadata), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// QueryByIndex 根据索引查询单条记录的完整数据
|
|
|
|
|
|
// index: 记录索引
|
|
|
|
|
|
// 返回完整的记录数据,包含状态信息
|
|
|
|
|
|
func (tp *TopicProcessor) QueryByIndex(index int) (*RecordWithStatus, error) {
|
|
|
|
|
|
record, err := tp.query.QueryByIndex(index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 获取当前处理窗口位置
|
|
|
|
|
|
var startIdx, endIdx int
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
startIdx = tp.tailer.GetStartIndex()
|
|
|
|
|
|
endIdx = tp.tailer.GetEndIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
status := GetRecordStatus(index, startIdx, endIdx)
|
|
|
|
|
|
return &RecordWithStatus{
|
|
|
|
|
|
Record: record,
|
|
|
|
|
|
Index: index,
|
|
|
|
|
|
Status: status,
|
|
|
|
|
|
}, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// GetRecordCount 获取记录总数(统一接口)
|
|
|
|
|
|
func (tp *TopicProcessor) GetRecordCount() int {
|
|
|
|
|
|
return tp.index.Count()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录
|
|
|
|
|
|
// count: 查询数量
|
|
|
|
|
|
// 返回从处理窗口开始位置(startIndex)向后的记录,包含状态信息
|
|
|
|
|
|
func (tp *TopicProcessor) QueryFromProcessing(count int) ([]*RecordWithStatus, error) {
|
|
|
|
|
|
// 获取当前处理窗口的开始位置
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
var startIdx int
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
startIdx = tp.tailer.GetStartIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 从 startIdx 开始向索引递增方向查询
|
|
|
|
|
|
// QueryNewest(refIndex, count) 查询从 refIndex+1 开始的记录
|
|
|
|
|
|
// 所以要从 startIdx 开始,应该调用 QueryNewest(startIdx - 1, count)
|
|
|
|
|
|
return tp.QueryNewest(startIdx-1, count)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// QueryFromFirst 从第一条记录向索引递增方向查询
|
|
|
|
|
|
// count: 查询数量
|
|
|
|
|
|
// 返回从第一条记录(索引 0)开始的记录,包含状态信息
|
|
|
|
|
|
// 例如:QueryFromFirst(3) 查询索引 0, 1, 2
|
|
|
|
|
|
func (tp *TopicProcessor) QueryFromFirst(count int) ([]*RecordWithStatus, error) {
|
|
|
|
|
|
// QueryNewest(-1, count) 会从索引 0 开始向后查询
|
|
|
|
|
|
return tp.QueryNewest(-1, count)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// QueryFromLast 从最后一条记录向索引递减方向查询
|
|
|
|
|
|
// count: 查询数量
|
|
|
|
|
|
// 返回从最后一条记录开始向前的记录,包含状态信息,按索引递增方向排序
|
|
|
|
|
|
// 例如:如果总共有 10 条记录,QueryFromLast(3) 查询索引 7, 8, 9
|
|
|
|
|
|
func (tp *TopicProcessor) QueryFromLast(count int) ([]*RecordWithStatus, error) {
|
|
|
|
|
|
// 获取记录总数
|
|
|
|
|
|
totalCount := tp.index.Count()
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// 如果没有记录,返回空数组
|
|
|
|
|
|
if totalCount == 0 {
|
|
|
|
|
|
return []*RecordWithStatus{}, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// QueryOldest(totalCount, count) 会从最后一条记录开始向前查询
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// totalCount 是记录总数,有效索引是 0 到 totalCount-1
|
|
|
|
|
|
// 所以传入 totalCount 作为 refIndex,会查询 totalCount-count 到 totalCount-1 的记录
|
2025-10-04 13:26:21 +08:00
|
|
|
|
return tp.QueryOldest(totalCount, count)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// QueryFromFirstMetadata 从第一条记录向索引递增方向查询元数据
|
|
|
|
|
|
// count: 查询数量
|
|
|
|
|
|
// 返回从第一条记录(索引 0)开始的记录元数据,包含状态信息
|
|
|
|
|
|
// 例如:QueryFromFirstMetadata(3) 查询索引 0, 1, 2 的元数据
|
|
|
|
|
|
func (tp *TopicProcessor) QueryFromFirstMetadata(count int) ([]*RecordMetadataWithStatus, error) {
|
|
|
|
|
|
// QueryNewestMetadata(-1, count) 会从索引 0 开始向后查询
|
|
|
|
|
|
return tp.QueryNewestMetadata(-1, count)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// QueryFromLastMetadata 从最后一条记录向索引递减方向查询元数据
|
|
|
|
|
|
// count: 查询数量
|
|
|
|
|
|
// 返回最后 N 条记录的元数据(按索引递增方向排序),包含状态信息
|
|
|
|
|
|
// 例如:QueryFromLastMetadata(3) 查询最后 3 条记录的元数据
|
|
|
|
|
|
func (tp *TopicProcessor) QueryFromLastMetadata(count int) ([]*RecordMetadataWithStatus, error) {
|
|
|
|
|
|
totalCount := tp.index.Count()
|
|
|
|
|
|
if totalCount == 0 {
|
|
|
|
|
|
return []*RecordMetadataWithStatus{}, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
// QueryOldestMetadata(totalCount, count) 会从 totalCount 向前查询 count 条
|
|
|
|
|
|
return tp.QueryOldestMetadata(totalCount, count)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// GetProcessingIndex 获取当前处理索引(窗口开始索引)
|
|
|
|
|
|
func (tp *TopicProcessor) GetProcessingIndex() int {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
if tp.tailer == nil {
|
|
|
|
|
|
return 0
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return tp.tailer.GetStartIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetReadIndex 获取当前读取索引(窗口结束索引)
|
|
|
|
|
|
func (tp *TopicProcessor) GetReadIndex() int {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
if tp.tailer == nil {
|
|
|
|
|
|
return 0
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return tp.tailer.GetEndIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Subscribe 订阅事件
|
|
|
|
|
|
func (tp *TopicProcessor) Subscribe(eventType EventType, listener EventListener) {
|
|
|
|
|
|
tp.eventBus.Subscribe(eventType, listener)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SubscribeAll 订阅所有事件
|
|
|
|
|
|
func (tp *TopicProcessor) SubscribeAll(listener EventListener) {
|
|
|
|
|
|
tp.eventBus.SubscribeAll(listener)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Unsubscribe 取消订阅
|
|
|
|
|
|
func (tp *TopicProcessor) Unsubscribe(eventType EventType) {
|
|
|
|
|
|
tp.eventBus.Unsubscribe(eventType)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// CanReset 检查是否可以执行重置操作
|
|
|
|
|
|
// 返回 true 表示可以重置,false 表示不能重置(正在重置中或有待处理的记录)
|
|
|
|
|
|
func (tp *TopicProcessor) CanReset() bool {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
return tp.canResetLocked()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// canResetLocked 内部方法,检查是否可以重置(调用前必须已持有锁)
|
|
|
|
|
|
func (tp *TopicProcessor) canResetLocked() bool {
|
|
|
|
|
|
// 检查当前状态
|
|
|
|
|
|
if tp.status.State == StateResetting {
|
|
|
|
|
|
return false
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查是否有待处理的日志
|
|
|
|
|
|
processingIndex := 0
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
processingIndex = tp.tailer.GetStartIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
recordCount := 0
|
|
|
|
|
|
if tp.index != nil {
|
|
|
|
|
|
recordCount = tp.index.Count()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 只有当所有记录都已处理完成时才能重置
|
|
|
|
|
|
return processingIndex >= recordCount
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// 注意:Reset 不会停止 processor,只有在没有待处理的日志时才能执行
|
|
|
|
|
|
// 重置完成后,如果之前是 Running 状态,会自动恢复到 Running 状态
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
func (tp *TopicProcessor) Reset() error {
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
defer tp.mu.Unlock()
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// 检查是否可以执行重置操作
|
|
|
|
|
|
if !tp.canResetLocked() {
|
|
|
|
|
|
// 提供详细的错误信息
|
|
|
|
|
|
currentState := tp.status.State
|
|
|
|
|
|
if currentState == StateResetting {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 获取待处理记录信息
|
2025-10-04 13:26:21 +08:00
|
|
|
|
processingIndex := 0
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
processingIndex = tp.tailer.GetStartIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
recordCount := 0
|
|
|
|
|
|
if tp.index != nil {
|
|
|
|
|
|
recordCount = tp.index.Count()
|
2025-10-04 13:26:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
return fmt.Errorf("cannot reset with pending records (%d/%d processed), wait for processing to complete", processingIndex, recordCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 记录之前是否在运行
|
|
|
|
|
|
currentState := tp.status.State
|
|
|
|
|
|
wasRunning := currentState == StateRunning
|
|
|
|
|
|
|
|
|
|
|
|
// 进入 Resetting 状态
|
|
|
|
|
|
tp.setState(StateResetting, nil)
|
|
|
|
|
|
tp.logger.Debug("entering resetting state", "wasRunning", wasRunning, "currentState", currentState)
|
|
|
|
|
|
|
|
|
|
|
|
// 如果之前在运行,停止 goroutines
|
|
|
|
|
|
if wasRunning && tp.cancel != nil {
|
2025-10-04 13:26:21 +08:00
|
|
|
|
tp.cancel()
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// 释放锁以等待 goroutines 停止
|
2025-10-04 13:26:21 +08:00
|
|
|
|
tp.mu.Unlock()
|
|
|
|
|
|
tp.wg.Wait()
|
|
|
|
|
|
tp.mu.Lock()
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var errs []error
|
|
|
|
|
|
|
2025-10-04 21:58:54 +08:00
|
|
|
|
// 重置所有组件(保持引用关系不变)
|
|
|
|
|
|
// 1. 重置索引(会删除索引文件并重新创建)
|
|
|
|
|
|
if tp.index != nil {
|
|
|
|
|
|
if err := tp.index.Reset(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to reset index", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("reset index: %w", err))
|
2025-10-04 18:56:52 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-10-04 21:58:54 +08:00
|
|
|
|
|
|
|
|
|
|
// 2. 重置写入器(会删除日志文件并重新创建)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if tp.writer != nil {
|
2025-10-04 21:58:54 +08:00
|
|
|
|
if err := tp.writer.Reset(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to reset writer", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("reset writer: %w", err))
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
2025-10-04 18:56:52 +08:00
|
|
|
|
}
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
2025-10-04 21:58:54 +08:00
|
|
|
|
// 3. 重置游标(会删除位置文件并重置位置)
|
|
|
|
|
|
if tp.cursor != nil {
|
|
|
|
|
|
if err := tp.cursor.Reset(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to reset cursor", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("reset cursor: %w", err))
|
|
|
|
|
|
}
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 21:58:54 +08:00
|
|
|
|
// 4. 重置查询器(重新打开日志文件)
|
|
|
|
|
|
if tp.query != nil {
|
|
|
|
|
|
if err := tp.query.Reset(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to reset query", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("reset query: %w", err))
|
|
|
|
|
|
}
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 21:58:54 +08:00
|
|
|
|
// 5. 重置 tailer(重置内部状态)
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
if err := tp.tailer.Reset(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to reset tailer", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("reset tailer: %w", err))
|
2025-10-04 18:56:52 +08:00
|
|
|
|
}
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 21:58:54 +08:00
|
|
|
|
// 6. 重置统计信息(会删除统计文件并重置所有计数器)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if tp.stats != nil {
|
2025-10-04 18:56:52 +08:00
|
|
|
|
if err := tp.stats.Reset(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to reset stats", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("reset stats: %w", err))
|
|
|
|
|
|
}
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 21:58:54 +08:00
|
|
|
|
// 如果有错误,设置为错误状态
|
|
|
|
|
|
if len(errs) > 0 {
|
|
|
|
|
|
tp.setState(StateError, errs[0])
|
|
|
|
|
|
return errs[0]
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// 发布重置事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessorReset,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
})
|
|
|
|
|
|
|
2025-10-04 18:56:52 +08:00
|
|
|
|
// 如果之前在运行,恢复到 Running 状态
|
|
|
|
|
|
if wasRunning {
|
|
|
|
|
|
// 创建新的 context
|
|
|
|
|
|
tp.ctx, tp.cancel = context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
|
|
// 启动 tailer goroutine(如果有 handler)
|
|
|
|
|
|
if tp.handler != nil && tp.tailer != nil {
|
|
|
|
|
|
tp.wg.Add(1)
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
defer tp.wg.Done()
|
|
|
|
|
|
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
|
|
|
|
|
|
tp.logger.Error("tailer error after reset", "error", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 启动统计保存 goroutine
|
|
|
|
|
|
tp.wg.Add(1)
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
defer tp.wg.Done()
|
|
|
|
|
|
ticker := time.NewTicker(tp.tailConfig.SaveInterval)
|
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-tp.ctx.Done():
|
|
|
|
|
|
return
|
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
|
if tp.stats != nil {
|
|
|
|
|
|
if err := tp.stats.Save(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to save stats", "error", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
tp.setState(StateRunning, nil)
|
|
|
|
|
|
tp.logger.Debug("processor reset completed, resumed to running state")
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// 恢复到之前的状态(通常是 Idle 或 Stopped)
|
|
|
|
|
|
if currentState == StateStopped {
|
|
|
|
|
|
tp.setState(StateStopped, nil)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
tp.setState(StateIdle, nil)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.logger.Debug("processor reset completed", "newState", tp.status.State)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 如果有错误,返回第一个
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
if len(errs) > 0 {
|
|
|
|
|
|
return errs[0]
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Close 清理 processor 的所有资源
|
|
|
|
|
|
func (tp *TopicProcessor) Close() error {
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
defer tp.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("closing processor")
|
|
|
|
|
|
|
|
|
|
|
|
var errs []error
|
|
|
|
|
|
|
|
|
|
|
|
// 保存统计信息
|
|
|
|
|
|
if tp.stats != nil {
|
|
|
|
|
|
if err := tp.stats.Save(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to save stats", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("save stats: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭 query
|
|
|
|
|
|
if tp.query != nil {
|
|
|
|
|
|
if err := tp.query.Close(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to close query", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("close query: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.query = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭 cursor(如果 tailer 未启动,cursor 可能还未关闭)
|
|
|
|
|
|
if tp.cursor != nil {
|
|
|
|
|
|
if err := tp.cursor.Close(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to close cursor", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("close cursor: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.cursor = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭 writer
|
|
|
|
|
|
if tp.writer != nil {
|
|
|
|
|
|
if err := tp.writer.Close(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to close writer", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("close writer: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.writer = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭 index(最后关闭,因为其他组件可能依赖它)
|
|
|
|
|
|
if tp.index != nil {
|
|
|
|
|
|
if err := tp.index.Close(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to close index", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("close index: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.index = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// tailer 会通过 context cancel 和 Stop() 自动关闭
|
|
|
|
|
|
tp.tailer = nil
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("processor closed")
|
|
|
|
|
|
|
|
|
|
|
|
// 如果有多个错误,返回第一个
|
|
|
|
|
|
if len(errs) > 0 {
|
|
|
|
|
|
return errs[0]
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|