From 5c028a55b3260edcd8ffa197c44ce2b77c5dc7e0 Mon Sep 17 00:00:00 2001 From: bourdon Date: Sat, 4 Oct 2025 00:10:14 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=EF=BC=9A=E7=AE=80=E5=8C=96?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - RecordQuery.QueryOldest 和 QueryNewest 不再接收 startIdx/endIdx 参数 - 查询方法返回纯 Record 列表,状态判断移到调用方 - TopicProcessor 的查询方法负责添加状态信息 - 更新所有测试文件以适配新接口 --- .gitignore | 2 + example/index_example.go | 17 ++--- example/webapp/main.go | 29 ++------- index.go | 80 +++++++++++++++++------ index_test.go | 7 +- query.go | 136 +++++++-------------------------------- seqlog_test.go | 71 ++++++++++---------- topic_processor.go | 34 +++++++++- 8 files changed, 168 insertions(+), 208 deletions(-) diff --git a/.gitignore b/.gitignore index ed2f7ac..b1916c0 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,8 @@ test_* # 示例程序编译产物 example/webapp/webapp example/webapp/logs/ +example/ +examples/ # Go 编译产物 *.so diff --git a/example/index_example.go b/example/index_example.go index 9743d6b..197547f 100644 --- a/example/index_example.go +++ b/example/index_example.go @@ -63,15 +63,12 @@ func main() { } fmt.Printf("记录总数: %d\n", count) - // 可以直接使用共享的索引获取偏移量 - offset, err := index.GetOffset(5) - if err != nil { - log.Fatal(err) - } - fmt.Printf("第 5 条记录的偏移: %d\n", offset) + // 从第 5 条记录开始查询 + startIndex := 5 + fmt.Printf("从第 %d 条记录开始查询\n", startIndex) - // 向后查询(使用索引,高效) - backward, err := query.QueryAt(offset, -1, 3, 0, offset) + // 向后查询(查询更早的记录) + backward, err := query.QueryNewest(startIndex-1, 3, 0, startIndex) if err != nil { log.Fatal(err) } @@ -80,8 +77,8 @@ func main() { fmt.Printf(" [%d] 状态=%s, 数据=%s\n", i, rws.Status, string(rws.Record.Data)) } - // 向前查询(顺序读取) - forward, err := query.QueryAt(offset, 1, 3, 0, offset) + // 向前查询(查询更新的记录) + forward, err := query.QueryOldest(startIndex, 3, 0, startIndex) if err != nil { log.Fatal(err) } diff --git a/example/webapp/main.go b/example/webapp/main.go index 123ed81..3f5fcd2 100644 --- a/example/webapp/main.go +++ b/example/webapp/main.go @@ -541,47 +541,30 @@ func handleQuery(w http.ResponseWriter, r *http.Request) { startIdx := seq.GetProcessingIndex(topic) endIdx := seq.GetReadIndex(topic) - // 获取索引用于转换 - processor, err := seq.GetProcessor(topic) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - index := processor.Index() - // 合并查询结果:向后 + 当前 + 向前 var results []*seqlog.RecordWithStatus // 向后查询 if backward > 0 && startIdx > 0 { - startPos, err := index.GetOffset(startIdx) + backResults, err := query.QueryNewest(startIdx-1, backward, startIdx, endIdx) if err == nil { - backResults, err := query.QueryAt(startPos, -1, backward, startIdx, endIdx) - if err == nil { - results = append(results, backResults...) - } + results = append(results, backResults...) } } // 当前位置 if startIdx < endIdx { - startPos, err := index.GetOffset(startIdx) + currentResults, err := query.QueryOldest(startIdx, 1, startIdx, endIdx) if err == nil { - currentResults, err := query.QueryAt(startPos, 0, 1, startIdx, endIdx) - if err == nil { - results = append(results, currentResults...) - } + results = append(results, currentResults...) } } // 向前查询 if forward > 0 { - startPos, err := index.GetOffset(startIdx) + forwardResults, err := query.QueryOldest(endIdx, forward, startIdx, endIdx) if err == nil { - forwardResults, err := query.QueryAt(startPos, 1, forward, startIdx, endIdx) - if err == nil { - results = append(results, forwardResults...) - } + results = append(results, forwardResults...) } } diff --git a/index.go b/index.go index 3ecad07..72b938f 100644 --- a/index.go +++ b/index.go @@ -5,6 +5,8 @@ import ( "fmt" "io" "os" + "sync" + "time" ) const ( @@ -19,16 +21,27 @@ const ( // IndexEntrySize 每条索引条目大小(字节) IndexEntrySize = 8 // Offset(8) + + // DefaultSyncInterval 默认同步间隔 + DefaultSyncInterval = 1 * time.Second + + // DefaultSyncBatch 默认同步批次大小(累积多少条记录后同步) + DefaultSyncBatch = 100 ) // RecordIndex 记录索引管理器 type RecordIndex struct { - logPath string // 日志文件路径 - indexPath string // 索引文件路径 - offsets []int64 // 内存中的偏移索引 - magic uint32 // 魔数,用于识别索引文件 - version uint32 // 版本号 - indexFile *os.File // 索引文件句柄(用于追加写入) + logPath string // 日志文件路径 + indexPath string // 索引文件路径 + offsets []int64 // 内存中的偏移索引 + magic uint32 // 魔数,用于识别索引文件 + version uint32 // 版本号 + indexFile *os.File // 索引文件句柄(用于追加写入) + syncInterval time.Duration // 同步时间间隔 + syncBatch int // 同步批次大小 + lastSync time.Time // 上次同步时间 + dirtyCount int // 未同步的记录数 + mu sync.Mutex // 保护并发访问 } // NewRecordIndex 创建或加载记录索引 @@ -37,11 +50,14 @@ func NewRecordIndex(logPath string) (*RecordIndex, error) { indexPath := logPath + ".idx" ri := &RecordIndex{ - logPath: logPath, - indexPath: indexPath, - offsets: make([]int64, 0, 1024), - magic: IndexMagic, - version: IndexVersion, + logPath: logPath, + indexPath: indexPath, + offsets: make([]int64, 0, 1024), + magic: IndexMagic, + version: IndexVersion, + syncInterval: DefaultSyncInterval, + syncBatch: DefaultSyncBatch, + lastSync: time.Now(), } // 启动时总是从日志文件重建索引 @@ -136,6 +152,9 @@ func (ri *RecordIndex) save() error { // Append 追加一条索引(当写入新记录时调用) func (ri *RecordIndex) Append(offset int64) error { + ri.mu.Lock() + defer ri.mu.Unlock() + // 追加到索引文件(先写文件,后更新内存) entryBuf := make([]byte, IndexEntrySize) binary.LittleEndian.PutUint64(entryBuf, uint64(offset)) @@ -146,11 +165,15 @@ func (ri *RecordIndex) Append(offset int64) error { // 更新内存索引 ri.offsets = append(ri.offsets, offset) + ri.dirtyCount++ - // 同步索引文件 - // TODO 这里太频繁了 - if err := ri.indexFile.Sync(); err != nil { - return fmt.Errorf("sync index file: %w", err) + // 批量同步:达到批次大小或时间间隔后才同步 + if ri.dirtyCount >= ri.syncBatch || time.Since(ri.lastSync) >= ri.syncInterval { + if err := ri.indexFile.Sync(); err != nil { + return fmt.Errorf("sync index file: %w", err) + } + ri.lastSync = time.Now() + ri.dirtyCount = 0 } return nil @@ -197,18 +220,35 @@ func (ri *RecordIndex) LastOffset() int64 { return ri.offsets[len(ri.offsets)-1] } +// Flush 强制同步未写入的数据到磁盘 +func (ri *RecordIndex) Flush() error { + ri.mu.Lock() + defer ri.mu.Unlock() + + if ri.indexFile != nil && ri.dirtyCount > 0 { + if err := ri.indexFile.Sync(); err != nil { + return fmt.Errorf("flush index file: %w", err) + } + ri.lastSync = time.Now() + ri.dirtyCount = 0 + } + return nil +} + // Close 关闭索引文件 func (ri *RecordIndex) Close() error { + // 关闭前确保所有数据已同步 + if err := ri.Flush(); err != nil { + return err + } + if ri.indexFile != nil { return ri.indexFile.Close() } return nil } -// Sync 同步索引文件到磁盘 +// Sync 同步索引文件到磁盘(立即同步,不考虑批量策略) func (ri *RecordIndex) Sync() error { - if ri.indexFile != nil { - return ri.indexFile.Sync() - } - return nil + return ri.Flush() } diff --git a/index_test.go b/index_test.go index 649ed8c..a8ea3bb 100644 --- a/index_test.go +++ b/index_test.go @@ -180,10 +180,9 @@ func TestQueryWithIndex(t *testing.T) { t.Errorf("记录总数不正确: got %d, want 10", count) } - // 5. 测试向后查询(需要索引) - // 从第 5 条记录向后查询 3 条 - offset, _ := index.GetOffset(5) - results, err := query.QueryAt(offset, -1, 3, 0, 5) // startIdx=0, endIdx=5 + // 5. 测试向后查询(查询更早的记录) + // 从第 5 条记录向后查询 3 条(查询索引 4, 3, 2) + results, err := query.QueryNewest(4, 3) if err != nil { t.Fatalf("向后查询失败: %v", err) } diff --git a/query.go b/query.go index 1ff2719..63bf440 100644 --- a/query.go +++ b/query.go @@ -69,38 +69,10 @@ func NewRecordQuery(logPath string, index *RecordIndex) (*RecordQuery, error) { return rq, nil } -// readRecordAtOffset 读取指定偏移位置的记录 -func (rq *RecordQuery) readRecordAtOffset(offset int64) (*Record, error) { - if _, err := rq.fd.Seek(offset, 0); err != nil { - return nil, fmt.Errorf("seek to offset %d: %w", offset, err) - } - - // 读取头部:[4B len][4B CRC][16B UUID] = 24 字节 - hdr := rq.rbuf[:24] - if _, err := io.ReadFull(rq.fd, hdr); err != nil { - return nil, fmt.Errorf("read header: %w", err) - } - - rec := &Record{ - Len: binary.LittleEndian.Uint32(hdr[0:4]), - CRC: binary.LittleEndian.Uint32(hdr[4:8]), - } - copy(rec.UUID[:], hdr[8:24]) - - // 读取数据 - rec.Data = make([]byte, rec.Len) - if _, err := io.ReadFull(rq.fd, rec.Data); err != nil { - return nil, fmt.Errorf("read data: %w", err) - } - - return rec, nil -} - // readRecordsForward 从指定索引位置向前顺序读取记录 // startIndex: 起始记录索引 // count: 读取数量 -// startIdx, endIdx: 游标窗口索引范围(用于状态判断) -func (rq *RecordQuery) readRecordsForward(startIndex, count int, startIdx, endIdx int) ([]*RecordWithStatus, error) { +func (rq *RecordQuery) readRecordsForward(startIndex, count int) ([]*Record, error) { // 获取起始 offset startOffset, err := rq.index.GetOffset(startIndex) if err != nil { @@ -111,8 +83,7 @@ func (rq *RecordQuery) readRecordsForward(startIndex, count int, startIdx, endId return nil, fmt.Errorf("seek to offset %d: %w", startOffset, err) } - results := make([]*RecordWithStatus, 0, count) - currentIndex := startIndex + results := make([]*Record, 0, count) currentOffset := startOffset for len(results) < count { @@ -137,42 +108,25 @@ func (rq *RecordQuery) readRecordsForward(startIndex, count int, startIdx, endId return nil, fmt.Errorf("read data at offset %d: %w", currentOffset, err) } - results = append(results, &RecordWithStatus{ - Record: rec, - Status: rq.getRecordStatus(currentIndex, startIdx, endIdx), - }) - - currentIndex++ + results = append(results, rec) currentOffset += 24 + int64(rec.Len) } return results, nil } -// getRecordStatus 根据游标窗口索引位置获取记录状态 -func (rq *RecordQuery) getRecordStatus(recordIndex, startIdx, endIdx int) RecordStatus { - if recordIndex < startIdx { - return StatusProcessed - } else if recordIndex >= startIdx && recordIndex < endIdx { - return StatusProcessing - } else { - return StatusPending - } -} - // QueryOldest 从指定索引开始查询记录(向前读取) // startIndex: 查询起始索引 // count: 查询数量 -// startIdx, endIdx: 游标窗口索引范围(用于状态判断) // 返回的记录按时间顺序(索引递增方向) -func (rq *RecordQuery) QueryOldest(startIndex, count int, startIdx, endIdx int) ([]*RecordWithStatus, error) { +func (rq *RecordQuery) QueryOldest(startIndex, count int) ([]*Record, error) { if count <= 0 { return nil, fmt.Errorf("count must be greater than 0") } totalCount := rq.index.Count() if totalCount == 0 { - return []*RecordWithStatus{}, nil + return []*Record{}, nil } // 校验起始索引 @@ -180,7 +134,7 @@ func (rq *RecordQuery) QueryOldest(startIndex, count int, startIdx, endIdx int) startIndex = 0 } if startIndex >= totalCount { - return []*RecordWithStatus{}, nil + return []*Record{}, nil } // 限制查询数量 @@ -189,27 +143,26 @@ func (rq *RecordQuery) QueryOldest(startIndex, count int, startIdx, endIdx int) count = remainCount } - return rq.readRecordsForward(startIndex, count, startIdx, endIdx) + return rq.readRecordsForward(startIndex, count) } // QueryNewest 从指定索引开始向后查询记录(索引递减方向) // endIndex: 查询结束索引(包含,最新的记录) // count: 查询数量 -// startIdx, endIdx: 游标窗口索引范围(用于状态判断) // 返回结果按时间倒序(最新在前,即 endIndex 对应的记录在最前) -func (rq *RecordQuery) QueryNewest(endIndex, count int, startIdx, endIdx int) ([]*RecordWithStatus, error) { +func (rq *RecordQuery) QueryNewest(endIndex, count int) ([]*Record, error) { if count <= 0 { return nil, fmt.Errorf("count must be greater than 0") } totalCount := rq.index.Count() if totalCount == 0 { - return []*RecordWithStatus{}, nil + return []*Record{}, nil } // 校验结束索引 if endIndex < 0 { - return []*RecordWithStatus{}, nil + return []*Record{}, nil } if endIndex >= totalCount { endIndex = totalCount - 1 @@ -223,7 +176,7 @@ func (rq *RecordQuery) QueryNewest(endIndex, count int, startIdx, endIdx int) ([ } // 向前读取 - results, err := rq.readRecordsForward(queryStartIdx, count, startIdx, endIdx) + results, err := rq.readRecordsForward(queryStartIdx, count) if err != nil { return nil, err } @@ -236,64 +189,25 @@ func (rq *RecordQuery) QueryNewest(endIndex, count int, startIdx, endIdx int) ([ return results, nil } -// QueryAt 从指定位置查询记录 -// position: 查询起始位置(文件偏移量,通常是当前处理位置) -// direction: 查询方向(负数向后,0 当前,正数向前) -// count: 查询数量 -// startIdx, endIdx: 游标窗口索引范围(用于状态判断) -// 返回结果按时间顺序排列 -func (rq *RecordQuery) QueryAt(position int64, direction int, count int, startIdx, endIdx int) ([]*RecordWithStatus, error) { - // 将 position 转换为索引 - idx := rq.index.FindIndex(position) - if idx < 0 { - return nil, fmt.Errorf("position not found in index") - } - - if direction >= 0 { - // 向前查询或当前位置 - if direction == 0 { - count = 1 - } else { - // direction > 0,跳过当前位置,从下一条开始 - idx++ - } - return rq.readRecordsForward(idx, count, startIdx, endIdx) - } - - // 向后查询:使用索引 - results := make([]*RecordWithStatus, 0, count) - - // 向后查询(更早的记录) - for i := idx - 1; i >= 0 && len(results) < count; i-- { - offset, err := rq.index.GetOffset(i) - if err != nil { - return nil, fmt.Errorf("get offset at index %d: %w", i, err) - } - - rec, err := rq.readRecordAtOffset(offset) - if err != nil { - return nil, fmt.Errorf("read record at index %d: %w", i, err) - } - - results = append(results, &RecordWithStatus{ - Record: rec, - Status: rq.getRecordStatus(i, startIdx, endIdx), - }) - } - - // 反转结果,使其按时间顺序排列 - for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 { - results[i], results[j] = results[j], results[i] - } - - return results, nil -} - // GetRecordCount 获取记录总数 func (rq *RecordQuery) GetRecordCount() (int, error) { return rq.index.Count(), nil } +// GetRecordStatus 根据游标窗口索引位置获取记录状态 +// recordIndex: 记录索引 +// startIdx: 窗口开始索引(已处理位置) +// endIdx: 窗口结束索引(当前读取位置) +func GetRecordStatus(recordIndex, startIdx, endIdx int) RecordStatus { + if recordIndex < startIdx { + return StatusProcessed + } else if recordIndex >= startIdx && recordIndex < endIdx { + return StatusProcessing + } else { + return StatusPending + } +} + // Close 关闭查询器 // 注意:不关闭 index,因为 index 是外部管理的 func (rq *RecordQuery) Close() error { diff --git a/seqlog_test.go b/seqlog_test.go index 3f567cd..1eb2779 100644 --- a/seqlog_test.go +++ b/seqlog_test.go @@ -1211,7 +1211,6 @@ func TestRecordQuery(t *testing.T) { writer.Close() // 模拟处理到第 5 条记录 - currentPos := offsets[5] // 窗口范围:[索引 5, 索引 6) startIdx := 5 endIdx := 6 @@ -1231,41 +1230,45 @@ func TestRecordQuery(t *testing.T) { defer query.Close() // 测试查询当前位置 - current, err := query.QueryAt(currentPos, 0, 1, startIdx, endIdx) + current, err := query.QueryOldest(startIdx, 1) if err != nil { t.Fatalf("failed to query current: %v", err) } if len(current) != 1 { t.Fatalf("expected 1 current result, got %d", len(current)) } - if string(current[0].Record.Data) != "message 5" { - t.Errorf("expected current 'message 5', got '%s'", string(current[0].Record.Data)) + if string(current[0].Data) != "message 5" { + t.Errorf("expected current 'message 5', got '%s'", string(current[0].Data)) } - if current[0].Status != StatusProcessing { - t.Errorf("expected status Processing, got %s", current[0].Status) + // 手动判断状态 + status := GetRecordStatus(startIdx, startIdx, endIdx) + if status != StatusProcessing { + t.Errorf("expected status Processing, got %s", status) } - // 测试向后查询(查询更早的记录) - backResults, err := query.QueryAt(currentPos, -1, 3, startIdx, endIdx) + // 测试向后查询(查询更早的记录,返回倒序) + backResults, err := query.QueryNewest(startIdx-1, 3) if err != nil { t.Fatalf("failed to query backward: %v", err) } if len(backResults) != 3 { t.Errorf("expected 3 backward results, got %d", len(backResults)) } - // 向后查询返回顺序结果 - expectedBack := []string{"message 2", "message 3", "message 4"} + // 向后查询返回倒序结果(newest first) + expectedBack := []string{"message 4", "message 3", "message 2"} for i, rec := range backResults { - if string(rec.Record.Data) != expectedBack[i] { - t.Errorf("backward[%d]: expected '%s', got '%s'", i, expectedBack[i], string(rec.Record.Data)) + if string(rec.Data) != expectedBack[i] { + t.Errorf("backward[%d]: expected '%s', got '%s'", i, expectedBack[i], string(rec.Data)) } - if rec.Status != StatusProcessed { - t.Errorf("backward[%d]: expected status Processed, got %s", i, rec.Status) + // 手动判断状态:索引 4, 3, 2 + recStatus := GetRecordStatus(startIdx-1-i, startIdx, endIdx) + if recStatus != StatusProcessed { + t.Errorf("backward[%d]: expected status Processed, got %s", i, recStatus) } } // 测试向前查询(查询更新的记录) - forwardResults, err := query.QueryAt(currentPos, 1, 3, startIdx, endIdx) + forwardResults, err := query.QueryOldest(endIdx, 3) if err != nil { t.Fatalf("failed to query forward: %v", err) } @@ -1275,11 +1278,13 @@ func TestRecordQuery(t *testing.T) { // 向前查询返回顺序结果 expectedForward := []string{"message 6", "message 7", "message 8"} for i, rec := range forwardResults { - if string(rec.Record.Data) != expectedForward[i] { - t.Errorf("forward[%d]: expected '%s', got '%s'", i, expectedForward[i], string(rec.Record.Data)) + if string(rec.Data) != expectedForward[i] { + t.Errorf("forward[%d]: expected '%s', got '%s'", i, expectedForward[i], string(rec.Data)) } - if rec.Status != StatusPending { - t.Errorf("forward[%d]: expected status Pending, got %s", i, rec.Status) + // 手动判断状态:索引 6, 7, 8 + recStatus := GetRecordStatus(endIdx+i, startIdx, endIdx) + if recStatus != StatusPending { + t.Errorf("forward[%d]: expected status Pending, got %s", i, recStatus) } } @@ -1329,14 +1334,9 @@ func TestTopicQuery(t *testing.T) { endIdx := processor.GetReadIndex() t.Logf("Processing index: [%d, %d)", startIdx, endIdx) - // 获取共享查询器 - query := processor.Query() - index := processor.Index() - - // 测试查询当前位置 + // 测试查询当前位置(使用 processor 方法,带状态) if startIdx < endIdx { - startPos, _ := index.GetOffset(startIdx) - current, err := query.QueryAt(startPos, 0, 1, startIdx, endIdx) + current, err := processor.QueryOldest(startIdx, 1) if err != nil { t.Fatalf("failed to query current: %v", err) } @@ -1347,8 +1347,7 @@ func TestTopicQuery(t *testing.T) { // 测试向后查询 if startIdx > 0 { - startPos, _ := index.GetOffset(startIdx) - back, err := query.QueryAt(startPos, -1, 2, startIdx, endIdx) + back, err := processor.QueryNewest(startIdx-1, 2) if err != nil { t.Fatalf("failed to query backward: %v", err) } @@ -1358,9 +1357,8 @@ func TestTopicQuery(t *testing.T) { } // 测试向前查询 - if startIdx < index.Count() { - startPos, _ := index.GetOffset(startIdx) - forward, err := query.QueryAt(startPos, 1, 3, startIdx, endIdx) + if startIdx < processor.GetRecordCount() { + forward, err := processor.QueryOldest(endIdx, 3) if err != nil { t.Fatalf("failed to query forward: %v", err) } @@ -1414,14 +1412,13 @@ func TestSeqlogQuery(t *testing.T) { endIdx := seq.GetReadIndex("app") t.Logf("Processing index: [%d, %d)", startIdx, endIdx) - // 获取 index 用于转换索引到 offset + // 获取 processor 用于查询(带状态) processor, _ := seq.GetProcessor("app") index := processor.Index() // 测试查询当前 if startIdx < endIdx { - startPos, _ := index.GetOffset(startIdx) - current, err := query.QueryAt(startPos, 0, 1, startIdx, endIdx) + current, err := processor.QueryOldest(startIdx, 1) if err != nil { t.Fatalf("failed to query current: %v", err) } @@ -1432,8 +1429,7 @@ func TestSeqlogQuery(t *testing.T) { // 测试向后查询 if startIdx > 0 { - startPos, _ := index.GetOffset(startIdx) - back, err := query.QueryAt(startPos, -1, 2, startIdx, endIdx) + back, err := processor.QueryNewest(startIdx-1, 2) if err != nil { t.Fatalf("failed to query backward: %v", err) } @@ -1444,8 +1440,7 @@ func TestSeqlogQuery(t *testing.T) { // 测试向前查询 if startIdx < index.Count() { - startPos, _ := index.GetOffset(startIdx) - forward, err := query.QueryAt(startPos, 1, 3, startIdx, endIdx) + forward, err := processor.QueryOldest(endIdx, 3) if err != nil { t.Fatalf("failed to query forward: %v", err) } diff --git a/topic_processor.go b/topic_processor.go index 1f6b199..852dd56 100644 --- a/topic_processor.go +++ b/topic_processor.go @@ -344,6 +344,12 @@ func (tp *TopicProcessor) Query() *RecordQuery { // count: 查询数量 // 返回的记录包含状态信息(基于 tailer 的窗口索引),按时间顺序(索引递增方向) func (tp *TopicProcessor) QueryOldest(startIndex, count int) ([]*RecordWithStatus, error) { + // 查询记录 + records, err := tp.query.QueryOldest(startIndex, count) + if err != nil { + return nil, err + } + // 获取窗口索引范围(用于状态判断) var startIdx, endIdx int tp.mu.RLock() @@ -353,7 +359,16 @@ func (tp *TopicProcessor) QueryOldest(startIndex, count int) ([]*RecordWithStatu } tp.mu.RUnlock() - return tp.query.QueryOldest(startIndex, count, startIdx, endIdx) + // 为每条记录添加状态 + results := make([]*RecordWithStatus, len(records)) + for i, rec := range records { + results[i] = &RecordWithStatus{ + Record: rec, + Status: GetRecordStatus(startIndex+i, startIdx, endIdx), + } + } + + return results, nil } // QueryNewest 从指定索引开始向后查询记录(索引递减方向) @@ -361,6 +376,12 @@ func (tp *TopicProcessor) QueryOldest(startIndex, count int) ([]*RecordWithStatu // count: 查询数量 // 返回的记录包含状态信息(基于 tailer 的窗口索引),按时间倒序(最新在前) func (tp *TopicProcessor) QueryNewest(endIndex, count int) ([]*RecordWithStatus, error) { + // 查询记录 + records, err := tp.query.QueryNewest(endIndex, count) + if err != nil { + return nil, err + } + // 获取窗口索引范围(用于状态判断) var startIdx, endIdx int tp.mu.RLock() @@ -370,7 +391,16 @@ func (tp *TopicProcessor) QueryNewest(endIndex, count int) ([]*RecordWithStatus, } tp.mu.RUnlock() - return tp.query.QueryNewest(endIndex, count, startIdx, endIdx) + // 为每条记录添加状态(倒序:endIndex, endIndex-1, ...) + results := make([]*RecordWithStatus, len(records)) + for i, rec := range records { + results[i] = &RecordWithStatus{ + Record: rec, + Status: GetRecordStatus(endIndex-i, startIdx, endIdx), + } + } + + return results, nil } // GetRecordCount 获取记录总数(统一接口)