重构:统一使用索引(Index)替代位置(Position)进行状态判断

## 主要变更

### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
  - Index: 记录序号(逻辑概念),用于状态判断
  - Offset: 文件字节位置(物理概念),仅用于 I/O 操作

### API 变更
- 删除所有 Position 相关方法:
  - `LogCursor.StartPos()/EndPos()`
  - `LogTailer.GetStartPos()/GetEndPos()`
  - `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
  - `Seqlog.GetProcessingPosition()/GetReadPosition()`

- 新增索引方法:
  - `LogCursor.StartIndex()/EndIndex()`
  - `LogTailer.GetStartIndex()/GetEndIndex()`
  - `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
  - `Seqlog.GetProcessingIndex()/GetReadIndex()`
  - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index

### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断

### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset

### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-03 23:48:21 +08:00
commit de39339620
21 changed files with 6039 additions and 0 deletions

30
.gitignore vendored Normal file
View File

@@ -0,0 +1,30 @@
# macOS
.DS_Store
# 测试生成的文件
*.test
test_*
# 索引文件
*.idx
# 位置文件
*.pos
# 日志文件
*.log
.jj/
.idea/
.vscode/
.zed/
# 示例程序编译产物
example/webapp/webapp
example/webapp/logs/
# Go 编译产物
*.so
*.dylib
*.dll
*.exe

75
CLAUDE.md Normal file
View File

@@ -0,0 +1,75 @@
# CLAUDE.md
本文件为 Claude Code 提供项目上下文信息。
**语言偏好:请使用中文进行所有交流和文档编写。**
## 项目概述
seqlog 是一个 Go 语言日志收集和处理库,模块路径为 `code.tczkiot.com/seqlog`
**核心特性:**
- 单文件日志处理:专注于单个日志文件的读取和处理
- 游标尺机制:通过游标跟踪日志文件的读取位置,支持断点续读
- 日志收集:提供高效的日志收集和解析能力
- 使用 Go 1.25.1 版本开发
**适用场景:**
- 日志文件监控和采集
- 增量日志读取和处理
- 日志文件位置追踪
## 项目结构
```
seqlog/
├── go.mod # Go 模块定义文件
└── CLAUDE.md # Claude Code 项目文档
```
## 开发指南
### 环境要求
- Go 1.25.1 或更高版本
- 模块路径:`code.tczkiot.com/seqlog`
### 代码规范
- 遵循 Go 官方代码风格指南
- 使用 `go fmt` 格式化代码
- 编写单元测试覆盖核心功能
- 导出的函数和类型需要添加文档注释
- **中英文混排规范**:注释等文字中,中英文之间、中文与阿拉伯数字之间必须添加空格
- ✅ 正确示例:`// 创建 logger 实例,默认日志级别为 INFO`
- ✅ 正确示例:`// 最多支持 100 个并发连接`
- ❌ 错误示例:`// 创建logger实例默认日志级别为INFO`
- ❌ 错误示例:`// 最多支持100个并发连接`
### 构建和测试
```bash
# 运行测试
go test ./...
# 构建项目
go build ./...
# 运行代码检查
go vet ./...
```
## 常用任务
### 添加新功能
1. 在相应的包中创建新文件
2. 实现功能并编写测试
3. 更新文档说明
### 发布新版本
1. 更新版本号
2. 运行完整测试套件
3. 创建 git tag
4. 推送到远程仓库
## 技术栈
- **语言**: Go 1.25.1
- **模块管理**: Go Modules

321
INDEX_DESIGN.md Normal file
View File

@@ -0,0 +1,321 @@
# Seqlog 索引设计文档
## 概述
seqlog 现已支持**持久化索引文件**,实现高效的日志记录查询和检索。
## 设计原则
1. **职责分离**:数据文件只存储数据,索引文件负责 offset 管理
2. **启动时重建**:每次启动都从日志文件重建索引,确保一致性
3. **最小化存储**:移除冗余字段,优化存储空间
## 索引文件格式
### 文件命名
```
{logfile}.idx
```
例如:`app.log` 对应的索引文件为 `app.log.idx`
### 数据文件结构
```
每条记录:[4B len][4B CRC][16B UUID][data]
头部大小24 字节
示例:
00000000 0f 00 00 00 8b 54 b3 a5 a5 9b fb 59 dd d5 45 2c |.....T.....Y..E,|
↑ Len=15 ↑ CRC ↑ UUID 开始...
00000010 a1 82 6f 16 5c 54 94 8d e6 97 a5 e5 bf 97 e8 ae |..o.\T..........|
↑ ...UUID 继续 ↑ 数据开始
```
### 索引文件结构
```
┌─────────────────────────────────────────────────┐
│ Header (8 字节) │
│ ┌─────────────────────────────────────────────┐ │
│ │ Magic: [4B] 0x53494458 ("SIDX") │ │
│ │ Version: [4B] 1 │ │
│ └─────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────┤
│ Index Entries (每条 8 字节) │
│ ┌─────────────────────────────────────────────┐ │
│ │ Offset1: [8B] 第 1 条记录的偏移 │ │
│ │ Offset2: [8B] 第 2 条记录的偏移 │ │
│ │ ... │ │
│ │ OffsetN: [8B] 第 N 条记录的偏移 │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
RecordCount = (文件大小 - 8) / 8
LastOffset = 读取最后一条索引条目
```
### 实际示例
15 条记录的文件:
**数据文件** (591 字节)
```
00000000 0f 00 00 00 8b 54 b3 a5 a5 9b fb 59 dd d5 45 2c |.....T.....Y..E,|
00000010 a1 82 6f 16 5c 54 94 8d e6 97 a5 e5 bf 97 e8 ae |..o.\T..........|
↑ 数据:"日志记录 #1"
```
**索引文件** (128 字节)
```
00000000 53 49 44 58 01 00 00 00 00 00 00 00 00 00 00 00 |SIDX............|
↑ Magic="SIDX" ↑ Ver=1 ↑ Offset[0]=0
00000010 27 00 00 00 00 00 00 00 4e 00 00 00 00 00 00 00 |'.......N.......|
↑ Offset[1]=39 (0x27) ↑ Offset[2]=78 (0x4E)
文件大小128 字节 = 8B header + 15 × 8B entries
记录总数:(128 - 8) / 8 = 15 条
```
## 核心组件
### 1. RecordIndex (index.go)
索引文件管理器,负责索引的构建、加载、追加和查询。
#### 主要方法
```go
// 创建或加载索引(自动重建)
index, err := seqlog.NewRecordIndex(logPath)
// 追加索引条目(写入时调用)
err := index.Append(offset)
// 根据索引位置获取记录偏移
offset, err := index.GetOffset(index)
// 二分查找:根据偏移量查找索引位置
idx := index.FindIndex(offset)
// 获取记录总数
count := index.Count()
// 获取最后一条记录偏移
lastOffset := index.LastOffset()
// 关闭索引
index.Close()
```
### 2. LogWriter 集成
写入器支持可选的索引自动更新。
```go
// 创建带索引的写入器
writer, err := seqlog.NewLogWriterWithIndex(logPath, true)
// 写入时自动更新索引
offset, err := writer.Append([]byte("log data"))
// 关闭写入器和索引
writer.Close()
```
### 3. RecordQuery 集成
查询器优先使用索引文件进行高效查询。
```go
// 创建带索引的查询器
query, err := seqlog.NewRecordQueryWithIndex(logPath, true)
// 获取记录总数从索引O(1)
count, err := query.GetRecordCount()
// 向后查询基于索引O(log n) 定位 + O(n) 读取)
backward, err := query.QueryAt(offset, -1, count, startPos, endPos)
query.Close()
```
## 性能优化
### 1. 启动时重建
- **每次启动都重建**:从日志文件扫描构建索引,确保索引和日志完全一致
- **无损坏风险**:索引文件即使损坏也会自动重建
- **简化设计**:无需在头部保存 RecordCount 和 LastOffset
### 2. 增量更新
- 写入记录时同步追加索引条目
- 避免每次查询都重新扫描日志文件
### 3. 二分查找
- `FindIndex()` 使用二分查找定位偏移量
- 时间复杂度O(log n)
### 4. 自动恢复
- 索引文件损坏时自动重建
- 写入器打开时检查并同步索引
## 使用场景
### 场景 1高频查询
```go
// 使用索引,避免每次查询都扫描日志
query, _ := seqlog.NewRecordQueryWithIndex(logPath, true)
for i := 0; i < 1000; i++ {
count, _ := query.GetRecordCount() // O(1)
// ...
}
```
### 场景 2向后查询
```go
// 向后查询需要索引(否则需全文扫描)
backward, _ := query.QueryAt(currentPos, -1, 10, startPos, endPos)
```
### 场景 3断点续传
```go
// 程序重启后,索引自动加载,无需重建
index, _ := seqlog.NewRecordIndex(logPath)
count := index.Count()
lastOffset := index.LastOffset()
```
### 场景 4大文件处理
```go
// 索引文件远小于日志文件,快速加载
// 100 万条记录的索引文件仅 ~7.6 MB
// (24B header + 1,000,000 * 8B = 8,000,024 字节)
```
## API 兼容性
### 向后兼容
- 现有 API 保持不变(`NewLogWriter`, `NewRecordQuery`
- 默认**不启用**索引,避免影响现有代码
### 选择性启用
```go
// 旧代码:不使用索引
writer, _ := seqlog.NewLogWriter(logPath)
// 新代码:启用索引
writer, _ := seqlog.NewLogWriterWithIndex(logPath, true)
```
## 测试覆盖
所有索引功能均有完整测试覆盖:
```bash
go test -v -run TestIndex
```
测试用例:
- `TestIndexBasicOperations` - 基本操作(构建、加载、查询)
- `TestIndexRebuild` - 索引重建
- `TestQueryWithIndex` - 带索引的查询
- `TestIndexAppend` - 索引追加
- `TestIndexHeader` - 头部信息验证
## 文件示例
运行示例程序:
```bash
cd example
go run index_example.go
```
示例输出:
```
=== 示例 1带索引的写入器 ===
写入: offset=0, data=日志记录 #1
写入: offset=47, data=日志记录 #2
...
索引文件已创建: test_seqlog/app.log.idx
=== 示例 2带索引的查询器 ===
记录总数: 10
第 5 条记录的偏移: 235
向后查询 3 条记录:
[0] 状态=StatusProcessing, 数据=日志记录 #3
...
```
## 技术细节
### 存储开销
**数据文件**
- 每条记录头部24 字节(原 32 字节,节省 25%
- 格式:`[4B len][4B CRC][16B UUID][data]`
**索引文件**
- 头部8 字节(固定)
- 每条记录8 字节
- 总大小:`8 + recordCount * 8` 字节
**示例对比**1 万条记录,每条 100 字节数据):
| 组件 | 旧格式 (32B 头) | 新格式 (24B 头) | 节省 |
|------|----------------|----------------|------|
| 数据文件 | 1.32 MB | 1.24 MB | **80 KB (6%)** |
| 索引文件 | 128 字节 | 128 字节 | 0 |
| 总计 | 1.32 MB | 1.24 MB | **80 KB** |
### 性能对比
| 操作 | 无索引 | 有索引 |
|------|--------|--------|
| 获取记录总数 | O(n) 全文扫描 | O(1) 读取头部 |
| 向后查询定位 | 不支持 | O(log n) 二分查找 |
| 启动时间 | 快(无需加载) | 中(加载索引) |
| 内存占用 | 低 | 中(索引数组) |
### 数据一致性
- **启动时重建**:确保索引永远和日志文件一致
- 运行时:写入日志后立即追加索引
- 索引文件使用 `Sync()` 确保持久化
### 错误处理
- 日志文件不存在 → 返回错误
- 写入失败 → 返回错误,不更新索引
- 索引文件损坏 → 启动时自动重建(无影响)
## 未来优化方向
1. **稀疏索引**:每 N 条记录建一个索引点,减少内存占用
2. **分段索引**:大文件分段存储,支持并发查询
3. **压缩索引**:使用差值编码减少存储空间
4. **mmap 映射**:大索引文件使用内存映射优化加载
5. **布隆过滤器**:快速判断记录是否存在
## 总结
索引文件设计要点:
**持久化**:索引保存到磁盘,重启后快速加载
**增量更新**:写入时自动追加,避免重建
**向后兼容**:不影响现有 API可选启用
**自动恢复**:损坏时自动重建,确保可用性
**高效查询**:二分查找 + O(1) 元数据读取
**测试完备**:全面的单元测试覆盖

198
cursor.go Normal file
View File

@@ -0,0 +1,198 @@
package seqlog
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"os"
"github.com/google/uuid"
)
// LogCursor 日志游标(窗口模式)
type LogCursor struct {
fd *os.File
rbuf []byte // 8 MiB 复用
path string // 日志文件路径
posFile string // 游标位置文件路径
startIdx int // 窗口开始索引(已处理的记录索引)
endIdx int // 窗口结束索引(当前读到的记录索引)
index *RecordIndex // 索引管理器(来自外部)
}
// NewCursor 创建一个新的日志游标
// index: 外部提供的索引管理器,用于快速定位记录
func NewCursor(path string, index *RecordIndex) (*LogCursor, error) {
if index == nil {
return nil, fmt.Errorf("index cannot be nil")
}
fd, err := os.Open(path)
if err != nil {
return nil, err
}
c := &LogCursor{
fd: fd,
rbuf: make([]byte, 8<<20),
path: path,
posFile: path + ".pos",
startIdx: 0,
endIdx: 0,
index: index,
}
// 尝试恢复上次位置
c.loadPosition()
return c, nil
}
// Seek 到任意 offset支持重启续传
func (c *LogCursor) Seek(offset int64, whence int) (int64, error) {
return c.fd.Seek(offset, whence)
}
// Next 读取下一条记录(使用索引快速定位)
func (c *LogCursor) Next() (*Record, error) {
// 检查是否超出索引范围
if c.endIdx >= c.index.Count() {
return nil, io.EOF
}
// 从索引获取当前记录的偏移量
offset, err := c.index.GetOffset(c.endIdx)
if err != nil {
return nil, fmt.Errorf("get offset from index: %w", err)
}
// Seek 到记录位置
if _, err := c.fd.Seek(offset, 0); err != nil {
return nil, fmt.Errorf("seek to offset %d: %w", offset, err)
}
// 读取头部:[4B len][4B CRC][16B UUID] = 24 字节
hdr := c.rbuf[:24]
if _, err := io.ReadFull(c.fd, hdr); err != nil {
return nil, err
}
var rec Record
rec.Len = binary.LittleEndian.Uint32(hdr[0:4])
rec.CRC = binary.LittleEndian.Uint32(hdr[4:8])
// 读取并校验 UUID
copy(rec.UUID[:], hdr[8:24])
if _, err := uuid.FromBytes(rec.UUID[:]); err != nil {
return nil, fmt.Errorf("invalid uuid: %w", err)
}
// 如果数据大于缓冲区,分配新的 buffer
var payload []byte
if int(rec.Len) <= len(c.rbuf)-24 {
payload = c.rbuf[24 : 24+rec.Len]
} else {
payload = make([]byte, rec.Len)
}
if _, err := io.ReadFull(c.fd, payload); err != nil {
return nil, err
}
if crc32.ChecksumIEEE(payload) != rec.CRC {
return nil, fmt.Errorf("crc mismatch")
}
rec.Data = append([]byte(nil), payload...) // 复制出去,复用 buffer
// 更新窗口结束索引(移动到下一条记录)
c.endIdx++
return &rec, nil
}
// NextRange 读取指定数量的记录(范围游动)
// count: 要读取的记录数量
// 返回:读取到的记录列表,如果到达文件末尾,返回的记录数可能少于 count
func (c *LogCursor) NextRange(count int) ([]*Record, error) {
if count <= 0 {
return nil, fmt.Errorf("count must be greater than 0")
}
results := make([]*Record, 0, count)
for range count {
rec, err := c.Next()
if err != nil {
if err == io.EOF && len(results) > 0 {
// 已经读取了一些记录,返回这些记录
return results, nil
}
return results, err
}
results = append(results, rec)
}
return results, nil
}
// Commit 提交窗口,将 endIdx 移动到 startIdx表示已处理完这批记录
func (c *LogCursor) Commit() {
c.startIdx = c.endIdx
}
// Rollback 回滚窗口,将 endIdx 回退到 startIdx表示放弃这批记录的处理
func (c *LogCursor) Rollback() error {
c.endIdx = c.startIdx
return nil
}
// StartIndex 获取窗口开始索引
func (c *LogCursor) StartIndex() int {
return c.startIdx
}
// EndIndex 获取窗口结束索引
func (c *LogCursor) EndIndex() int {
return c.endIdx
}
// Close 关闭游标并保存位置
func (c *LogCursor) Close() error {
c.savePosition()
return c.fd.Close()
}
// savePosition 保存当前读取位置到文件
func (c *LogCursor) savePosition() error {
f, err := os.Create(c.posFile)
if err != nil {
return err
}
defer f.Close()
buf := make([]byte, 4)
// 保存 startIdx已处理的索引
binary.LittleEndian.PutUint32(buf, uint32(c.startIdx))
_, err = f.Write(buf)
return err
}
// loadPosition 从文件加载上次的读取位置
func (c *LogCursor) loadPosition() error {
f, err := os.Open(c.posFile)
if err != nil {
if os.IsNotExist(err) {
return nil // 文件不存在,从头开始
}
return err
}
defer f.Close()
buf := make([]byte, 4)
if _, err := io.ReadFull(f, buf); err != nil {
return err
}
// 加载 startIdx
c.startIdx = int(binary.LittleEndian.Uint32(buf))
c.endIdx = c.startIdx
return nil
}

