Files
seqlog/stats.go
bourdon de39339620 重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更

### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
  - Index: 记录序号(逻辑概念),用于状态判断
  - Offset: 文件字节位置(物理概念),仅用于 I/O 操作

### API 变更
- 删除所有 Position 相关方法:
  - `LogCursor.StartPos()/EndPos()`
  - `LogTailer.GetStartPos()/GetEndPos()`
  - `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
  - `Seqlog.GetProcessingPosition()/GetReadPosition()`

- 新增索引方法:
  - `LogCursor.StartIndex()/EndIndex()`
  - `LogTailer.GetStartIndex()/GetEndIndex()`
  - `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
  - `Seqlog.GetProcessingIndex()/GetReadIndex()`
  - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index

### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断

### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset

### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:50:53 +08:00

160 lines
3.9 KiB
Go

package seqlog
import (
"encoding/json"
"fmt"
"os"
"sync/atomic"
"time"
)
// Stats topic 统计信息
type Stats struct {
WriteCount int64 `json:"write_count"` // 写入次数
WriteBytes int64 `json:"write_bytes"` // 写入字节数
ProcessedCount int64 `json:"processed_count"` // 处理次数
ProcessedBytes int64 `json:"processed_bytes"` // 处理字节数
ErrorCount int64 `json:"error_count"` // 错误次数
FirstWriteTime time.Time `json:"first_write_time"` // 首次写入时间
LastWriteTime time.Time `json:"last_write_time"` // 最后写入时间
}
// TopicStats topic 统计管理器(支持原子操作和持久化)
type TopicStats struct {
writeCount atomic.Int64
writeBytes atomic.Int64
processedCount atomic.Int64
processedBytes atomic.Int64
errorCount atomic.Int64
firstWriteTime atomic.Value // time.Time
lastWriteTime atomic.Value // time.Time
statsPath string
}
// NewTopicStats 创建 topic 统计管理器
func NewTopicStats(statsPath string) *TopicStats {
ts := &TopicStats{
statsPath: statsPath,
}
// 尝试从文件加载统计信息
if err := ts.Load(); err != nil && !os.IsNotExist(err) {
// 忽略文件不存在错误,其他错误也忽略(使用默认值)
}
return ts
}
// IncWrite 增加写入计数
func (ts *TopicStats) IncWrite(bytes int64) {
ts.writeCount.Add(1)
ts.writeBytes.Add(bytes)
now := time.Now()
ts.lastWriteTime.Store(now)
// 如果是首次写入,设置首次写入时间
if ts.firstWriteTime.Load() == nil {
ts.firstWriteTime.Store(now)
}
}
// IncProcessed 增加处理计数
func (ts *TopicStats) IncProcessed(bytes int64) {
ts.processedCount.Add(1)
ts.processedBytes.Add(bytes)
}
// IncError 增加错误计数
func (ts *TopicStats) IncError() {
ts.errorCount.Add(1)
}
// Get 获取当前统计信息
func (ts *TopicStats) Get() Stats {
stats := Stats{
WriteCount: ts.writeCount.Load(),
WriteBytes: ts.writeBytes.Load(),
ProcessedCount: ts.processedCount.Load(),
ProcessedBytes: ts.processedBytes.Load(),
ErrorCount: ts.errorCount.Load(),
}
if t := ts.firstWriteTime.Load(); t != nil {
stats.FirstWriteTime = t.(time.Time)
}
if t := ts.lastWriteTime.Load(); t != nil {
stats.LastWriteTime = t.(time.Time)
}
return stats
}
// Save 保存统计信息到文件
func (ts *TopicStats) Save() error {
stats := ts.Get()
data, err := json.Marshal(stats)
if err != nil {
return fmt.Errorf("marshal stats: %w", err)
}
// 原子写入:先写临时文件,再重命名
tmpPath := ts.statsPath + ".tmp"
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
return fmt.Errorf("write temp file: %w", err)
}
if err := os.Rename(tmpPath, ts.statsPath); err != nil {
return fmt.Errorf("rename temp file: %w", err)
}
return nil
}
// Load 从文件加载统计信息
func (ts *TopicStats) Load() error {
data, err := os.ReadFile(ts.statsPath)
if err != nil {
return err
}
var stats Stats
if err := json.Unmarshal(data, &stats); err != nil {
return fmt.Errorf("unmarshal stats: %w", err)
}
// 恢复统计数据
ts.writeCount.Store(stats.WriteCount)
ts.writeBytes.Store(stats.WriteBytes)
ts.processedCount.Store(stats.ProcessedCount)
ts.processedBytes.Store(stats.ProcessedBytes)
ts.errorCount.Store(stats.ErrorCount)
if !stats.FirstWriteTime.IsZero() {
ts.firstWriteTime.Store(stats.FirstWriteTime)
}
if !stats.LastWriteTime.IsZero() {
ts.lastWriteTime.Store(stats.LastWriteTime)
}
return nil
}
// Reset 重置所有统计信息并删除统计文件
func (ts *TopicStats) Reset() error {
// 重置所有计数器
ts.writeCount.Store(0)
ts.writeBytes.Store(0)
ts.processedCount.Store(0)
ts.processedBytes.Store(0)
ts.errorCount.Store(0)
ts.firstWriteTime = atomic.Value{}
ts.lastWriteTime = atomic.Value{}
// 删除统计文件
if err := os.Remove(ts.statsPath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove stats file: %w", err)
}
return nil
}