重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
package seqlog
|
|
|
|
|
|
|
|
|
|
|
|
import "github.com/google/uuid"
|
|
|
|
|
|
|
|
|
|
|
|
// seqlog 是一个 Go 语言日志收集和处理库
|
|
|
|
|
|
//
|
|
|
|
|
|
// 核心特性:
|
|
|
|
|
|
// - 单文件日志处理:专注于单个日志文件的读取和处理
|
|
|
|
|
|
// - 游标尺机制:通过游标跟踪日志文件的读取位置,支持断点续读
|
|
|
|
|
|
// - 自动恢复:程序重启时自动发现已存在的日志文件并从上次位置继续处理
|
|
|
|
|
|
// - 日志收集:提供高效的日志收集和解析能力
|
|
|
|
|
|
// - tail -f 模式:支持持续监控日志文件的新增内容
|
|
|
|
|
|
// - UUID 去重:每条日志自动生成唯一 UUID,便于外部去重
|
|
|
|
|
|
// - slog 集成:内置 slog.Logger 支持,提供结构化日志记录
|
|
|
|
|
|
// - 统计功能:提供可恢复的统计信息,包括写入/处理次数、字节数、错误计数等
|
|
|
|
|
|
// - 双向查询:支持基于当前处理位置的向前/向后查询,自动标注记录状态
|
|
|
|
|
|
// - 事件通知:支持订阅各种事件(写入、处理、启动、停止等),实时状态变化通知
|
|
|
|
|
|
//
|
|
|
|
|
|
// 使用示例:
|
|
|
|
|
|
//
|
|
|
|
|
|
// // 写入日志
|
|
|
|
|
|
// writer, _ := seqlog.NewLogWriter("app.log")
|
|
|
|
|
|
// offset, _ := writer.Append([]byte("log message"))
|
|
|
|
|
|
//
|
|
|
|
|
|
// // 读取日志
|
|
|
|
|
|
// cursor, _ := seqlog.NewCursor("app.log")
|
|
|
|
|
|
// defer cursor.Close()
|
|
|
|
|
|
// for {
|
|
|
|
|
|
// rec, err := cursor.Next()
|
|
|
|
|
|
// if err != nil {
|
|
|
|
|
|
// break
|
|
|
|
|
|
// }
|
|
|
|
|
|
// // rec.UUID 是自动生成的唯一标识符,可用于去重
|
|
|
|
|
|
// fmt.Printf("UUID: %s, Data: %s\n", rec.UUID, string(rec.Data))
|
|
|
|
|
|
// }
|
|
|
|
|
|
//
|
|
|
|
|
|
// // tail -f 模式
|
|
|
|
|
|
// handler := func(rec *Record) error {
|
|
|
|
|
|
// fmt.Println(string(rec.Data))
|
|
|
|
|
|
// return nil
|
|
|
|
|
|
// }
|
|
|
|
|
|
// tailer, _ := seqlog.NewTailer("app.log", handler, nil)
|
|
|
|
|
|
// tailer.Start()
|
|
|
|
|
|
//
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// // 使用 LogHub 管理器(带 slog 支持和自动恢复)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// logger := slog.Default()
|
|
|
|
|
|
// handler := func(topic string, rec *seqlog.Record) error {
|
|
|
|
|
|
// fmt.Printf("[%s] %s\n", topic, string(rec.Data))
|
|
|
|
|
|
// return nil
|
|
|
|
|
|
// }
|
2025-10-04 13:26:21 +08:00
|
|
|
|
// seq := seqlog.NewLogHub("/tmp/logs", logger, handler)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// seq.Start() // 自动发现并恢复已存在的日志文件
|
|
|
|
|
|
// seq.Write("app", []byte("application log"))
|
|
|
|
|
|
//
|
|
|
|
|
|
// // 获取统计信息
|
|
|
|
|
|
// stats, _ := seq.GetTopicStats("app")
|
|
|
|
|
|
// fmt.Printf("写入: %d 条, %d 字节\n", stats.WriteCount, stats.WriteBytes)
|
|
|
|
|
|
//
|
|
|
|
|
|
// // 查询记录
|
|
|
|
|
|
// query, _ := seq.NewTopicQuery("app")
|
|
|
|
|
|
// defer query.Close()
|
|
|
|
|
|
// current, _ := query.GetCurrent() // 获取当前处理位置
|
|
|
|
|
|
// backward, _ := query.QueryBackward(5) // 向后查询 5 条
|
|
|
|
|
|
// forward, _ := query.QueryForward(5) // 向前查询 5 条
|
|
|
|
|
|
//
|
|
|
|
|
|
// // 订阅事件
|
|
|
|
|
|
// seq.Subscribe("app", seqlog.EventWriteSuccess, func(event *seqlog.Event) {
|
|
|
|
|
|
// fmt.Printf("写入成功: offset=%d\n", event.Position)
|
|
|
|
|
|
// })
|
|
|
|
|
|
//
|
|
|
|
|
|
// seq.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
// Record 日志记录
|
|
|
|
|
|
//
|
2025-10-04 17:54:49 +08:00
|
|
|
|
// 存储格式:[4B len][8B offset][4B CRC][16B UUID][data]
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
type Record struct {
|
|
|
|
|
|
Len uint32 // 数据长度
|
|
|
|
|
|
CRC uint32 // CRC 校验和
|
|
|
|
|
|
UUID uuid.UUID // UUID,用于去重
|
|
|
|
|
|
Data []byte // 实际数据
|
|
|
|
|
|
}
|