为所有核心组件添加 Reset() 方法: - LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变 - RecordIndex.Reset(): 清空索引数据并重新创建索引文件 - RecordQuery.Reset(): 关闭并重新打开日志文件 - ProcessCursor.Reset(): 删除位置文件并重置游标位置 - LogTailer.Reset(): 重置内部 channel 状态 优化 TopicProcessor.Reset() 实现: - 不再销毁和重建组件对象 - 通过调用各组件的 Reset() 方法重置状态 - 保持组件间引用关系稳定 - 减少代码行数约 20 行 - 避免空指针风险和内存分配开销 代码改进: - LogWriter 添加 path 字段用于重置 - 移除 topic_processor.go 中未使用的 os import - 职责分离更清晰,每个组件管理自己的重置逻辑 测试结果: - TestTopicReset: PASS - TestTopicResetWithPendingRecords: PASS - 所有 TopicProcessor 相关测试通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
242 lines
6.0 KiB
Go
242 lines
6.0 KiB
Go
package seqlog
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"fmt"
|
||
"hash/crc32"
|
||
"io"
|
||
"os"
|
||
|
||
"github.com/google/uuid"
|
||
)
|
||
|
||
// ProcessCursor 日志游标(窗口模式)
|
||
type ProcessCursor struct {
|
||
fd *os.File
|
||
rbuf []byte // 8 MiB 复用
|
||
path string // 日志文件路径
|
||
posFile string // 游标位置文件路径
|
||
startIdx int // 窗口开始索引(已处理的记录索引)
|
||
endIdx int // 窗口结束索引(当前读到的记录索引)
|
||
index *RecordIndex // 索引管理器(来自外部)
|
||
writer *LogWriter // 写入器引用(用于检查写入位置)
|
||
}
|
||
|
||
// NewCursor 创建一个新的日志游标
|
||
// index: 外部提供的索引管理器,用于快速定位记录
|
||
// writer: 外部提供的写入器引用,用于检查写入位置(可选,为 nil 时不进行写入保护检查)
|
||
func NewCursor(path string, index *RecordIndex, writer *LogWriter) (*ProcessCursor, error) {
|
||
if index == nil {
|
||
return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter)
|
||
}
|
||
|
||
fd, err := os.Open(path)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
c := &ProcessCursor{
|
||
fd: fd,
|
||
rbuf: make([]byte, 8<<20),
|
||
path: path,
|
||
posFile: path + ".pos",
|
||
startIdx: 0,
|
||
endIdx: 0,
|
||
index: index,
|
||
writer: writer,
|
||
}
|
||
// 尝试恢复上次位置
|
||
c.loadPosition()
|
||
return c, nil
|
||
}
|
||
|
||
// Seek 到任意 offset(支持重启续传)
|
||
func (c *ProcessCursor) Seek(offset int64, whence int) (int64, error) {
|
||
return c.fd.Seek(offset, whence)
|
||
}
|
||
|
||
// Next 读取下一条记录(使用索引快速定位)
|
||
func (c *ProcessCursor) Next() (*Record, error) {
|
||
// 检查是否超出索引范围
|
||
if c.endIdx >= c.index.Count() {
|
||
return nil, io.EOF
|
||
}
|
||
|
||
// 从索引获取当前记录的偏移量
|
||
offset, err := c.index.GetOffset(c.endIdx)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("get offset from index: %w", err)
|
||
}
|
||
|
||
// 写入保护:检查读取位置是否超过当前写入位置
|
||
if c.writer != nil {
|
||
dirtyOffset := c.writer.GetDirtyOffset()
|
||
// 如果正在写入(dirtyOffset >= 0)且记录起始位置 >= 写入位置,说明数据还未完全写入,返回 EOF 等待
|
||
if dirtyOffset >= 0 && offset >= dirtyOffset {
|
||
return nil, io.EOF
|
||
}
|
||
}
|
||
|
||
// Seek 到记录位置
|
||
if _, err := c.fd.Seek(offset, 0); err != nil {
|
||
return nil, fmt.Errorf("seek to offset %d: %w", offset, err)
|
||
}
|
||
|
||
// 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节
|
||
hdr := c.rbuf[:32]
|
||
if _, err := io.ReadFull(c.fd, hdr); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var rec Record
|
||
rec.Len = binary.LittleEndian.Uint32(hdr[0:4])
|
||
// hdr[4:12] 是 offset,读取时不需要使用
|
||
rec.CRC = binary.LittleEndian.Uint32(hdr[12:16])
|
||
|
||
// 读取并校验 UUID
|
||
copy(rec.UUID[:], hdr[16:32])
|
||
if _, err := uuid.FromBytes(rec.UUID[:]); err != nil {
|
||
return nil, fmt.Errorf("%w: %v", ErrInvalidUUID, err)
|
||
}
|
||
|
||
// 如果数据大于缓冲区,分配新的 buffer
|
||
var payload []byte
|
||
if int(rec.Len) <= len(c.rbuf)-32 {
|
||
payload = c.rbuf[32 : 32+rec.Len]
|
||
} else {
|
||
payload = make([]byte, rec.Len)
|
||
}
|
||
|
||
if _, err := io.ReadFull(c.fd, payload); err != nil {
|
||
return nil, err
|
||
}
|
||
if crc32.ChecksumIEEE(payload) != rec.CRC {
|
||
return nil, fmt.Errorf("%w: offset=%d", ErrCRCMismatch, offset)
|
||
}
|
||
rec.Data = append([]byte(nil), payload...) // 复制出去,复用 buffer
|
||
|
||
// 更新窗口结束索引(移动到下一条记录)
|
||
c.endIdx++
|
||
|
||
return &rec, nil
|
||
}
|
||
|
||
// NextRange 读取指定数量的记录(范围游动)
|
||
// count: 要读取的记录数量
|
||
// 返回:读取到的记录列表,如果到达文件末尾,返回的记录数可能少于 count
|
||
func (c *ProcessCursor) NextRange(count int) ([]*Record, error) {
|
||
if count <= 0 {
|
||
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
|
||
}
|
||
|
||
results := make([]*Record, 0, count)
|
||
|
||
for range count {
|
||
rec, err := c.Next()
|
||
if err != nil {
|
||
if err == io.EOF && len(results) > 0 {
|
||
// 已经读取了一些记录,返回这些记录
|
||
return results, nil
|
||
}
|
||
return results, err
|
||
}
|
||
results = append(results, rec)
|
||
}
|
||
|
||
return results, nil
|
||
}
|
||
|
||
// Commit 提交窗口,将 endIdx 移动到 startIdx(表示已处理完这批记录)
|
||
func (c *ProcessCursor) Commit() {
|
||
c.startIdx = c.endIdx
|
||
}
|
||
|
||
// Rollback 回滚窗口,将 endIdx 回退到 startIdx(表示放弃这批记录的处理)
|
||
func (c *ProcessCursor) Rollback() error {
|
||
c.endIdx = c.startIdx
|
||
return nil
|
||
}
|
||
|
||
// StartIndex 获取窗口开始索引
|
||
func (c *ProcessCursor) StartIndex() int {
|
||
return c.startIdx
|
||
}
|
||
|
||
// EndIndex 获取窗口结束索引
|
||
func (c *ProcessCursor) EndIndex() int {
|
||
return c.endIdx
|
||
}
|
||
|
||
// Close 关闭游标并保存位置
|
||
func (c *ProcessCursor) Close() error {
|
||
c.savePosition()
|
||
return c.fd.Close()
|
||
}
|
||
|
||
// Reset 重置游标,删除位置文件并重新打开日志文件
|
||
// 保持 index 和 writer 引用不变
|
||
func (c *ProcessCursor) Reset() error {
|
||
// 关闭文件
|
||
if c.fd != nil {
|
||
if err := c.fd.Close(); err != nil {
|
||
return err
|
||
}
|
||
c.fd = nil
|
||
}
|
||
|
||
// 删除位置文件
|
||
if err := os.Remove(c.posFile); err != nil && !os.IsNotExist(err) {
|
||
return err
|
||
}
|
||
|
||
// 重新打开日志文件
|
||
fd, err := os.Open(c.path)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 重置状态
|
||
c.fd = fd
|
||
c.startIdx = 0
|
||
c.endIdx = 0
|
||
|
||
return nil
|
||
}
|
||
|
||
// savePosition 保存当前读取位置到文件
|
||
func (c *ProcessCursor) savePosition() error {
|
||
f, err := os.Create(c.posFile)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer f.Close()
|
||
|
||
buf := make([]byte, 4)
|
||
// 保存 startIdx(已处理的索引)
|
||
binary.LittleEndian.PutUint32(buf, uint32(c.startIdx))
|
||
_, err = f.Write(buf)
|
||
return err
|
||
}
|
||
|
||
// loadPosition 从文件加载上次的读取位置
|
||
func (c *ProcessCursor) loadPosition() error {
|
||
f, err := os.Open(c.posFile)
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
return nil // 文件不存在,从头开始
|
||
}
|
||
return err
|
||
}
|
||
defer f.Close()
|
||
|
||
buf := make([]byte, 4)
|
||
if _, err := io.ReadFull(f, buf); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 加载 startIdx
|
||
c.startIdx = int(binary.LittleEndian.Uint32(buf))
|
||
c.endIdx = c.startIdx
|
||
|
||
return nil
|
||
}
|