重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
package seqlog
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"encoding/binary"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"os"
|
2025-10-04 00:10:14 +08:00
|
|
|
"sync"
|
|
|
|
|
"time"
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// IndexMagic 索引文件魔数
|
|
|
|
|
IndexMagic = 0x58444953 // "SIDX" (Seqlog Index)
|
|
|
|
|
|
|
|
|
|
// IndexVersion 索引文件版本
|
|
|
|
|
IndexVersion = 1
|
|
|
|
|
|
|
|
|
|
// IndexHeaderSize 索引文件头部大小(字节)
|
|
|
|
|
IndexHeaderSize = 8 // Magic(4) + Version(4)
|
|
|
|
|
|
|
|
|
|
// IndexEntrySize 每条索引条目大小(字节)
|
|
|
|
|
IndexEntrySize = 8 // Offset(8)
|
2025-10-04 00:10:14 +08:00
|
|
|
|
|
|
|
|
// DefaultSyncInterval 默认同步间隔
|
|
|
|
|
DefaultSyncInterval = 1 * time.Second
|
|
|
|
|
|
|
|
|
|
// DefaultSyncBatch 默认同步批次大小(累积多少条记录后同步)
|
|
|
|
|
DefaultSyncBatch = 100
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// RecordIndex 记录索引管理器
|
|
|
|
|
type RecordIndex struct {
|
2025-10-04 00:10:14 +08:00
|
|
|
logPath string // 日志文件路径
|
|
|
|
|
indexPath string // 索引文件路径
|
|
|
|
|
offsets []int64 // 内存中的偏移索引
|
|
|
|
|
magic uint32 // 魔数,用于识别索引文件
|
|
|
|
|
version uint32 // 版本号
|
|
|
|
|
indexFile *os.File // 索引文件句柄(用于追加写入)
|
|
|
|
|
syncInterval time.Duration // 同步时间间隔
|
|
|
|
|
syncBatch int // 同步批次大小
|
|
|
|
|
lastSync time.Time // 上次同步时间
|
|
|
|
|
dirtyCount int // 未同步的记录数
|
2025-10-04 13:32:44 +08:00
|
|
|
entryBuf [IndexEntrySize]byte // 可重用的写入缓冲区
|
2025-10-04 00:10:14 +08:00
|
|
|
mu sync.Mutex // 保护并发访问
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewRecordIndex 创建或加载记录索引
|
|
|
|
|
// 启动时总是从日志文件重建索引,确保索引和日志文件完全一致
|
|
|
|
|
func NewRecordIndex(logPath string) (*RecordIndex, error) {
|
|
|
|
|
indexPath := logPath + ".idx"
|
|
|
|
|
|
|
|
|
|
ri := &RecordIndex{
|
2025-10-04 00:10:14 +08:00
|
|
|
logPath: logPath,
|
|
|
|
|
indexPath: indexPath,
|
|
|
|
|
offsets: make([]int64, 0, 1024),
|
|
|
|
|
magic: IndexMagic,
|
|
|
|
|
version: IndexVersion,
|
|
|
|
|
syncInterval: DefaultSyncInterval,
|
|
|
|
|
syncBatch: DefaultSyncBatch,
|
|
|
|
|
lastSync: time.Now(),
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 启动时总是从日志文件重建索引
|
|
|
|
|
if err := ri.rebuild(); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("rebuild index: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 打开索引文件用于追加写入
|
|
|
|
|
f, err := os.OpenFile(indexPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("open index file for append: %w", err)
|
|
|
|
|
}
|
|
|
|
|
ri.indexFile = f
|
|
|
|
|
|
|
|
|
|
return ri, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// rebuild 从日志文件重建索引
|
|
|
|
|
func (ri *RecordIndex) rebuild() error {
|
|
|
|
|
logFile, err := os.Open(ri.logPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
|
// 日志文件不存在,创建空索引
|
|
|
|
|
ri.offsets = make([]int64, 0, 1024)
|
|
|
|
|
return ri.save()
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("open log file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer logFile.Close()
|
|
|
|
|
|
|
|
|
|
ri.offsets = make([]int64, 0, 1024)
|
|
|
|
|
currentOffset := int64(0)
|
|
|
|
|
headerBuf := make([]byte, 24) // Record header size: [4B len][4B CRC][16B UUID]
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
// 记录当前偏移
|
|
|
|
|
ri.offsets = append(ri.offsets, currentOffset)
|
|
|
|
|
|
|
|
|
|
// 读取记录头部
|
|
|
|
|
if _, err := io.ReadFull(logFile, headerBuf); err != nil {
|
|
|
|
|
if err == io.EOF {
|
|
|
|
|
// 到达文件末尾,移除最后一个 EOF 位置
|
|
|
|
|
ri.offsets = ri.offsets[:len(ri.offsets)-1]
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("read record header at offset %d: %w", currentOffset, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 解析数据长度
|
|
|
|
|
dataLen := binary.LittleEndian.Uint32(headerBuf[0:4])
|
|
|
|
|
|
|
|
|
|
// 跳过数据部分
|
|
|
|
|
if _, err := logFile.Seek(int64(dataLen), io.SeekCurrent); err != nil {
|
|
|
|
|
return fmt.Errorf("seek data at offset %d: %w", currentOffset, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
currentOffset += 24 + int64(dataLen)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 写入索引文件
|
|
|
|
|
return ri.save()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// save 保存索引到文件
|
|
|
|
|
func (ri *RecordIndex) save() error {
|
|
|
|
|
f, err := os.Create(ri.indexPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("create index file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
|
|
// 写入头部
|
|
|
|
|
headerBuf := make([]byte, IndexHeaderSize)
|
2025-10-03 23:55:18 +08:00
|
|
|
binary.LittleEndian.PutUint32(headerBuf[0:4], ri.magic)
|
|
|
|
|
binary.LittleEndian.PutUint32(headerBuf[4:8], ri.version)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
|
|
|
|
if _, err := f.Write(headerBuf); err != nil {
|
|
|
|
|
return fmt.Errorf("write header: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 写入所有索引条目
|
|
|
|
|
entryBuf := make([]byte, IndexEntrySize)
|
|
|
|
|
for _, offset := range ri.offsets {
|
|
|
|
|
binary.LittleEndian.PutUint64(entryBuf, uint64(offset))
|
|
|
|
|
if _, err := f.Write(entryBuf); err != nil {
|
|
|
|
|
return fmt.Errorf("write entry: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return f.Sync()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Append 追加一条索引(当写入新记录时调用)
|
|
|
|
|
func (ri *RecordIndex) Append(offset int64) error {
|
2025-10-04 00:10:14 +08:00
|
|
|
ri.mu.Lock()
|
|
|
|
|
defer ri.mu.Unlock()
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
// 追加到索引文件(先写文件,后更新内存)
|
2025-10-04 13:32:44 +08:00
|
|
|
// 使用可重用的 buffer 减少内存分配
|
|
|
|
|
binary.LittleEndian.PutUint64(ri.entryBuf[:], uint64(offset))
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
2025-10-04 13:32:44 +08:00
|
|
|
if _, err := ri.indexFile.Write(ri.entryBuf[:]); err != nil {
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
return fmt.Errorf("append index entry: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 更新内存索引
|
|
|
|
|
ri.offsets = append(ri.offsets, offset)
|
2025-10-04 00:10:14 +08:00
|
|
|
ri.dirtyCount++
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
2025-10-04 00:10:14 +08:00
|
|
|
// 批量同步:达到批次大小或时间间隔后才同步
|
|
|
|
|
if ri.dirtyCount >= ri.syncBatch || time.Since(ri.lastSync) >= ri.syncInterval {
|
|
|
|
|
if err := ri.indexFile.Sync(); err != nil {
|
|
|
|
|
return fmt.Errorf("sync index file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
ri.lastSync = time.Now()
|
|
|
|
|
ri.dirtyCount = 0
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetOffset 根据索引位置获取记录偏移
|
|
|
|
|
func (ri *RecordIndex) GetOffset(index int) (int64, error) {
|
|
|
|
|
if index < 0 || index >= len(ri.offsets) {
|
新增:统一的错误类型系统 (errors.go)
主要功能:
- 定义哨兵错误(Sentinel Errors):ErrNilParameter, ErrInvalidCount,
ErrInvalidRange, ErrAlreadyRunning, ErrNotFound, ErrCRCMismatch 等
- 实现结构化错误类型:TopicError, FileError, IndexError, ValidationError
- 提供错误检查辅助函数:IsTopicNotFound, IsIndexOutOfRange, IsCRCMismatch
- 支持 errors.Is 和 errors.As 进行错误判断
更新相关文件使用新错误类型:
- cursor.go: 使用 ValidationError 和 ErrCRCMismatch
- index.go: 使用 IndexError 处理索引越界
- query.go: 使用 ValidationError 验证参数
- seqlog_manager.go: 使用 TopicError 和 ErrAlreadyRegistered
- topic_processor.go: 使用 ErrAlreadyRunning 和 ErrInvalidConfig
测试覆盖:
- errors_test.go 提供完整的错误类型测试
- 所有现有测试继续通过
使用示例:
```go
// 检查 topic 是否存在
if IsTopicNotFound(err) {
// 处理 topic 不存在的情况
}
// 检查索引越界
if IsIndexOutOfRange(err) {
var indexErr *IndexError
errors.As(err, &indexErr)
fmt.Printf("index %d out of range\n", indexErr.Index)
}
```
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 01:56:22 +08:00
|
|
|
return 0, NewIndexError(index, len(ri.offsets))
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
}
|
|
|
|
|
return ri.offsets[index], nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FindIndex 根据偏移量查找索引位置(二分查找)
|
|
|
|
|
func (ri *RecordIndex) FindIndex(offset int64) int {
|
|
|
|
|
left, right := 0, len(ri.offsets)-1
|
|
|
|
|
result := -1
|
|
|
|
|
|
|
|
|
|
for left <= right {
|
|
|
|
|
mid := (left + right) / 2
|
|
|
|
|
if ri.offsets[mid] == offset {
|
|
|
|
|
return mid
|
|
|
|
|
} else if ri.offsets[mid] < offset {
|
|
|
|
|
result = mid
|
|
|
|
|
left = mid + 1
|
|
|
|
|
} else {
|
|
|
|
|
right = mid - 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Count 返回记录总数
|
|
|
|
|
func (ri *RecordIndex) Count() int {
|
|
|
|
|
return len(ri.offsets)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// LastOffset 返回最后一条记录的偏移
|
|
|
|
|
func (ri *RecordIndex) LastOffset() int64 {
|
|
|
|
|
if len(ri.offsets) == 0 {
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
return ri.offsets[len(ri.offsets)-1]
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-04 00:10:14 +08:00
|
|
|
// Flush 强制同步未写入的数据到磁盘
|
|
|
|
|
func (ri *RecordIndex) Flush() error {
|
|
|
|
|
ri.mu.Lock()
|
|
|
|
|
defer ri.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
if ri.indexFile != nil && ri.dirtyCount > 0 {
|
|
|
|
|
if err := ri.indexFile.Sync(); err != nil {
|
|
|
|
|
return fmt.Errorf("flush index file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
ri.lastSync = time.Now()
|
|
|
|
|
ri.dirtyCount = 0
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
// Close 关闭索引文件
|
|
|
|
|
func (ri *RecordIndex) Close() error {
|
2025-10-04 00:10:14 +08:00
|
|
|
// 关闭前确保所有数据已同步
|
|
|
|
|
if err := ri.Flush(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
if ri.indexFile != nil {
|
|
|
|
|
return ri.indexFile.Close()
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-04 21:58:54 +08:00
|
|
|
// Reset 重置索引,清空所有数据并重新创建索引文件
|
|
|
|
|
func (ri *RecordIndex) Reset() error {
|
|
|
|
|
ri.mu.Lock()
|
|
|
|
|
defer ri.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
// 关闭索引文件
|
|
|
|
|
if ri.indexFile != nil {
|
|
|
|
|
if err := ri.indexFile.Close(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
ri.indexFile = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 删除索引文件
|
|
|
|
|
if err := os.Remove(ri.indexPath); err != nil && !os.IsNotExist(err) {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 清空内存中的索引数据
|
|
|
|
|
ri.offsets = make([]int64, 0, 1024)
|
|
|
|
|
ri.dirtyCount = 0
|
|
|
|
|
ri.lastSync = time.Now()
|
|
|
|
|
|
|
|
|
|
// 保存空索引(创建文件并写入头部)
|
|
|
|
|
if err := ri.save(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 重新打开索引文件用于追加
|
|
|
|
|
f, err := os.OpenFile(ri.indexPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("reopen index file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
ri.indexFile = f
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-04 00:10:14 +08:00
|
|
|
// Sync 同步索引文件到磁盘(立即同步,不考虑批量策略)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
func (ri *RecordIndex) Sync() error {
|
2025-10-04 00:10:14 +08:00
|
|
|
return ri.Flush()
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
}
|