Files
seqlog/cursor.go
bourdon 6fb0731935 重构:为核心组件实现 Reset 方法优化重置机制
为所有核心组件添加 Reset() 方法:
- LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变
- RecordIndex.Reset(): 清空索引数据并重新创建索引文件
- RecordQuery.Reset(): 关闭并重新打开日志文件
- ProcessCursor.Reset(): 删除位置文件并重置游标位置
- LogTailer.Reset(): 重置内部 channel 状态

优化 TopicProcessor.Reset() 实现:
- 不再销毁和重建组件对象
- 通过调用各组件的 Reset() 方法重置状态
- 保持组件间引用关系稳定
- 减少代码行数约 20 行
- 避免空指针风险和内存分配开销

代码改进:
- LogWriter 添加 path 字段用于重置
- 移除 topic_processor.go 中未使用的 os import
- 职责分离更清晰,每个组件管理自己的重置逻辑

测试结果:
- TestTopicReset: PASS
- TestTopicResetWithPendingRecords: PASS
- 所有 TopicProcessor 相关测试通过

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 21:58:54 +08:00

242 lines
6.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"os"
"github.com/google/uuid"
)
// ProcessCursor 日志游标(窗口模式)
type ProcessCursor struct {
fd *os.File
rbuf []byte // 8 MiB 复用
path string // 日志文件路径
posFile string // 游标位置文件路径
startIdx int // 窗口开始索引(已处理的记录索引)
endIdx int // 窗口结束索引(当前读到的记录索引)
index *RecordIndex // 索引管理器(来自外部)
writer *LogWriter // 写入器引用(用于检查写入位置)
}
// NewCursor 创建一个新的日志游标
// index: 外部提供的索引管理器,用于快速定位记录
// writer: 外部提供的写入器引用,用于检查写入位置(可选,为 nil 时不进行写入保护检查)
func NewCursor(path string, index *RecordIndex, writer *LogWriter) (*ProcessCursor, error) {
if index == nil {
return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter)
}
fd, err := os.Open(path)
if err != nil {
return nil, err
}
c := &ProcessCursor{
fd: fd,
rbuf: make([]byte, 8<<20),
path: path,
posFile: path + ".pos",
startIdx: 0,
endIdx: 0,
index: index,
writer: writer,
}
// 尝试恢复上次位置
c.loadPosition()
return c, nil
}
// Seek 到任意 offset支持重启续传
func (c *ProcessCursor) Seek(offset int64, whence int) (int64, error) {
return c.fd.Seek(offset, whence)
}
// Next 读取下一条记录(使用索引快速定位)
func (c *ProcessCursor) Next() (*Record, error) {
// 检查是否超出索引范围
if c.endIdx >= c.index.Count() {
return nil, io.EOF
}
// 从索引获取当前记录的偏移量
offset, err := c.index.GetOffset(c.endIdx)
if err != nil {
return nil, fmt.Errorf("get offset from index: %w", err)
}
// 写入保护:检查读取位置是否超过当前写入位置
if c.writer != nil {
dirtyOffset := c.writer.GetDirtyOffset()
// 如果正在写入dirtyOffset >= 0且记录起始位置 >= 写入位置,说明数据还未完全写入,返回 EOF 等待
if dirtyOffset >= 0 && offset >= dirtyOffset {
return nil, io.EOF
}
}
// Seek 到记录位置
if _, err := c.fd.Seek(offset, 0); err != nil {
return nil, fmt.Errorf("seek to offset %d: %w", offset, err)
}
// 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节
hdr := c.rbuf[:32]
if _, err := io.ReadFull(c.fd, hdr); err != nil {
return nil, err
}
var rec Record
rec.Len = binary.LittleEndian.Uint32(hdr[0:4])
// hdr[4:12] 是 offset读取时不需要使用
rec.CRC = binary.LittleEndian.Uint32(hdr[12:16])
// 读取并校验 UUID
copy(rec.UUID[:], hdr[16:32])
if _, err := uuid.FromBytes(rec.UUID[:]); err != nil {
return nil, fmt.Errorf("%w: %v", ErrInvalidUUID, err)
}
// 如果数据大于缓冲区,分配新的 buffer
var payload []byte
if int(rec.Len) <= len(c.rbuf)-32 {
payload = c.rbuf[32 : 32+rec.Len]
} else {
payload = make([]byte, rec.Len)
}
if _, err := io.ReadFull(c.fd, payload); err != nil {
return nil, err
}
if crc32.ChecksumIEEE(payload) != rec.CRC {
return nil, fmt.Errorf("%w: offset=%d", ErrCRCMismatch, offset)
}
rec.Data = append([]byte(nil), payload...) // 复制出去,复用 buffer
// 更新窗口结束索引(移动到下一条记录)
c.endIdx++
return &rec, nil
}
// NextRange 读取指定数量的记录(范围游动)
// count: 要读取的记录数量
// 返回:读取到的记录列表,如果到达文件末尾,返回的记录数可能少于 count
func (c *ProcessCursor) NextRange(count int) ([]*Record, error) {
if count <= 0 {
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
}
results := make([]*Record, 0, count)
for range count {
rec, err := c.Next()
if err != nil {
if err == io.EOF && len(results) > 0 {
// 已经读取了一些记录,返回这些记录
return results, nil
}
return results, err
}
results = append(results, rec)
}
return results, nil
}
// Commit 提交窗口,将 endIdx 移动到 startIdx表示已处理完这批记录
func (c *ProcessCursor) Commit() {
c.startIdx = c.endIdx
}
// Rollback 回滚窗口,将 endIdx 回退到 startIdx表示放弃这批记录的处理
func (c *ProcessCursor) Rollback() error {
c.endIdx = c.startIdx
return nil
}
// StartIndex 获取窗口开始索引
func (c *ProcessCursor) StartIndex() int {
return c.startIdx
}
// EndIndex 获取窗口结束索引
func (c *ProcessCursor) EndIndex() int {
return c.endIdx
}
// Close 关闭游标并保存位置
func (c *ProcessCursor) Close() error {
c.savePosition()
return c.fd.Close()
}
// Reset 重置游标,删除位置文件并重新打开日志文件
// 保持 index 和 writer 引用不变
func (c *ProcessCursor) Reset() error {
// 关闭文件
if c.fd != nil {
if err := c.fd.Close(); err != nil {
return err
}
c.fd = nil
}
// 删除位置文件
if err := os.Remove(c.posFile); err != nil && !os.IsNotExist(err) {
return err
}
// 重新打开日志文件
fd, err := os.Open(c.path)
if err != nil {
return err
}
// 重置状态
c.fd = fd
c.startIdx = 0
c.endIdx = 0
return nil
}
// savePosition 保存当前读取位置到文件
func (c *ProcessCursor) savePosition() error {
f, err := os.Create(c.posFile)
if err != nil {
return err
}
defer f.Close()
buf := make([]byte, 4)
// 保存 startIdx已处理的索引
binary.LittleEndian.PutUint32(buf, uint32(c.startIdx))
_, err = f.Write(buf)
return err
}
// loadPosition 从文件加载上次的读取位置
func (c *ProcessCursor) loadPosition() error {
f, err := os.Open(c.posFile)
if err != nil {
if os.IsNotExist(err) {
return nil // 文件不存在,从头开始
}
return err
}
defer f.Close()
buf := make([]byte, 4)
if _, err := io.ReadFull(f, buf); err != nil {
return err
}
// 加载 startIdx
c.startIdx = int(binary.LittleEndian.Uint32(buf))
c.endIdx = c.startIdx
return nil
}