package seqlog import ( "encoding/binary" "fmt" "io" "os" "unicode/utf8" "github.com/google/uuid" ) // RecordStatus 记录处理状态 type RecordStatus int const ( StatusProcessed RecordStatus = iota // 已处理 StatusProcessing // 处理中(当前位置) StatusPending // 待处理 StatusUnavailable // 不可用(尚未写入) ) // String 返回状态的字符串表示 func (s RecordStatus) String() string { switch s { case StatusProcessed: return "StatusProcessed" case StatusProcessing: return "StatusProcessing" case StatusPending: return "StatusPending" case StatusUnavailable: return "StatusUnavailable" default: return "StatusUnknown" } } // RecordWithStatus 带状态的记录 type RecordWithStatus struct { Record *Record Index int // 记录在日志文件中的索引位置 Status RecordStatus // 记录的处理状态 } // RecordWithIndex 带索引的记录 type RecordWithIndex struct { Record *Record Index int // 记录在日志文件中的索引位置 } // RecordMetadata 记录元数据(不包含完整数据) type RecordMetadata struct { Index int // 记录索引 UUID uuid.UUID // UUID DataSize uint32 // 数据大小(字节) DataPreview string // 数据预览(前 200 个字符) Full bool } // RecordMetadataWithStatus 带状态的记录元数据 type RecordMetadataWithStatus struct { Metadata *RecordMetadata Status RecordStatus // 记录的处理状态 } // RecordQuery 记录查询器 type RecordQuery struct { logPath string fd *os.File rbuf []byte // 复用读缓冲区 index *RecordIndex // 索引文件管理器(来自外部) writer *LogWriter // 日志写入器(来自外部) } // NewRecordQuery 创建记录查询器 // index 参数必须由外部提供,确保所有组件使用同一个索引实例 func NewRecordQuery(logPath string, index *RecordIndex, writer *LogWriter) (*RecordQuery, error) { if index == nil { return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter) } if writer == nil { return nil, NewValidationError("writer", "writer cannot be nil", ErrNilParameter) } fd, err := os.Open(logPath) if err != nil { return nil, fmt.Errorf("open log file: %w", err) } rq := &RecordQuery{ logPath: logPath, fd: fd, rbuf: make([]byte, 8<<20), // 8 MiB 缓冲区 index: index, writer: writer, } return rq, nil } // readRecordsMetadataForward 从指定索引位置向前顺序读取记录元数据(不读取完整 Data,但读取预览) // startIndex: 起始记录索引 // count: 读取数量 func (rq *RecordQuery) readRecordsMetadataForward(startIndex, count int) ([]*RecordMetadata, error) { // 获取起始 offset startOffset, err := rq.index.GetOffset(startIndex) if err != nil { return nil, fmt.Errorf("get start offset: %w", err) } if _, err := rq.fd.Seek(startOffset, 0); err != nil { return nil, fmt.Errorf("seek to offset %d: %w", startOffset, err) } results := make([]*RecordMetadata, 0, count) currentIndex := startIndex for len(results) < count { // 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节 hdr := rq.rbuf[:32] if _, err := io.ReadFull(rq.fd, hdr); err != nil { if err == io.EOF { break } return nil, fmt.Errorf("read header at index %d: %w", currentIndex, err) } dataOffset := binary.LittleEndian.Uint64(hdr[4:12]) // 写入保护:如果 writer 存在,检查是否正在写入该记录 if rq.writer != nil { dirtyOffset := rq.writer.GetDirtyOffset() // 如果正在写入(dirtyOffset >= 0)且记录位置 >= 写入位置,等待写入完成 if dirtyOffset >= 0 && dataOffset >= uint64(dirtyOffset) { break } } dataLen := binary.LittleEndian.Uint32(hdr[0:4]) var uuidBytes [16]byte copy(uuidBytes[:], hdr[16:32]) // 读取数据预览(最多 200 字节) previewSize := min(int(dataLen), 200) previewData := make([]byte, previewSize) if _, err := io.ReadFull(rq.fd, previewData); err != nil { if err == io.EOF { break } return nil, fmt.Errorf("read preview at index %d: %w", currentIndex, err) } // 确保预览数据不会在 UTF-8 字符中间截断 validPreviewSize := previewSize if previewSize > 0 && previewSize < int(dataLen) { // 只有在截断的情况下才需要检查 // 从后往前最多检查 3 个字节,找到最后一个完整的 UTF-8 字符边界 for i := 0; i < 3 && validPreviewSize > 0; i++ { if utf8.Valid(previewData[:validPreviewSize]) { break } validPreviewSize-- } } metadata := &RecordMetadata{ Index: currentIndex, UUID: uuidBytes, DataSize: dataLen, DataPreview: string(previewData[:validPreviewSize]), Full: previewSize == int(dataLen), } // 跳过剩余数据部分 remainingSize := int64(dataLen) - int64(previewSize) if remainingSize > 0 { if _, err := rq.fd.Seek(remainingSize, 1); err != nil { if err == io.EOF { break } return nil, fmt.Errorf("skip remaining data at index %d: %w", currentIndex, err) } } results = append(results, metadata) currentIndex++ } return results, nil } // readRecordsForward 从指定索引位置向前顺序读取记录 // startIndex: 起始记录索引 // count: 读取数量 func (rq *RecordQuery) readRecordsForward(startIndex, count int) ([]*Record, error) { // 获取起始 offset startOffset, err := rq.index.GetOffset(startIndex) if err != nil { return nil, fmt.Errorf("get start offset: %w", err) } if _, err := rq.fd.Seek(startOffset, 0); err != nil { return nil, fmt.Errorf("seek to offset %d: %w", startOffset, err) } results := make([]*Record, 0, count) currentOffset := startOffset for len(results) < count { // 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节 hdr := rq.rbuf[:32] if _, err := io.ReadFull(rq.fd, hdr); err != nil { if err == io.EOF { break } return nil, fmt.Errorf("read header at offset %d: %w", currentOffset, err) } rec := &Record{ Len: binary.LittleEndian.Uint32(hdr[0:4]), // hdr[4:12] 是 offset,读取时不需要使用 CRC: binary.LittleEndian.Uint32(hdr[12:16]), } copy(rec.UUID[:], hdr[16:32]) // 读取数据 rec.Data = make([]byte, rec.Len) if _, err := io.ReadFull(rq.fd, rec.Data); err != nil { // 如果遇到 EOF,说明文件可能不完整(被截断或索引不一致) // 返回已读取的记录,而不是报错 if err == io.EOF || err == io.ErrUnexpectedEOF { break } return nil, fmt.Errorf("read data at offset %d: %w", currentOffset, err) } results = append(results, rec) currentOffset += 32 + int64(rec.Len) } return results, nil } // QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录) // refIndex: 参考索引位置 // count: 查询数量 // 返回的记录按索引递增方向排序,包含索引信息 // 例如:QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5),返回 [2, 3, 4] func (rq *RecordQuery) QueryOldest(refIndex, count int) ([]*RecordWithIndex, error) { if count <= 0 { return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } totalCount := rq.index.Count() if totalCount == 0 { return []*RecordWithIndex{}, nil } // 验证参考索引范围(严格模式) if refIndex < 0 || refIndex > totalCount { return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [0, %d]", refIndex, totalCount), ErrInvalidRange) } // 计算实际起始索引(向索引递减方向) startIndex := refIndex - count if startIndex < 0 { startIndex = 0 count = refIndex // 调整实际数量 } if count <= 0 { return []*RecordWithIndex{}, nil } // 读取记录 records, err := rq.readRecordsForward(startIndex, count) if err != nil { return nil, err } // 转换为带索引的记录 results := make([]*RecordWithIndex, len(records)) for i, rec := range records { results[i] = &RecordWithIndex{ Record: rec, Index: startIndex + i, } } return results, nil } // QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录) // refIndex: 参考索引位置 // count: 查询数量 // 返回的记录按索引递增方向排序,包含索引信息 // 例如:QueryNewest(5, 3) 查询索引 6, 7, 8(不包含 5),返回 [6, 7, 8] func (rq *RecordQuery) QueryNewest(refIndex, count int) ([]*RecordWithIndex, error) { if count <= 0 { return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } totalCount := rq.index.Count() if totalCount == 0 { return []*RecordWithIndex{}, nil } // 验证参考索引范围(严格模式) // QueryNewest 允许 refIndex = -1(从头开始查询) if refIndex < -1 || refIndex >= totalCount { return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [-1, %d)", refIndex, totalCount), ErrInvalidRange) } // 计算实际起始索引(向索引递增方向) startIndex := refIndex + 1 if startIndex >= totalCount { return []*RecordWithIndex{}, nil } // 限制查询数量 remainCount := totalCount - startIndex if count > remainCount { count = remainCount } // 读取记录 records, err := rq.readRecordsForward(startIndex, count) if err != nil { return nil, err } // 转换为带索引的记录 results := make([]*RecordWithIndex, len(records)) for i, rec := range records { results[i] = &RecordWithIndex{ Record: rec, Index: startIndex + i, } } return results, nil } // QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据) // refIndex: 参考索引位置 // count: 查询数量 // 返回的记录按索引递增方向排序,只包含元数据信息 // 例如:QueryOldestMetadata(5, 3) 查询索引 2, 3, 4(不包含 5),返回 [2, 3, 4] func (rq *RecordQuery) QueryOldestMetadata(refIndex, count int) ([]*RecordMetadata, error) { if count <= 0 { return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } totalCount := rq.index.Count() if totalCount == 0 { return []*RecordMetadata{}, nil } // 验证参考索引范围(严格模式) if refIndex < 0 || refIndex > totalCount { return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [0, %d]", refIndex, totalCount), ErrInvalidRange) } // 计算实际起始索引(向索引递减方向) startIndex := refIndex - count if startIndex < 0 { startIndex = 0 count = refIndex // 调整实际数量 } if count <= 0 { return []*RecordMetadata{}, nil } // 读取元数据 return rq.readRecordsMetadataForward(startIndex, count) } // QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据) // refIndex: 参考索引位置 // count: 查询数量 // 返回的记录按索引递增方向排序,只包含元数据信息 // 例如:QueryNewestMetadata(5, 3) 查询索引 6, 7, 8(不包含 5),返回 [6, 7, 8] func (rq *RecordQuery) QueryNewestMetadata(refIndex, count int) ([]*RecordMetadata, error) { if count <= 0 { return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } totalCount := rq.index.Count() if totalCount == 0 { return []*RecordMetadata{}, nil } // 验证参考索引范围(严格模式) // QueryNewestMetadata 允许 refIndex = -1(从头开始查询) if refIndex < -1 || refIndex >= totalCount { return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [-1, %d)", refIndex, totalCount), ErrInvalidRange) } // 计算实际起始索引(向索引递增方向) startIndex := refIndex + 1 if startIndex >= totalCount { return []*RecordMetadata{}, nil } // 限制查询数量 remainCount := totalCount - startIndex if count > remainCount { count = remainCount } // 读取元数据 return rq.readRecordsMetadataForward(startIndex, count) } // QueryByIndex 根据索引查询单条记录的完整数据 // index: 记录索引 // 返回完整的记录数据 func (rq *RecordQuery) QueryByIndex(index int) (*Record, error) { totalCount := rq.index.Count() if index < 0 || index >= totalCount { return nil, NewValidationError("index", fmt.Sprintf("index %d out of range [0, %d)", index, totalCount), ErrInvalidRange) } // 读取单条记录 records, err := rq.readRecordsForward(index, 1) if err != nil { return nil, err } if len(records) == 0 { return nil, fmt.Errorf("record at index %d not found", index) } return records[0], nil } // GetRecordCount 获取记录总数 func (rq *RecordQuery) GetRecordCount() (int, error) { return rq.index.Count(), nil } // GetRecordStatus 根据游标窗口索引位置获取记录状态 // recordIndex: 记录索引 // startIdx: 窗口开始索引(已处理位置) // endIdx: 窗口结束索引(当前读取位置) func GetRecordStatus(recordIndex, startIdx, endIdx int) RecordStatus { if recordIndex < startIdx { return StatusProcessed } else if recordIndex >= startIdx && recordIndex < endIdx { return StatusProcessing } else { return StatusPending } } // Close 关闭查询器 // 注意:不关闭 index,因为 index 是外部管理的 func (rq *RecordQuery) Close() error { // 只关闭日志文件 if rq.fd != nil { return rq.fd.Close() } return nil } // Reset 重置查询器,关闭并重新打开日志文件 // 保持 index 和 writer 引用不变 func (rq *RecordQuery) Reset() error { // 关闭当前文件句柄 if rq.fd != nil { if err := rq.fd.Close(); err != nil { return err } rq.fd = nil } // 重新打开日志文件 fd, err := os.Open(rq.logPath) if err != nil { return fmt.Errorf("reopen log file: %w", err) } rq.fd = fd return nil }