为所有核心组件添加 Reset() 方法: - LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变 - RecordIndex.Reset(): 清空索引数据并重新创建索引文件 - RecordQuery.Reset(): 关闭并重新打开日志文件 - ProcessCursor.Reset(): 删除位置文件并重置游标位置 - LogTailer.Reset(): 重置内部 channel 状态 优化 TopicProcessor.Reset() 实现: - 不再销毁和重建组件对象 - 通过调用各组件的 Reset() 方法重置状态 - 保持组件间引用关系稳定 - 减少代码行数约 20 行 - 避免空指针风险和内存分配开销 代码改进: - LogWriter 添加 path 字段用于重置 - 移除 topic_processor.go 中未使用的 os import - 职责分离更清晰,每个组件管理自己的重置逻辑 测试结果: - TestTopicReset: PASS - TestTopicResetWithPendingRecords: PASS - 所有 TopicProcessor 相关测试通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
146 lines
3.2 KiB
Go
146 lines
3.2 KiB
Go
package seqlog
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"hash/crc32"
|
||
"os"
|
||
"sync"
|
||
|
||
"github.com/google/uuid"
|
||
)
|
||
|
||
// LogWriter 日志写入器
|
||
type LogWriter struct {
|
||
path string // 日志文件路径
|
||
fd *os.File
|
||
off int64 // 当前写入偏移
|
||
dirtyOff int64 // 最后一次写入偏移
|
||
wbuf []byte // 8 MiB 复用
|
||
index *RecordIndex // 索引管理器(可选)
|
||
mu sync.RWMutex // 保护 off 字段
|
||
}
|
||
|
||
// NewLogWriter 创建一个新的日志写入器
|
||
// index: 外部提供的索引管理器,用于在多个组件间共享
|
||
func NewLogWriter(path string, index *RecordIndex) (*LogWriter, error) {
|
||
if index == nil {
|
||
return nil, os.ErrInvalid
|
||
}
|
||
|
||
fd, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
off, _ := fd.Seek(0, 2) // 跳到尾部
|
||
|
||
w := &LogWriter{
|
||
path: path,
|
||
fd: fd,
|
||
off: off,
|
||
dirtyOff: -1,
|
||
wbuf: make([]byte, 0, 8<<20),
|
||
index: index,
|
||
}
|
||
|
||
return w, nil
|
||
}
|
||
|
||
// Append 追加一条日志记录,返回该记录的偏移量
|
||
func (w *LogWriter) Append(data []byte) (int64, error) {
|
||
w.mu.Lock()
|
||
defer w.mu.Unlock()
|
||
|
||
// 记录当前偏移(返回给调用者,用于索引)
|
||
offset := w.off
|
||
|
||
w.dirtyOff = offset
|
||
defer func() {
|
||
w.dirtyOff = -1
|
||
}()
|
||
|
||
// 生成 UUID v4
|
||
id := uuid.New()
|
||
|
||
// 编码:[4B len][8B offset][4B CRC][16B UUID][data]
|
||
buf := w.wbuf[:0]
|
||
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(data)))
|
||
buf = binary.LittleEndian.AppendUint64(buf, uint64(offset))
|
||
buf = binary.LittleEndian.AppendUint32(buf, crc32.ChecksumIEEE(data))
|
||
buf = append(buf, id[:]...)
|
||
buf = append(buf, data...)
|
||
|
||
// 落盘 + sync
|
||
if _, err := w.fd.Write(buf); err != nil {
|
||
return 0, err
|
||
}
|
||
if err := w.fd.Sync(); err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
// 数据写入成功,立即更新偏移量(保证 w.off 和文件大小一致)
|
||
w.off += int64(len(buf))
|
||
|
||
// 更新索引(如果索引失败,数据已持久化,依赖启动时 rebuild 恢复)
|
||
if err := w.index.Append(offset); err != nil {
|
||
// 索引失败不影响 w.off,因为数据已经写入
|
||
return 0, err
|
||
}
|
||
|
||
return offset, nil
|
||
}
|
||
|
||
// GetWriteOffset 获取当前写入偏移量(线程安全)
|
||
func (w *LogWriter) GetWriteOffset() int64 {
|
||
w.mu.RLock()
|
||
defer w.mu.RUnlock()
|
||
return w.off
|
||
}
|
||
|
||
func (w *LogWriter) GetDirtyOffset() int64 {
|
||
w.mu.RLock()
|
||
defer w.mu.RUnlock()
|
||
return w.dirtyOff
|
||
}
|
||
|
||
// Close 关闭写入器
|
||
// 注意:不关闭 index,因为 index 是外部管理的共享资源
|
||
func (w *LogWriter) Close() error {
|
||
if w.fd == nil {
|
||
return nil
|
||
}
|
||
return w.fd.Close()
|
||
}
|
||
|
||
// Reset 重置写入器,删除日志文件并重新创建
|
||
// 保持 index 和 wbuf 引用不变
|
||
func (w *LogWriter) Reset() error {
|
||
w.mu.Lock()
|
||
defer w.mu.Unlock()
|
||
|
||
// 关闭当前文件句柄
|
||
if w.fd != nil {
|
||
if err := w.fd.Close(); err != nil {
|
||
return err
|
||
}
|
||
w.fd = nil
|
||
}
|
||
|
||
// 删除日志文件
|
||
if err := os.Remove(w.path); err != nil && !os.IsNotExist(err) {
|
||
return err
|
||
}
|
||
|
||
// 重新创建文件
|
||
fd, err := os.OpenFile(w.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 重置状态
|
||
w.fd = fd
|
||
w.off = 0
|
||
w.dirtyOff = -1
|
||
|
||
return nil
|
||
}
|