171
event.go Normal file
View File

@@ -0,0 +1,171 @@
package seqlog
import (
"sync"
"time"
)
// EventType 事件类型
type EventType int
const (
EventWriteSuccess EventType = iota // 写入成功
EventWriteError // 写入错误
EventProcessSuccess // 处理成功
EventProcessError // 处理错误
EventProcessorStart // Processor 启动
EventProcessorStop // Processor 停止
EventProcessorReset // Processor 重置
EventPositionSaved // 位置保存
)
// String 返回事件类型的字符串表示
func (e EventType) String() string {
switch e {
case EventWriteSuccess:
return "写入成功"
case EventWriteError:
return "写入错误"
case EventProcessSuccess:
return "处理成功"
case EventProcessError:
return "处理错误"
case EventProcessorStart:
return "Processor 启动"
case EventProcessorStop:
return "Processor 停止"
case EventProcessorReset:
return "Processor 重置"
case EventPositionSaved:
return "位置保存"
default:
return "未知事件"
}
}
// Event 事件数据
type Event struct {
Type EventType // 事件类型
Topic string // topic 名称
Timestamp time.Time // 事件时间
Record *Record // 相关记录(可选)
Error error // 错误信息(可选)
Position int64 // 位置信息(可选)
}
// EventListener 事件监听器
type EventListener func(*Event)
// EventBus 事件总线
type EventBus struct {
listeners map[EventType][]EventListener
mu sync.RWMutex
}
// NewEventBus 创建事件总线
func NewEventBus() *EventBus {
return &EventBus{
listeners: make(map[EventType][]EventListener),
}
}
// Subscribe 订阅事件
func (eb *EventBus) Subscribe(eventType EventType, listener EventListener) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.listeners[eventType] = append(eb.listeners[eventType], listener)
}
// SubscribeAll 订阅所有事件
func (eb *EventBus) SubscribeAll(listener EventListener) {
eb.mu.Lock()
defer eb.mu.Unlock()
allTypes := []EventType{
EventWriteSuccess,
EventWriteError,
EventProcessSuccess,
EventProcessError,
EventProcessorStart,
EventProcessorStop,
EventProcessorReset,
EventPositionSaved,
}
for _, eventType := range allTypes {
eb.listeners[eventType] = append(eb.listeners[eventType], listener)
}
}
// Unsubscribe 取消订阅(移除所有该类型的监听器)
func (eb *EventBus) Unsubscribe(eventType EventType) {
eb.mu.Lock()
defer eb.mu.Unlock()
delete(eb.listeners, eventType)
}
// Publish 发布事件
func (eb *EventBus) Publish(event *Event) {
eb.mu.RLock()
listeners := eb.listeners[event.Type]
eb.mu.RUnlock()
// 异步通知所有监听器
for _, listener := range listeners {
// 每个监听器在单独的 goroutine 中执行,避免阻塞
go func(l EventListener) {
defer func() {
// 防止 listener panic 影响其他监听器
if r := recover(); r != nil {
// 可以在这里记录 panic 信息
}
}()
l(event)
}(listener)
}
}
// PublishSync 同步发布事件(按顺序执行监听器)
func (eb *EventBus) PublishSync(event *Event) {
eb.mu.RLock()
listeners := eb.listeners[event.Type]
eb.mu.RUnlock()
for _, listener := range listeners {
func(l EventListener) {
defer func() {
if r := recover(); r != nil {
// 防止 panic
}
}()
l(event)
}(listener)
}
}
// Clear 清空所有监听器
func (eb *EventBus) Clear() {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.listeners = make(map[EventType][]EventListener)
}
// HasListeners 检查是否有监听器
func (eb *EventBus) HasListeners(eventType EventType) bool {
eb.mu.RLock()
defer eb.mu.RUnlock()
listeners, exists := eb.listeners[eventType]
return exists && len(listeners) > 0
}
// ListenerCount 获取监听器数量
func (eb *EventBus) ListenerCount(eventType EventType) int {
eb.mu.RLock()
defer eb.mu.RUnlock()
return len(eb.listeners[eventType])
}

149
example/index_example.go Normal file
View File

@@ -0,0 +1,149 @@
package main
import (
"fmt"
"log"
"code.tczkiot.com/seqlog"
)
func main() {
logPath := "test_seqlog/app.log"
// ===== 示例 1使用带索引的写入器 =====
fmt.Println("=== 示例 1带索引的写入器 ===")
// 创建索引
index, err := seqlog.NewRecordIndex(logPath)
if err != nil {
log.Fatal(err)
}
defer index.Close()
// 创建写入器(使用共享索引)
writer, err := seqlog.NewLogWriter(logPath, index)
if err != nil {
log.Fatal(err)
}
// 写入日志时,索引会自动更新
for i := 1; i <= 10; i++ {
data := fmt.Sprintf("日志记录 #%d", i)
offset, err := writer.Append([]byte(data))
if err != nil {
log.Fatal(err)
}
fmt.Printf("写入: offset=%d, data=%s\n", offset, data)
}
writer.Close()
fmt.Printf("索引文件已创建: %s.idx\n\n", logPath)
// ===== 示例 2使用索引进行快速查询 =====
fmt.Println("=== 示例 2带索引的查询器 ===")
// 先获取索引(由 writer 创建)
index2, err := seqlog.NewRecordIndex(logPath)
if err != nil {
log.Fatal(err)
}
defer index2.Close()
// 创建查询器(使用外部索引)
query, err := seqlog.NewRecordQuery(logPath, index2)
if err != nil {
log.Fatal(err)
}
defer query.Close()
// 获取记录总数直接从索引读取O(1)
count, err := query.GetRecordCount()
if err != nil {
log.Fatal(err)
}
fmt.Printf("记录总数: %d\n", count)
// 可以直接使用共享的索引获取偏移量
offset, err := index.GetOffset(5)
if err != nil {
log.Fatal(err)
}
fmt.Printf("第 5 条记录的偏移: %d\n", offset)
// 向后查询(使用索引,高效)
backward, err := query.QueryAt(offset, -1, 3, 0, offset)
if err != nil {
log.Fatal(err)
}
fmt.Printf("向后查询 3 条记录:\n")
for i, rws := range backward {
fmt.Printf(" [%d] 状态=%s, 数据=%s\n", i, rws.Status, string(rws.Record.Data))
}
// 向前查询(顺序读取)
forward, err := query.QueryAt(offset, 1, 3, 0, offset)
if err != nil {
log.Fatal(err)
}
fmt.Printf("向前查询 3 条记录:\n")
for i, rws := range forward {
fmt.Printf(" [%d] 状态=%s, 数据=%s\n", i, rws.Status, string(rws.Record.Data))
}
fmt.Println()
// ===== 示例 3索引的自动恢复和重建 =====
fmt.Println("=== 示例 3索引恢复 ===")
// 如果索引文件存在,会自动加载
// 如果索引文件不存在或损坏,会自动重建
index3, err := seqlog.NewRecordIndex(logPath)
if err != nil {
log.Fatal(err)
}
fmt.Printf("索引已加载: %d 条记录\n", index3.Count())
fmt.Printf("最后一条记录偏移: %d\n", index3.LastOffset())
// 二分查找:根据偏移量查找索引位置
idx := index3.FindIndex(offset)
fmt.Printf("偏移量 %d 对应的索引位置: %d\n\n", offset, idx)
index3.Close()
// ===== 示例 4追加写入索引自动更新=====
fmt.Println("=== 示例 4追加写入 ===")
// 重新打开索引和写入器,追加新数据
index5, err := seqlog.NewRecordIndex(logPath)
if err != nil {
log.Fatal(err)
}
defer index5.Close()
writer, err = seqlog.NewLogWriter(logPath, index5)
if err != nil {
log.Fatal(err)
}
for i := 11; i <= 15; i++ {
data := fmt.Sprintf("追加记录 #%d", i)
offset, err := writer.Append([]byte(data))
if err != nil {
log.Fatal(err)
}
fmt.Printf("追加: offset=%d, data=%s\n", offset, data)
}
writer.Close()
// 验证索引已更新
index4, err := seqlog.NewRecordIndex(logPath)
if err != nil {
log.Fatal(err)
}
defer index4.Close()
fmt.Printf("索引已更新: 现有 %d 条记录\n", index4.Count())
fmt.Println("\n=== 所有示例完成 ===")
}

View File

@@ -0,0 +1,116 @@
package main
import (
"fmt"
"log"
"log/slog"
"code.tczkiot.com/seqlog"
)
func main() {
// ===== TopicProcessor 作为聚合器使用 =====
fmt.Println("=== TopicProcessor 聚合器示例 ===\n")
// 创建 TopicProcessor提供空 handler
logger := slog.Default()
tp, err := seqlog.NewTopicProcessor("test_seqlog", "app", logger, &seqlog.TopicConfig{
Handler: func(rec *seqlog.Record) error {
return nil // 示例中不需要处理
},
})
if err != nil {
log.Fatalf("创建 TopicProcessor 失败: %v", err)
}
// ===== 1. 写入数据 =====
fmt.Println("1. 写入数据:")
for i := 1; i <= 5; i++ {
data := fmt.Sprintf("消息 #%d", i)
offset, err := tp.Write([]byte(data))
if err != nil {
log.Fatal(err)
}
fmt.Printf(" 写入成功: offset=%d, data=%s\n", offset, data)
}
fmt.Println()
// ===== 2. 获取记录总数 =====
fmt.Println("2. 查询记录总数:")
count := tp.GetRecordCount()
fmt.Printf(" 总共 %d 条记录\n\n", count)
// ===== 3. 获取索引 =====
fmt.Println("3. 使用索引:")
index := tp.Index()
fmt.Printf(" 索引记录数: %d\n", index.Count())
fmt.Printf(" 最后偏移: %d\n\n", index.LastOffset())
// ===== 4. 使用查询器查询 =====
fmt.Println("4. 查询记录:")
// 查询最老的 3 条记录(从索引 0 开始)
oldest, err := tp.QueryOldest(0, 3)
if err != nil {
log.Fatal(err)
}
fmt.Println(" 查询最老的 3 条:")
for i, rws := range oldest {
fmt.Printf(" [%d] 状态=%s, 数据=%s\n", i, rws.Status, string(rws.Record.Data))
}
// 查询最新的 2 条记录(从最后一条开始)
totalCount := tp.GetRecordCount()
newest, err := tp.QueryNewest(totalCount-1, 2)
if err != nil {
log.Fatal(err)
}
fmt.Println(" 查询最新的 2 条:")
for i, rws := range newest {
fmt.Printf(" [%d] 状态=%s, 数据=%s\n", i, rws.Status, string(rws.Record.Data))
}
fmt.Println()
// ===== 5. 使用游标读取 =====
fmt.Println("5. 使用游标读取:")
cursor, err := tp.Cursor()
if err != nil {
log.Fatal(err)
}
defer cursor.Close()
// 读取 3 条记录
records, err := cursor.NextRange(3)
if err != nil {
log.Fatal(err)
}
fmt.Printf(" 读取了 %d 条记录:\n", len(records))
for i, rec := range records {
fmt.Printf(" [%d] %s\n", i, string(rec.Data))
}
// 提交游标位置
cursor.Commit()
fmt.Printf(" 游标位置: start=%d, end=%d\n\n", cursor.StartIndex(), cursor.EndIndex())
// ===== 6. 继续写入 =====
fmt.Println("6. 继续写入:")
for i := 6; i <= 8; i++ {
data := fmt.Sprintf("消息 #%d", i)
offset, _ := tp.Write([]byte(data))
fmt.Printf(" 写入成功: offset=%d, data=%s\n", offset, data)
}
fmt.Println()
// ===== 7. 再次查询总数 =====
fmt.Println("7. 更新后的记录总数:")
count = tp.GetRecordCount()
fmt.Printf(" 总共 %d 条记录\n\n", count)
// ===== 8. 获取统计信息 =====
fmt.Println("8. 统计信息:")
stats := tp.GetStats()
fmt.Printf(" 写入: %d 条, %d 字节\n", stats.WriteCount, stats.WriteBytes)
fmt.Println("\n=== 所有示例完成 ===")
}

