160 lines
3.9 KiB
Go
160 lines
3.9 KiB
Go
|
|
package seqlog
|
||
|
|
|
||
|
|
import (
|
||
|
|
"encoding/json"
|
||
|
|
"fmt"
|
||
|
|
"os"
|
||
|
|
"sync/atomic"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
// Stats topic 统计信息
|
||
|
|
type Stats struct {
|
||
|
|
WriteCount int64 `json:"write_count"` // 写入次数
|
||
|
|
WriteBytes int64 `json:"write_bytes"` // 写入字节数
|
||
|
|
ProcessedCount int64 `json:"processed_count"` // 处理次数
|
||
|
|
ProcessedBytes int64 `json:"processed_bytes"` // 处理字节数
|
||
|
|
ErrorCount int64 `json:"error_count"` // 错误次数
|
||
|
|
FirstWriteTime time.Time `json:"first_write_time"` // 首次写入时间
|
||
|
|
LastWriteTime time.Time `json:"last_write_time"` // 最后写入时间
|
||
|
|
}
|
||
|
|
|
||
|
|
// TopicStats topic 统计管理器(支持原子操作和持久化)
|
||
|
|
type TopicStats struct {
|
||
|
|
writeCount atomic.Int64
|
||
|
|
writeBytes atomic.Int64
|
||
|
|
processedCount atomic.Int64
|
||
|
|
processedBytes atomic.Int64
|
||
|
|
errorCount atomic.Int64
|
||
|
|
firstWriteTime atomic.Value // time.Time
|
||
|
|
lastWriteTime atomic.Value // time.Time
|
||
|
|
statsPath string
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewTopicStats 创建 topic 统计管理器
|
||
|
|
func NewTopicStats(statsPath string) *TopicStats {
|
||
|
|
ts := &TopicStats{
|
||
|
|
statsPath: statsPath,
|
||
|
|
}
|
||
|
|
// 尝试从文件加载统计信息
|
||
|
|
if err := ts.Load(); err != nil && !os.IsNotExist(err) {
|
||
|
|
// 忽略文件不存在错误,其他错误也忽略(使用默认值)
|
||
|
|
}
|
||
|
|
return ts
|
||
|
|
}
|
||
|
|
|
||
|
|
// IncWrite 增加写入计数
|
||
|
|
func (ts *TopicStats) IncWrite(bytes int64) {
|
||
|
|
ts.writeCount.Add(1)
|
||
|
|
ts.writeBytes.Add(bytes)
|
||
|
|
|
||
|
|
now := time.Now()
|
||
|
|
ts.lastWriteTime.Store(now)
|
||
|
|
|
||
|
|
// 如果是首次写入,设置首次写入时间
|
||
|
|
if ts.firstWriteTime.Load() == nil {
|
||
|
|
ts.firstWriteTime.Store(now)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// IncProcessed 增加处理计数
|
||
|
|
func (ts *TopicStats) IncProcessed(bytes int64) {
|
||
|
|
ts.processedCount.Add(1)
|
||
|
|
ts.processedBytes.Add(bytes)
|
||
|
|
}
|
||
|
|
|
||
|
|
// IncError 增加错误计数
|
||
|
|
func (ts *TopicStats) IncError() {
|
||
|
|
ts.errorCount.Add(1)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Get 获取当前统计信息
|
||
|
|
func (ts *TopicStats) Get() Stats {
|
||
|
|
stats := Stats{
|
||
|
|
WriteCount: ts.writeCount.Load(),
|
||
|
|
WriteBytes: ts.writeBytes.Load(),
|
||
|
|
ProcessedCount: ts.processedCount.Load(),
|
||
|
|
ProcessedBytes: ts.processedBytes.Load(),
|
||
|
|
ErrorCount: ts.errorCount.Load(),
|
||
|
|
}
|
||
|
|
|
||
|
|
if t := ts.firstWriteTime.Load(); t != nil {
|
||
|
|
stats.FirstWriteTime = t.(time.Time)
|
||
|
|
}
|
||
|
|
if t := ts.lastWriteTime.Load(); t != nil {
|
||
|
|
stats.LastWriteTime = t.(time.Time)
|
||
|
|
}
|
||
|
|
|
||
|
|
return stats
|
||
|
|
}
|
||
|
|
|
||
|
|
// Save 保存统计信息到文件
|
||
|
|
func (ts *TopicStats) Save() error {
|
||
|
|
stats := ts.Get()
|
||
|
|
|
||
|
|
data, err := json.Marshal(stats)
|
||
|
|
if err != nil {
|
||
|
|
return fmt.Errorf("marshal stats: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// 原子写入:先写临时文件,再重命名
|
||
|
|
tmpPath := ts.statsPath + ".tmp"
|
||
|
|
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
|
||
|
|
return fmt.Errorf("write temp file: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
if err := os.Rename(tmpPath, ts.statsPath); err != nil {
|
||
|
|
return fmt.Errorf("rename temp file: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// Load 从文件加载统计信息
|
||
|
|
func (ts *TopicStats) Load() error {
|
||
|
|
data, err := os.ReadFile(ts.statsPath)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
var stats Stats
|
||
|
|
if err := json.Unmarshal(data, &stats); err != nil {
|
||
|
|
return fmt.Errorf("unmarshal stats: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// 恢复统计数据
|
||
|
|
ts.writeCount.Store(stats.WriteCount)
|
||
|
|
ts.writeBytes.Store(stats.WriteBytes)
|
||
|
|
ts.processedCount.Store(stats.ProcessedCount)
|
||
|
|
ts.processedBytes.Store(stats.ProcessedBytes)
|
||
|
|
ts.errorCount.Store(stats.ErrorCount)
|
||
|
|
|
||
|
|
if !stats.FirstWriteTime.IsZero() {
|
||
|
|
ts.firstWriteTime.Store(stats.FirstWriteTime)
|
||
|
|
}
|
||
|
|
if !stats.LastWriteTime.IsZero() {
|
||
|
|
ts.lastWriteTime.Store(stats.LastWriteTime)
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// Reset 重置所有统计信息并删除统计文件
|
||
|
|
func (ts *TopicStats) Reset() error {
|
||
|
|
// 重置所有计数器
|
||
|
|
ts.writeCount.Store(0)
|
||
|
|
ts.writeBytes.Store(0)
|
||
|
|
ts.processedCount.Store(0)
|
||
|
|
ts.processedBytes.Store(0)
|
||
|
|
ts.errorCount.Store(0)
|
||
|
|
ts.firstWriteTime = atomic.Value{}
|
||
|
|
ts.lastWriteTime = atomic.Value{}
|
||
|
|
|
||
|
|
// 删除统计文件
|
||
|
|
if err := os.Remove(ts.statsPath); err != nil && !os.IsNotExist(err) {
|
||
|
|
return fmt.Errorf("remove stats file: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|