package seqlog import ( "encoding/binary" "fmt" "io" "os" "sync" "time" ) const ( // IndexMagic 索引文件魔数 IndexMagic = 0x58444953 // "SIDX" (Seqlog Index) // IndexVersion 索引文件版本 IndexVersion = 1 // IndexHeaderSize 索引文件头部大小(字节) IndexHeaderSize = 8 // Magic(4) + Version(4) // IndexEntrySize 每条索引条目大小(字节) IndexEntrySize = 8 // Offset(8) // DefaultSyncInterval 默认同步间隔 DefaultSyncInterval = 1 * time.Second // DefaultSyncBatch 默认同步批次大小(累积多少条记录后同步) DefaultSyncBatch = 100 ) // RecordIndex 记录索引管理器 type RecordIndex struct { logPath string // 日志文件路径 indexPath string // 索引文件路径 offsets []int64 // 内存中的偏移索引 magic uint32 // 魔数,用于识别索引文件 version uint32 // 版本号 indexFile *os.File // 索引文件句柄(用于追加写入) syncInterval time.Duration // 同步时间间隔 syncBatch int // 同步批次大小 lastSync time.Time // 上次同步时间 dirtyCount int // 未同步的记录数 entryBuf [IndexEntrySize]byte // 可重用的写入缓冲区 mu sync.Mutex // 保护并发访问 } // NewRecordIndex 创建或加载记录索引 // 启动时总是从日志文件重建索引,确保索引和日志文件完全一致 func NewRecordIndex(logPath string) (*RecordIndex, error) { indexPath := logPath + ".idx" ri := &RecordIndex{ logPath: logPath, indexPath: indexPath, offsets: make([]int64, 0, 1024), magic: IndexMagic, version: IndexVersion, syncInterval: DefaultSyncInterval, syncBatch: DefaultSyncBatch, lastSync: time.Now(), } // 启动时总是从日志文件重建索引 if err := ri.rebuild(); err != nil { return nil, fmt.Errorf("rebuild index: %w", err) } // 打开索引文件用于追加写入 f, err := os.OpenFile(indexPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return nil, fmt.Errorf("open index file for append: %w", err) } ri.indexFile = f return ri, nil } // rebuild 从日志文件重建索引 func (ri *RecordIndex) rebuild() error { logFile, err := os.Open(ri.logPath) if err != nil { if os.IsNotExist(err) { // 日志文件不存在,创建空索引 ri.offsets = make([]int64, 0, 1024) return ri.save() } return fmt.Errorf("open log file: %w", err) } defer logFile.Close() ri.offsets = make([]int64, 0, 1024) currentOffset := int64(0) headerBuf := make([]byte, 24) // Record header size: [4B len][4B CRC][16B UUID] for { // 记录当前偏移 ri.offsets = append(ri.offsets, currentOffset) // 读取记录头部 if _, err := io.ReadFull(logFile, headerBuf); err != nil { if err == io.EOF { // 到达文件末尾,移除最后一个 EOF 位置 ri.offsets = ri.offsets[:len(ri.offsets)-1] break } return fmt.Errorf("read record header at offset %d: %w", currentOffset, err) } // 解析数据长度 dataLen := binary.LittleEndian.Uint32(headerBuf[0:4]) // 跳过数据部分 if _, err := logFile.Seek(int64(dataLen), io.SeekCurrent); err != nil { return fmt.Errorf("seek data at offset %d: %w", currentOffset, err) } currentOffset += 24 + int64(dataLen) } // 写入索引文件 return ri.save() } // save 保存索引到文件 func (ri *RecordIndex) save() error { f, err := os.Create(ri.indexPath) if err != nil { return fmt.Errorf("create index file: %w", err) } defer f.Close() // 写入头部 headerBuf := make([]byte, IndexHeaderSize) binary.LittleEndian.PutUint32(headerBuf[0:4], ri.magic) binary.LittleEndian.PutUint32(headerBuf[4:8], ri.version) if _, err := f.Write(headerBuf); err != nil { return fmt.Errorf("write header: %w", err) } // 写入所有索引条目 entryBuf := make([]byte, IndexEntrySize) for _, offset := range ri.offsets { binary.LittleEndian.PutUint64(entryBuf, uint64(offset)) if _, err := f.Write(entryBuf); err != nil { return fmt.Errorf("write entry: %w", err) } } return f.Sync() } // Append 追加一条索引(当写入新记录时调用) func (ri *RecordIndex) Append(offset int64) error { ri.mu.Lock() defer ri.mu.Unlock() // 追加到索引文件(先写文件,后更新内存) // 使用可重用的 buffer 减少内存分配 binary.LittleEndian.PutUint64(ri.entryBuf[:], uint64(offset)) if _, err := ri.indexFile.Write(ri.entryBuf[:]); err != nil { return fmt.Errorf("append index entry: %w", err) } // 更新内存索引 ri.offsets = append(ri.offsets, offset) ri.dirtyCount++ // 批量同步:达到批次大小或时间间隔后才同步 if ri.dirtyCount >= ri.syncBatch || time.Since(ri.lastSync) >= ri.syncInterval { if err := ri.indexFile.Sync(); err != nil { return fmt.Errorf("sync index file: %w", err) } ri.lastSync = time.Now() ri.dirtyCount = 0 } return nil } // GetOffset 根据索引位置获取记录偏移 func (ri *RecordIndex) GetOffset(index int) (int64, error) { if index < 0 || index >= len(ri.offsets) { return 0, NewIndexError(index, len(ri.offsets)) } return ri.offsets[index], nil } // FindIndex 根据偏移量查找索引位置(二分查找) func (ri *RecordIndex) FindIndex(offset int64) int { left, right := 0, len(ri.offsets)-1 result := -1 for left <= right { mid := (left + right) / 2 if ri.offsets[mid] == offset { return mid } else if ri.offsets[mid] < offset { result = mid left = mid + 1 } else { right = mid - 1 } } return result } // Count 返回记录总数 func (ri *RecordIndex) Count() int { return len(ri.offsets) } // LastOffset 返回最后一条记录的偏移 func (ri *RecordIndex) LastOffset() int64 { if len(ri.offsets) == 0 { return 0 } return ri.offsets[len(ri.offsets)-1] } // Flush 强制同步未写入的数据到磁盘 func (ri *RecordIndex) Flush() error { ri.mu.Lock() defer ri.mu.Unlock() if ri.indexFile != nil && ri.dirtyCount > 0 { if err := ri.indexFile.Sync(); err != nil { return fmt.Errorf("flush index file: %w", err) } ri.lastSync = time.Now() ri.dirtyCount = 0 } return nil } // Close 关闭索引文件 func (ri *RecordIndex) Close() error { // 关闭前确保所有数据已同步 if err := ri.Flush(); err != nil { return err } if ri.indexFile != nil { return ri.indexFile.Close() } return nil } // Reset 重置索引,清空所有数据并重新创建索引文件 func (ri *RecordIndex) Reset() error { ri.mu.Lock() defer ri.mu.Unlock() // 关闭索引文件 if ri.indexFile != nil { if err := ri.indexFile.Close(); err != nil { return err } ri.indexFile = nil } // 删除索引文件 if err := os.Remove(ri.indexPath); err != nil && !os.IsNotExist(err) { return err } // 清空内存中的索引数据 ri.offsets = make([]int64, 0, 1024) ri.dirtyCount = 0 ri.lastSync = time.Now() // 保存空索引(创建文件并写入头部) if err := ri.save(); err != nil { return err } // 重新打开索引文件用于追加 f, err := os.OpenFile(ri.indexPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return fmt.Errorf("reopen index file: %w", err) } ri.indexFile = f return nil } // Sync 同步索引文件到磁盘(立即同步,不考虑批量策略) func (ri *RecordIndex) Sync() error { return ri.Flush() }