|
|
6fb0731935
|
重构:为核心组件实现 Reset 方法优化重置机制
为所有核心组件添加 Reset() 方法:
- LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变
- RecordIndex.Reset(): 清空索引数据并重新创建索引文件
- RecordQuery.Reset(): 关闭并重新打开日志文件
- ProcessCursor.Reset(): 删除位置文件并重置游标位置
- LogTailer.Reset(): 重置内部 channel 状态
优化 TopicProcessor.Reset() 实现:
- 不再销毁和重建组件对象
- 通过调用各组件的 Reset() 方法重置状态
- 保持组件间引用关系稳定
- 减少代码行数约 20 行
- 避免空指针风险和内存分配开销
代码改进:
- LogWriter 添加 path 字段用于重置
- 移除 topic_processor.go 中未使用的 os import
- 职责分离更清晰,每个组件管理自己的重置逻辑
测试结果:
- TestTopicReset: PASS
- TestTopicResetWithPendingRecords: PASS
- 所有 TopicProcessor 相关测试通过
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
|
2025-10-04 21:58:54 +08:00 |
|
|
|
bcc328b129
|
重构:TopicProcessor 状态管理系统与 Reset 方法优化
新增功能:
- 添加 ProcessorState 状态类型(Idle/Starting/Running/Stopping/Stopped/Resetting/Error)
- 添加 ProcessorStatus 结构体和状态管理方法(GetState/GetStatus/setState)
- 实现状态转换逻辑和访问控制(CanWrite/CanQuery)
- 新增 CanReset() 方法检查是否可执行重置操作
Reset 方法优化:
- 重写 Reset() 方法,不再停止 processor
- 只有在无待处理记录时才能执行重置
- 进入 Resetting 状态期间阻止所有读写操作
- 重置后自动恢复到之前的运行状态
- 正确关闭并重置 cursor 和 stats 组件
- 调整执行顺序:先关闭组件,再删除文件,后重新初始化
错误处理增强:
- 添加 ErrProcessorResetting 和 ErrInvalidState 错误类型
- 添加 EventStateChanged 事件类型
- 修复 writer/index 为 nil 时的空指针问题
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
|
2025-10-04 18:56:52 +08:00 |
|
|
|
810664eb12
|
重构:优化记录格式并修复核心功能
- 修改记录存储格式为 [4B len][8B offset][4B CRC][16B UUID][data]
- 修复 TopicProcessor 中 WaitGroup 使用错误导致 handler 不执行的问题
- 修复写入保护逻辑,避免 dirtyOffset=-1 时误判为写入中
- 添加统计信息定期持久化功能
- 改进 UTF-8 字符截断处理,防止 CJK 字符乱码
- 优化 Web UI:显示人类可读的文件大小,支持点击外部关闭弹窗
- 重构示例代码,添加 webui 和 webui_integration 示例
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
|
2025-10-04 17:54:49 +08:00 |
|
|
|
90cc9e21c9
|
重构:重命名核心组件并增强查询功能
主要更改:
1. 核心重命名
- Seqlog -> LogHub (更准确地反映其作为日志中枢的角色)
- NewSeqlog() -> NewLogHub()
- LogCursor -> ProcessCursor (更准确地反映其用于处理场景)
- seqlog_manager.go -> loghub.go (文件名与结构体名对应)
2. TopicProcessor.Reset 增强
- 如果正在运行且没有待处理的日志,会自动停止后重置
- 如果有待处理的日志,返回详细错误(显示已处理/总记录数)
- 简化了 LogHub.ResetTopic,移除显式 Stop 调用
3. 新增查询方法
- TopicProcessor.QueryFromFirst(count) - 从第一条记录向索引递增方向查询
- TopicProcessor.QueryFromLast(count) - 从最后一条记录向索引递减方向查询
- LogHub.QueryFromFirst(topic, count)
- LogHub.QueryFromLast(topic, count)
4. 测试覆盖
- 添加 query_test.go - QueryFromProcessing 测试
- 添加 TestQueryFromFirstAndLast - TopicProcessor 查询测试
- 添加 TestLogHubQueryFromFirstAndLast - LogHub 查询测试
- 添加 TestTopicResetWithPendingRecords - Reset 增强功能测试
5. 示例代码
- 添加 example/get_record/ - 演示 QueryFromProcessing 用法
- 更新所有示例以使用 LogHub 和新 API
所有测试通过 ✅
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
|
2025-10-04 13:26:21 +08:00 |
|
|
|
6862de12ff
|
新增:统一的错误类型系统 (errors.go)
主要功能:
- 定义哨兵错误(Sentinel Errors):ErrNilParameter, ErrInvalidCount,
ErrInvalidRange, ErrAlreadyRunning, ErrNotFound, ErrCRCMismatch 等
- 实现结构化错误类型:TopicError, FileError, IndexError, ValidationError
- 提供错误检查辅助函数:IsTopicNotFound, IsIndexOutOfRange, IsCRCMismatch
- 支持 errors.Is 和 errors.As 进行错误判断
更新相关文件使用新错误类型:
- cursor.go: 使用 ValidationError 和 ErrCRCMismatch
- index.go: 使用 IndexError 处理索引越界
- query.go: 使用 ValidationError 验证参数
- seqlog_manager.go: 使用 TopicError 和 ErrAlreadyRegistered
- topic_processor.go: 使用 ErrAlreadyRunning 和 ErrInvalidConfig
测试覆盖:
- errors_test.go 提供完整的错误类型测试
- 所有现有测试继续通过
使用示例:
```go
// 检查 topic 是否存在
if IsTopicNotFound(err) {
// 处理 topic 不存在的情况
}
// 检查索引越界
if IsIndexOutOfRange(err) {
var indexErr *IndexError
errors.As(err, &indexErr)
fmt.Printf("index %d out of range\n", indexErr.Index)
}
```
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
|
2025-10-04 01:56:22 +08:00 |
|
|
|
de39339620
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
|
2025-10-03 23:50:53 +08:00 |
|