65
example/webapp/README.md Normal file
View File

@@ -0,0 +1,65 @@
# Seqlog Web 演示
一个简单的 Web 应用,展示 Seqlog 的实际使用场景。
## 功能
### 后端模拟业务
- 每 2 秒自动生成业务日志
- 随机生成不同 topicapp、api、database、cache
- 随机生成不同操作(查询、插入、更新、删除、备份、恢复、同步等)
- **随机日志大小**2KB ~ 10MB
- 80% 小日志2KB - 100KB
- 15% 中日志100KB - 1MB
- 5% 大日志1MB - 10MB
### Web 查询界面
- 查看所有 topics
- 查看每个 topic 的统计信息(显示实际字节数)
- 查询日志(支持向前/向后翻页)
- 实时自动刷新
- 日志状态标注(已处理/处理中/待处理)
## 快速启动
```bash
cd example/webapp
go run main.go
```
访问: http://localhost:8080
## 使用说明
1. **选择 Topic**: 点击左侧的 topic 列表
2. **查看统计**: 左侧会显示该 topic 的统计信息(包括总字节数)
3. **查看日志**: 右侧显示日志内容,带状态标注
4. **刷新**: 点击"刷新日志"按钮或等待自动刷新
5. **翻页**: 使用"向前翻页"和"向后翻页"按钮
6. **自定义范围**: 修改显示范围的数字,控制查询条数
## 界面说明
- **绿色边框**: 已处理的日志
- **黄色边框**: 正在处理的日志
- **灰色边框**: 待处理的日志
## 性能测试
由于日志大小范围很大2KB ~ 10MB可以观察到
- 小日志处理速度很快
- 大日志会占用更多存储空间
- 统计信息会显示真实的字节数增长
## API 接口
- `GET /api/topics` - 获取所有 topics
- `GET /api/stats?topic=<name>` - 获取统计信息
- `GET /api/query?topic=<name>&backward=10&forward=10` - 查询日志
- `POST /api/write` - 手动写入日志
## 技术栈
- 后端: Go + Seqlog
- 前端: 原生 HTML/CSS/JavaScript
- 无需额外依赖

634
example/webapp/main.go Normal file
View File

