优化:减少代码重复和内存分配

主要优化:

1. 提取重复代码(topic_processor.go)
   - 新增 addStatusToRecords() 辅助方法
   - QueryOldest 和 QueryNewest 中的状态添加逻辑重复,已提取
   - 减少 38 行重复代码

2. 优化内存分配(index.go)
   - 在 RecordIndex 结构体中添加可重用的 entryBuf
   - Append 方法不再每次都分配 8 字节 buffer
   - 高频写入场景下可显著减少 GC 压力

性能提升:
- 减少内存分配次数(每次写入索引节省 1 次分配)
- 提高代码可维护性(消除重复代码)
- 所有测试通过 

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-04 13:32:44 +08:00
parent 90cc9e21c9
commit 955a467248
2 changed files with 21 additions and 38 deletions

View File

@@ -41,6 +41,7 @@ type RecordIndex struct {
syncBatch int // 同步批次大小
lastSync time.Time // 上次同步时间
dirtyCount int // 未同步的记录数
entryBuf [IndexEntrySize]byte // 可重用的写入缓冲区
mu sync.Mutex // 保护并发访问
}
@@ -156,10 +157,10 @@ func (ri *RecordIndex) Append(offset int64) error {
defer ri.mu.Unlock()
// 追加到索引文件(先写文件,后更新内存)
entryBuf := make([]byte, IndexEntrySize)
binary.LittleEndian.PutUint64(entryBuf, uint64(offset))
// 使用可重用的 buffer 减少内存分配
binary.LittleEndian.PutUint64(ri.entryBuf[:], uint64(offset))
if _, err := ri.indexFile.Write(entryBuf); err != nil {
if _, err := ri.indexFile.Write(ri.entryBuf[:]); err != nil {
return fmt.Errorf("append index entry: %w", err)
}

View File

@@ -339,18 +339,8 @@ func (tp *TopicProcessor) Query() *RecordQuery {
return tp.query
}
// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录包含索引和状态信息,按索引递增方向排序
// 例如QueryOldest(5, 3) 查询索引 2, 3, 4不包含 5
func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) {
// 查询记录(包含索引信息)
records, err := tp.query.QueryOldest(refIndex, count)
if err != nil {
return nil, err
}
// addStatusToRecords 为记录添加状态信息(辅助方法
func (tp *TopicProcessor) addStatusToRecords(records []*RecordWithIndex) []*RecordWithStatus {
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
@@ -370,7 +360,20 @@ func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus,
}
}
return results, nil
return results
}
// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录包含索引和状态信息,按索引递增方向排序
// 例如QueryOldest(5, 3) 查询索引 2, 3, 4不包含 5
func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) {
records, err := tp.query.QueryOldest(refIndex, count)
if err != nil {
return nil, err
}
return tp.addStatusToRecords(records), nil
}
// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录)
@@ -379,32 +382,11 @@ func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus,
// 返回的记录包含索引和状态信息,按索引递增方向排序
// 例如QueryNewest(5, 3) 查询索引 6, 7, 8不包含 5
func (tp *TopicProcessor) QueryNewest(refIndex, count int) ([]*RecordWithStatus, error) {
// 查询记录(包含索引信息)
records, err := tp.query.QueryNewest(refIndex, count)
if err != nil {
return nil, err
}
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
// 为每条记录添加状态
results := make([]*RecordWithStatus, len(records))
for i, rec := range records {
results[i] = &RecordWithStatus{
Record: rec.Record,
Index: rec.Index,
Status: GetRecordStatus(rec.Index, startIdx, endIdx),
}
}
return results, nil
return tp.addStatusToRecords(records), nil
}
// GetRecordCount 获取记录总数(统一接口)