Files
seqlog/query.go
bourdon 6fb0731935 重构:为核心组件实现 Reset 方法优化重置机制
为所有核心组件添加 Reset() 方法:
- LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变
- RecordIndex.Reset(): 清空索引数据并重新创建索引文件
- RecordQuery.Reset(): 关闭并重新打开日志文件
- ProcessCursor.Reset(): 删除位置文件并重置游标位置
- LogTailer.Reset(): 重置内部 channel 状态

优化 TopicProcessor.Reset() 实现:
- 不再销毁和重建组件对象
- 通过调用各组件的 Reset() 方法重置状态
- 保持组件间引用关系稳定
- 减少代码行数约 20 行
- 避免空指针风险和内存分配开销

代码改进:
- LogWriter 添加 path 字段用于重置
- 移除 topic_processor.go 中未使用的 os import
- 职责分离更清晰,每个组件管理自己的重置逻辑

测试结果:
- TestTopicReset: PASS
- TestTopicResetWithPendingRecords: PASS
- 所有 TopicProcessor 相关测试通过

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 21:58:54 +08:00

488 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"encoding/binary"
"fmt"
"io"
"os"
"unicode/utf8"
"github.com/google/uuid"
)
// RecordStatus 记录处理状态
type RecordStatus int
const (
StatusProcessed RecordStatus = iota // 已处理
StatusProcessing // 处理中(当前位置)
StatusPending // 待处理
StatusUnavailable // 不可用(尚未写入)
)
// String 返回状态的字符串表示
func (s RecordStatus) String() string {
switch s {
case StatusProcessed:
return "StatusProcessed"
case StatusProcessing:
return "StatusProcessing"
case StatusPending:
return "StatusPending"
case StatusUnavailable:
return "StatusUnavailable"
default:
return "StatusUnknown"
}
}
// RecordWithStatus 带状态的记录
type RecordWithStatus struct {
Record *Record
Index int // 记录在日志文件中的索引位置
Status RecordStatus // 记录的处理状态
}
// RecordWithIndex 带索引的记录
type RecordWithIndex struct {
Record *Record
Index int // 记录在日志文件中的索引位置
}
// RecordMetadata 记录元数据(不包含完整数据)
type RecordMetadata struct {
Index int // 记录索引
UUID uuid.UUID // UUID
DataSize uint32 // 数据大小(字节)
DataPreview string // 数据预览(前 200 个字符)
Full bool
}
// RecordMetadataWithStatus 带状态的记录元数据
type RecordMetadataWithStatus struct {
Metadata *RecordMetadata
Status RecordStatus // 记录的处理状态
}
// RecordQuery 记录查询器
type RecordQuery struct {
logPath string
fd *os.File
rbuf []byte // 复用读缓冲区
index *RecordIndex // 索引文件管理器(来自外部)
writer *LogWriter // 日志写入器(来自外部)
}
// NewRecordQuery 创建记录查询器
// index 参数必须由外部提供,确保所有组件使用同一个索引实例
func NewRecordQuery(logPath string, index *RecordIndex, writer *LogWriter) (*RecordQuery, error) {
if index == nil {
return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter)
}
if writer == nil {
return nil, NewValidationError("writer", "writer cannot be nil", ErrNilParameter)
}
fd, err := os.Open(logPath)
if err != nil {
return nil, fmt.Errorf("open log file: %w", err)
}
rq := &RecordQuery{
logPath: logPath,
fd: fd,
rbuf: make([]byte, 8<<20), // 8 MiB 缓冲区
index: index,
writer: writer,
}
return rq, nil
}
// readRecordsMetadataForward 从指定索引位置向前顺序读取记录元数据(不读取完整 Data但读取预览
// startIndex: 起始记录索引
// count: 读取数量
func (rq *RecordQuery) readRecordsMetadataForward(startIndex, count int) ([]*RecordMetadata, error) {
// 获取起始 offset
startOffset, err := rq.index.GetOffset(startIndex)
if err != nil {
return nil, fmt.Errorf("get start offset: %w", err)
}
if _, err := rq.fd.Seek(startOffset, 0); err != nil {
return nil, fmt.Errorf("seek to offset %d: %w", startOffset, err)
}
results := make([]*RecordMetadata, 0, count)
currentIndex := startIndex
for len(results) < count {
// 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节
hdr := rq.rbuf[:32]
if _, err := io.ReadFull(rq.fd, hdr); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("read header at index %d: %w", currentIndex, err)
}
dataOffset := binary.LittleEndian.Uint64(hdr[4:12])
// 写入保护:如果 writer 存在,检查是否正在写入该记录
if rq.writer != nil {
dirtyOffset := rq.writer.GetDirtyOffset()
// 如果正在写入dirtyOffset >= 0且记录位置 >= 写入位置,等待写入完成
if dirtyOffset >= 0 && dataOffset >= uint64(dirtyOffset) {
break
}
}
dataLen := binary.LittleEndian.Uint32(hdr[0:4])
var uuidBytes [16]byte
copy(uuidBytes[:], hdr[16:32])
// 读取数据预览(最多 200 字节)
previewSize := min(int(dataLen), 200)
previewData := make([]byte, previewSize)
if _, err := io.ReadFull(rq.fd, previewData); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("read preview at index %d: %w", currentIndex, err)
}
// 确保预览数据不会在 UTF-8 字符中间截断
validPreviewSize := previewSize
if previewSize > 0 && previewSize < int(dataLen) {
// 只有在截断的情况下才需要检查
// 从后往前最多检查 3 个字节,找到最后一个完整的 UTF-8 字符边界
for i := 0; i < 3 && validPreviewSize > 0; i++ {
if utf8.Valid(previewData[:validPreviewSize]) {
break
}
validPreviewSize--
}
}
metadata := &RecordMetadata{
Index: currentIndex,
UUID: uuidBytes,
DataSize: dataLen,
DataPreview: string(previewData[:validPreviewSize]),
Full: previewSize == int(dataLen),
}
// 跳过剩余数据部分
remainingSize := int64(dataLen) - int64(previewSize)
if remainingSize > 0 {
if _, err := rq.fd.Seek(remainingSize, 1); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("skip remaining data at index %d: %w", currentIndex, err)
}
}
results = append(results, metadata)
currentIndex++
}
return results, nil
}
// readRecordsForward 从指定索引位置向前顺序读取记录
// startIndex: 起始记录索引
// count: 读取数量
func (rq *RecordQuery) readRecordsForward(startIndex, count int) ([]*Record, error) {
// 获取起始 offset
startOffset, err := rq.index.GetOffset(startIndex)
if err != nil {
return nil, fmt.Errorf("get start offset: %w", err)
}
if _, err := rq.fd.Seek(startOffset, 0); err != nil {
return nil, fmt.Errorf("seek to offset %d: %w", startOffset, err)
}
results := make([]*Record, 0, count)
currentOffset := startOffset
for len(results) < count {
// 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节
hdr := rq.rbuf[:32]
if _, err := io.ReadFull(rq.fd, hdr); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("read header at offset %d: %w", currentOffset, err)
}
rec := &Record{
Len: binary.LittleEndian.Uint32(hdr[0:4]),
// hdr[4:12] 是 offset读取时不需要使用
CRC: binary.LittleEndian.Uint32(hdr[12:16]),
}
copy(rec.UUID[:], hdr[16:32])
// 读取数据
rec.Data = make([]byte, rec.Len)
if _, err := io.ReadFull(rq.fd, rec.Data); err != nil {
// 如果遇到 EOF说明文件可能不完整被截断或索引不一致
// 返回已读取的记录,而不是报错
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return nil, fmt.Errorf("read data at offset %d: %w", currentOffset, err)
}
results = append(results, rec)
currentOffset += 32 + int64(rec.Len)
}
return results, nil
}
// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录按索引递增方向排序,包含索引信息
// 例如QueryOldest(5, 3) 查询索引 2, 3, 4不包含 5返回 [2, 3, 4]
func (rq *RecordQuery) QueryOldest(refIndex, count int) ([]*RecordWithIndex, error) {
if count <= 0 {
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
}
totalCount := rq.index.Count()
if totalCount == 0 {
return []*RecordWithIndex{}, nil
}
// 验证参考索引范围(严格模式)
if refIndex < 0 || refIndex > totalCount {
return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [0, %d]", refIndex, totalCount), ErrInvalidRange)
}
// 计算实际起始索引(向索引递减方向)
startIndex := refIndex - count
if startIndex < 0 {
startIndex = 0
count = refIndex // 调整实际数量
}
if count <= 0 {
return []*RecordWithIndex{}, nil
}
// 读取记录
records, err := rq.readRecordsForward(startIndex, count)
if err != nil {
return nil, err
}
// 转换为带索引的记录
results := make([]*RecordWithIndex, len(records))
for i, rec := range records {
results[i] = &RecordWithIndex{
Record: rec,
Index: startIndex + i,
}
}
return results, nil
}
// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录按索引递增方向排序,包含索引信息
// 例如QueryNewest(5, 3) 查询索引 6, 7, 8不包含 5返回 [6, 7, 8]
func (rq *RecordQuery) QueryNewest(refIndex, count int) ([]*RecordWithIndex, error) {
if count <= 0 {
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
}
totalCount := rq.index.Count()
if totalCount == 0 {
return []*RecordWithIndex{}, nil
}
// 验证参考索引范围(严格模式)
// QueryNewest 允许 refIndex = -1从头开始查询
if refIndex < -1 || refIndex >= totalCount {
return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [-1, %d)", refIndex, totalCount), ErrInvalidRange)
}
// 计算实际起始索引(向索引递增方向)
startIndex := refIndex + 1
if startIndex >= totalCount {
return []*RecordWithIndex{}, nil
}
// 限制查询数量
remainCount := totalCount - startIndex
if count > remainCount {
count = remainCount
}
// 读取记录
records, err := rq.readRecordsForward(startIndex, count)
if err != nil {
return nil, err
}
// 转换为带索引的记录
results := make([]*RecordWithIndex, len(records))
for i, rec := range records {
results[i] = &RecordWithIndex{
Record: rec,
Index: startIndex + i,
}
}
return results, nil
}
// QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录按索引递增方向排序,只包含元数据信息
// 例如QueryOldestMetadata(5, 3) 查询索引 2, 3, 4不包含 5返回 [2, 3, 4]
func (rq *RecordQuery) QueryOldestMetadata(refIndex, count int) ([]*RecordMetadata, error) {
if count <= 0 {
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
}
totalCount := rq.index.Count()
if totalCount == 0 {
return []*RecordMetadata{}, nil
}
// 验证参考索引范围(严格模式)
if refIndex < 0 || refIndex > totalCount {
return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [0, %d]", refIndex, totalCount), ErrInvalidRange)
}
// 计算实际起始索引(向索引递减方向)
startIndex := refIndex - count
if startIndex < 0 {
startIndex = 0
count = refIndex // 调整实际数量
}
if count <= 0 {
return []*RecordMetadata{}, nil
}
// 读取元数据
return rq.readRecordsMetadataForward(startIndex, count)
}
// QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录按索引递增方向排序,只包含元数据信息
// 例如QueryNewestMetadata(5, 3) 查询索引 6, 7, 8不包含 5返回 [6, 7, 8]
func (rq *RecordQuery) QueryNewestMetadata(refIndex, count int) ([]*RecordMetadata, error) {
if count <= 0 {
return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount)
}
totalCount := rq.index.Count()
if totalCount == 0 {
return []*RecordMetadata{}, nil
}
// 验证参考索引范围(严格模式)
// QueryNewestMetadata 允许 refIndex = -1从头开始查询
if refIndex < -1 || refIndex >= totalCount {
return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [-1, %d)", refIndex, totalCount), ErrInvalidRange)
}
// 计算实际起始索引(向索引递增方向)
startIndex := refIndex + 1
if startIndex >= totalCount {
return []*RecordMetadata{}, nil
}
// 限制查询数量
remainCount := totalCount - startIndex
if count > remainCount {
count = remainCount
}
// 读取元数据
return rq.readRecordsMetadataForward(startIndex, count)
}
// QueryByIndex 根据索引查询单条记录的完整数据
// index: 记录索引
// 返回完整的记录数据
func (rq *RecordQuery) QueryByIndex(index int) (*Record, error) {
totalCount := rq.index.Count()
if index < 0 || index >= totalCount {
return nil, NewValidationError("index", fmt.Sprintf("index %d out of range [0, %d)", index, totalCount), ErrInvalidRange)
}
// 读取单条记录
records, err := rq.readRecordsForward(index, 1)
if err != nil {
return nil, err
}
if len(records) == 0 {
return nil, fmt.Errorf("record at index %d not found", index)
}
return records[0], nil
}
// GetRecordCount 获取记录总数
func (rq *RecordQuery) GetRecordCount() (int, error) {
return rq.index.Count(), nil
}
// GetRecordStatus 根据游标窗口索引位置获取记录状态
// recordIndex: 记录索引
// startIdx: 窗口开始索引(已处理位置)
// endIdx: 窗口结束索引(当前读取位置)
func GetRecordStatus(recordIndex, startIdx, endIdx int) RecordStatus {
if recordIndex < startIdx {
return StatusProcessed
} else if recordIndex >= startIdx && recordIndex < endIdx {
return StatusProcessing
} else {
return StatusPending
}
}
// Close 关闭查询器
// 注意:不关闭 index因为 index 是外部管理的
func (rq *RecordQuery) Close() error {
// 只关闭日志文件
if rq.fd != nil {
return rq.fd.Close()
}
return nil
}
// Reset 重置查询器,关闭并重新打开日志文件
// 保持 index 和 writer 引用不变
func (rq *RecordQuery) Reset() error {
// 关闭当前文件句柄
if rq.fd != nil {
if err := rq.fd.Close(); err != nil {
return err
}
rq.fd = nil
}
// 重新打开日志文件
fd, err := os.Open(rq.logPath)
if err != nil {
return fmt.Errorf("reopen log file: %w", err)
}
rq.fd = fd
return nil
}