@@ -0,0 +1,634 @@
package main
import (
"encoding/json"
"fmt"
"log/slog"
"math/rand"
"net/http"
"os"
"strconv"
"sync"
"time"
"code.tczkiot.com/seqlog"
)
var (
seq *seqlog.Seqlog
logger *slog.Logger
queryCache = make(map[string]*seqlog.RecordQuery)
queryCacheMu sync.RWMutex
)
func main() {
// 初始化
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// 创建 Seqlog
seq = seqlog.NewSeqlog("logs", logger, func(topic string, rec *seqlog.Record) error {
// 简单的日志处理:只打印摘要信息
dataPreview := string(rec.Data)
if len(dataPreview) > 100 {
dataPreview = dataPreview[:100] + "..."
}
logger.Info("处理日志", "topic", topic, "size", len(rec.Data), "preview", dataPreview)
return nil
})
if err := seq.Start(); err != nil {
logger.Error("启动失败", "error", err)
os.Exit(1)
}
defer seq.Stop()
logger.Info("Seqlog 已启动")
// 启动后台业务模拟
go simulateBusiness()
// 启动 Web 服务器
http.HandleFunc("/", handleIndex)
http.HandleFunc("/api/topics", handleTopics)
http.HandleFunc("/api/stats", handleStats)
http.HandleFunc("/api/query", handleQuery)
http.HandleFunc("/api/write", handleWrite)
addr := ":8080"
logger.Info("Web 服务器启动", "地址", "http://localhost"+addr)
if err := http.ListenAndServe(addr, nil); err != nil {
logger.Error("服务器错误", "error", err)
}
}
// 模拟业务写日志
func simulateBusiness() {
topics := []string{"app", "api", "database", "cache"}
actions := []string{"查询", "插入", "更新", "删除", "连接", "断开", "备份", "恢复", "同步"}
status := []string{"成功", "失败", "超时", "重试"}
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for range ticker.C {
// 随机选择 topic 和内容
topic := topics[rand.Intn(len(topics))]
action := actions[rand.Intn(len(actions))]
st := status[rand.Intn(len(status))]
// 随机生成日志大小2KB 到 10MB
// 80% 概率生成小日志2KB-100KB
// 15% 概率生成中日志100KB-1MB
// 5% 概率生成大日志1MB-10MB
var logSize int
prob := rand.Intn(100)
if prob < 80 {
// 2KB - 100KB
logSize = 2*1024 + rand.Intn(98*1024)
} else if prob < 95 {
// 100KB - 1MB
logSize = 100*1024 + rand.Intn(924*1024)
} else {
// 1MB - 10MB
logSize = 1024*1024 + rand.Intn(9*1024*1024)
}
// 生成日志内容
header := fmt.Sprintf("[%s] %s %s - 用时: %dms | 数据大小: %s | ",
time.Now().Format("15:04:05"),
action,
st,
rand.Intn(1000),
formatBytes(int64(logSize)))
// 填充随机数据到指定大小
data := make([]byte, logSize)
copy(data, []byte(header))
// 填充可读的模拟数据
fillOffset := len(header)
patterns := []string{
"user_id=%d, session=%x, ip=%d.%d.%d.%d, ",
"query_time=%dms, rows=%d, cached=%v, ",
"error_code=%d, retry_count=%d, ",
"request_id=%x, trace_id=%x, ",
}
for fillOffset < logSize-100 {
pattern := patterns[rand.Intn(len(patterns))]
var chunk string
switch pattern {
case patterns[0]:
chunk = fmt.Sprintf(pattern, rand.Intn(10000), rand.Intn(0xFFFFFF),
rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256))
case patterns[1]:
chunk = fmt.Sprintf(pattern, rand.Intn(1000), rand.Intn(10000), rand.Intn(2) == 1)
case patterns[2]:
chunk = fmt.Sprintf(pattern, rand.Intn(500), rand.Intn(5))
case patterns[3]:
chunk = fmt.Sprintf(pattern, rand.Intn(0xFFFFFFFF), rand.Intn(0xFFFFFFFF))
}
remaining := logSize - fillOffset
if len(chunk) > remaining {
chunk = chunk[:remaining]
}
copy(data[fillOffset:], []byte(chunk))
fillOffset += len(chunk)
}
// 写入日志
if _, err := seq.Write(topic, data); err != nil {
logger.Error("写入日志失败", "error", err, "size", logSize)
} else {
logger.Info("写入日志", "topic", topic, "size", formatBytes(int64(logSize)))
}
}
}
func formatBytes(bytes int64) string {
if bytes < 1024 {
return fmt.Sprintf("%d B", bytes)
}
if bytes < 1024*1024 {
return fmt.Sprintf("%.1f KB", float64(bytes)/1024)
}
return fmt.Sprintf("%.2f MB", float64(bytes)/1024/1024)
}
// 首页
func handleIndex(w http.ResponseWriter, r *http.Request) {
html := `<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Seqlog 日志查询</title>
<style>
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
max-width: 1400px;
margin: 0 auto;
padding: 20px;
background: #f5f5f5;
}
.header {
background: white;
padding: 20px;
border-radius: 8px;
margin-bottom: 20px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
h1 {
margin: 0;
color: #333;
}
.subtitle {
color: #666;
margin-top: 5px;
}
.container {
display: grid;
grid-template-columns: 250px 1fr;
gap: 20px;
}
.sidebar {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.main {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.topic-list {
list-style: none;
padding: 0;
margin: 0;
}
.topic-item {
padding: 10px;
margin-bottom: 5px;
cursor: pointer;
border-radius: 4px;
transition: background 0.2s;
}
.topic-item:hover {
background: #f0f0f0;
}
.topic-item.active {
background: #007bff;
color: white;
}
.stats {
margin-top: 20px;
padding: 15px;
background: #f8f9fa;
border-radius: 4px;
}
.stat-item {
display: flex;
justify-content: space-between;
margin-bottom: 8px;
font-size: 14px;
}
.controls {
margin-bottom: 20px;
}
.btn {
padding: 8px 16px;
margin-right: 10px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
}
.btn-primary {
background: #007bff;
color: white;
}
.btn-secondary {
background: #6c757d;
color: white;
}
.log-container {
height: 500px;
overflow-y: auto;
border: 1px solid #ddd;
border-radius: 4px;
padding: 10px;
background: #f8f9fa;
font-family: 'Courier New', monospace;
font-size: 13px;
}
.log-entry {
padding: 6px 10px;
margin-bottom: 4px;
background: white;
border-left: 3px solid #007bff;
border-radius: 2px;
word-wrap: break-word;
overflow-wrap: break-word;
display: -webkit-box;
-webkit-line-clamp: 3;
-webkit-box-orient: vertical;
overflow: hidden;
line-height: 1.5;
}
.log-entry.processed {
border-left-color: #28a745;
opacity: 0.8;
}
.log-entry.processing {
border-left-color: #ffc107;
background: #fff9e6;
}
.log-entry.pending {
border-left-color: #6c757d;
opacity: 0.6;
}
.status-badge {
display: inline-block;
padding: 2px 8px;
border-radius: 3px;
font-size: 11px;
margin-right: 8px;
}
.status-processed {
background: #d4edda;
color: #155724;
}
.status-processing {
background: #fff3cd;
color: #856404;
}
.status-pending {
background: #e2e3e5;
color: #383d41;
}
.loading {
text-align: center;
padding: 20px;
color: #666;
}
</style>
</head>
<body>
<div class="header">
<h1>Seqlog 日志查询系统</h1>
<div class="subtitle">实时查看和管理应用日志</div>
</div>
<div class="container">
<div class="sidebar">
<h3>Topics</h3>
<ul class="topic-list" id="topicList"></ul>
<div class="stats" id="stats">
<h4>统计信息</h4>
<div id="statsContent">选择一个 topic 查看统计</div>
</div>
</div>
<div class="main">
<div class="controls">
<button class="btn btn-primary" onclick="loadLogs()">刷新日志</button>
<button class="btn btn-secondary" onclick="queryBackward()">向前翻页</button>
<button class="btn btn-secondary" onclick="queryForward()">向后翻页</button>
<span style="margin-left: 20px;">显示范围: 前 <input type="number" id="backwardCount" value="10" style="width: 60px;"> 条, 后 <input type="number" id="forwardCount" value="10" style="width: 60px;"> 条</span>
</div>
<div class="log-container" id="logContainer">
<div class="loading">选择一个 topic 开始查看日志</div>
</div>
</div>
</div>
<script>
let currentTopic = null;
let displayedOffsets = new Set(); // 追踪已显示的日志偏移量
// 加载 topics
async function loadTopics() {
const response = await fetch('/api/topics');
const topics = await response.json();
const list = document.getElementById('topicList');
list.innerHTML = topics.map(topic =>
'<li class="topic-item" onclick="selectTopic(\'' + topic + '\')">' + topic + '</li>'
).join('');
}
// 选择 topic
function selectTopic(topic) {
currentTopic = topic;
displayedOffsets.clear(); // 切换 topic 时清空已显示记录
// 更新选中状态
document.querySelectorAll('.topic-item').forEach(item => {
item.classList.remove('active');
if (item.textContent === topic) {
item.classList.add('active');
}
});
// 清空容器并重新加载
document.getElementById('logContainer').innerHTML = '';
loadStats(topic);
loadLogs();
}
// 加载统计
async function loadStats(topic) {
const response = await fetch('/api/stats?topic=' + topic);
const stats = await response.json();
const content = document.getElementById('statsContent');
content.innerHTML =
'<div class="stat-item"><span>写入:</span><span>' + stats.write_count + ' 条</span></div>' +
'<div class="stat-item"><span>处理:</span><span>' + stats.processed_count + ' 条</span></div>' +
'<div class="stat-item"><span>错误:</span><span>' + stats.error_count + ' 次</span></div>' +
'<div class="stat-item"><span>大小:</span><span>' + formatBytes(stats.write_bytes) + '</span></div>';
}
// 加载日志
async function loadLogs() {
if (!currentTopic) return;
const backward = document.getElementById('backwardCount').value;
const forward = document.getElementById('forwardCount').value;
const response = await fetch('/api/query?topic=' + currentTopic +
'&backward=' + backward + '&forward=' + forward);
const data = await response.json();
const container = document.getElementById('logContainer');
if (data.records.length === 0 && displayedOffsets.size === 0) {
container.innerHTML = '<div class="loading">暂无日志</div>';
return;
}
// 过滤出新记录
const newRecords = data.records.filter(r => !displayedOffsets.has(r.offset));
if (newRecords.length > 0) {
// 生成新记录的 HTML
const newHTML = newRecords.map(r => {
displayedOffsets.add(r.offset); // 标记为已显示
// 解析状态,处理可能的状态值
let statusClass = 'pending';
let statusText = '待处理';
let badgeClass = 'status-pending';
if (r.status === 'StatusProcessed' || r.status === 'processed') {
statusClass = 'processed';
statusText = '已处理';
badgeClass = 'status-processed';
} else if (r.status === 'StatusProcessing' || r.status === 'processing') {
statusClass = 'processing';
statusText = '处理中';
badgeClass = 'status-processing';
}
return '<div class="log-entry ' + statusClass + '" data-offset="' + r.offset + '">' +
'<span class="status-badge ' + badgeClass + '">' + statusText + '</span>' +
r.data +
'</div>';
}).join('');
// 追加新记录
container.innerHTML += newHTML;
// 自动滚动到底部
container.scrollTop = container.scrollHeight;
}
}
function queryBackward() {
loadLogs();
}
function queryForward() {
loadLogs();
}
function formatBytes(bytes) {
if (bytes < 1024) return bytes + ' B';
if (bytes < 1024 * 1024) return (bytes / 1024).toFixed(1) + ' KB';
return (bytes / 1024 / 1024).toFixed(1) + ' MB';
}
// 初始化
loadTopics();
// 不再自动刷新 topics 列表
setInterval(() => {
if (currentTopic) {
loadStats(currentTopic);
loadLogs();
}
}, 3000); // 每 3 秒刷新日志
</script>
</body>
</html>`
w.Header().Set("Content-Type", "text/html; charset=utf-8")
fmt.Fprint(w, html)
}
// API: 获取所有 topics
func handleTopics(w http.ResponseWriter, r *http.Request) {
topics := seq.GetTopics()
json.NewEncoder(w).Encode(topics)
}
// API: 获取统计信息
func handleStats(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
http.Error(w, "缺少 topic 参数", http.StatusBadRequest)
return
}
stats, err := seq.GetTopicStats(topic)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(stats)
}
// API: 查询日志
func handleQuery(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
http.Error(w, "缺少 topic 参数", http.StatusBadRequest)
return
}
backward, _ := strconv.Atoi(r.URL.Query().Get("backward"))
forward, _ := strconv.Atoi(r.URL.Query().Get("forward"))
if backward == 0 {
backward = 10
}
if forward == 0 {
forward = 10
}
// 从缓存中获取或创建 query 对象
queryCacheMu.Lock()
query, exists := queryCache[topic]
if !exists {
var err error
query, err = seq.NewTopicQuery(topic)
if err != nil {
queryCacheMu.Unlock()
http.Error(w, err.Error(), http.StatusNotFound)
return
}
queryCache[topic] = query
}
queryCacheMu.Unlock()
// 获取当前处理索引和读取索引
startIdx := seq.GetProcessingIndex(topic)
endIdx := seq.GetReadIndex(topic)
// 获取索引用于转换
processor, err := seq.GetProcessor(topic)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
index := processor.Index()
// 合并查询结果:向后 + 当前 + 向前
var results []*seqlog.RecordWithStatus
// 向后查询
if backward > 0 && startIdx > 0 {
startPos, err := index.GetOffset(startIdx)
if err == nil {
backResults, err := query.QueryAt(startPos, -1, backward, startIdx, endIdx)
if err == nil {
results = append(results, backResults...)
}
}
}
// 当前位置
if startIdx < endIdx {
startPos, err := index.GetOffset(startIdx)
if err == nil {
currentResults, err := query.QueryAt(startPos, 0, 1, startIdx, endIdx)
if err == nil {
results = append(results, currentResults...)
}
}
}
// 向前查询
if forward > 0 {
startPos, err := index.GetOffset(startIdx)
if err == nil {
forwardResults, err := query.QueryAt(startPos, 1, forward, startIdx, endIdx)
if err == nil {
results = append(results, forwardResults...)
}
}
}
type Record struct {
Status string `json:"status"`
Data string `json:"data"`
}
records := make([]Record, len(results))
for i, r := range results {
records[i] = Record{
Status: r.Status.String(),
Data: string(r.Record.Data),
}
}
json.NewEncoder(w).Encode(map[string]interface{}{
"records": records,
"total": len(records),
})
}
// API: 手动写入日志
func handleWrite(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "只支持 POST", http.StatusMethodNotAllowed)
return
}
var req struct {
Topic string `json:"topic"`
Data string `json:"data"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
offset, err := seq.Write(req.Topic, []byte(req.Data))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]interface{}{
"success": true,
"offset": offset,
})
}

5
go.mod Normal file
View File

@@ -0,0 +1,5 @@
module code.tczkiot.com/seqlog
go 1.25.1
require github.com/google/uuid v1.6.0

2
go.sum Normal file
View File

@@ -0,0 +1,2 @@
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=

221
index.go Normal file
View File

@@ -0,0 +1,221 @@
package seqlog
import (
"encoding/binary"
"fmt"
"io"
"os"
)
const (
// IndexMagic 索引文件魔数
IndexMagic = 0x58444953 // "SIDX" (Seqlog Index)
// IndexVersion 索引文件版本
IndexVersion = 1
// IndexHeaderSize 索引文件头部大小(字节)
IndexHeaderSize = 8 // Magic(4) + Version(4)
// IndexEntrySize 每条索引条目大小(字节)
IndexEntrySize = 8 // Offset(8)
)
// IndexHeader 索引文件头部
type IndexHeader struct {
Magic uint32 // 魔数,用于识别索引文件
Version uint32 // 版本号
}
// RecordIndex 记录索引管理器
type RecordIndex struct {
logPath string // 日志文件路径
indexPath string // 索引文件路径
offsets []int64 // 内存中的偏移索引
header IndexHeader // 索引文件头部
indexFile *os.File // 索引文件句柄(用于追加写入)
}
// NewRecordIndex 创建或加载记录索引
// 启动时总是从日志文件重建索引,确保索引和日志文件完全一致
func NewRecordIndex(logPath string) (*RecordIndex, error) {
indexPath := logPath + ".idx"
ri := &RecordIndex{
logPath: logPath,
indexPath: indexPath,
offsets: make([]int64, 0, 1024),
header: IndexHeader{
Magic: IndexMagic,
Version: IndexVersion,
},
}
// 启动时总是从日志文件重建索引
if err := ri.rebuild(); err != nil {
return nil, fmt.Errorf("rebuild index: %w", err)
}
// 打开索引文件用于追加写入
f, err := os.OpenFile(indexPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, fmt.Errorf("open index file for append: %w", err)
}
ri.indexFile = f
return ri, nil
}
// rebuild 从日志文件重建索引
func (ri *RecordIndex) rebuild() error {
logFile, err := os.Open(ri.logPath)
if err != nil {
if os.IsNotExist(err) {
// 日志文件不存在,创建空索引
ri.offsets = make([]int64, 0, 1024)
return ri.save()
}
return fmt.Errorf("open log file: %w", err)
}
defer logFile.Close()
ri.offsets = make([]int64, 0, 1024)
currentOffset := int64(0)
headerBuf := make([]byte, 24) // Record header size: [4B len][4B CRC][16B UUID]
for {
// 记录当前偏移
ri.offsets = append(ri.offsets, currentOffset)
// 读取记录头部
if _, err := io.ReadFull(logFile, headerBuf); err != nil {
if err == io.EOF {
// 到达文件末尾,移除最后一个 EOF 位置
ri.offsets = ri.offsets[:len(ri.offsets)-1]
break
}
return fmt.Errorf("read record header at offset %d: %w", currentOffset, err)
}
// 解析数据长度
dataLen := binary.LittleEndian.Uint32(headerBuf[0:4])
// 跳过数据部分
if _, err := logFile.Seek(int64(dataLen), io.SeekCurrent); err != nil {
return fmt.Errorf("seek data at offset %d: %w", currentOffset, err)
}
currentOffset += 24 + int64(dataLen)
}
// 写入索引文件
return ri.save()
}
// save 保存索引到文件
func (ri *RecordIndex) save() error {
f, err := os.Create(ri.indexPath)
if err != nil {
return fmt.Errorf("create index file: %w", err)
}
defer f.Close()
// 写入头部
headerBuf := make([]byte, IndexHeaderSize)
binary.LittleEndian.PutUint32(headerBuf[0:4], ri.header.Magic)
binary.LittleEndian.PutUint32(headerBuf[4:8], ri.header.Version)
if _, err := f.Write(headerBuf); err != nil {
return fmt.Errorf("write header: %w", err)
}
// 写入所有索引条目
entryBuf := make([]byte, IndexEntrySize)
for _, offset := range ri.offsets {
binary.LittleEndian.PutUint64(entryBuf, uint64(offset))
if _, err := f.Write(entryBuf); err != nil {
return fmt.Errorf("write entry: %w", err)
}
}
return f.Sync()
}
// Append 追加一条索引(当写入新记录时调用)
func (ri *RecordIndex) Append(offset int64) error {
// 追加到索引文件(先写文件,后更新内存)
entryBuf := make([]byte, IndexEntrySize)
binary.LittleEndian.PutUint64(entryBuf, uint64(offset))
if _, err := ri.indexFile.Write(entryBuf); err != nil {
return fmt.Errorf("append index entry: %w", err)
}
// 更新内存索引
ri.offsets = append(ri.offsets, offset)
// 同步索引文件
// TODO 这里太频繁了
if err := ri.indexFile.Sync(); err != nil {
return fmt.Errorf("sync index file: %w", err)
}
return nil
}
// GetOffset 根据索引位置获取记录偏移
func (ri *RecordIndex) GetOffset(index int) (int64, error) {
if index < 0 || index >= len(ri.offsets) {
return 0, fmt.Errorf("index out of range: %d (total: %d)", index, len(ri.offsets))
}
return ri.offsets[index], nil
}
// FindIndex 根据偏移量查找索引位置(二分查找)
func (ri *RecordIndex) FindIndex(offset int64) int {
left, right := 0, len(ri.offsets)-1
result := -1
for left <= right {
mid := (left + right) / 2
if ri.offsets[mid] == offset {
return mid
} else if ri.offsets[mid] < offset {
result = mid
left = mid + 1
} else {
right = mid - 1
}
}
return result
}
// Count 返回记录总数
func (ri *RecordIndex) Count() int {
return len(ri.offsets)
}
// LastOffset 返回最后一条记录的偏移
func (ri *RecordIndex) LastOffset() int64 {
if len(ri.offsets) == 0 {
return 0
}
return ri.offsets[len(ri.offsets)-1]
}
// Close 关闭索引文件
func (ri *RecordIndex) Close() error {
if ri.indexFile != nil {
return ri.indexFile.Close()
}
return nil
}
// Sync 同步索引文件到磁盘
func (ri *RecordIndex) Sync() error {
if ri.indexFile != nil {
return ri.indexFile.Sync()
}
return nil
}

294
index_test.go Normal file
View File

@@ -0,0 +1,294 @@
package seqlog
import (
"os"
"path/filepath"
"testing"
)
// TestIndexBasicOperations 测试索引的基本操作
func TestIndexBasicOperations(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
logPath := filepath.Join(tmpDir, "test.log")
// 1. 创建索引
index, err := NewRecordIndex(logPath)
if err != nil {
t.Fatalf("创建索引失败: %v", err)
}
defer index.Close()
// 2. 创建写入器(使用共享索引)
writer, err := NewLogWriter(logPath, index)
if err != nil {
t.Fatalf("创建写入器失败: %v", err)
}
// 2. 写入测试数据
testData := []string{
"第一条日志",
"第二条日志",
"第三条日志",
"第四条日志",
"第五条日志",
}
offsets := make([]int64, 0, len(testData))
for _, data := range testData {
offset, err := writer.Append([]byte(data))
if err != nil {
t.Fatalf("写入失败: %v", err)
}
offsets = append(offsets, offset)
t.Logf("写入记录: offset=%d, data=%s", offset, data)
}
// 关闭写入器
if err := writer.Close(); err != nil {
t.Fatalf("关闭写入器失败: %v", err)
}
// 3. 验证索引文件存在
indexPath := logPath + ".idx"
if _, err := os.Stat(indexPath); os.IsNotExist(err) {
t.Fatalf("索引文件不存在: %s", indexPath)
}
t.Logf("索引文件已创建: %s", indexPath)
// 验证记录数量
if index.Count() != len(testData) {
t.Errorf("记录数量不匹配: got %d, want %d", index.Count(), len(testData))
}
// 验证每条记录的偏移量
for i, expectedOffset := range offsets {
actualOffset, err := index.GetOffset(i)
if err != nil {
t.Errorf("获取第 %d 条记录的偏移失败: %v", i, err)
continue
}
if actualOffset != expectedOffset {
t.Errorf("第 %d 条记录偏移量不匹配: got %d, want %d", i, actualOffset, expectedOffset)
}
}
// 验证二分查找
for i, offset := range offsets {
idx := index.FindIndex(offset)
if idx != i {
t.Errorf("FindIndex(%d) = %d, want %d", offset, idx, i)
}
}
t.Logf("索引基本操作测试通过")
}
// TestIndexRebuild 测试索引重建功能
func TestIndexRebuild(t *testing.T) {
tmpDir := t.TempDir()
logPath := filepath.Join(tmpDir, "test.log")
// 1. 创建索引和写入器,写入数据
index1, err := NewRecordIndex(logPath)
if err != nil {
t.Fatalf("创建索引失败: %v", err)
}
writer, err := NewLogWriter(logPath, index1)
if err != nil {
t.Fatalf("创建写入器失败: %v", err)
}
offsets := make([]int64, 0, 3)
for i := 0; i < 3; i++ {
offset, err := writer.Append([]byte("测试数据"))
if err != nil {
t.Fatalf("写入失败: %v", err)
}
offsets = append(offsets, offset)
}
writer.Close()
index1.Close()
// 2. 重新加载索引(测试索引加载功能)
index, err := NewRecordIndex(logPath)
if err != nil {
t.Fatalf("创建索引失败: %v", err)
}
defer index.Close()
// 验证重建的索引
if index.Count() != 3 {
t.Errorf("重建的索引记录数不正确: got %d, want 3", index.Count())
}
for i, expectedOffset := range offsets {
actualOffset, err := index.GetOffset(i)
if err != nil {
t.Errorf("获取偏移失败: %v", err)
continue
}
if actualOffset != expectedOffset {
t.Errorf("偏移量不匹配: got %d, want %d", actualOffset, expectedOffset)
}
}
t.Logf("索引重建测试通过")
}
// TestQueryWithIndex 测试带索引的查询
func TestQueryWithIndex(t *testing.T) {
tmpDir := t.TempDir()
logPath := filepath.Join(tmpDir, "test.log")
// 创建索引
index, err := NewRecordIndex(logPath)
if err != nil {
t.Fatalf("创建索引失败: %v", err)
}
defer index.Close()
// 创建写入器(使用共享索引)
writer, err := NewLogWriter(logPath, index)
if err != nil {
t.Fatalf("创建写入器失败: %v", err)
}
// 写入 10 条记录
for range 10 {
_, err := writer.Append([]byte("测试数据"))
if err != nil {
t.Fatalf("写入失败: %v", err)
}
}
writer.Close()
// 2. 创建查询器(使用共享索引)
query, err := NewRecordQuery(logPath, index)
if err != nil {
t.Fatalf("创建查询器失败: %v", err)
}
defer query.Close()
// 4. 测试获取记录总数
count, err := query.GetRecordCount()
if err != nil {
t.Fatalf("获取记录总数失败: %v", err)
}
if count != 10 {
t.Errorf("记录总数不正确: got %d, want 10", count)
}
// 5. 测试向后查询(需要索引)
// 从第 5 条记录向后查询 3 条
offset, _ := index.GetOffset(5)
results, err := query.QueryAt(offset, -1, 3, 0, 5) // startIdx=0, endIdx=5
if err != nil {
t.Fatalf("向后查询失败: %v", err)
}
if len(results) != 3 {
t.Errorf("查询结果数量不正确: got %d, want 3", len(results))
}
t.Logf("带索引的查询测试通过")
}
// TestIndexAppend 测试索引追加功能
func TestIndexAppend(t *testing.T) {
tmpDir := t.TempDir()
logPath := filepath.Join(tmpDir, "test.log")
// 1. 创建索引和写入器,写入初始数据
index1, err := NewRecordIndex(logPath)
if err != nil {
t.Fatalf("创建索引失败: %v", err)
}
writer, err := NewLogWriter(logPath, index1)
if err != nil {
t.Fatalf("创建写入器失败: %v", err)
}
for range 5 {
writer.Append([]byte("初始数据"))
}
writer.Close()
index1.Close()
// 2. 重新打开索引和写入器,追加新数据
index2, err := NewRecordIndex(logPath)
if err != nil {
t.Fatalf("重新打开索引失败: %v", err)
}
writer, err = NewLogWriter(logPath, index2)
if err != nil {
t.Fatalf("重新打开写入器失败: %v", err)
}
for range 3 {
writer.Append([]byte("追加数据"))
}
writer.Close()
index2.Close()
// 3. 验证索引
index, err := NewRecordIndex(logPath)
if err != nil {
t.Fatalf("加载索引失败: %v", err)
}
defer index.Close()
if index.Count() != 8 {
t.Errorf("索引记录数不正确: got %d, want 8", index.Count())
}
t.Logf("索引追加测试通过")
}
// TestIndexHeader 测试索引头部信息
func TestIndexHeader(t *testing.T) {
tmpDir := t.TempDir()
logPath := filepath.Join(tmpDir, "test.log")
// 创建索引
index, err := NewRecordIndex(logPath)
if err != nil {
t.Fatalf("创建索引失败: %v", err)
}
defer index.Close()
// 创建写入器(使用共享索引)
writer, err := NewLogWriter(logPath, index)
if err != nil {
t.Fatalf("创建写入器失败: %v", err)
}
lastOffset, _ := writer.Append([]byte("第一条"))
writer.Append([]byte("第二条"))
lastOffset, _ = writer.Append([]byte("第三条"))
writer.Close()
// 验证魔数和版本
if index.header.Magic != IndexMagic {
t.Errorf("Magic 不正确: got 0x%X, want 0x%X", index.header.Magic, IndexMagic)
}
if index.header.Version != IndexVersion {
t.Errorf("Version 不正确: got %d, want %d", index.header.Version, IndexVersion)
}
// 验证记录总数(从内存索引计算)
if index.Count() != 3 {
t.Errorf("Count 不正确: got %d, want 3", index.Count())
}
// 验证最后一条记录偏移(从内存索引获取)
if index.LastOffset() != lastOffset {
t.Errorf("LastOffset 不正确: got %d, want %d", index.LastOffset(), lastOffset)
}
t.Logf("索引头部信息测试通过")
}

305
query.go Normal file
View File

@@ -0,0 +1,305 @@
package seqlog
import (
"encoding/binary"
"fmt"
"io"
"os"
)
// RecordStatus 记录处理状态
type RecordStatus int
const (
StatusProcessed RecordStatus = iota // 已处理
StatusProcessing // 处理中(当前位置)
StatusPending // 待处理
StatusUnavailable // 不可用(尚未写入)
)
// String 返回状态的字符串表示
func (s RecordStatus) String() string {
switch s {
case StatusProcessed:
return "StatusProcessed"
case StatusProcessing:
return "StatusProcessing"
case StatusPending:
return "StatusPending"
case StatusUnavailable:
return "StatusUnavailable"
default:
return "StatusUnknown"
}
}
// RecordWithStatus 带状态的记录
type RecordWithStatus struct {
Record *Record
Status RecordStatus
}
// RecordQuery 记录查询器
type RecordQuery struct {
logPath string
fd *os.File
rbuf []byte // 复用读缓冲区
index *RecordIndex // 索引文件管理器(来自外部)
}
// NewRecordQuery 创建记录查询器
// index 参数必须由外部提供,确保所有组件使用同一个索引实例
func NewRecordQuery(logPath string, index *RecordIndex) (*RecordQuery, error) {
if index == nil {
return nil, fmt.Errorf("index cannot be nil")
}
fd, err := os.Open(logPath)
if err != nil {
return nil, fmt.Errorf("open log file: %w", err)
}
rq := &RecordQuery{
logPath: logPath,
fd: fd,
rbuf: make([]byte, 8<<20), // 8 MiB 缓冲区
index: index,
}
return rq, nil
}
// readRecordAtOffset 读取指定偏移位置的记录
func (rq *RecordQuery) readRecordAtOffset(offset int64) (*Record, error) {
if _, err := rq.fd.Seek(offset, 0); err != nil {
return nil, fmt.Errorf("seek to offset %d: %w", offset, err)
}
// 读取头部:[4B len][4B CRC][16B UUID] = 24 字节
hdr := rq.rbuf[:24]
if _, err := io.ReadFull(rq.fd, hdr); err != nil {
return nil, fmt.Errorf("read header: %w", err)
}
rec := &Record{
Len: binary.LittleEndian.Uint32(hdr[0:4]),
CRC: binary.LittleEndian.Uint32(hdr[4:8]),
}
copy(rec.UUID[:], hdr[8:24])
// 读取数据
rec.Data = make([]byte, rec.Len)
if _, err := io.ReadFull(rq.fd, rec.Data); err != nil {
return nil, fmt.Errorf("read data: %w", err)
}
return rec, nil
}
// readRecordsForward 从指定索引位置向前顺序读取记录
// startIndex: 起始记录索引
// count: 读取数量
// startIdx, endIdx: 游标窗口索引范围(用于状态判断)
func (rq *RecordQuery) readRecordsForward(startIndex, count int, startIdx, endIdx int) ([]*RecordWithStatus, error) {
// 获取起始 offset
startOffset, err := rq.index.GetOffset(startIndex)
if err != nil {
return nil, fmt.Errorf("get start offset: %w", err)
}
if _, err := rq.fd.Seek(startOffset, 0); err != nil {
return nil, fmt.Errorf("seek to offset %d: %w", startOffset, err)
}
results := make([]*RecordWithStatus, 0, count)
currentIndex := startIndex
currentOffset := startOffset
for len(results) < count {
// 读取头部:[4B len][4B CRC][16B UUID] = 24 字节
hdr := rq.rbuf[:24]
if _, err := io.ReadFull(rq.fd, hdr); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("read header at offset %d: %w", currentOffset, err)
}
rec := &Record{
Len: binary.LittleEndian.Uint32(hdr[0:4]),
CRC: binary.LittleEndian.Uint32(hdr[4:8]),
}
copy(rec.UUID[:], hdr[8:24])
// 读取数据
rec.Data = make([]byte, rec.Len)
if _, err := io.ReadFull(rq.fd, rec.Data); err != nil {
return nil, fmt.Errorf("read data at offset %d: %w", currentOffset, err)
}
results = append(results, &RecordWithStatus{
Record: rec,
Status: rq.getRecordStatus(currentIndex, startIdx, endIdx),
})
currentIndex++
currentOffset += 24 + int64(rec.Len)
}
return results, nil
}
// getRecordStatus 根据游标窗口索引位置获取记录状态
func (rq *RecordQuery) getRecordStatus(recordIndex, startIdx, endIdx int) RecordStatus {
if recordIndex < startIdx {
return StatusProcessed
} else if recordIndex >= startIdx && recordIndex < endIdx {
return StatusProcessing
} else {
return StatusPending
}
}
// QueryOldest 从指定索引开始查询记录(向前读取)
// startIndex: 查询起始索引
// count: 查询数量
// startIdx, endIdx: 游标窗口索引范围(用于状态判断)
// 返回的记录按时间顺序(索引递增方向)
func (rq *RecordQuery) QueryOldest(startIndex, count int, startIdx, endIdx int) ([]*RecordWithStatus, error) {
if count <= 0 {
return nil, fmt.Errorf("count must be greater than 0")
}
totalCount := rq.index.Count()
if totalCount == 0 {
return []*RecordWithStatus{}, nil
}
// 校验起始索引
if startIndex < 0 {
startIndex = 0
}
if startIndex >= totalCount {
return []*RecordWithStatus{}, nil
}
// 限制查询数量
remainCount := totalCount - startIndex
if count > remainCount {
count = remainCount
}
return rq.readRecordsForward(startIndex, count, startIdx, endIdx)
}
// QueryNewest 从指定索引开始向后查询记录(索引递减方向)
// endIndex: 查询结束索引(包含,最新的记录)
// count: 查询数量
// startIdx, endIdx: 游标窗口索引范围(用于状态判断)
// 返回结果按时间倒序(最新在前,即 endIndex 对应的记录在最前)
func (rq *RecordQuery) QueryNewest(endIndex, count int, startIdx, endIdx int) ([]*RecordWithStatus, error) {
if count <= 0 {
return nil, fmt.Errorf("count must be greater than 0")
}
totalCount := rq.index.Count()
if totalCount == 0 {
return []*RecordWithStatus{}, nil
}
// 校验结束索引
if endIndex < 0 {
return []*RecordWithStatus{}, nil
}
if endIndex >= totalCount {
endIndex = totalCount - 1
}
// 计算实际起始索引(向前推 count-1 条)
queryStartIdx := endIndex - count + 1
if queryStartIdx < 0 {
queryStartIdx = 0
count = endIndex + 1 // 调整实际数量
}
// 向前读取
results, err := rq.readRecordsForward(queryStartIdx, count, startIdx, endIdx)
if err != nil {
return nil, err
}
// 反转结果,使最新的在前
for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
results[i], results[j] = results[j], results[i]
}
return results, nil
}
// QueryAt 从指定位置查询记录
// position: 查询起始位置(文件偏移量,通常是当前处理位置)
// direction: 查询方向负数向后0 当前,正数向前)
// count: 查询数量
// startIdx, endIdx: 游标窗口索引范围(用于状态判断)
// 返回结果按时间顺序排列
func (rq *RecordQuery) QueryAt(position int64, direction int, count int, startIdx, endIdx int) ([]*RecordWithStatus, error) {
// 将 position 转换为索引
idx := rq.index.FindIndex(position)
if idx < 0 {
return nil, fmt.Errorf("position not found in index")
}
if direction >= 0 {
// 向前查询或当前位置
if direction == 0 {
count = 1
} else {
// direction > 0跳过当前位置从下一条开始
idx++
}
return rq.readRecordsForward(idx, count, startIdx, endIdx)
}
// 向后查询:使用索引
results := make([]*RecordWithStatus, 0, count)
// 向后查询(更早的记录)
for i := idx - 1; i >= 0 && len(results) < count; i-- {
offset, err := rq.index.GetOffset(i)
if err != nil {
return nil, fmt.Errorf("get offset at index %d: %w", i, err)
}
rec, err := rq.readRecordAtOffset(offset)
if err != nil {
return nil, fmt.Errorf("read record at index %d: %w", i, err)
}
results = append(results, &RecordWithStatus{
Record: rec,
Status: rq.getRecordStatus(i, startIdx, endIdx),
})
}
// 反转结果,使其按时间顺序排列
for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
results[i], results[j] = results[j], results[i]
}
return results, nil
}
// GetRecordCount 获取记录总数
func (rq *RecordQuery) GetRecordCount() (int, error) {
return rq.index.Count(), nil
}
// Close 关闭查询器
// 注意:不关闭 index因为 index 是外部管理的
func (rq *RecordQuery) Close() error {
// 只关闭日志文件
if rq.fd != nil {
return rq.fd.Close()
}
return nil
}

82
seqlog.go Normal file
View File

@@ -0,0 +1,82 @@
package seqlog
import "github.com/google/uuid"
// seqlog 是一个 Go 语言日志收集和处理库
//
// 核心特性:
// - 单文件日志处理:专注于单个日志文件的读取和处理
// - 游标尺机制:通过游标跟踪日志文件的读取位置,支持断点续读
// - 自动恢复:程序重启时自动发现已存在的日志文件并从上次位置继续处理
// - 日志收集:提供高效的日志收集和解析能力
// - tail -f 模式:支持持续监控日志文件的新增内容
// - UUID 去重:每条日志自动生成唯一 UUID便于外部去重
// - slog 集成:内置 slog.Logger 支持,提供结构化日志记录
// - 统计功能:提供可恢复的统计信息,包括写入/处理次数、字节数、错误计数等
// - 双向查询:支持基于当前处理位置的向前/向后查询,自动标注记录状态
// - 事件通知:支持订阅各种事件(写入、处理、启动、停止等),实时状态变化通知
//
// 使用示例:
//
// // 写入日志
// writer, _ := seqlog.NewLogWriter("app.log")
// offset, _ := writer.Append([]byte("log message"))
//
// // 读取日志
// cursor, _ := seqlog.NewCursor("app.log")
// defer cursor.Close()
// for {
// rec, err := cursor.Next()
// if err != nil {
// break
// }
// // rec.UUID 是自动生成的唯一标识符,可用于去重
// fmt.Printf("UUID: %s, Data: %s\n", rec.UUID, string(rec.Data))
// }
//
// // tail -f 模式
// handler := func(rec *Record) error {
// fmt.Println(string(rec.Data))
// return nil
// }
// tailer, _ := seqlog.NewTailer("app.log", handler, nil)
// tailer.Start()
//
// // 使用 Seqlog 管理器(带 slog 支持和自动恢复)
// logger := slog.Default()
// handler := func(topic string, rec *seqlog.Record) error {
// fmt.Printf("[%s] %s\n", topic, string(rec.Data))
// return nil
// }
// seq := seqlog.NewSeqlog("/tmp/logs", logger, handler)
// seq.Start() // 自动发现并恢复已存在的日志文件
// seq.Write("app", []byte("application log"))
//
// // 获取统计信息
// stats, _ := seq.GetTopicStats("app")
// fmt.Printf("写入: %d 条, %d 字节\n", stats.WriteCount, stats.WriteBytes)
//
// // 查询记录
// query, _ := seq.NewTopicQuery("app")
// defer query.Close()
// current, _ := query.GetCurrent() // 获取当前处理位置
// backward, _ := query.QueryBackward(5) // 向后查询 5 条
// forward, _ := query.QueryForward(5) // 向前查询 5 条
//
// // 订阅事件
// seq.Subscribe("app", seqlog.EventWriteSuccess, func(event *seqlog.Event) {
// fmt.Printf("写入成功: offset=%d\n", event.Position)
// })
//
// seq.Stop()
// Record 日志记录
//
// 存储格式:[4B len][4B CRC][16B UUID][data]
// 注意Offset 不存储在数据文件中,而是由索引文件管理
type Record struct {
Len uint32 // 数据长度
CRC uint32 // CRC 校验和
UUID uuid.UUID // UUID用于去重
Data []byte // 实际数据
}

560
seqlog_manager.go Normal file
View File

@@ -0,0 +1,560 @@
package seqlog
import (
"fmt"
"log/slog"
"os"
"strings"
"sync"
)
// Seqlog 日志管理器,统一管理多个 topic 的日志分发
//
// 自动恢复机制:
// - Start() 时自动扫描 baseDir 中所有 .log 文件
// - 为每个发现的日志文件创建 processor
// - 使用 .pos 文件保存的游标位置恢复处理进度
// - 只处理上次中断后新增的日志,避免重复处理
type Seqlog struct {
baseDir string
processors map[string]*TopicProcessor
defaultHandler TopicRecordHandler
defaultConfig *TailConfig
logger *slog.Logger // 用于内部日志记录
globalEventBus *EventBus // 全局事件总线
pendingSubscribers map[EventType][]EventListener
mu sync.RWMutex
running bool
}
// NewSeqlog 创建一个新的日志管理器
// logger: 内部日志记录器,如果不需要可以传 slog.Default()
func NewSeqlog(baseDir string, logger *slog.Logger, defaultHandler TopicRecordHandler) *Seqlog {
if logger == nil {
logger = slog.Default()
}
return &Seqlog{
baseDir: baseDir,
processors: make(map[string]*TopicProcessor),
defaultHandler: defaultHandler,
globalEventBus: NewEventBus(),
pendingSubscribers: make(map[EventType][]EventListener),
defaultConfig: &TailConfig{
PollInterval: 100 * 1000000, // 100ms
SaveInterval: 1000 * 1000000, // 1s
},
logger: logger,
}
}
// SetDefaultTailConfig 设置默认的 tail 配置
func (s *Seqlog) SetDefaultTailConfig(config *TailConfig) {
s.mu.Lock()
defer s.mu.Unlock()
if config != nil {
s.defaultConfig = config
}
}
// RegisterHandler 为指定 topic 注册 handler
func (s *Seqlog) RegisterHandler(topic string, handler RecordHandler) error {
return s.RegisterHandlerWithConfig(topic, &TopicConfig{Handler: handler})
}
// RegisterHandlerWithConfig 为指定 topic 注册 handler 和配置
// 注意handler 为必填参数,如果 topic 已存在则返回错误
func (s *Seqlog) RegisterHandlerWithConfig(topic string, config *TopicConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
processor := s.processors[topic]
if processor == nil {
// 创建新的 processor使用带 topic 属性的 logger
topicLogger := s.logger.With("topic", topic)
topicLogger.Debug("creating new processor")
var err error
processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config)
if err != nil {
s.logger.Error("failed to create processor", "topic", topic, "error", err)
return fmt.Errorf("failed to create processor for topic %s: %w", topic, err)
}
s.processors[topic] = processor
// 订阅 processor 的所有事件,转发到全局事件总线
processor.SubscribeAll(func(event *Event) {
s.globalEventBus.Publish(event)
})
} else {
// Processor 已存在handler 不可更新
return fmt.Errorf("handler already registered for topic %s", topic)
}
s.logger.Info("handler registered", "topic", topic)
return nil
}
// Write 写入日志到指定 topic
func (s *Seqlog) Write(topic string, data []byte) (int64, error) {
processor, err := s.getOrCreateProcessor(topic)
if err != nil {
s.logger.Error("failed to get processor", "topic", topic, "error", err)
return 0, fmt.Errorf("failed to get processor for topic %s: %w", topic, err)
}
offset, err := processor.Write(data)
if err != nil {
s.logger.Error("failed to write", "topic", topic, "error", err)
return 0, err
}
s.logger.Debug("write success", "topic", topic, "offset", offset, "size", len(data))
return offset, nil
}
// Start 启动 Seqlog 和所有已注册的 processor
func (s *Seqlog) Start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
return fmt.Errorf("seqlog is already running")
}
s.logger.Info("starting seqlog", "baseDir", s.baseDir, "processors", len(s.processors))
// 自动发现 baseDir 中已存在的日志文件
if err := s.discoverExistingTopics(); err != nil {
s.logger.Warn("failed to discover existing topics", "error", err)
}
// 启动所有已存在的 processor
for topic, processor := range s.processors {
if err := processor.Start(); err != nil {
// 忽略文件不存在的错误,因为可能还没写入
// processor 会在第一次写入时自动创建 writer 和 tailer
s.logger.Debug("processor start skipped", "topic", topic, "error", err)
} else {
s.logger.Debug("processor started", "topic", topic)
}
}
s.running = true
s.logger.Info("seqlog started successfully", "total_processors", len(s.processors))
return nil
}
// discoverExistingTopics 自动发现 baseDir 中已存在的日志文件并创建对应的 processor
// 注意:此方法需要在持有锁的情况下调用
func (s *Seqlog) discoverExistingTopics() error {
// 确保目录存在
if err := os.MkdirAll(s.baseDir, 0755); err != nil {
return fmt.Errorf("failed to create base directory: %w", err)
}
// 读取目录中的所有 .log 文件
entries, err := os.ReadDir(s.baseDir)
if err != nil {
return fmt.Errorf("failed to read base directory: %w", err)
}
discovered := 0
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
// 只处理 .log 文件,忽略 .pos 位置文件
if !strings.HasSuffix(name, ".log") {
continue
}
// 提取 topic 名称(去掉 .log 后缀)
topic := strings.TrimSuffix(name, ".log")
// 如果 processor 已存在,跳过
if _, exists := s.processors[topic]; exists {
continue
}
// 创建 processor使用默认配置和 handler
s.logger.Info("discovered existing topic", "topic", topic)
var config *TopicConfig
if s.defaultHandler != nil {
topicName := topic
handler := func(rec *Record) error {
return s.defaultHandler(topicName, rec)
}
config = &TopicConfig{
Handler: handler,
TailConfig: s.defaultConfig,
}
}
topicLogger := s.logger.With("topic", topic)
processor, err := NewTopicProcessor(s.baseDir, topic, topicLogger, config)
if err != nil {
s.logger.Error("failed to create processor for discovered topic", "topic", topic, "error", err)
continue
}
s.processors[topic] = processor
// 订阅 processor 的所有事件,转发到全局事件总线
processor.SubscribeAll(func(event *Event) {
s.globalEventBus.Publish(event)
})
discovered++
}
if discovered > 0 {
s.logger.Info("auto-discovered topics", "count", discovered)
}
return nil
}
// Stop 停止所有 processor
func (s *Seqlog) Stop() error {
s.mu.Lock()
if !s.running {
s.mu.Unlock()
return nil
}
s.logger.Info("stopping seqlog")
s.running = false
processors := make([]*TopicProcessor, 0, len(s.processors))
for _, p := range s.processors {
processors = append(processors, p)
}
s.mu.Unlock()
// 停止并清理所有 processor
for _, processor := range processors {
topic := processor.Topic()
// 先停止
if err := processor.Stop(); err != nil {
s.logger.Error("failed to stop processor", "topic", topic, "error", err)
return fmt.Errorf("failed to stop processor for topic %s: %w", topic, err)
}
s.logger.Debug("processor stopped", "topic", topic)
// 再清理资源
if err := processor.Close(); err != nil {
s.logger.Error("failed to close processor", "topic", topic, "error", err)
// 继续清理其他 processor不返回错误
}
}
s.logger.Info("seqlog stopped successfully")
return nil
}
// getOrCreateProcessor 获取或创建指定 topic 的 processor使用默认配置
// 如果没有 defaultHandler使用空 handlerno-op
func (s *Seqlog) getOrCreateProcessor(topic string) (*TopicProcessor, error) {
// 创建默认配置
var config *TopicConfig
if s.defaultHandler != nil {
// 使用默认 handler包装成 RecordHandler
topicName := topic
handler := func(rec *Record) error {
return s.defaultHandler(topicName, rec)
}
config = &TopicConfig{
Handler: handler,
TailConfig: s.defaultConfig,
}
} else {
// 没有 defaultHandler检查是否已存在
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if exists {
return processor, nil
}
// 使用空 handlerno-op允许只写入不处理
config = &TopicConfig{
Handler: func(rec *Record) error {
return nil // 空处理,什么都不做
},
TailConfig: s.defaultConfig,
}
}
return s.getOrCreateProcessorWithConfig(topic, config)
}
// getOrCreateProcessorWithConfig 获取或创建指定 topic 的 processor使用指定配置
func (s *Seqlog) getOrCreateProcessorWithConfig(topic string, config *TopicConfig) (*TopicProcessor, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if exists {
return processor, nil
}
s.mu.Lock()
defer s.mu.Unlock()
// 双重检查
if processor, exists := s.processors[topic]; exists {
return processor, nil
}
// 创建新的 processor使用带 topic 属性的 logger
topicLogger := s.logger.With("topic", topic)
topicLogger.Debug("auto-creating processor")
var err error
processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config)
if err != nil {
topicLogger.Error("failed to create processor", "error", err)
return nil, fmt.Errorf("failed to create processor: %w", err)
}
s.processors[topic] = processor
// 订阅 processor 的所有事件,转发到全局事件总线
processor.SubscribeAll(func(event *Event) {
s.globalEventBus.Publish(event)
})
// 如果正在运行,立即启动 processor
if s.running {
if err := processor.Start(); err != nil {
topicLogger.Error("failed to start processor", "error", err)
return nil, fmt.Errorf("failed to start processor: %w", err)
}
topicLogger.Debug("processor auto-started")
}
return processor, nil
}
// GetTopics 获取所有已知的 topic
func (s *Seqlog) GetTopics() []string {
s.mu.RLock()
defer s.mu.RUnlock()
topics := make([]string, 0, len(s.processors))
for topic := range s.processors {
topics = append(topics, topic)
}
return topics
}
// IsRunning 检查 Seqlog 是否正在运行
func (s *Seqlog) IsRunning() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.running
}
// UpdateTopicConfig 动态更新指定 topic 的 tail 配置
func (s *Seqlog) UpdateTopicConfig(topic string, config *TailConfig) error {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return fmt.Errorf("topic %s not found", topic)
}
return processor.UpdateTailConfig(config)
}
// GetTopicConfig 获取指定 topic 的 tail 配置
func (s *Seqlog) GetTopicConfig(topic string) (*TailConfig, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("topic %s not found", topic)
}
return processor.GetTailConfig(), nil
}
// GetTopicStats 获取指定 topic 的统计信息
func (s *Seqlog) GetTopicStats(topic string) (Stats, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return Stats{}, fmt.Errorf("topic %s not found", topic)
}
return processor.GetStats(), nil
}
// GetAllStats 获取所有 topic 的统计信息
func (s *Seqlog) GetAllStats() map[string]Stats {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]Stats, len(s.processors))
for topic, processor := range s.processors {
result[topic] = processor.GetStats()
}
return result
}
// NewTopicQuery 为指定 topic 获取查询器(返回共享实例)
func (s *Seqlog) NewTopicQuery(topic string) (*RecordQuery, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("topic %s not found", topic)
}
return processor.Query(), nil
}
// GetProcessingIndex 获取指定 topic 的当前处理索引
func (s *Seqlog) GetProcessingIndex(topic string) int {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return 0
}
return processor.GetProcessingIndex()
}
// GetReadIndex 获取指定 topic 的当前读取索引
func (s *Seqlog) GetReadIndex(topic string) int {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return 0
}
return processor.GetReadIndex()
}
// GetProcessor 获取指定 topic 的 processor
func (s *Seqlog) GetProcessor(topic string) (*TopicProcessor, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("topic %s not found", topic)
}
return processor, nil
}
// Subscribe 为指定 topic 订阅事件(如果 topic 不存在,会在创建时应用订阅)
func (s *Seqlog) Subscribe(topic string, eventType EventType, listener EventListener) error {
s.mu.Lock()
defer s.mu.Unlock()
processor, exists := s.processors[topic]
if exists {
processor.Subscribe(eventType, listener)
} else {
// topic 还不存在,保存待处理的订阅
// 使用包装函数,只转发给对应 topic 的事件
wrappedListener := func(event *Event) {
if event.Topic == topic {
listener(event)
}
}
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener)
// 同时订阅到全局事件总线
s.globalEventBus.Subscribe(eventType, wrappedListener)
}
return nil
}
// SubscribeAll 为指定 topic 订阅所有事件
func (s *Seqlog) SubscribeAll(topic string, listener EventListener) error {
s.mu.Lock()
defer s.mu.Unlock()
processor, exists := s.processors[topic]
if exists {
processor.SubscribeAll(listener)
} else {
// topic 还不存在,为所有事件类型保存待处理的订阅
allTypes := []EventType{
EventWriteSuccess,
EventWriteError,
EventProcessSuccess,
EventProcessError,
EventProcessorStart,
EventProcessorStop,
EventProcessorReset,
EventPositionSaved,
}
for _, eventType := range allTypes {
wrappedListener := func(event *Event) {
if event.Topic == topic {
listener(event)
}
}
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener)
s.globalEventBus.Subscribe(eventType, wrappedListener)
}
}
return nil
}
// SubscribeAllTopics 为所有 topic 订阅指定事件
func (s *Seqlog) SubscribeAllTopics(eventType EventType, listener EventListener) {
s.mu.Lock()
defer s.mu.Unlock()
// 为已存在的 processor 订阅
for _, processor := range s.processors {
processor.Subscribe(eventType, listener)
}
// 保存为全局订阅,新创建的 processor 也会自动订阅
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], listener)
s.globalEventBus.Subscribe(eventType, listener)
}
// ResetTopic 重置指定 topic 的所有数据
// 注意:必须先停止 Seqlog 或至少停止该 topic 的 processor
func (s *Seqlog) ResetTopic(topic string) error {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return fmt.Errorf("topic %s not found", topic)
}
// 先停止 processor
if err := processor.Stop(); err != nil {
return fmt.Errorf("failed to stop processor: %w", err)
}
// 执行重置
if err := processor.Reset(); err != nil {
return fmt.Errorf("failed to reset processor: %w", err)
}
// 如果 seqlog 正在运行,重新启动 processor
s.mu.RLock()
running := s.running
s.mu.RUnlock()
if running {
if err := processor.Start(); err != nil {
return fmt.Errorf("failed to restart processor: %w", err)
}
}
return nil
}

1843
seqlog_test.go Normal file

File diff suppressed because it is too large Load Diff

159
stats.go Normal file
View File

@@ -0,0 +1,159 @@
package seqlog
import (
"encoding/json"
"fmt"
"os"
"sync/atomic"
"time"
)
// Stats topic 统计信息
type Stats struct {
WriteCount int64 `json:"write_count"` // 写入次数
WriteBytes int64 `json:"write_bytes"` // 写入字节数
ProcessedCount int64 `json:"processed_count"` // 处理次数
ProcessedBytes int64 `json:"processed_bytes"` // 处理字节数
ErrorCount int64 `json:"error_count"` // 错误次数
FirstWriteTime time.Time `json:"first_write_time"` // 首次写入时间
LastWriteTime time.Time `json:"last_write_time"` // 最后写入时间
}
// TopicStats topic 统计管理器(支持原子操作和持久化)
type TopicStats struct {
writeCount atomic.Int64
writeBytes atomic.Int64
processedCount atomic.Int64
processedBytes atomic.Int64
errorCount atomic.Int64
firstWriteTime atomic.Value // time.Time
lastWriteTime atomic.Value // time.Time
statsPath string
}
// NewTopicStats 创建 topic 统计管理器
func NewTopicStats(statsPath string) *TopicStats {
ts := &TopicStats{
statsPath: statsPath,
}
// 尝试从文件加载统计信息
if err := ts.Load(); err != nil && !os.IsNotExist(err) {
// 忽略文件不存在错误,其他错误也忽略(使用默认值)
}
return ts
}
// IncWrite 增加写入计数
func (ts *TopicStats) IncWrite(bytes int64) {
ts.writeCount.Add(1)
ts.writeBytes.Add(bytes)
now := time.Now()
ts.lastWriteTime.Store(now)
// 如果是首次写入,设置首次写入时间
if ts.firstWriteTime.Load() == nil {
ts.firstWriteTime.Store(now)
}
}
// IncProcessed 增加处理计数
func (ts *TopicStats) IncProcessed(bytes int64) {
ts.processedCount.Add(1)
ts.processedBytes.Add(bytes)
}
// IncError 增加错误计数
func (ts *TopicStats) IncError() {
ts.errorCount.Add(1)
}
// Get 获取当前统计信息
func (ts *TopicStats) Get() Stats {
stats := Stats{
WriteCount: ts.writeCount.Load(),
WriteBytes: ts.writeBytes.Load(),
ProcessedCount: ts.processedCount.Load(),
ProcessedBytes: ts.processedBytes.Load(),
ErrorCount: ts.errorCount.Load(),
}
if t := ts.firstWriteTime.Load(); t != nil {
stats.FirstWriteTime = t.(time.Time)
}
if t := ts.lastWriteTime.Load(); t != nil {
stats.LastWriteTime = t.(time.Time)
}
return stats
}
// Save 保存统计信息到文件
func (ts *TopicStats) Save() error {
stats := ts.Get()
data, err := json.Marshal(stats)
if err != nil {
return fmt.Errorf("marshal stats: %w", err)
}
// 原子写入:先写临时文件,再重命名
tmpPath := ts.statsPath + ".tmp"
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
return fmt.Errorf("write temp file: %w", err)
}
if err := os.Rename(tmpPath, ts.statsPath); err != nil {
return fmt.Errorf("rename temp file: %w", err)
}
return nil
}
// Load 从文件加载统计信息
func (ts *TopicStats) Load() error {
data, err := os.ReadFile(ts.statsPath)
if err != nil {
return err
}
var stats Stats
if err := json.Unmarshal(data, &stats); err != nil {
return fmt.Errorf("unmarshal stats: %w", err)
}
// 恢复统计数据
ts.writeCount.Store(stats.WriteCount)
ts.writeBytes.Store(stats.WriteBytes)
ts.processedCount.Store(stats.ProcessedCount)
ts.processedBytes.Store(stats.ProcessedBytes)
ts.errorCount.Store(stats.ErrorCount)
if !stats.FirstWriteTime.IsZero() {
ts.firstWriteTime.Store(stats.FirstWriteTime)
}
if !stats.LastWriteTime.IsZero() {
ts.lastWriteTime.Store(stats.LastWriteTime)
}
return nil
}
// Reset 重置所有统计信息并删除统计文件
func (ts *TopicStats) Reset() error {
// 重置所有计数器
ts.writeCount.Store(0)
ts.writeBytes.Store(0)
ts.processedCount.Store(0)
ts.processedBytes.Store(0)
ts.errorCount.Store(0)
ts.firstWriteTime = atomic.Value{}
ts.lastWriteTime = atomic.Value{}
// 删除统计文件
if err := os.Remove(ts.statsPath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove stats file: %w", err)
}
return nil
}

149
tailer.go Normal file
View File

@@ -0,0 +1,149 @@
package seqlog
import (
"context"
"fmt"
"io"
"time"
)
// RecordHandler 日志记录处理函数类型
type RecordHandler func(*Record) error
// TopicRecordHandler 带 topic 信息的日志记录处理函数类型
type TopicRecordHandler func(topic string, rec *Record) error
// TailConfig tail 模式配置
type TailConfig struct {
PollInterval time.Duration // 轮询间隔,默认 100ms
SaveInterval time.Duration // 位置保存间隔,默认 1s
BatchSize int // 批量处理大小,默认 10
}
// LogTailer 持续监控处理器
type LogTailer struct {
cursor *LogCursor
handler RecordHandler
config TailConfig
configCh chan TailConfig // 用于动态更新配置
stopCh chan struct{}
doneCh chan struct{}
}
// NewTailer 创建一个新的 tail 处理器
// cursor: 外部提供的游标,用于读取和跟踪日志位置
func NewTailer(cursor *LogCursor, handler RecordHandler, config *TailConfig) (*LogTailer, error) {
if cursor == nil {
return nil, fmt.Errorf("cursor cannot be nil")
}
cfg := TailConfig{
PollInterval: 100 * time.Millisecond,
SaveInterval: 1 * time.Second,
BatchSize: 10,
}
if config != nil {
if config.PollInterval > 0 {
cfg.PollInterval = config.PollInterval
}
if config.SaveInterval > 0 {
cfg.SaveInterval = config.SaveInterval
}
if config.BatchSize > 0 {
cfg.BatchSize = config.BatchSize
}
}
return &LogTailer{
cursor: cursor,
handler: handler,
config: cfg,
configCh: make(chan TailConfig, 1),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}, nil
}
// Start 使用 context 控制的启动方式
func (t *LogTailer) Start(ctx context.Context) error {
defer close(t.doneCh)
defer t.cursor.savePosition() // 退出时保存位置
saveTicker := time.NewTicker(t.config.SaveInterval)
defer saveTicker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.stopCh:
return nil
case newConfig := <-t.configCh:
// 动态更新配置
t.config = newConfig
saveTicker.Reset(t.config.SaveInterval)
case <-saveTicker.C:
// 定期保存位置
t.cursor.savePosition()
default:
// 批量读取记录
records, err := t.cursor.NextRange(t.config.BatchSize)
if err != nil {
if err == io.EOF {
// 文件末尾,等待新数据
time.Sleep(t.config.PollInterval)
continue
}
return fmt.Errorf("read records error: %w", err)
}
// 批量处理记录
for _, rec := range records {
if err := t.handler(rec); err != nil {
// 处理失败,回滚窗口
t.cursor.Rollback()
return fmt.Errorf("handler error: %w", err)
}
}
// 全部处理成功,提交窗口
t.cursor.Commit()
}
}
}
// Stop 停止监控
func (t *LogTailer) Stop() {
close(t.stopCh)
<-t.doneCh // 等待完全停止
}
// UpdateConfig 动态更新配置
func (t *LogTailer) UpdateConfig(config TailConfig) {
select {
case t.configCh <- config:
// 配置已发送
default:
// channel 满了,丢弃旧配置,发送新配置
select {
case <-t.configCh:
default:
}
t.configCh <- config
}
}
// GetConfig 获取当前配置
func (t *LogTailer) GetConfig() TailConfig {
return t.config
}
// GetStartIndex 获取已处理索引(窗口开始索引)
func (t *LogTailer) GetStartIndex() int {
return t.cursor.StartIndex()
}
// GetEndIndex 获取当前读取索引(窗口结束索引)
func (t *LogTailer) GetEndIndex() int {
return t.cursor.EndIndex()
}

576
topic_processor.go Normal file
View File

@@ -0,0 +1,576 @@
package seqlog
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
)
// TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口
type TopicProcessor struct {
topic string
logPath string
logger *slog.Logger
// 核心组件(聚合)
writer *LogWriter // 写入器
index *RecordIndex // 索引管理器
query *RecordQuery // 查询器
cursor *LogCursor // 游标
tailer *LogTailer // 持续处理器
// 配置和状态
handler RecordHandler
tailConfig *TailConfig
stats *TopicStats // 统计信息
eventBus *EventBus // 事件总线
// 并发控制
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
running bool
}
// TopicConfig topic 配置
type TopicConfig struct {
Handler RecordHandler // 处理函数(必填)
TailConfig *TailConfig // tail 配置,可选
}
// NewTopicProcessor 创建一个新的 topic 处理器
// 在初始化时创建所有核心组件index 在组件间共享
// handler 为必填参数,如果 config 为 nil 或 config.Handler 为 nil 会返回错误
func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *TopicConfig) (*TopicProcessor, error) {
// 验证必填参数
if config == nil || config.Handler == nil {
return nil, fmt.Errorf("config and config.Handler are required")
}
ctx, cancel := context.WithCancel(context.Background())
// 默认配置
tailConfig := &TailConfig{
PollInterval: 100 * 1000000, // 100ms
SaveInterval: 1000 * 1000000, // 1s
}
if config.TailConfig != nil {
tailConfig = config.TailConfig
}
if logger == nil {
logger = slog.Default()
}
logPath := filepath.Join(baseDir, topic+".log")
statsPath := filepath.Join(baseDir, topic+".stats")
tp := &TopicProcessor{
topic: topic,
logPath: logPath,
logger: logger,
handler: config.Handler,
tailConfig: tailConfig,
stats: NewTopicStats(statsPath),
eventBus: NewEventBus(),
ctx: ctx,
cancel: cancel,
}
// 初始化所有组件
if err := tp.initializeComponents(); err != nil {
cancel()
return nil, fmt.Errorf("failed to initialize components: %w", err)
}
return tp, nil
}
// initializeComponents 初始化所有核心组件
func (tp *TopicProcessor) initializeComponents() error {
// 1. 创建共享的索引管理器
index, err := NewRecordIndex(tp.logPath)
if err != nil {
return fmt.Errorf("create index: %w", err)
}
tp.index = index
// 2. 创建写入器(使用共享 index
writer, err := NewLogWriter(tp.logPath, tp.index)
if err != nil {
tp.index.Close()
return fmt.Errorf("create writer: %w", err)
}
tp.writer = writer
// 3. 创建查询器(使用共享 index
query, err := NewRecordQuery(tp.logPath, tp.index)
if err != nil {
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create query: %w", err)
}
tp.query = query
// 4. 创建游标(使用共享 index
cursor, err := NewCursor(tp.logPath, tp.index)
if err != nil {
tp.query.Close()
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create cursor: %w", err)
}
tp.cursor = cursor
// 5. 创建 tailerhandler 为必填,总是创建)
// 注意:只创建不启动,启动在 Start() 中进行
if err := tp.createTailer(); err != nil {
tp.cursor.Close()
tp.query.Close()
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create tailer: %w", err)
}
tp.logger.Debug("all components initialized")
return nil
}
// createTailer 创建 tailer不启动
func (tp *TopicProcessor) createTailer() error {
// 包装 handler添加统计功能和事件发布
wrappedHandler := func(rec *Record) error {
if err := tp.handler(rec); err != nil {
tp.stats.IncError()
// 发布处理错误事件
tp.eventBus.Publish(&Event{
Type: EventProcessError,
Topic: tp.topic,
Timestamp: time.Now(),
Record: rec,
Error: err,
Position: 0, // Position 在 tailer 模式下不可用
})
return err
}
// 处理成功,更新统计
tp.stats.IncProcessed(int64(len(rec.Data)))
// 发布处理成功事件
tp.eventBus.Publish(&Event{
Type: EventProcessSuccess,
Topic: tp.topic,
Timestamp: time.Now(),
Record: rec,
Position: 0, // Position 在 tailer 模式下不可用
})
return nil
}
tp.logger.Debug("creating tailer")
tailer, err := NewTailer(tp.cursor, wrappedHandler, tp.tailConfig)
if err != nil {
tp.logger.Error("failed to create tailer", "error", err)
return fmt.Errorf("failed to create tailer: %w", err)
}
tp.tailer = tailer
tp.logger.Debug("tailer created")
return nil
}
// Write 写入日志(统一接口)
func (tp *TopicProcessor) Write(data []byte) (int64, error) {
offset, err := tp.writer.Append(data)
if err != nil {
tp.logger.Error("failed to append", "error", err)
tp.stats.IncError()
// 发布写入错误事件
tp.eventBus.Publish(&Event{
Type: EventWriteError,
Topic: tp.topic,
Timestamp: time.Now(),
Error: err,
})
return 0, err
}
// 更新统计信息
tp.stats.IncWrite(int64(len(data)))
tp.logger.Debug("write success", "offset", offset, "size", len(data))
// 发布写入成功事件
tp.eventBus.Publish(&Event{
Type: EventWriteSuccess,
Topic: tp.topic,
Timestamp: time.Now(),
Position: offset,
})
return offset, nil
}
// Start 启动 tailer如果已创建
func (tp *TopicProcessor) Start() error {
tp.mu.Lock()
defer tp.mu.Unlock()
if tp.running {
return fmt.Errorf("topic processor for %s is already running", tp.topic)
}
tp.logger.Debug("starting processor")
// 重新创建 context如果之前被 cancel 了)
if tp.ctx.Err() != nil {
tp.ctx, tp.cancel = context.WithCancel(context.Background())
}
tp.running = true
// 如果 tailer 已创建,启动它
if tp.tailer != nil {
tp.logger.Debug("launching tailer goroutine")
tp.wg.Go(func() {
tp.logger.Debug("tailer goroutine started")
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
tp.logger.Error("tailer error", "error", err)
}
tp.logger.Debug("tailer goroutine finished")
})
}
// 发布启动事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStart,
Topic: tp.topic,
Timestamp: time.Now(),
})
return nil
}
// Stop 停止 tailer
func (tp *TopicProcessor) Stop() error {
tp.mu.Lock()
if !tp.running {
tp.mu.Unlock()
return nil
}
tp.logger.Debug("stopping processor")
tp.running = false
tp.cancel()
tp.mu.Unlock()
// 等待 tailer 停止
tp.wg.Wait()
tp.logger.Debug("processor stopped")
// 发布停止事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStop,
Topic: tp.topic,
Timestamp: time.Now(),
})
return nil
}
// Topic 返回 topic 名称
func (tp *TopicProcessor) Topic() string {
return tp.topic
}
// IsRunning 检查是否正在运行
func (tp *TopicProcessor) IsRunning() bool {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.running
}
// UpdateTailConfig 动态更新 tail 配置
func (tp *TopicProcessor) UpdateTailConfig(config *TailConfig) error {
tp.mu.Lock()
defer tp.mu.Unlock()
if config == nil {
return fmt.Errorf("config cannot be nil")
}
tp.tailConfig = config
// 如果 tailer 已经在运行,更新其配置
if tp.tailer != nil {
tp.tailer.UpdateConfig(*config)
}
return nil
}
// GetTailConfig 获取当前 tail 配置
func (tp *TopicProcessor) GetTailConfig() *TailConfig {
tp.mu.RLock()
defer tp.mu.RUnlock()
cfg := tp.tailConfig
return cfg
}
// GetStats 获取当前统计信息
func (tp *TopicProcessor) GetStats() Stats {
return tp.stats.Get()
}
// Query 获取共享的查询器
func (tp *TopicProcessor) Query() *RecordQuery {
return tp.query
}
// QueryOldest 从指定索引开始查询记录(向前读取)
// startIndex: 查询起始索引
// count: 查询数量
// 返回的记录包含状态信息(基于 tailer 的窗口索引),按时间顺序(索引递增方向)
func (tp *TopicProcessor) QueryOldest(startIndex, count int) ([]*RecordWithStatus, error) {
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
return tp.query.QueryOldest(startIndex, count, startIdx, endIdx)
}
// QueryNewest 从指定索引开始向后查询记录(索引递减方向)
// endIndex: 查询结束索引(最新的记录)
// count: 查询数量
// 返回的记录包含状态信息(基于 tailer 的窗口索引),按时间倒序(最新在前)
func (tp *TopicProcessor) QueryNewest(endIndex, count int) ([]*RecordWithStatus, error) {
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
return tp.query.QueryNewest(endIndex, count, startIdx, endIdx)
}
// GetRecordCount 获取记录总数(统一接口)
func (tp *TopicProcessor) GetRecordCount() int {
return tp.index.Count()
}
// Cursor 创建一个新的游标实例(使用共享的 index
// 注意:每次调用都会创建新实例,调用者需要负责关闭
// Tailer 内部有自己的游标,不会与此冲突
func (tp *TopicProcessor) Cursor() (*LogCursor, error) {
return NewCursor(tp.logPath, tp.index)
}
// Index 获取索引管理器
func (tp *TopicProcessor) Index() *RecordIndex {
return tp.index
}
// GetProcessingIndex 获取当前处理索引(窗口开始索引)
func (tp *TopicProcessor) GetProcessingIndex() int {
tp.mu.RLock()
defer tp.mu.RUnlock()
if tp.tailer == nil {
return 0
}
return tp.tailer.GetStartIndex()
}
// GetReadIndex 获取当前读取索引(窗口结束索引)
func (tp *TopicProcessor) GetReadIndex() int {
tp.mu.RLock()
defer tp.mu.RUnlock()
if tp.tailer == nil {
return 0
}
return tp.tailer.GetEndIndex()
}
// Subscribe 订阅事件
func (tp *TopicProcessor) Subscribe(eventType EventType, listener EventListener) {
tp.eventBus.Subscribe(eventType, listener)
}
// SubscribeAll 订阅所有事件
func (tp *TopicProcessor) SubscribeAll(listener EventListener) {
tp.eventBus.SubscribeAll(listener)
}
// Unsubscribe 取消订阅
func (tp *TopicProcessor) Unsubscribe(eventType EventType) {
tp.eventBus.Unsubscribe(eventType)
}
// Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件
// 注意:必须在 Stop 之后调用
func (tp *TopicProcessor) Reset() error {
tp.mu.Lock()
defer tp.mu.Unlock()
if tp.running {
return fmt.Errorf("cannot reset while processor is running, please stop first")
}
tp.logger.Debug("resetting processor")
var errs []error
// 关闭 writer如果还未关闭
if tp.writer != nil {
if err := tp.writer.Close(); err != nil {
tp.logger.Error("failed to close writer during reset", "error", err)
errs = append(errs, fmt.Errorf("close writer: %w", err))
}
tp.writer = nil
}
// 删除日志文件
if err := os.Remove(tp.logPath); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove log file", "error", err)
errs = append(errs, fmt.Errorf("remove log file: %w", err))
}
// 删除位置文件
posFile := tp.logPath + ".pos"
if err := os.Remove(posFile); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove position file", "error", err)
errs = append(errs, fmt.Errorf("remove position file: %w", err))
}
// 删除索引文件
indexFile := tp.logPath + ".idx"
if err := os.Remove(indexFile); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove index file", "error", err)
errs = append(errs, fmt.Errorf("remove index file: %w", err))
}
// 关闭所有组件
if tp.query != nil {
tp.query.Close()
tp.query = nil
}
if tp.index != nil {
tp.index.Close()
tp.index = nil
}
// 重新初始化所有组件(已持有锁)
// 这会重新创建 index, writer, query如果有 handler 也会创建 tailer
if err := tp.initializeComponents(); err != nil {
tp.logger.Error("failed to reinitialize components", "error", err)
errs = append(errs, fmt.Errorf("reinitialize components: %w", err))
}
// 重置统计信息
if tp.stats != nil {
tp.stats.Reset()
}
tp.logger.Debug("processor reset completed")
// 发布重置事件
tp.eventBus.Publish(&Event{
Type: EventProcessorReset,
Topic: tp.topic,
Timestamp: time.Now(),
})
// 如果有多个错误,返回第一个
if len(errs) > 0 {
return errs[0]
}
return nil
}
// Close 清理 processor 的所有资源
func (tp *TopicProcessor) Close() error {
tp.mu.Lock()
defer tp.mu.Unlock()
tp.logger.Debug("closing processor")
var errs []error
// 保存统计信息
if tp.stats != nil {
if err := tp.stats.Save(); err != nil {
tp.logger.Error("failed to save stats", "error", err)
errs = append(errs, fmt.Errorf("save stats: %w", err))
}
}
// 关闭 query
if tp.query != nil {
if err := tp.query.Close(); err != nil {
tp.logger.Error("failed to close query", "error", err)
errs = append(errs, fmt.Errorf("close query: %w", err))
}
tp.query = nil
}
// 关闭 cursor如果 tailer 未启动cursor 可能还未关闭)
if tp.cursor != nil {
if err := tp.cursor.Close(); err != nil {
tp.logger.Error("failed to close cursor", "error", err)
errs = append(errs, fmt.Errorf("close cursor: %w", err))
}
tp.cursor = nil
}
// 关闭 writer
if tp.writer != nil {
if err := tp.writer.Close(); err != nil {
tp.logger.Error("failed to close writer", "error", err)
errs = append(errs, fmt.Errorf("close writer: %w", err))
}
tp.writer = nil
}
// 关闭 index最后关闭因为其他组件可能依赖它
if tp.index != nil {
if err := tp.index.Close(); err != nil {
tp.logger.Error("failed to close index", "error", err)
errs = append(errs, fmt.Errorf("close index: %w", err))
}
tp.index = nil
}
// tailer 会通过 context cancel 和 Stop() 自动关闭
tp.tailer = nil
tp.logger.Debug("processor closed")
// 如果有多个错误,返回第一个
if len(errs) > 0 {
return errs[0]
}
return nil
}

84
writer.go Normal file
View File

@@ -0,0 +1,84 @@
package seqlog
import (
"encoding/binary"
"hash/crc32"
"os"
"github.com/google/uuid"
)
// LogWriter 日志写入器
type LogWriter struct {
fd *os.File
off int64 // 当前写入偏移
wbuf []byte // 8 MiB 复用
index *RecordIndex // 索引管理器(可选)
}
// NewLogWriter 创建一个新的日志写入器
// index: 外部提供的索引管理器,用于在多个组件间共享
func NewLogWriter(path string, index *RecordIndex) (*LogWriter, error) {
if index == nil {
return nil, os.ErrInvalid
}
fd, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
off, _ := fd.Seek(0, 2) // 跳到尾部
w := &LogWriter{
fd: fd,
off: off,
wbuf: make([]byte, 0, 8<<20),
index: index,
}
return w, nil
}
// Append 追加一条日志记录,返回该记录的偏移量
func (w *LogWriter) Append(data []byte) (int64, error) {
// 记录当前偏移(返回给调用者,用于索引)
offset := w.off
// 生成 UUID v4
id := uuid.New()
// 编码:[4B len][4B CRC][16B UUID][data]
buf := w.wbuf[:0]
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(data)))
buf = binary.LittleEndian.AppendUint32(buf, crc32.ChecksumIEEE(data))
buf = append(buf, id[:]...)
buf = append(buf, data...)
// 落盘 + sync
if _, err := w.fd.Write(buf); err != nil {
return 0, err
}
if err := w.fd.Sync(); err != nil {
return 0, err
}
// 数据写入成功,立即更新偏移量(保证 w.off 和文件大小一致)
w.off += int64(len(buf))
// 更新索引(如果索引失败,数据已持久化,依赖启动时 rebuild 恢复)
if err := w.index.Append(offset); err != nil {
// 索引失败不影响 w.off因为数据已经写入
return 0, err
}
return offset, nil
}
// Close 关闭写入器
// 注意:不关闭 index因为 index 是外部管理的共享资源
func (w *LogWriter) Close() error {
if w.fd == nil {
return nil
}
return w.fd.Close()
}