package seqlog import ( "encoding/binary" "fmt" "hash/crc32" "io" "os" "github.com/google/uuid" ) // ProcessCursor 日志游标(窗口模式) type ProcessCursor struct { fd *os.File rbuf []byte // 8 MiB 复用 path string // 日志文件路径 posFile string // 游标位置文件路径 startIdx int // 窗口开始索引(已处理的记录索引) endIdx int // 窗口结束索引(当前读到的记录索引) index *RecordIndex // 索引管理器(来自外部) writer *LogWriter // 写入器引用(用于检查写入位置) } // NewCursor 创建一个新的日志游标 // index: 外部提供的索引管理器,用于快速定位记录 // writer: 外部提供的写入器引用,用于检查写入位置(可选,为 nil 时不进行写入保护检查) func NewCursor(path string, index *RecordIndex, writer *LogWriter) (*ProcessCursor, error) { if index == nil { return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter) } fd, err := os.Open(path) if err != nil { return nil, err } c := &ProcessCursor{ fd: fd, rbuf: make([]byte, 8<<20), path: path, posFile: path + ".pos", startIdx: 0, endIdx: 0, index: index, writer: writer, } // 尝试恢复上次位置 c.loadPosition() return c, nil } // Seek 到任意 offset(支持重启续传) func (c *ProcessCursor) Seek(offset int64, whence int) (int64, error) { return c.fd.Seek(offset, whence) } // Next 读取下一条记录(使用索引快速定位) func (c *ProcessCursor) Next() (*Record, error) { // 检查是否超出索引范围 if c.endIdx >= c.index.Count() { return nil, io.EOF } // 从索引获取当前记录的偏移量 offset, err := c.index.GetOffset(c.endIdx) if err != nil { return nil, fmt.Errorf("get offset from index: %w", err) } // 写入保护:检查读取位置是否超过当前写入位置 if c.writer != nil { dirtyOffset := c.writer.GetDirtyOffset() // 如果正在写入(dirtyOffset >= 0)且记录起始位置 >= 写入位置,说明数据还未完全写入,返回 EOF 等待 if dirtyOffset >= 0 && offset >= dirtyOffset { return nil, io.EOF } } // Seek 到记录位置 if _, err := c.fd.Seek(offset, 0); err != nil { return nil, fmt.Errorf("seek to offset %d: %w", offset, err) } // 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节 hdr := c.rbuf[:32] if _, err := io.ReadFull(c.fd, hdr); err != nil { return nil, err } var rec Record rec.Len = binary.LittleEndian.Uint32(hdr[0:4]) // hdr[4:12] 是 offset,读取时不需要使用 rec.CRC = binary.LittleEndian.Uint32(hdr[12:16]) // 读取并校验 UUID copy(rec.UUID[:], hdr[16:32]) if _, err := uuid.FromBytes(rec.UUID[:]); err != nil { return nil, fmt.Errorf("%w: %v", ErrInvalidUUID, err) } // 如果数据大于缓冲区,分配新的 buffer var payload []byte if int(rec.Len) <= len(c.rbuf)-32 { payload = c.rbuf[32 : 32+rec.Len] } else { payload = make([]byte, rec.Len) } if _, err := io.ReadFull(c.fd, payload); err != nil { return nil, err } if crc32.ChecksumIEEE(payload) != rec.CRC { return nil, fmt.Errorf("%w: offset=%d", ErrCRCMismatch, offset) } rec.Data = append([]byte(nil), payload...) // 复制出去,复用 buffer // 更新窗口结束索引(移动到下一条记录) c.endIdx++ return &rec, nil } // NextRange 读取指定数量的记录(范围游动) // count: 要读取的记录数量 // 返回:读取到的记录列表,如果到达文件末尾,返回的记录数可能少于 count func (c *ProcessCursor) NextRange(count int) ([]*Record, error) { if count <= 0 { return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } results := make([]*Record, 0, count) for range count { rec, err := c.Next() if err != nil { if err == io.EOF && len(results) > 0 { // 已经读取了一些记录,返回这些记录 return results, nil } return results, err } results = append(results, rec) } return results, nil } // Commit 提交窗口,将 endIdx 移动到 startIdx(表示已处理完这批记录) func (c *ProcessCursor) Commit() { c.startIdx = c.endIdx } // Rollback 回滚窗口,将 endIdx 回退到 startIdx(表示放弃这批记录的处理) func (c *ProcessCursor) Rollback() error { c.endIdx = c.startIdx return nil } // StartIndex 获取窗口开始索引 func (c *ProcessCursor) StartIndex() int { return c.startIdx } // EndIndex 获取窗口结束索引 func (c *ProcessCursor) EndIndex() int { return c.endIdx } // Close 关闭游标并保存位置 func (c *ProcessCursor) Close() error { c.savePosition() return c.fd.Close() } // Reset 重置游标,删除位置文件并重新打开日志文件 // 保持 index 和 writer 引用不变 func (c *ProcessCursor) Reset() error { // 关闭文件 if c.fd != nil { if err := c.fd.Close(); err != nil { return err } c.fd = nil } // 删除位置文件 if err := os.Remove(c.posFile); err != nil && !os.IsNotExist(err) { return err } // 重新打开日志文件 fd, err := os.Open(c.path) if err != nil { return err } // 重置状态 c.fd = fd c.startIdx = 0 c.endIdx = 0 return nil } // savePosition 保存当前读取位置到文件 func (c *ProcessCursor) savePosition() error { f, err := os.Create(c.posFile) if err != nil { return err } defer f.Close() buf := make([]byte, 4) // 保存 startIdx(已处理的索引) binary.LittleEndian.PutUint32(buf, uint32(c.startIdx)) _, err = f.Write(buf) return err } // loadPosition 从文件加载上次的读取位置 func (c *ProcessCursor) loadPosition() error { f, err := os.Open(c.posFile) if err != nil { if os.IsNotExist(err) { return nil // 文件不存在,从头开始 } return err } defer f.Close() buf := make([]byte, 4) if _, err := io.ReadFull(f, buf); err != nil { return err } // 加载 startIdx c.startIdx = int(binary.LittleEndian.Uint32(buf)) c.endIdx = c.startIdx return nil }