为所有核心组件添加 Reset() 方法: - LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变 - RecordIndex.Reset(): 清空索引数据并重新创建索引文件 - RecordQuery.Reset(): 关闭并重新打开日志文件 - ProcessCursor.Reset(): 删除位置文件并重置游标位置 - LogTailer.Reset(): 重置内部 channel 状态 优化 TopicProcessor.Reset() 实现: - 不再销毁和重建组件对象 - 通过调用各组件的 Reset() 方法重置状态 - 保持组件间引用关系稳定 - 减少代码行数约 20 行 - 避免空指针风险和内存分配开销 代码改进: - LogWriter 添加 path 字段用于重置 - 移除 topic_processor.go 中未使用的 os import - 职责分离更清晰,每个组件管理自己的重置逻辑 测试结果: - TestTopicReset: PASS - TestTopicResetWithPendingRecords: PASS - 所有 TopicProcessor 相关测试通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
488 lines
14 KiB
Go
488 lines
14 KiB
Go
package seqlog
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"fmt"
|
||
"io"
|
||
"os"
|
||
"unicode/utf8"
|
||
|
||
"github.com/google/uuid"
|
||
)
|
||
|
||
// RecordStatus 记录处理状态
|
||
type RecordStatus int
|
||
|
||
const (
|
||
StatusProcessed RecordStatus = iota // 已处理
|
||
StatusProcessing // 处理中(当前位置)
|
||
StatusPending // 待处理
|
||
StatusUnavailable // 不可用(尚未写入)
|
||
)
|
||
|
||
// String 返回状态的字符串表示
|
||
func (s RecordStatus) String() string {
|
||
switch s {
|
||
case StatusProcessed:
|
||
return "StatusProcessed"
|
||
case StatusProcessing:
|
||
return "StatusProcessing"
|
||
case StatusPending:
|
||
return "StatusPending"
|
||
case StatusUnavailable:
|
||
return "StatusUnavailable"
|
||
default:
|
||
return "StatusUnknown"
|
||
}
|
||
}
|
||
|
||
// RecordWithStatus 带状态的记录
|
||
type RecordWithStatus struct {
|
||
Record *Record
|
||
Index int // 记录在日志文件中的索引位置
|
||
Status RecordStatus // 记录的处理状态
|
||
}
|
||
|
||
// RecordWithIndex 带索引的记录
|
||
type RecordWithIndex struct {
|
||
Record *Record
|
||
Index int // 记录在日志文件中的索引位置
|
||
}
|
||
|
||
// RecordMetadata 记录元数据(不包含完整数据)
|
||
type RecordMetadata struct {
|
||
Index int // 记录索引
|
||
UUID uuid.UUID // UUID
|
||
DataSize uint32 // 数据大小(字节)
|
||
DataPreview string // 数据预览(前 200 个字符)
|
||
Full bool
|
||
}
|
||
|
||
// RecordMetadataWithStatus 带状态的记录元数据
|
||
type RecordMetadataWithStatus struct {
|
||
Metadata *RecordMetadata
|
||
Status RecordStatus // 记录的处理状态
|
||
}
|
||
|
||
// RecordQuery 记录查询器
|
||
type RecordQuery struct {
|
||
logPath string
|
||
fd *os.File
|
||
rbuf []byte // 复用读缓冲区
|
||
index *RecordIndex // 索引文件管理器(来自外部)
|
||
writer *LogWriter // 日志写入器(来自外部)
|
||
}
|
||
|
||
// NewRecordQuery 创建记录查询器
|
||
// index 参数必须由外部提供,确保所有组件使用同一个索引实例
|
||
func NewRecordQuery(logPath string, index *RecordIndex, writer *LogWriter) (*RecordQuery, error) {
|
||
if index == nil {
|
||
return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter)
|
||
}
|
||
if writer == nil {
|
||
return nil, NewValidationError("writer", "writer cannot be nil", ErrNilParameter)
|
||
}
|
||
|
||
fd, err := os.Open(logPath)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("open log file: %w", err)
|
||
}
|
||
|
||
rq := &RecordQuery{
|
||
logPath: logPath,
|
||
fd: fd,
|
||
rbuf: make([]byte, 8<<20), // 8 MiB 缓冲区
|
||
index: index,
|
||
writer: writer,
|
||
}
|
||
|
||
return rq, nil
|
||
}
|
||
|
||
// readRecordsMetadataForward 从指定索引位置向前顺序读取记录元数据(不读取完整 Data,但读取预览)
|
||
// startIndex: 起始记录索引
|
||
// count: 读取数量
|
||
func (rq *RecordQuery) readRecordsMetadataForward(startIndex, count int) ([]*RecordMetadata, error) {
|
||
// 获取起始 offset
|
||
startOffset, err := rq.index.GetOffset(startIndex)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("get start offset: %w", err)
|
||
}
|
||
|
||
if _, err := rq.fd.Seek(startOffset, 0); err != nil {
|
||
return nil, fmt.Errorf("seek to offset %d: %w", startOffset, err)
|
||
}
|
||
|
||
results := make([]*RecordMetadata, 0, count)
|
||
currentIndex := startIndex
|
||
|
||
for len(results) < count {
|
||
// 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节
|
||
hdr := rq.rbuf[:32]
|
||
if _, err := io.ReadFull(rq.fd, hdr); err != nil {
|
||
if err == io.EOF {
|
||
break
|
||
}
|
||
return nil, fmt.Errorf("read header at index %d: %w", currentIndex, err)
|
||
}
|
||
|
||
dataOffset := binary.LittleEndian.Uint64(hdr[4:12])
|
||
// 写入保护:如果 writer 存在,检查是否正在写入该记录
|
||
if rq.writer != nil {
|
||
dirtyOffset := rq.writer.GetDirtyOffset()
|
||
// 如果正在写入(dirtyOffset >= 0)且记录位置 >= 写入位置,等待写入完成
|
||
if dirtyOffset >= 0 && dataOffset >= uint64(dirtyOffset) {
|
||
break
|
||
}
|
||
}
|
||
|
||
dataLen := binary.LittleEndian.Uint32(hdr[0:4])
|
||
var uuidBytes [16]byte
|
||
copy(uuidBytes[:], hdr[16:32])
|
||
|
||
// 读取数据预览(最多 200 字节)
|
||
previewSize := min(int(dataLen), 200)
|
||
|
||
previewData := make([]byte, previewSize)
|
||
if _, err := io.ReadFull(rq.fd, previewData); err != nil {
|
||
if err == io.EOF {
|
||
break
|
||
}
|
||
return nil, fmt.Errorf("read preview at index %d: %w", currentIndex, err)
|
||
}
|
||
|
||
// 确保预览数据不会在 UTF-8 字符中间截断
|
||
validPreviewSize := previewSize
|
||
if previewSize > 0 && previewSize < int(dataLen) {
|
||
// 只有在截断的情况下才需要检查
|
||
// 从后往前最多检查 3 个字节,找到最后一个完整的 UTF-8 字符边界
|
||
for i := 0; i < 3 && validPreviewSize > 0; i++ {
|
||
if utf8.Valid(previewData[:validPreviewSize]) {
|
||
break
|
||
}
|
||
validPreviewSize--
|
||
}
|
||
}
|
||
|
||
metadata := &RecordMetadata{
|
||
Index: currentIndex,
|
||
UUID: uuidBytes,
|
||
DataSize: dataLen,
|
||
DataPreview: string(previewData[:validPreviewSize]),
|
||
Full: previewSize == int(dataLen),
|
||
}
|
||
|
||
// 跳过剩余数据部分
|
||
remainingSize := int64(dataLen) - int64(previewSize)
|
||
if remainingSize > 0 {
|
||
if _, err := rq.fd.Seek(remainingSize, 1); err != nil {
|
||
if err == io.EOF {
|
||
break
|
||
}
|
||
return nil, fmt.Errorf("skip remaining data at index %d: %w", currentIndex, err)
|
||
}
|
||
}
|
||
|
||
results = append(results, metadata)
|
||
currentIndex++
|
||
}
|
||
|
||
return results, nil
|
||
}
|
||
|
||
// readRecordsForward 从指定索引位置向前顺序读取记录
|
||
// startIndex: 起始记录索引
|
||
// count: 读取数量
|
||
func (rq *RecordQuery) readRecordsForward(startIndex, count int) ([]*Record, error) {
|
||
// 获取起始 offset
|
||
startOffset, err := rq.index.GetOffset(startIndex)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("get start offset: %w", err)
|
||
}
|
||
|
||
if _, err := rq.fd.Seek(startOffset, 0); err != nil {
|
||
return nil, fmt.Errorf("seek to offset %d: %w", startOffset, err)
|
||
}
|
||
|
||
results := make([]*Record, 0, count)
|
||
currentOffset := startOffset
|
||
|
||
for len(results) < count {
|
||
// 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节
|
||
hdr := rq.rbuf[:32]
|
||
if _, err := io.ReadFull(rq.fd, hdr); err != nil {
|
||
if err == io.EOF {
|
||
break
|
||
}
|
||
return nil, fmt.Errorf("read header at offset %d: %w", currentOffset, err)
|
||
}
|
||
|
||
rec := &Record{
|
||
Len: binary.LittleEndian.Uint32(hdr[0:4]),
|
||
// hdr[4:12] 是 offset,读取时不需要使用
|
||
CRC: binary.LittleEndian.Uint32(hdr[12:16]),
|
||
}
|
||
copy(rec.UUID[:], hdr[16:32])
|
||
|
||
// 读取数据
|
||
rec.Data = make([]byte, rec.Len)
|
||
if _, err := io.ReadFull(rq.fd, rec.Data); err != nil {
|
||
// 如果遇到 EOF,说明文件可能不完整(被截断或索引不一致)
|
||
// 返回已读取的记录,而不是报错
|
||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||
break
|
||
}
|
||
return nil, fmt.Errorf("read data at offset %d: %w", currentOffset, err)
|
||
}
|
||
|
||
results = append(results, rec)
|
||
currentOffset += 32 + int64(rec.Len)
|
||
}
|
||
|
||
return results, nil
|
||
}
|
||
|
||
// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录)
|
||
// refIndex: 参考索引位置
|
||
// count: 查询数量
|
||
// 返回的记录按索引递增方向排序,包含索引信息
|
||
// 例如:QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5),返回 [2, 3, 4]
|
||
func (rq *RecordQuery) QueryOldest(refIndex, count int) ([]*RecordWithIndex, error) {
|
||
if count <= 0 {
|
||
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
|
||
}
|
||
|
||
totalCount := rq.index.Count()
|
||
if totalCount == 0 {
|
||
return []*RecordWithIndex{}, nil
|
||
}
|
||
|
||
// 验证参考索引范围(严格模式)
|
||
if refIndex < 0 || refIndex > totalCount {
|
||
return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [0, %d]", refIndex, totalCount), ErrInvalidRange)
|
||
}
|
||
|
||
// 计算实际起始索引(向索引递减方向)
|
||
startIndex := refIndex - count
|
||
if startIndex < 0 {
|
||
startIndex = 0
|
||
count = refIndex // 调整实际数量
|
||
}
|
||
|
||
if count <= 0 {
|
||
return []*RecordWithIndex{}, nil
|
||
}
|
||
|
||
// 读取记录
|
||
records, err := rq.readRecordsForward(startIndex, count)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 转换为带索引的记录
|
||
results := make([]*RecordWithIndex, len(records))
|
||
for i, rec := range records {
|
||
results[i] = &RecordWithIndex{
|
||
Record: rec,
|
||
Index: startIndex + i,
|
||
}
|
||
}
|
||
|
||
return results, nil
|
||
}
|
||
|
||
// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录)
|
||
// refIndex: 参考索引位置
|
||
// count: 查询数量
|
||
// 返回的记录按索引递增方向排序,包含索引信息
|
||
// 例如:QueryNewest(5, 3) 查询索引 6, 7, 8(不包含 5),返回 [6, 7, 8]
|
||
func (rq *RecordQuery) QueryNewest(refIndex, count int) ([]*RecordWithIndex, error) {
|
||
if count <= 0 {
|
||
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
|
||
}
|
||
|
||
totalCount := rq.index.Count()
|
||
if totalCount == 0 {
|
||
return []*RecordWithIndex{}, nil
|
||
}
|
||
|
||
// 验证参考索引范围(严格模式)
|
||
// QueryNewest 允许 refIndex = -1(从头开始查询)
|
||
if refIndex < -1 || refIndex >= totalCount {
|
||
return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [-1, %d)", refIndex, totalCount), ErrInvalidRange)
|
||
}
|
||
|
||
// 计算实际起始索引(向索引递增方向)
|
||
startIndex := refIndex + 1
|
||
if startIndex >= totalCount {
|
||
return []*RecordWithIndex{}, nil
|
||
}
|
||
|
||
// 限制查询数量
|
||
remainCount := totalCount - startIndex
|
||
if count > remainCount {
|
||
count = remainCount
|
||
}
|
||
|
||
// 读取记录
|
||
records, err := rq.readRecordsForward(startIndex, count)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 转换为带索引的记录
|
||
results := make([]*RecordWithIndex, len(records))
|
||
for i, rec := range records {
|
||
results[i] = &RecordWithIndex{
|
||
Record: rec,
|
||
Index: startIndex + i,
|
||
}
|
||
}
|
||
|
||
return results, nil
|
||
}
|
||
|
||
// QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据)
|
||
// refIndex: 参考索引位置
|
||
// count: 查询数量
|
||
// 返回的记录按索引递增方向排序,只包含元数据信息
|
||
// 例如:QueryOldestMetadata(5, 3) 查询索引 2, 3, 4(不包含 5),返回 [2, 3, 4]
|
||
func (rq *RecordQuery) QueryOldestMetadata(refIndex, count int) ([]*RecordMetadata, error) {
|
||
if count <= 0 {
|
||
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
|
||
}
|
||
|
||
totalCount := rq.index.Count()
|
||
if totalCount == 0 {
|
||
return []*RecordMetadata{}, nil
|
||
}
|
||
|
||
// 验证参考索引范围(严格模式)
|
||
if refIndex < 0 || refIndex > totalCount {
|
||
return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [0, %d]", refIndex, totalCount), ErrInvalidRange)
|
||
}
|
||
|
||
// 计算实际起始索引(向索引递减方向)
|
||
startIndex := refIndex - count
|
||
if startIndex < 0 {
|
||
startIndex = 0
|
||
count = refIndex // 调整实际数量
|
||
}
|
||
|
||
if count <= 0 {
|
||
return []*RecordMetadata{}, nil
|
||
}
|
||
|
||
// 读取元数据
|
||
return rq.readRecordsMetadataForward(startIndex, count)
|
||
}
|
||
|
||
// QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据)
|
||
// refIndex: 参考索引位置
|
||
// count: 查询数量
|
||
// 返回的记录按索引递增方向排序,只包含元数据信息
|
||
// 例如:QueryNewestMetadata(5, 3) 查询索引 6, 7, 8(不包含 5),返回 [6, 7, 8]
|
||
func (rq *RecordQuery) QueryNewestMetadata(refIndex, count int) ([]*RecordMetadata, error) {
|
||
if count <= 0 {
|
||
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
|
||
}
|
||
|
||
totalCount := rq.index.Count()
|
||
if totalCount == 0 {
|
||
return []*RecordMetadata{}, nil
|
||
}
|
||
|
||
// 验证参考索引范围(严格模式)
|
||
// QueryNewestMetadata 允许 refIndex = -1(从头开始查询)
|
||
if refIndex < -1 || refIndex >= totalCount {
|
||
return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [-1, %d)", refIndex, totalCount), ErrInvalidRange)
|
||
}
|
||
|
||
// 计算实际起始索引(向索引递增方向)
|
||
startIndex := refIndex + 1
|
||
if startIndex >= totalCount {
|
||
return []*RecordMetadata{}, nil
|
||
}
|
||
|
||
// 限制查询数量
|
||
remainCount := totalCount - startIndex
|
||
if count > remainCount {
|
||
count = remainCount
|
||
}
|
||
|
||
// 读取元数据
|
||
return rq.readRecordsMetadataForward(startIndex, count)
|
||
}
|
||
|
||
// QueryByIndex 根据索引查询单条记录的完整数据
|
||
// index: 记录索引
|
||
// 返回完整的记录数据
|
||
func (rq *RecordQuery) QueryByIndex(index int) (*Record, error) {
|
||
totalCount := rq.index.Count()
|
||
if index < 0 || index >= totalCount {
|
||
return nil, NewValidationError("index", fmt.Sprintf("index %d out of range [0, %d)", index, totalCount), ErrInvalidRange)
|
||
}
|
||
|
||
// 读取单条记录
|
||
records, err := rq.readRecordsForward(index, 1)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if len(records) == 0 {
|
||
return nil, fmt.Errorf("record at index %d not found", index)
|
||
}
|
||
|
||
return records[0], nil
|
||
}
|
||
|
||
// GetRecordCount 获取记录总数
|
||
func (rq *RecordQuery) GetRecordCount() (int, error) {
|
||
return rq.index.Count(), nil
|
||
}
|
||
|
||
// GetRecordStatus 根据游标窗口索引位置获取记录状态
|
||
// recordIndex: 记录索引
|
||
// startIdx: 窗口开始索引(已处理位置)
|
||
// endIdx: 窗口结束索引(当前读取位置)
|
||
func GetRecordStatus(recordIndex, startIdx, endIdx int) RecordStatus {
|
||
if recordIndex < startIdx {
|
||
return StatusProcessed
|
||
} else if recordIndex >= startIdx && recordIndex < endIdx {
|
||
return StatusProcessing
|
||
} else {
|
||
return StatusPending
|
||
}
|
||
}
|
||
|
||
// Close 关闭查询器
|
||
// 注意:不关闭 index,因为 index 是外部管理的
|
||
func (rq *RecordQuery) Close() error {
|
||
// 只关闭日志文件
|
||
if rq.fd != nil {
|
||
return rq.fd.Close()
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Reset 重置查询器,关闭并重新打开日志文件
|
||
// 保持 index 和 writer 引用不变
|
||
func (rq *RecordQuery) Reset() error {
|
||
// 关闭当前文件句柄
|
||
if rq.fd != nil {
|
||
if err := rq.fd.Close(); err != nil {
|
||
return err
|
||
}
|
||
rq.fd = nil
|
||
}
|
||
|
||
// 重新打开日志文件
|
||
fd, err := os.Open(rq.logPath)
|
||
if err != nil {
|
||
return fmt.Errorf("reopen log file: %w", err)
|
||
}
|
||
|
||
rq.fd = fd
|
||
return nil
|
||
}
|