Files
seqlog/index.go
bourdon 6fb0731935 重构:为核心组件实现 Reset 方法优化重置机制
为所有核心组件添加 Reset() 方法:
- LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变
- RecordIndex.Reset(): 清空索引数据并重新创建索引文件
- RecordQuery.Reset(): 关闭并重新打开日志文件
- ProcessCursor.Reset(): 删除位置文件并重置游标位置
- LogTailer.Reset(): 重置内部 channel 状态

优化 TopicProcessor.Reset() 实现:
- 不再销毁和重建组件对象
- 通过调用各组件的 Reset() 方法重置状态
- 保持组件间引用关系稳定
- 减少代码行数约 20 行
- 避免空指针风险和内存分配开销

代码改进:
- LogWriter 添加 path 字段用于重置
- 移除 topic_processor.go 中未使用的 os import
- 职责分离更清晰,每个组件管理自己的重置逻辑

测试结果:
- TestTopicReset: PASS
- TestTopicResetWithPendingRecords: PASS
- 所有 TopicProcessor 相关测试通过

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 21:58:54 +08:00

294 lines
7.3 KiB
Go

package seqlog
import (
"encoding/binary"
"fmt"
"io"
"os"
"sync"
"time"
)
const (
// IndexMagic 索引文件魔数
IndexMagic = 0x58444953 // "SIDX" (Seqlog Index)
// IndexVersion 索引文件版本
IndexVersion = 1
// IndexHeaderSize 索引文件头部大小(字节)
IndexHeaderSize = 8 // Magic(4) + Version(4)
// IndexEntrySize 每条索引条目大小(字节)
IndexEntrySize = 8 // Offset(8)
// DefaultSyncInterval 默认同步间隔
DefaultSyncInterval = 1 * time.Second
// DefaultSyncBatch 默认同步批次大小(累积多少条记录后同步)
DefaultSyncBatch = 100
)
// RecordIndex 记录索引管理器
type RecordIndex struct {
logPath string // 日志文件路径
indexPath string // 索引文件路径
offsets []int64 // 内存中的偏移索引
magic uint32 // 魔数,用于识别索引文件
version uint32 // 版本号
indexFile *os.File // 索引文件句柄(用于追加写入)
syncInterval time.Duration // 同步时间间隔
syncBatch int // 同步批次大小
lastSync time.Time // 上次同步时间
dirtyCount int // 未同步的记录数
entryBuf [IndexEntrySize]byte // 可重用的写入缓冲区
mu sync.Mutex // 保护并发访问
}
// NewRecordIndex 创建或加载记录索引
// 启动时总是从日志文件重建索引,确保索引和日志文件完全一致
func NewRecordIndex(logPath string) (*RecordIndex, error) {
indexPath := logPath + ".idx"
ri := &RecordIndex{
logPath: logPath,
indexPath: indexPath,
offsets: make([]int64, 0, 1024),
magic: IndexMagic,
version: IndexVersion,
syncInterval: DefaultSyncInterval,
syncBatch: DefaultSyncBatch,
lastSync: time.Now(),
}
// 启动时总是从日志文件重建索引
if err := ri.rebuild(); err != nil {
return nil, fmt.Errorf("rebuild index: %w", err)
}
// 打开索引文件用于追加写入
f, err := os.OpenFile(indexPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, fmt.Errorf("open index file for append: %w", err)
}
ri.indexFile = f
return ri, nil
}
// rebuild 从日志文件重建索引
func (ri *RecordIndex) rebuild() error {
logFile, err := os.Open(ri.logPath)
if err != nil {
if os.IsNotExist(err) {
// 日志文件不存在,创建空索引
ri.offsets = make([]int64, 0, 1024)
return ri.save()
}
return fmt.Errorf("open log file: %w", err)
}
defer logFile.Close()
ri.offsets = make([]int64, 0, 1024)
currentOffset := int64(0)
headerBuf := make([]byte, 24) // Record header size: [4B len][4B CRC][16B UUID]
for {
// 记录当前偏移
ri.offsets = append(ri.offsets, currentOffset)
// 读取记录头部
if _, err := io.ReadFull(logFile, headerBuf); err != nil {
if err == io.EOF {
// 到达文件末尾,移除最后一个 EOF 位置
ri.offsets = ri.offsets[:len(ri.offsets)-1]
break
}
return fmt.Errorf("read record header at offset %d: %w", currentOffset, err)
}
// 解析数据长度
dataLen := binary.LittleEndian.Uint32(headerBuf[0:4])
// 跳过数据部分
if _, err := logFile.Seek(int64(dataLen), io.SeekCurrent); err != nil {
return fmt.Errorf("seek data at offset %d: %w", currentOffset, err)
}
currentOffset += 24 + int64(dataLen)
}
// 写入索引文件
return ri.save()
}
// save 保存索引到文件
func (ri *RecordIndex) save() error {
f, err := os.Create(ri.indexPath)
if err != nil {
return fmt.Errorf("create index file: %w", err)
}
defer f.Close()
// 写入头部
headerBuf := make([]byte, IndexHeaderSize)
binary.LittleEndian.PutUint32(headerBuf[0:4], ri.magic)
binary.LittleEndian.PutUint32(headerBuf[4:8], ri.version)
if _, err := f.Write(headerBuf); err != nil {
return fmt.Errorf("write header: %w", err)
}
// 写入所有索引条目
entryBuf := make([]byte, IndexEntrySize)
for _, offset := range ri.offsets {
binary.LittleEndian.PutUint64(entryBuf, uint64(offset))
if _, err := f.Write(entryBuf); err != nil {
return fmt.Errorf("write entry: %w", err)
}
}
return f.Sync()
}
// Append 追加一条索引(当写入新记录时调用)
func (ri *RecordIndex) Append(offset int64) error {
ri.mu.Lock()
defer ri.mu.Unlock()
// 追加到索引文件(先写文件,后更新内存)
// 使用可重用的 buffer 减少内存分配
binary.LittleEndian.PutUint64(ri.entryBuf[:], uint64(offset))
if _, err := ri.indexFile.Write(ri.entryBuf[:]); err != nil {
return fmt.Errorf("append index entry: %w", err)
}
// 更新内存索引
ri.offsets = append(ri.offsets, offset)
ri.dirtyCount++
// 批量同步:达到批次大小或时间间隔后才同步
if ri.dirtyCount >= ri.syncBatch || time.Since(ri.lastSync) >= ri.syncInterval {
if err := ri.indexFile.Sync(); err != nil {
return fmt.Errorf("sync index file: %w", err)
}
ri.lastSync = time.Now()
ri.dirtyCount = 0
}
return nil
}
// GetOffset 根据索引位置获取记录偏移
func (ri *RecordIndex) GetOffset(index int) (int64, error) {
if index < 0 || index >= len(ri.offsets) {
return 0, NewIndexError(index, len(ri.offsets))
}
return ri.offsets[index], nil
}
// FindIndex 根据偏移量查找索引位置(二分查找)
func (ri *RecordIndex) FindIndex(offset int64) int {
left, right := 0, len(ri.offsets)-1
result := -1
for left <= right {
mid := (left + right) / 2
if ri.offsets[mid] == offset {
return mid
} else if ri.offsets[mid] < offset {
result = mid
left = mid + 1
} else {
right = mid - 1
}
}
return result
}
// Count 返回记录总数
func (ri *RecordIndex) Count() int {
return len(ri.offsets)
}
// LastOffset 返回最后一条记录的偏移
func (ri *RecordIndex) LastOffset() int64 {
if len(ri.offsets) == 0 {
return 0
}
return ri.offsets[len(ri.offsets)-1]
}
// Flush 强制同步未写入的数据到磁盘
func (ri *RecordIndex) Flush() error {
ri.mu.Lock()
defer ri.mu.Unlock()
if ri.indexFile != nil && ri.dirtyCount > 0 {
if err := ri.indexFile.Sync(); err != nil {
return fmt.Errorf("flush index file: %w", err)
}
ri.lastSync = time.Now()
ri.dirtyCount = 0
}
return nil
}
// Close 关闭索引文件
func (ri *RecordIndex) Close() error {
// 关闭前确保所有数据已同步
if err := ri.Flush(); err != nil {
return err
}
if ri.indexFile != nil {
return ri.indexFile.Close()
}
return nil
}
// Reset 重置索引,清空所有数据并重新创建索引文件
func (ri *RecordIndex) Reset() error {
ri.mu.Lock()
defer ri.mu.Unlock()
// 关闭索引文件
if ri.indexFile != nil {
if err := ri.indexFile.Close(); err != nil {
return err
}
ri.indexFile = nil
}
// 删除索引文件
if err := os.Remove(ri.indexPath); err != nil && !os.IsNotExist(err) {
return err
}
// 清空内存中的索引数据
ri.offsets = make([]int64, 0, 1024)
ri.dirtyCount = 0
ri.lastSync = time.Now()
// 保存空索引(创建文件并写入头部)
if err := ri.save(); err != nil {
return err
}
// 重新打开索引文件用于追加
f, err := os.OpenFile(ri.indexPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("reopen index file: %w", err)
}
ri.indexFile = f
return nil
}
// Sync 同步索引文件到磁盘(立即同步,不考虑批量策略)
func (ri *RecordIndex) Sync() error {
return ri.Flush()
}