package seqlog import ( "encoding/binary" "hash/crc32" "os" "sync" "github.com/google/uuid" ) // LogWriter 日志写入器 type LogWriter struct { path string // 日志文件路径 fd *os.File off int64 // 当前写入偏移 dirtyOff int64 // 最后一次写入偏移 wbuf []byte // 8 MiB 复用 index *RecordIndex // 索引管理器(可选) mu sync.RWMutex // 保护 off 字段 } // NewLogWriter 创建一个新的日志写入器 // index: 外部提供的索引管理器,用于在多个组件间共享 func NewLogWriter(path string, index *RecordIndex) (*LogWriter, error) { if index == nil { return nil, os.ErrInvalid } fd, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return nil, err } off, _ := fd.Seek(0, 2) // 跳到尾部 w := &LogWriter{ path: path, fd: fd, off: off, dirtyOff: -1, wbuf: make([]byte, 0, 8<<20), index: index, } return w, nil } // Append 追加一条日志记录,返回该记录的偏移量 func (w *LogWriter) Append(data []byte) (int64, error) { w.mu.Lock() defer w.mu.Unlock() // 记录当前偏移(返回给调用者,用于索引) offset := w.off w.dirtyOff = offset defer func() { w.dirtyOff = -1 }() // 生成 UUID v4 id := uuid.New() // 编码:[4B len][8B offset][4B CRC][16B UUID][data] buf := w.wbuf[:0] buf = binary.LittleEndian.AppendUint32(buf, uint32(len(data))) buf = binary.LittleEndian.AppendUint64(buf, uint64(offset)) buf = binary.LittleEndian.AppendUint32(buf, crc32.ChecksumIEEE(data)) buf = append(buf, id[:]...) buf = append(buf, data...) // 落盘 + sync if _, err := w.fd.Write(buf); err != nil { return 0, err } if err := w.fd.Sync(); err != nil { return 0, err } // 数据写入成功,立即更新偏移量(保证 w.off 和文件大小一致) w.off += int64(len(buf)) // 更新索引(如果索引失败,数据已持久化,依赖启动时 rebuild 恢复) if err := w.index.Append(offset); err != nil { // 索引失败不影响 w.off,因为数据已经写入 return 0, err } return offset, nil } // GetWriteOffset 获取当前写入偏移量(线程安全) func (w *LogWriter) GetWriteOffset() int64 { w.mu.RLock() defer w.mu.RUnlock() return w.off } func (w *LogWriter) GetDirtyOffset() int64 { w.mu.RLock() defer w.mu.RUnlock() return w.dirtyOff } // Close 关闭写入器 // 注意:不关闭 index,因为 index 是外部管理的共享资源 func (w *LogWriter) Close() error { if w.fd == nil { return nil } return w.fd.Close() } // Reset 重置写入器,删除日志文件并重新创建 // 保持 index 和 wbuf 引用不变 func (w *LogWriter) Reset() error { w.mu.Lock() defer w.mu.Unlock() // 关闭当前文件句柄 if w.fd != nil { if err := w.fd.Close(); err != nil { return err } w.fd = nil } // 删除日志文件 if err := os.Remove(w.path); err != nil && !os.IsNotExist(err) { return err } // 重新创建文件 fd, err := os.OpenFile(w.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return err } // 重置状态 w.fd = fd w.off = 0 w.dirtyOff = -1 return nil }