package seqlog import ( "encoding/json" "fmt" "os" "sync/atomic" "time" ) // Stats topic 统计信息 type Stats struct { WriteCount int64 `json:"write_count"` // 写入次数 WriteBytes int64 `json:"write_bytes"` // 写入字节数 ProcessedCount int64 `json:"processed_count"` // 处理次数 ProcessedBytes int64 `json:"processed_bytes"` // 处理字节数 ErrorCount int64 `json:"error_count"` // 错误次数 FirstWriteTime time.Time `json:"first_write_time"` // 首次写入时间 LastWriteTime time.Time `json:"last_write_time"` // 最后写入时间 } // TopicStats topic 统计管理器(支持原子操作和持久化) type TopicStats struct { writeCount atomic.Int64 writeBytes atomic.Int64 processedCount atomic.Int64 processedBytes atomic.Int64 errorCount atomic.Int64 firstWriteTime atomic.Value // time.Time lastWriteTime atomic.Value // time.Time statsPath string } // NewTopicStats 创建 topic 统计管理器 func NewTopicStats(statsPath string) *TopicStats { ts := &TopicStats{ statsPath: statsPath, } // 尝试从文件加载统计信息 if err := ts.Load(); err != nil && !os.IsNotExist(err) { // 忽略文件不存在错误,其他错误也忽略(使用默认值) } return ts } // IncWrite 增加写入计数 func (ts *TopicStats) IncWrite(bytes int64) { ts.writeCount.Add(1) ts.writeBytes.Add(bytes) now := time.Now() ts.lastWriteTime.Store(now) // 如果是首次写入,设置首次写入时间 if ts.firstWriteTime.Load() == nil { ts.firstWriteTime.Store(now) } } // IncProcessed 增加处理计数 func (ts *TopicStats) IncProcessed(bytes int64) { ts.processedCount.Add(1) ts.processedBytes.Add(bytes) } // IncError 增加错误计数 func (ts *TopicStats) IncError() { ts.errorCount.Add(1) } // Get 获取当前统计信息 func (ts *TopicStats) Get() Stats { stats := Stats{ WriteCount: ts.writeCount.Load(), WriteBytes: ts.writeBytes.Load(), ProcessedCount: ts.processedCount.Load(), ProcessedBytes: ts.processedBytes.Load(), ErrorCount: ts.errorCount.Load(), } if t := ts.firstWriteTime.Load(); t != nil { stats.FirstWriteTime = t.(time.Time) } if t := ts.lastWriteTime.Load(); t != nil { stats.LastWriteTime = t.(time.Time) } return stats } // Save 保存统计信息到文件 func (ts *TopicStats) Save() error { stats := ts.Get() data, err := json.Marshal(stats) if err != nil { return fmt.Errorf("marshal stats: %w", err) } // 原子写入:先写临时文件,再重命名 tmpPath := ts.statsPath + ".tmp" if err := os.WriteFile(tmpPath, data, 0644); err != nil { return fmt.Errorf("write temp file: %w", err) } if err := os.Rename(tmpPath, ts.statsPath); err != nil { return fmt.Errorf("rename temp file: %w", err) } return nil } // Load 从文件加载统计信息 func (ts *TopicStats) Load() error { data, err := os.ReadFile(ts.statsPath) if err != nil { return err } var stats Stats if err := json.Unmarshal(data, &stats); err != nil { return fmt.Errorf("unmarshal stats: %w", err) } // 恢复统计数据 ts.writeCount.Store(stats.WriteCount) ts.writeBytes.Store(stats.WriteBytes) ts.processedCount.Store(stats.ProcessedCount) ts.processedBytes.Store(stats.ProcessedBytes) ts.errorCount.Store(stats.ErrorCount) if !stats.FirstWriteTime.IsZero() { ts.firstWriteTime.Store(stats.FirstWriteTime) } if !stats.LastWriteTime.IsZero() { ts.lastWriteTime.Store(stats.LastWriteTime) } return nil } // Reset 重置所有统计信息并删除统计文件 func (ts *TopicStats) Reset() error { // 重置所有计数器 ts.writeCount.Store(0) ts.writeBytes.Store(0) ts.processedCount.Store(0) ts.processedBytes.Store(0) ts.errorCount.Store(0) ts.firstWriteTime = atomic.Value{} ts.lastWriteTime = atomic.Value{} // 删除统计文件 if err := os.Remove(ts.statsPath); err != nil && !os.IsNotExist(err) { return fmt.Errorf("remove stats file: %w", err) } return nil }