## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
160 lines
3.9 KiB
Go
160 lines
3.9 KiB
Go
package seqlog
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Stats topic 统计信息
|
|
type Stats struct {
|
|
WriteCount int64 `json:"write_count"` // 写入次数
|
|
WriteBytes int64 `json:"write_bytes"` // 写入字节数
|
|
ProcessedCount int64 `json:"processed_count"` // 处理次数
|
|
ProcessedBytes int64 `json:"processed_bytes"` // 处理字节数
|
|
ErrorCount int64 `json:"error_count"` // 错误次数
|
|
FirstWriteTime time.Time `json:"first_write_time"` // 首次写入时间
|
|
LastWriteTime time.Time `json:"last_write_time"` // 最后写入时间
|
|
}
|
|
|
|
// TopicStats topic 统计管理器(支持原子操作和持久化)
|
|
type TopicStats struct {
|
|
writeCount atomic.Int64
|
|
writeBytes atomic.Int64
|
|
processedCount atomic.Int64
|
|
processedBytes atomic.Int64
|
|
errorCount atomic.Int64
|
|
firstWriteTime atomic.Value // time.Time
|
|
lastWriteTime atomic.Value // time.Time
|
|
statsPath string
|
|
}
|
|
|
|
// NewTopicStats 创建 topic 统计管理器
|
|
func NewTopicStats(statsPath string) *TopicStats {
|
|
ts := &TopicStats{
|
|
statsPath: statsPath,
|
|
}
|
|
// 尝试从文件加载统计信息
|
|
if err := ts.Load(); err != nil && !os.IsNotExist(err) {
|
|
// 忽略文件不存在错误,其他错误也忽略(使用默认值)
|
|
}
|
|
return ts
|
|
}
|
|
|
|
// IncWrite 增加写入计数
|
|
func (ts *TopicStats) IncWrite(bytes int64) {
|
|
ts.writeCount.Add(1)
|
|
ts.writeBytes.Add(bytes)
|
|
|
|
now := time.Now()
|
|
ts.lastWriteTime.Store(now)
|
|
|
|
// 如果是首次写入,设置首次写入时间
|
|
if ts.firstWriteTime.Load() == nil {
|
|
ts.firstWriteTime.Store(now)
|
|
}
|
|
}
|
|
|
|
// IncProcessed 增加处理计数
|
|
func (ts *TopicStats) IncProcessed(bytes int64) {
|
|
ts.processedCount.Add(1)
|
|
ts.processedBytes.Add(bytes)
|
|
}
|
|
|
|
// IncError 增加错误计数
|
|
func (ts *TopicStats) IncError() {
|
|
ts.errorCount.Add(1)
|
|
}
|
|
|
|
// Get 获取当前统计信息
|
|
func (ts *TopicStats) Get() Stats {
|
|
stats := Stats{
|
|
WriteCount: ts.writeCount.Load(),
|
|
WriteBytes: ts.writeBytes.Load(),
|
|
ProcessedCount: ts.processedCount.Load(),
|
|
ProcessedBytes: ts.processedBytes.Load(),
|
|
ErrorCount: ts.errorCount.Load(),
|
|
}
|
|
|
|
if t := ts.firstWriteTime.Load(); t != nil {
|
|
stats.FirstWriteTime = t.(time.Time)
|
|
}
|
|
if t := ts.lastWriteTime.Load(); t != nil {
|
|
stats.LastWriteTime = t.(time.Time)
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// Save 保存统计信息到文件
|
|
func (ts *TopicStats) Save() error {
|
|
stats := ts.Get()
|
|
|
|
data, err := json.Marshal(stats)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal stats: %w", err)
|
|
}
|
|
|
|
// 原子写入:先写临时文件,再重命名
|
|
tmpPath := ts.statsPath + ".tmp"
|
|
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
|
|
return fmt.Errorf("write temp file: %w", err)
|
|
}
|
|
|
|
if err := os.Rename(tmpPath, ts.statsPath); err != nil {
|
|
return fmt.Errorf("rename temp file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Load 从文件加载统计信息
|
|
func (ts *TopicStats) Load() error {
|
|
data, err := os.ReadFile(ts.statsPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var stats Stats
|
|
if err := json.Unmarshal(data, &stats); err != nil {
|
|
return fmt.Errorf("unmarshal stats: %w", err)
|
|
}
|
|
|
|
// 恢复统计数据
|
|
ts.writeCount.Store(stats.WriteCount)
|
|
ts.writeBytes.Store(stats.WriteBytes)
|
|
ts.processedCount.Store(stats.ProcessedCount)
|
|
ts.processedBytes.Store(stats.ProcessedBytes)
|
|
ts.errorCount.Store(stats.ErrorCount)
|
|
|
|
if !stats.FirstWriteTime.IsZero() {
|
|
ts.firstWriteTime.Store(stats.FirstWriteTime)
|
|
}
|
|
if !stats.LastWriteTime.IsZero() {
|
|
ts.lastWriteTime.Store(stats.LastWriteTime)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Reset 重置所有统计信息并删除统计文件
|
|
func (ts *TopicStats) Reset() error {
|
|
// 重置所有计数器
|
|
ts.writeCount.Store(0)
|
|
ts.writeBytes.Store(0)
|
|
ts.processedCount.Store(0)
|
|
ts.processedBytes.Store(0)
|
|
ts.errorCount.Store(0)
|
|
ts.firstWriteTime = atomic.Value{}
|
|
ts.lastWriteTime = atomic.Value{}
|
|
|
|
// 删除统计文件
|
|
if err := os.Remove(ts.statsPath); err != nil && !os.IsNotExist(err) {
|
|
return fmt.Errorf("remove stats file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|