Files
seqlog/writer.go
bourdon 6fb0731935 重构:为核心组件实现 Reset 方法优化重置机制
为所有核心组件添加 Reset() 方法:
- LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变
- RecordIndex.Reset(): 清空索引数据并重新创建索引文件
- RecordQuery.Reset(): 关闭并重新打开日志文件
- ProcessCursor.Reset(): 删除位置文件并重置游标位置
- LogTailer.Reset(): 重置内部 channel 状态

优化 TopicProcessor.Reset() 实现:
- 不再销毁和重建组件对象
- 通过调用各组件的 Reset() 方法重置状态
- 保持组件间引用关系稳定
- 减少代码行数约 20 行
- 避免空指针风险和内存分配开销

代码改进:
- LogWriter 添加 path 字段用于重置
- 移除 topic_processor.go 中未使用的 os import
- 职责分离更清晰,每个组件管理自己的重置逻辑

测试结果:
- TestTopicReset: PASS
- TestTopicResetWithPendingRecords: PASS
- 所有 TopicProcessor 相关测试通过

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 21:58:54 +08:00

146 lines
3.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"encoding/binary"
"hash/crc32"
"os"
"sync"
"github.com/google/uuid"
)
// LogWriter 日志写入器
type LogWriter struct {
path string // 日志文件路径
fd *os.File
off int64 // 当前写入偏移
dirtyOff int64 // 最后一次写入偏移
wbuf []byte // 8 MiB 复用
index *RecordIndex // 索引管理器(可选)
mu sync.RWMutex // 保护 off 字段
}
// NewLogWriter 创建一个新的日志写入器
// index: 外部提供的索引管理器,用于在多个组件间共享
func NewLogWriter(path string, index *RecordIndex) (*LogWriter, error) {
if index == nil {
return nil, os.ErrInvalid
}
fd, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
off, _ := fd.Seek(0, 2) // 跳到尾部
w := &LogWriter{
path: path,
fd: fd,
off: off,
dirtyOff: -1,
wbuf: make([]byte, 0, 8<<20),
index: index,
}
return w, nil
}
// Append 追加一条日志记录,返回该记录的偏移量
func (w *LogWriter) Append(data []byte) (int64, error) {
w.mu.Lock()
defer w.mu.Unlock()
// 记录当前偏移(返回给调用者,用于索引)
offset := w.off
w.dirtyOff = offset
defer func() {
w.dirtyOff = -1
}()
// 生成 UUID v4
id := uuid.New()
// 编码:[4B len][8B offset][4B CRC][16B UUID][data]
buf := w.wbuf[:0]
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(data)))
buf = binary.LittleEndian.AppendUint64(buf, uint64(offset))
buf = binary.LittleEndian.AppendUint32(buf, crc32.ChecksumIEEE(data))
buf = append(buf, id[:]...)
buf = append(buf, data...)
// 落盘 + sync
if _, err := w.fd.Write(buf); err != nil {
return 0, err
}
if err := w.fd.Sync(); err != nil {
return 0, err
}
// 数据写入成功,立即更新偏移量(保证 w.off 和文件大小一致)
w.off += int64(len(buf))
// 更新索引(如果索引失败,数据已持久化,依赖启动时 rebuild 恢复)
if err := w.index.Append(offset); err != nil {
// 索引失败不影响 w.off因为数据已经写入
return 0, err
}
return offset, nil
}
// GetWriteOffset 获取当前写入偏移量(线程安全)
func (w *LogWriter) GetWriteOffset() int64 {
w.mu.RLock()
defer w.mu.RUnlock()
return w.off
}
func (w *LogWriter) GetDirtyOffset() int64 {
w.mu.RLock()
defer w.mu.RUnlock()
return w.dirtyOff
}
// Close 关闭写入器
// 注意:不关闭 index因为 index 是外部管理的共享资源
func (w *LogWriter) Close() error {
if w.fd == nil {
return nil
}
return w.fd.Close()
}
// Reset 重置写入器,删除日志文件并重新创建
// 保持 index 和 wbuf 引用不变
func (w *LogWriter) Reset() error {
w.mu.Lock()
defer w.mu.Unlock()
// 关闭当前文件句柄
if w.fd != nil {
if err := w.fd.Close(); err != nil {
return err
}
w.fd = nil
}
// 删除日志文件
if err := os.Remove(w.path); err != nil && !os.IsNotExist(err) {
return err
}
// 重新创建文件
fd, err := os.OpenFile(w.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
// 重置状态
w.fd = fd
w.off = 0
w.dirtyOff = -1
return nil
}