From 955a467248f50dbce9931faa940f3cf9e53345c8 Mon Sep 17 00:00:00 2001 From: bourdon Date: Sat, 4 Oct 2025 13:32:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9A=E5=87=8F=E5=B0=91?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E9=87=8D=E5=A4=8D=E5=92=8C=E5=86=85=E5=AD=98?= =?UTF-8?q?=E5=88=86=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主要优化: 1. 提取重复代码(topic_processor.go) - 新增 addStatusToRecords() 辅助方法 - QueryOldest 和 QueryNewest 中的状态添加逻辑重复,已提取 - 减少 38 行重复代码 2. 优化内存分配(index.go) - 在 RecordIndex 结构体中添加可重用的 entryBuf - Append 方法不再每次都分配 8 字节 buffer - 高频写入场景下可显著减少 GC 压力 性能提升: - 减少内存分配次数(每次写入索引节省 1 次分配) - 提高代码可维护性(消除重复代码) - 所有测试通过 ✅ 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- index.go | 7 ++++--- topic_processor.go | 52 +++++++++++++++------------------------------- 2 files changed, 21 insertions(+), 38 deletions(-) diff --git a/index.go b/index.go index 002296c..14e87da 100644 --- a/index.go +++ b/index.go @@ -41,6 +41,7 @@ type RecordIndex struct { syncBatch int // 同步批次大小 lastSync time.Time // 上次同步时间 dirtyCount int // 未同步的记录数 + entryBuf [IndexEntrySize]byte // 可重用的写入缓冲区 mu sync.Mutex // 保护并发访问 } @@ -156,10 +157,10 @@ func (ri *RecordIndex) Append(offset int64) error { defer ri.mu.Unlock() // 追加到索引文件(先写文件,后更新内存) - entryBuf := make([]byte, IndexEntrySize) - binary.LittleEndian.PutUint64(entryBuf, uint64(offset)) + // 使用可重用的 buffer 减少内存分配 + binary.LittleEndian.PutUint64(ri.entryBuf[:], uint64(offset)) - if _, err := ri.indexFile.Write(entryBuf); err != nil { + if _, err := ri.indexFile.Write(ri.entryBuf[:]); err != nil { return fmt.Errorf("append index entry: %w", err) } diff --git a/topic_processor.go b/topic_processor.go index 0cdf8ca..1767317 100644 --- a/topic_processor.go +++ b/topic_processor.go @@ -339,18 +339,8 @@ func (tp *TopicProcessor) Query() *RecordQuery { return tp.query } -// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录) -// refIndex: 参考索引位置 -// count: 查询数量 -// 返回的记录包含索引和状态信息,按索引递增方向排序 -// 例如:QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5) -func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) { - // 查询记录(包含索引信息) - records, err := tp.query.QueryOldest(refIndex, count) - if err != nil { - return nil, err - } - +// addStatusToRecords 为记录添加状态信息(辅助方法) +func (tp *TopicProcessor) addStatusToRecords(records []*RecordWithIndex) []*RecordWithStatus { // 获取窗口索引范围(用于状态判断) var startIdx, endIdx int tp.mu.RLock() @@ -370,7 +360,20 @@ func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, } } - return results, nil + return results +} + +// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录) +// refIndex: 参考索引位置 +// count: 查询数量 +// 返回的记录包含索引和状态信息,按索引递增方向排序 +// 例如:QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5) +func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) { + records, err := tp.query.QueryOldest(refIndex, count) + if err != nil { + return nil, err + } + return tp.addStatusToRecords(records), nil } // QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录) @@ -379,32 +382,11 @@ func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, // 返回的记录包含索引和状态信息,按索引递增方向排序 // 例如:QueryNewest(5, 3) 查询索引 6, 7, 8(不包含 5) func (tp *TopicProcessor) QueryNewest(refIndex, count int) ([]*RecordWithStatus, error) { - // 查询记录(包含索引信息) records, err := tp.query.QueryNewest(refIndex, count) if err != nil { return nil, err } - - // 获取窗口索引范围(用于状态判断) - var startIdx, endIdx int - tp.mu.RLock() - if tp.tailer != nil { - startIdx = tp.tailer.GetStartIndex() - endIdx = tp.tailer.GetEndIndex() - } - tp.mu.RUnlock() - - // 为每条记录添加状态 - results := make([]*RecordWithStatus, len(records)) - for i, rec := range records { - results[i] = &RecordWithStatus{ - Record: rec.Record, - Index: rec.Index, - Status: GetRecordStatus(rec.Index, startIdx, endIdx), - } - } - - return results, nil + return tp.addStatusToRecords(records), nil } // GetRecordCount 获取记录总数(统一接口)