Files
seqlog/writer.go

146 lines
3.2 KiB
Go
Raw Permalink Normal View History

重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
package seqlog
import (
"encoding/binary"
"hash/crc32"
"os"
"sync"
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
"github.com/google/uuid"
)
// LogWriter 日志写入器
type LogWriter struct {
path string // 日志文件路径
fd *os.File
off int64 // 当前写入偏移
dirtyOff int64 // 最后一次写入偏移
wbuf []byte // 8 MiB 复用
index *RecordIndex // 索引管理器(可选)
mu sync.RWMutex // 保护 off 字段
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
// NewLogWriter 创建一个新的日志写入器
// index: 外部提供的索引管理器,用于在多个组件间共享
func NewLogWriter(path string, index *RecordIndex) (*LogWriter, error) {
if index == nil {
return nil, os.ErrInvalid
}
fd, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
off, _ := fd.Seek(0, 2) // 跳到尾部
w := &LogWriter{
path: path,
fd: fd,
off: off,
dirtyOff: -1,
wbuf: make([]byte, 0, 8<<20),
index: index,
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
}
return w, nil
}
// Append 追加一条日志记录,返回该记录的偏移量
func (w *LogWriter) Append(data []byte) (int64, error) {
w.mu.Lock()
defer w.mu.Unlock()
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// 记录当前偏移(返回给调用者,用于索引)
offset := w.off
w.dirtyOff = offset
defer func() {
w.dirtyOff = -1
}()
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// 生成 UUID v4
id := uuid.New()
// 编码:[4B len][8B offset][4B CRC][16B UUID][data]
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
buf := w.wbuf[:0]
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(data)))
buf = binary.LittleEndian.AppendUint64(buf, uint64(offset))
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
buf = binary.LittleEndian.AppendUint32(buf, crc32.ChecksumIEEE(data))
buf = append(buf, id[:]...)
buf = append(buf, data...)
// 落盘 + sync
if _, err := w.fd.Write(buf); err != nil {
return 0, err
}
if err := w.fd.Sync(); err != nil {
return 0, err
}
// 数据写入成功,立即更新偏移量(保证 w.off 和文件大小一致)
w.off += int64(len(buf))
// 更新索引(如果索引失败,数据已持久化,依赖启动时 rebuild 恢复)
if err := w.index.Append(offset); err != nil {
// 索引失败不影响 w.off因为数据已经写入
return 0, err
}
return offset, nil
}
// GetWriteOffset 获取当前写入偏移量(线程安全)
func (w *LogWriter) GetWriteOffset() int64 {
w.mu.RLock()
defer w.mu.RUnlock()
return w.off
}
func (w *LogWriter) GetDirtyOffset() int64 {
w.mu.RLock()
defer w.mu.RUnlock()
return w.dirtyOff
}
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
// Close 关闭写入器
// 注意:不关闭 index因为 index 是外部管理的共享资源
func (w *LogWriter) Close() error {
if w.fd == nil {
return nil
}
return w.fd.Close()
}
// Reset 重置写入器,删除日志文件并重新创建
// 保持 index 和 wbuf 引用不变
func (w *LogWriter) Reset() error {
w.mu.Lock()
defer w.mu.Unlock()
// 关闭当前文件句柄
if w.fd != nil {
if err := w.fd.Close(); err != nil {
return err
}
w.fd = nil
}
// 删除日志文件
if err := os.Remove(w.path); err != nil && !os.IsNotExist(err) {
return err
}
// 重新创建文件
fd, err := os.OpenFile(w.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
// 重置状态
w.fd = fd
w.off = 0
w.dirtyOff = -1
return nil
}