diff --git a/INDEX_DESIGN.md b/INDEX_DESIGN.md deleted file mode 100644 index 1c6fa17..0000000 --- a/INDEX_DESIGN.md +++ /dev/null @@ -1,321 +0,0 @@ -# Seqlog 索引设计文档 - -## 概述 - -seqlog 现已支持**持久化索引文件**,实现高效的日志记录查询和检索。 - -## 设计原则 - -1. **职责分离**:数据文件只存储数据,索引文件负责 offset 管理 -2. **启动时重建**:每次启动都从日志文件重建索引,确保一致性 -3. **最小化存储**:移除冗余字段,优化存储空间 - -## 索引文件格式 - -### 文件命名 - -``` -{logfile}.idx -``` - -例如:`app.log` 对应的索引文件为 `app.log.idx` - -### 数据文件结构 - -``` -每条记录:[4B len][4B CRC][16B UUID][data] -头部大小:24 字节 - -示例: -00000000 0f 00 00 00 8b 54 b3 a5 a5 9b fb 59 dd d5 45 2c |.....T.....Y..E,| - ↑ Len=15 ↑ CRC ↑ UUID 开始... -00000010 a1 82 6f 16 5c 54 94 8d e6 97 a5 e5 bf 97 e8 ae |..o.\T..........| - ↑ ...UUID 继续 ↑ 数据开始 -``` - -### 索引文件结构 - -``` -┌─────────────────────────────────────────────────┐ -│ Header (8 字节) │ -│ ┌─────────────────────────────────────────────┐ │ -│ │ Magic: [4B] 0x53494458 ("SIDX") │ │ -│ │ Version: [4B] 1 │ │ -│ └─────────────────────────────────────────────┘ │ -├─────────────────────────────────────────────────┤ -│ Index Entries (每条 8 字节) │ -│ ┌─────────────────────────────────────────────┐ │ -│ │ Offset1: [8B] 第 1 条记录的偏移 │ │ -│ │ Offset2: [8B] 第 2 条记录的偏移 │ │ -│ │ ... │ │ -│ │ OffsetN: [8B] 第 N 条记录的偏移 │ │ -│ └─────────────────────────────────────────────┘ │ -└─────────────────────────────────────────────────┘ - -RecordCount = (文件大小 - 8) / 8 -LastOffset = 读取最后一条索引条目 -``` - -### 实际示例 - -15 条记录的文件: - -**数据文件** (591 字节): -``` -00000000 0f 00 00 00 8b 54 b3 a5 a5 9b fb 59 dd d5 45 2c |.....T.....Y..E,| -00000010 a1 82 6f 16 5c 54 94 8d e6 97 a5 e5 bf 97 e8 ae |..o.\T..........| - ↑ 数据:"日志记录 #1" -``` - -**索引文件** (128 字节): -``` -00000000 53 49 44 58 01 00 00 00 00 00 00 00 00 00 00 00 |SIDX............| - ↑ Magic="SIDX" ↑ Ver=1 ↑ Offset[0]=0 -00000010 27 00 00 00 00 00 00 00 4e 00 00 00 00 00 00 00 |'.......N.......| - ↑ Offset[1]=39 (0x27) ↑ Offset[2]=78 (0x4E) - -文件大小:128 字节 = 8B header + 15 × 8B entries -记录总数:(128 - 8) / 8 = 15 条 -``` - -## 核心组件 - -### 1. RecordIndex (index.go) - -索引文件管理器,负责索引的构建、加载、追加和查询。 - -#### 主要方法 - -```go -// 创建或加载索引(自动重建) -index, err := seqlog.NewRecordIndex(logPath) - -// 追加索引条目(写入时调用) -err := index.Append(offset) - -// 根据索引位置获取记录偏移 -offset, err := index.GetOffset(index) - -// 二分查找:根据偏移量查找索引位置 -idx := index.FindIndex(offset) - -// 获取记录总数 -count := index.Count() - -// 获取最后一条记录偏移 -lastOffset := index.LastOffset() - -// 关闭索引 -index.Close() -``` - -### 2. LogWriter 集成 - -写入器支持可选的索引自动更新。 - -```go -// 创建带索引的写入器 -writer, err := seqlog.NewLogWriterWithIndex(logPath, true) - -// 写入时自动更新索引 -offset, err := writer.Append([]byte("log data")) - -// 关闭写入器和索引 -writer.Close() -``` - -### 3. RecordQuery 集成 - -查询器优先使用索引文件进行高效查询。 - -```go -// 创建带索引的查询器 -query, err := seqlog.NewRecordQueryWithIndex(logPath, true) - -// 获取记录总数(从索引,O(1)) -count, err := query.GetRecordCount() - -// 向后查询(基于索引,O(log n) 定位 + O(n) 读取) -backward, err := query.QueryAt(offset, -1, count, startPos, endPos) - -query.Close() -``` - -## 性能优化 - -### 1. 启动时重建 - -- **每次启动都重建**:从日志文件扫描构建索引,确保索引和日志完全一致 -- **无损坏风险**:索引文件即使损坏也会自动重建 -- **简化设计**:无需在头部保存 RecordCount 和 LastOffset - -### 2. 增量更新 - -- 写入记录时同步追加索引条目 -- 避免每次查询都重新扫描日志文件 - -### 3. 二分查找 - -- `FindIndex()` 使用二分查找定位偏移量 -- 时间复杂度:O(log n) - -### 4. 自动恢复 - -- 索引文件损坏时自动重建 -- 写入器打开时检查并同步索引 - -## 使用场景 - -### 场景 1:高频查询 - -```go -// 使用索引,避免每次查询都扫描日志 -query, _ := seqlog.NewRecordQueryWithIndex(logPath, true) -for i := 0; i < 1000; i++ { - count, _ := query.GetRecordCount() // O(1) - // ... -} -``` - -### 场景 2:向后查询 - -```go -// 向后查询需要索引(否则需全文扫描) -backward, _ := query.QueryAt(currentPos, -1, 10, startPos, endPos) -``` - -### 场景 3:断点续传 - -```go -// 程序重启后,索引自动加载,无需重建 -index, _ := seqlog.NewRecordIndex(logPath) -count := index.Count() -lastOffset := index.LastOffset() -``` - -### 场景 4:大文件处理 - -```go -// 索引文件远小于日志文件,快速加载 -// 100 万条记录的索引文件仅 ~7.6 MB -// (24B header + 1,000,000 * 8B = 8,000,024 字节) -``` - -## API 兼容性 - -### 向后兼容 - -- 现有 API 保持不变(`NewLogWriter`, `NewRecordQuery`) -- 默认**不启用**索引,避免影响现有代码 - -### 选择性启用 - -```go -// 旧代码:不使用索引 -writer, _ := seqlog.NewLogWriter(logPath) - -// 新代码:启用索引 -writer, _ := seqlog.NewLogWriterWithIndex(logPath, true) -``` - -## 测试覆盖 - -所有索引功能均有完整测试覆盖: - -```bash -go test -v -run TestIndex -``` - -测试用例: -- `TestIndexBasicOperations` - 基本操作(构建、加载、查询) -- `TestIndexRebuild` - 索引重建 -- `TestQueryWithIndex` - 带索引的查询 -- `TestIndexAppend` - 索引追加 -- `TestIndexHeader` - 头部信息验证 - -## 文件示例 - -运行示例程序: - -```bash -cd example -go run index_example.go -``` - -示例输出: - -``` -=== 示例 1:带索引的写入器 === -写入: offset=0, data=日志记录 #1 -写入: offset=47, data=日志记录 #2 -... -索引文件已创建: test_seqlog/app.log.idx - -=== 示例 2:带索引的查询器 === -记录总数: 10 -第 5 条记录的偏移: 235 -向后查询 3 条记录: - [0] 状态=StatusProcessing, 数据=日志记录 #3 - ... -``` - -## 技术细节 - -### 存储开销 - -**数据文件**: -- 每条记录头部:24 字节(原 32 字节,节省 25%) -- 格式:`[4B len][4B CRC][16B UUID][data]` - -**索引文件**: -- 头部:8 字节(固定) -- 每条记录:8 字节 -- 总大小:`8 + recordCount * 8` 字节 - -**示例对比**(1 万条记录,每条 100 字节数据): - -| 组件 | 旧格式 (32B 头) | 新格式 (24B 头) | 节省 | -|------|----------------|----------------|------| -| 数据文件 | 1.32 MB | 1.24 MB | **80 KB (6%)** | -| 索引文件 | 128 字节 | 128 字节 | 0 | -| 总计 | 1.32 MB | 1.24 MB | **80 KB** | - -### 性能对比 - -| 操作 | 无索引 | 有索引 | -|------|--------|--------| -| 获取记录总数 | O(n) 全文扫描 | O(1) 读取头部 | -| 向后查询定位 | 不支持 | O(log n) 二分查找 | -| 启动时间 | 快(无需加载) | 中(加载索引) | -| 内存占用 | 低 | 中(索引数组) | - -### 数据一致性 - -- **启动时重建**:确保索引永远和日志文件一致 -- 运行时:写入日志后立即追加索引 -- 索引文件使用 `Sync()` 确保持久化 - -### 错误处理 - -- 日志文件不存在 → 返回错误 -- 写入失败 → 返回错误,不更新索引 -- 索引文件损坏 → 启动时自动重建(无影响) - -## 未来优化方向 - -1. **稀疏索引**:每 N 条记录建一个索引点,减少内存占用 -2. **分段索引**:大文件分段存储,支持并发查询 -3. **压缩索引**:使用差值编码减少存储空间 -4. **mmap 映射**:大索引文件使用内存映射优化加载 -5. **布隆过滤器**:快速判断记录是否存在 - -## 总结 - -索引文件设计要点: - -✅ **持久化**:索引保存到磁盘,重启后快速加载 -✅ **增量更新**:写入时自动追加,避免重建 -✅ **向后兼容**:不影响现有 API,可选启用 -✅ **自动恢复**:损坏时自动重建,确保可用性 -✅ **高效查询**:二分查找 + O(1) 元数据读取 -✅ **测试完备**:全面的单元测试覆盖 diff --git a/cursor.go b/cursor.go index e963e0e..ceff177 100644 --- a/cursor.go +++ b/cursor.go @@ -19,11 +19,13 @@ type ProcessCursor struct { startIdx int // 窗口开始索引(已处理的记录索引) endIdx int // 窗口结束索引(当前读到的记录索引) index *RecordIndex // 索引管理器(来自外部) + writer *LogWriter // 写入器引用(用于检查写入位置) } // NewCursor 创建一个新的日志游标 // index: 外部提供的索引管理器,用于快速定位记录 -func NewCursor(path string, index *RecordIndex) (*ProcessCursor, error) { +// writer: 外部提供的写入器引用,用于检查写入位置(可选,为 nil 时不进行写入保护检查) +func NewCursor(path string, index *RecordIndex, writer *LogWriter) (*ProcessCursor, error) { if index == nil { return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter) } @@ -40,6 +42,7 @@ func NewCursor(path string, index *RecordIndex) (*ProcessCursor, error) { startIdx: 0, endIdx: 0, index: index, + writer: writer, } // 尝试恢复上次位置 c.loadPosition() @@ -64,31 +67,39 @@ func (c *ProcessCursor) Next() (*Record, error) { return nil, fmt.Errorf("get offset from index: %w", err) } + // 写入保护:检查读取位置是否超过当前写入位置 + dirtyOffset := c.writer.GetDirtyOffset() + // 如果正在写入(dirtyOffset >= 0)且记录起始位置 >= 写入位置,说明数据还未完全写入,返回 EOF 等待 + if dirtyOffset >= 0 && offset >= dirtyOffset { + return nil, io.EOF + } + // Seek 到记录位置 if _, err := c.fd.Seek(offset, 0); err != nil { return nil, fmt.Errorf("seek to offset %d: %w", offset, err) } - // 读取头部:[4B len][4B CRC][16B UUID] = 24 字节 - hdr := c.rbuf[:24] + // 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节 + hdr := c.rbuf[:32] if _, err := io.ReadFull(c.fd, hdr); err != nil { return nil, err } var rec Record rec.Len = binary.LittleEndian.Uint32(hdr[0:4]) - rec.CRC = binary.LittleEndian.Uint32(hdr[4:8]) + // hdr[4:12] 是 offset,读取时不需要使用 + rec.CRC = binary.LittleEndian.Uint32(hdr[12:16]) // 读取并校验 UUID - copy(rec.UUID[:], hdr[8:24]) + copy(rec.UUID[:], hdr[16:32]) if _, err := uuid.FromBytes(rec.UUID[:]); err != nil { return nil, fmt.Errorf("%w: %v", ErrInvalidUUID, err) } // 如果数据大于缓冲区,分配新的 buffer var payload []byte - if int(rec.Len) <= len(c.rbuf)-24 { - payload = c.rbuf[24 : 24+rec.Len] + if int(rec.Len) <= len(c.rbuf)-32 { + payload = c.rbuf[32 : 32+rec.Len] } else { payload = make([]byte, rec.Len) } @@ -97,7 +108,7 @@ func (c *ProcessCursor) Next() (*Record, error) { return nil, err } if crc32.ChecksumIEEE(payload) != rec.CRC { - return nil, ErrCRCMismatch + return nil, fmt.Errorf("%w: offset=%d", ErrCRCMismatch, offset) } rec.Data = append([]byte(nil), payload...) // 复制出去,复用 buffer diff --git a/example/get_record/main.go b/example/get_record/main.go deleted file mode 100644 index a9dc462..0000000 --- a/example/get_record/main.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "log" - "time" - - "code.tczkiot.com/seqlog" -) - -func main() { - // 创建 Seqlog 实例 - mgr := seqlog.NewLogHub("./logs", nil, nil) - - // 注册 topic handler - processedCount := 0 - err := mgr.RegisterHandler("app", func(record *seqlog.Record) error { - processedCount++ - fmt.Printf("处理记录 #%d: %s\n", processedCount, string(record.Data)) - time.Sleep(100 * time.Millisecond) // 模拟处理耗时 - return nil - }) - if err != nil { - log.Fatal(err) - } - - // 写入一些记录 - fmt.Println("=== 写入记录 ===") - for i := 0; i < 10; i++ { - data := fmt.Sprintf("日志消息 #%d", i) - offset, err := mgr.Write("app", []byte(data)) - if err != nil { - log.Fatal(err) - } - fmt.Printf("写入: offset=%d, data=%s\n", offset, data) - } - - // 启动处理 - fmt.Println("\n=== 启动日志处理 ===") - err = mgr.Start() - if err != nil { - log.Fatal(err) - } - - // 等待一段时间让处理器处理一些记录 - time.Sleep(500 * time.Millisecond) - - // 查询当前处理窗口的记录 - fmt.Println("\n=== 查询当前处理窗口记录 ===") - records, err := mgr.QueryFromProcessing("app", 5) - if err != nil { - log.Fatal(err) - } - - fmt.Printf("从处理窗口开始位置查询到 %d 条记录:\n", len(records)) - for _, rec := range records { - fmt.Printf(" [索引 %d] %s - 状态: %s\n", rec.Index, string(rec.Record.Data), rec.Status) - } - - // 查询更多记录 - fmt.Println("\n=== 查询后续记录 ===") - moreRecords, err := mgr.QueryFromProcessing("app", 10) - if err != nil { - log.Fatal(err) - } - fmt.Printf("查询到 %d 条记录:\n", len(moreRecords)) - for _, rec := range moreRecords { - fmt.Printf(" [索引 %d] %s - 状态: %s\n", rec.Index, string(rec.Record.Data), rec.Status) - } - - // 清理 - mgr.Stop() - fmt.Println("\n=== 示例完成 ===") -} diff --git a/example/index/main.go b/example/index/main.go index 627a7d7..cde277a 100644 --- a/example/index/main.go +++ b/example/index/main.go @@ -37,8 +37,8 @@ func main() { lastOffset = offset fmt.Printf("写入: offset=%d, data=%s\n", offset, data) } + defer writer.Close() - writer.Close() fmt.Printf("索引文件已创建: %s.idx\n\n", logPath) // ===== 示例 2:使用索引进行快速查询 ===== @@ -52,7 +52,7 @@ func main() { defer index2.Close() // 创建查询器(使用外部索引) - query, err := seqlog.NewRecordQuery(logPath, index2) + query, err := seqlog.NewRecordQuery(logPath, index2, writer) if err != nil { log.Fatal(err) } diff --git a/example/webapp/README.md b/example/webapp/README.md deleted file mode 100644 index b86e528..0000000 --- a/example/webapp/README.md +++ /dev/null @@ -1,65 +0,0 @@ -# Seqlog Web 演示 - -一个简单的 Web 应用,展示 Seqlog 的实际使用场景。 - -## 功能 - -### 后端模拟业务 -- 每 2 秒自动生成业务日志 -- 随机生成不同 topic(app、api、database、cache) -- 随机生成不同操作(查询、插入、更新、删除、备份、恢复、同步等) -- **随机日志大小**(2KB ~ 10MB): - - 80% 小日志(2KB - 100KB) - - 15% 中日志(100KB - 1MB) - - 5% 大日志(1MB - 10MB) - -### Web 查询界面 -- 查看所有 topics -- 查看每个 topic 的统计信息(显示实际字节数) -- 查询日志(支持向前/向后翻页) -- 实时自动刷新 -- 日志状态标注(已处理/处理中/待处理) - -## 快速启动 - -```bash -cd example/webapp -go run main.go -``` - -访问: http://localhost:8080 - -## 使用说明 - -1. **选择 Topic**: 点击左侧的 topic 列表 -2. **查看统计**: 左侧会显示该 topic 的统计信息(包括总字节数) -3. **查看日志**: 右侧显示日志内容,带状态标注 -4. **刷新**: 点击"刷新日志"按钮或等待自动刷新 -5. **翻页**: 使用"向前翻页"和"向后翻页"按钮 -6. **自定义范围**: 修改显示范围的数字,控制查询条数 - -## 界面说明 - -- **绿色边框**: 已处理的日志 -- **黄色边框**: 正在处理的日志 -- **灰色边框**: 待处理的日志 - -## 性能测试 - -由于日志大小范围很大(2KB ~ 10MB),可以观察到: -- 小日志处理速度很快 -- 大日志会占用更多存储空间 -- 统计信息会显示真实的字节数增长 - -## API 接口 - -- `GET /api/topics` - 获取所有 topics -- `GET /api/stats?topic=` - 获取统计信息 -- `GET /api/query?topic=&backward=10&forward=10` - 查询日志 -- `POST /api/write` - 手动写入日志 - -## 技术栈 - -- 后端: Go + Seqlog -- 前端: 原生 HTML/CSS/JavaScript -- 无需额外依赖 diff --git a/example/webapp/main.go b/example/webapp/main.go deleted file mode 100644 index af848a5..0000000 --- a/example/webapp/main.go +++ /dev/null @@ -1,648 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log/slog" - "math/rand" - "net/http" - "os" - "strconv" - "time" - - "code.tczkiot.com/seqlog" -) - -var ( - seq *seqlog.LogHub - logger *slog.Logger -) - -func main() { - // 初始化 - logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ - Level: slog.LevelInfo, - })) - - // 创建 Seqlog - seq = seqlog.NewLogHub("logs", logger, func(topic string, rec *seqlog.Record) error { - // 简单的日志处理:只打印摘要信息 - dataPreview := string(rec.Data) - if len(dataPreview) > 100 { - dataPreview = dataPreview[:100] + "..." - } - logger.Info("处理日志", "topic", topic, "size", len(rec.Data), "preview", dataPreview) - return nil - }) - - if err := seq.Start(); err != nil { - logger.Error("启动失败", "error", err) - os.Exit(1) - } - defer seq.Stop() - - logger.Info("Seqlog 已启动") - - // 启动后台业务模拟 - go simulateBusiness() - - // 启动 Web 服务器 - http.HandleFunc("/", handleIndex) - http.HandleFunc("/api/topics", handleTopics) - http.HandleFunc("/api/stats", handleStats) - http.HandleFunc("/api/query", handleQuery) - http.HandleFunc("/api/write", handleWrite) - - addr := ":8080" - logger.Info("Web 服务器启动", "地址", "http://localhost"+addr) - if err := http.ListenAndServe(addr, nil); err != nil { - logger.Error("服务器错误", "error", err) - } -} - -// 模拟业务写日志 -func simulateBusiness() { - topics := []string{"app", "api", "database", "cache"} - actions := []string{"查询", "插入", "更新", "删除", "连接", "断开", "备份", "恢复", "同步"} - status := []string{"成功", "失败", "超时", "重试"} - - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - - for range ticker.C { - // 随机选择 topic 和内容 - topic := topics[rand.Intn(len(topics))] - action := actions[rand.Intn(len(actions))] - st := status[rand.Intn(len(status))] - - // 随机生成日志大小:2KB 到 10MB - // 80% 概率生成小日志(2KB-100KB) - // 15% 概率生成中日志(100KB-1MB) - // 5% 概率生成大日志(1MB-10MB) - var logSize int - prob := rand.Intn(100) - if prob < 80 { - // 2KB - 100KB - logSize = 2*1024 + rand.Intn(98*1024) - } else if prob < 95 { - // 100KB - 1MB - logSize = 100*1024 + rand.Intn(924*1024) - } else { - // 1MB - 10MB - logSize = 1024*1024 + rand.Intn(9*1024*1024) - } - - // 生成日志内容 - header := fmt.Sprintf("[%s] %s %s - 用时: %dms | 数据大小: %s | ", - time.Now().Format("15:04:05"), - action, - st, - rand.Intn(1000), - formatBytes(int64(logSize))) - - // 填充随机数据到指定大小 - data := make([]byte, logSize) - copy(data, []byte(header)) - - // 填充可读的模拟数据 - fillOffset := len(header) - patterns := []string{ - "user_id=%d, session=%x, ip=%d.%d.%d.%d, ", - "query_time=%dms, rows=%d, cached=%v, ", - "error_code=%d, retry_count=%d, ", - "request_id=%x, trace_id=%x, ", - } - - for fillOffset < logSize-100 { - pattern := patterns[rand.Intn(len(patterns))] - var chunk string - switch pattern { - case patterns[0]: - chunk = fmt.Sprintf(pattern, rand.Intn(10000), rand.Intn(0xFFFFFF), - rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256)) - case patterns[1]: - chunk = fmt.Sprintf(pattern, rand.Intn(1000), rand.Intn(10000), rand.Intn(2) == 1) - case patterns[2]: - chunk = fmt.Sprintf(pattern, rand.Intn(500), rand.Intn(5)) - case patterns[3]: - chunk = fmt.Sprintf(pattern, rand.Intn(0xFFFFFFFF), rand.Intn(0xFFFFFFFF)) - } - - remaining := logSize - fillOffset - if len(chunk) > remaining { - chunk = chunk[:remaining] - } - copy(data[fillOffset:], []byte(chunk)) - fillOffset += len(chunk) - } - - // 写入日志 - if _, err := seq.Write(topic, data); err != nil { - logger.Error("写入日志失败", "error", err, "size", logSize) - } else { - logger.Info("写入日志", "topic", topic, "size", formatBytes(int64(logSize))) - } - } -} - -func formatBytes(bytes int64) string { - if bytes < 1024 { - return fmt.Sprintf("%d B", bytes) - } - if bytes < 1024*1024 { - return fmt.Sprintf("%.1f KB", float64(bytes)/1024) - } - return fmt.Sprintf("%.2f MB", float64(bytes)/1024/1024) -} - -type Record struct { - Index int `json:"index"` - Status string `json:"status"` - Data string `json:"data"` -} - -// 首页 -func handleIndex(w http.ResponseWriter, r *http.Request) { - html := ` - - - - Seqlog 日志查询 - - - -
-

Seqlog 日志查询系统

-
实时查看和管理应用日志
-
- -
- - -
-
- - - - 显示范围: 前 条, 后 -
- -
-
选择一个 topic 开始查看日志
-
-
-
- - - -` - - w.Header().Set("Content-Type", "text/html; charset=utf-8") - fmt.Fprint(w, html) -} - -// API: 获取所有 topics -func handleTopics(w http.ResponseWriter, r *http.Request) { - topics := seq.GetTopics() - json.NewEncoder(w).Encode(topics) -} - -// API: 获取统计信息 -func handleStats(w http.ResponseWriter, r *http.Request) { - topic := r.URL.Query().Get("topic") - if topic == "" { - http.Error(w, "缺少 topic 参数", http.StatusBadRequest) - return - } - - stats, err := seq.GetTopicStats(topic) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - - json.NewEncoder(w).Encode(stats) -} - -// API: 查询日志 -func handleQuery(w http.ResponseWriter, r *http.Request) { - topic := r.URL.Query().Get("topic") - if topic == "" { - http.Error(w, "缺少 topic 参数", http.StatusBadRequest) - return - } - - // 获取查询参数 - indexParam := r.URL.Query().Get("index") - direction := r.URL.Query().Get("direction") - count, _ := strconv.Atoi(r.URL.Query().Get("count")) - - if count <= 0 { - count = 10 - } - - // 获取 processor - processor, err := seq.GetProcessor(topic) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - - // 获取记录总数 - totalCount := processor.GetRecordCount() - - // 执行查询 - var results []*seqlog.RecordWithStatus - - if direction == "" { - results, err = processor.QueryFromProcessing(count) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - if len(results) == 0 { - results, err = processor.QueryFromLast(count) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - } - } else { - var refIndex int - if indexParam == "" { - http.Error(w, "参数错误", http.StatusNotFound) - return - } else { - refIndex, err = strconv.Atoi(indexParam) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - } - - if direction == "backward" { - var queryErr error - results, queryErr = processor.QueryNewest(refIndex, count) - if queryErr != nil { - http.Error(w, queryErr.Error(), http.StatusInternalServerError) - return - } - } else if direction == "forward" { - var queryErr error - results, queryErr = processor.QueryOldest(refIndex, count) - if queryErr != nil { - http.Error(w, queryErr.Error(), http.StatusInternalServerError) - return - } - } else { - http.Error(w, "参数错误", http.StatusNotFound) - return - } - } - - records := make([]Record, len(results)) - for i, result := range results { - records[i] = Record{ - Index: result.Index, - Status: result.Status.String(), - Data: string(result.Record.Data), - } - } - - json.NewEncoder(w).Encode(map[string]interface{}{ - "records": records, - "total": len(records), - "totalCount": totalCount, - }) -} - -// API: 手动写入日志 -func handleWrite(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - http.Error(w, "只支持 POST", http.StatusMethodNotAllowed) - return - } - - var req struct { - Topic string `json:"topic"` - Data string `json:"data"` - } - - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - offset, err := seq.Write(req.Topic, []byte(req.Data)) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - json.NewEncoder(w).Encode(map[string]interface{}{ - "success": true, - "offset": offset, - }) -} diff --git a/example/webui/README.md b/example/webui/README.md new file mode 100644 index 0000000..27ea8b3 --- /dev/null +++ b/example/webui/README.md @@ -0,0 +1,140 @@ +# Seqlog Web UI 示例 + +这个示例展示了如何使用 seqlog 的内置 Web UI 功能。 + +## 特性 + +- **零编译前端**:使用 Vue 3 和 Tailwind CSS CDN,无需前端构建步骤 +- **实时日志查看**:查看多个 topic 的日志记录 +- **统计信息**:实时显示每个 topic 的统计数据 +- **日志查询**:支持查询最早、最新的日志记录 +- **状态追踪**:显示日志的处理状态(待处理、处理中、已处理) +- **灵活集成**:可以集成到现有的 HTTP 服务器或其他框架 + +## 运行示例 + +```bash +cd example/webui +go run main.go +``` + +然后在浏览器中访问 http://localhost:8080 + +## API 端点 + +Web UI 提供以下 API 端点: + +- `GET /api/topics` - 获取所有 topic 列表 +- `GET /api/logs?topic=xxx&count=N` - 查询最新的 N 条日志(从最后一条向前查询) +- `GET /api/logs/first?topic=xxx&count=N` - 查询从第一条开始的 N 条日志(从索引 0 向后查询) +- `GET /api/logs/last?topic=xxx&count=N` - 查询最后的 N 条日志(从最后一条向前查询) +- `GET /api/stats?topic=xxx` - 获取指定 topic 的统计信息 + +## 使用方式 + +### 方式 1:独立 Web UI 服务器 + +最简单的方式,直接启动一个独立的 Web UI 服务器: + +```go +import "code.tczkiot.com/seqlog" + +// 创建 LogHub +hub := seqlog.NewLogHub("./logs", logger, handler) + +// 启动服务 +hub.Start() + +// 启动 Web UI(会阻塞) +hub.ServeUI(":8080") +``` + +如果需要在后台运行 Web UI: + +```go +go func() { + if err := hub.ServeUI(":8080"); err != nil { + logger.Error("Web UI 错误", "error", err) + } +}() +``` + +### 方式 2:集成到现有 HTTP 服务器 + +如果你已经有一个 HTTP 服务器,可以将 Web UI 集成到现有的 ServeMux: + +```go +import ( + "net/http" + "code.tczkiot.com/seqlog" +) + +// 创建 LogHub +hub := seqlog.NewLogHub("./logs", logger, handler) +hub.Start() + +// 创建自己的 ServeMux +mux := http.NewServeMux() + +// 注册业务端点 +mux.HandleFunc("/api/users", handleUsers) +mux.HandleFunc("/health", handleHealth) + +// 注册 seqlog Web UI 到根路径 +hub.RegisterWebUIRoutes(mux) + +// 启动服务器 +http.ListenAndServe(":8080", mux) +``` + +### 方式 3:集成到子路径 + +如果你想将 Web UI 放在子路径下(比如 `/logs/`): + +```go +// 创建主 ServeMux +mux := http.NewServeMux() + +// 注册业务端点 +mux.HandleFunc("/health", handleHealth) + +// 创建 Web UI 的 ServeMux +logsMux := http.NewServeMux() +hub.RegisterWebUIRoutes(logsMux) + +// 挂载到 /logs/ 路径 +mux.Handle("/logs/", http.StripPrefix("/logs", logsMux)) + +// 启动服务器 +http.ListenAndServe(":8080", mux) +``` + +访问 http://localhost:8080/logs/ 查看 Web UI。 + +完整的集成示例请参考 [example/webui_integration](../webui_integration/main.go)。 + +### 方式 4:集成到其他 Web 框架 + +对于 gin、echo 等框架,可以通过适配器集成: + +```go +// Gin 框架示例 +import ( + "github.com/gin-gonic/gin" + "net/http" +) + +r := gin.Default() + +// 业务路由 +r.GET("/api/users", handleUsers) + +// 创建 seqlog Web UI 的 ServeMux +logsMux := http.NewServeMux() +hub.RegisterWebUIRoutes(logsMux) + +// 使用 gin.WrapH 包装 http.Handler +r.Any("/logs/*path", gin.WrapH(http.StripPrefix("/logs", logsMux))) + +r.Run(":8080") +``` diff --git a/example/webui/main.go b/example/webui/main.go new file mode 100644 index 0000000..19bd1c9 --- /dev/null +++ b/example/webui/main.go @@ -0,0 +1,114 @@ +package main + +import ( + "fmt" + "log/slog" + "math/rand" + "os" + "strings" + "sync" + "time" + "unicode/utf8" + + "code.tczkiot.com/seqlog" +) + +func main() { + // 创建日志目录 + baseDir := "./logs" + if err := os.MkdirAll(baseDir, 0755); err != nil { + panic(err) + } + + // 创建 LogHub + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + // 自定义处理器:打印处理的记录 + handler := func(topic string, record *seqlog.Record) error { + previewSize := min(int(record.Len), 100) + validPreviewSize := previewSize + if previewSize > 0 && previewSize < int(record.Len) { + // 只有在截断的情况下才需要检查 + // 从后往前最多检查 3 个字节,找到最后一个完整的 UTF-8 字符边界 + for i := 0; i < 3 && validPreviewSize > 0; i++ { + if utf8.Valid(record.Data[:validPreviewSize]) { + break + } + validPreviewSize-- + } + } + + fmt.Printf("[%s] 处理记录: %s\n", topic, string(record.Data[:validPreviewSize])) + return nil + } + + hub := seqlog.NewLogHub(baseDir, logger, handler) + + // 启动 LogHub(会自动发现和启动所有 topic) + if err := hub.Start(); err != nil { + panic(err) + } + defer hub.Stop() + + // topic 列表(会在第一次写入时自动创建) + topics := []string{"app", "system", "access"} + + // 在后台启动 Web UI 服务器 + go func() { + fmt.Println("启动 Web UI 服务器: http://localhost:8080") + if err := hub.ServeUI(":8080"); err != nil { + fmt.Printf("Web UI 服务器错误: %v\n", err) + } + }() + + // 生成随机大小的数据(2KB 到 10MB) + generateRandomData := func(minSize, maxSize int) []byte { + size := minSize + rand.Intn(maxSize-minSize) + // 使用重复字符填充,模拟实际日志内容 + return []byte(strings.Repeat("X", size)) + } + + // 启动多个并发写入器(提高并发数) + var wg sync.WaitGroup + concurrentWriters := 10 // 10 个并发写入器 + + for i := range concurrentWriters { + wg.Add(1) + go func(writerID int) { + defer wg.Done() + count := 0 + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for range ticker.C { + count++ + // 随机选择 topic + topic := topics[rand.Intn(len(topics))] + + // 生成随机大小的数据(2KB 到 2MB) + minSize := 2 * 1024 // 2KB + maxSize := 2 * 1024 * 1024 // 2MB + randomData := generateRandomData(minSize, maxSize) + + // 组合消息头和随机数据 + message := fmt.Sprintf("[Writer-%d] 日志 #%d - %s - 大小: %d bytes\n", + writerID, count, time.Now().Format(time.RFC3339), len(randomData)) + fullData := append([]byte(message), randomData...) + + if _, err := hub.Write(topic, fullData); err != nil { + fmt.Printf("写入失败: %v\n", err) + } + } + }(i) + } + + // 等待用户中断 + fmt.Println("\n==== Seqlog Web UI 示例 ====") + fmt.Println("访问 http://localhost:8080 查看 Web UI") + fmt.Println("按 Ctrl+C 退出") + + // 阻塞主线程 + select {} +} diff --git a/example/webui_integration/main.go b/example/webui_integration/main.go new file mode 100644 index 0000000..d30f6c5 --- /dev/null +++ b/example/webui_integration/main.go @@ -0,0 +1,91 @@ +package main + +import ( + "fmt" + "log/slog" + "net/http" + "os" + "time" + + "code.tczkiot.com/seqlog" +) + +func main() { + // 创建日志目录 + baseDir := "./logs" + if err := os.MkdirAll(baseDir, 0755); err != nil { + panic(err) + } + + // 创建 LogHub + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + handler := func(topic string, record *seqlog.Record) error { + fmt.Printf("[%s] 处理记录: %s\n", topic, string(record.Data)) + return nil + } + + hub := seqlog.NewLogHub(baseDir, logger, handler) + + // 启动 LogHub + if err := hub.Start(); err != nil { + panic(err) + } + defer hub.Stop() + + // 创建自己的 ServeMux + mux := http.NewServeMux() + + // 注册自己的业务端点 + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintf(w, `{"status":"ok","service":"my-app"}`) + }) + + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + fmt.Fprintf(w, "# HELP my_app_requests_total Total requests\n") + fmt.Fprintf(w, "my_app_requests_total 12345\n") + }) + + // 在 /logs 路径下集成 seqlog Web UI + // 方法 1:使用子路径(需要创建一个包装 ServeMux) + logsMux := http.NewServeMux() + if err := hub.RegisterWebUIRoutes(logsMux); err != nil { + panic(err) + } + mux.Handle("/logs/", http.StripPrefix("/logs", logsMux)) + + // 启动模拟写日志 + go func() { + topics := []string{"app", "system", "access"} + count := 0 + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for range ticker.C { + count++ + topic := topics[count%len(topics)] + message := fmt.Sprintf("日志消息 %d - %s", count, time.Now().Format(time.RFC3339)) + + if _, err := hub.Write(topic, []byte(message)); err != nil { + fmt.Printf("写入失败: %v\n", err) + } + } + }() + + // 启动服务器 + fmt.Println("\n==== Seqlog 集成示例 ====") + fmt.Println("业务端点:") + fmt.Println(" - http://localhost:8080/health") + fmt.Println(" - http://localhost:8080/metrics") + fmt.Println("Web UI:") + fmt.Println(" - http://localhost:8080/logs/") + fmt.Println("按 Ctrl+C 退出") + + if err := http.ListenAndServe(":8080", mux); err != nil { + fmt.Printf("服务器错误: %v\n", err) + } +} diff --git a/example/webui_integration/webui_integration b/example/webui_integration/webui_integration new file mode 100755 index 0000000..beedb7a Binary files /dev/null and b/example/webui_integration/webui_integration differ diff --git a/index_test.go b/index_test.go index a8ea3bb..ed610b5 100644 --- a/index_test.go +++ b/index_test.go @@ -162,10 +162,10 @@ func TestQueryWithIndex(t *testing.T) { t.Fatalf("写入失败: %v", err) } } - writer.Close() + defer writer.Close() // 2. 创建查询器(使用共享索引) - query, err := NewRecordQuery(logPath, index) + query, err := NewRecordQuery(logPath, index, writer) if err != nil { t.Fatalf("创建查询器失败: %v", err) } diff --git a/query.go b/query.go index 0747755..1b84b93 100644 --- a/query.go +++ b/query.go @@ -5,6 +5,9 @@ import ( "fmt" "io" "os" + "unicode/utf8" + + "github.com/google/uuid" ) // RecordStatus 记录处理状态 @@ -46,20 +49,39 @@ type RecordWithIndex struct { Index int // 记录在日志文件中的索引位置 } +// RecordMetadata 记录元数据(不包含完整数据) +type RecordMetadata struct { + Index int // 记录索引 + UUID uuid.UUID // UUID + DataSize uint32 // 数据大小(字节) + DataPreview string // 数据预览(前 200 个字符) + Full bool +} + +// RecordMetadataWithStatus 带状态的记录元数据 +type RecordMetadataWithStatus struct { + Metadata *RecordMetadata + Status RecordStatus // 记录的处理状态 +} + // RecordQuery 记录查询器 type RecordQuery struct { logPath string fd *os.File rbuf []byte // 复用读缓冲区 index *RecordIndex // 索引文件管理器(来自外部) + writer *LogWriter // 日志写入器(来自外部) } // NewRecordQuery 创建记录查询器 // index 参数必须由外部提供,确保所有组件使用同一个索引实例 -func NewRecordQuery(logPath string, index *RecordIndex) (*RecordQuery, error) { +func NewRecordQuery(logPath string, index *RecordIndex, writer *LogWriter) (*RecordQuery, error) { if index == nil { return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter) } + if writer == nil { + return nil, NewValidationError("writer", "writer cannot be nil", ErrNilParameter) + } fd, err := os.Open(logPath) if err != nil { @@ -71,11 +93,100 @@ func NewRecordQuery(logPath string, index *RecordIndex) (*RecordQuery, error) { fd: fd, rbuf: make([]byte, 8<<20), // 8 MiB 缓冲区 index: index, + writer: writer, } return rq, nil } +// readRecordsMetadataForward 从指定索引位置向前顺序读取记录元数据(不读取完整 Data,但读取预览) +// startIndex: 起始记录索引 +// count: 读取数量 +func (rq *RecordQuery) readRecordsMetadataForward(startIndex, count int) ([]*RecordMetadata, error) { + // 获取起始 offset + startOffset, err := rq.index.GetOffset(startIndex) + if err != nil { + return nil, fmt.Errorf("get start offset: %w", err) + } + + if _, err := rq.fd.Seek(startOffset, 0); err != nil { + return nil, fmt.Errorf("seek to offset %d: %w", startOffset, err) + } + + results := make([]*RecordMetadata, 0, count) + currentIndex := startIndex + + for len(results) < count { + // 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节 + hdr := rq.rbuf[:32] + if _, err := io.ReadFull(rq.fd, hdr); err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("read header at index %d: %w", currentIndex, err) + } + + dataOffset := binary.LittleEndian.Uint64(hdr[4:12]) + dirtyOffset := rq.writer.GetDirtyOffset() + // 如果正在写入(dirtyOffset >= 0)且记录位置 >= 写入位置,等待写入完成 + if dirtyOffset >= 0 && dataOffset >= uint64(dirtyOffset) { + break + } + + dataLen := binary.LittleEndian.Uint32(hdr[0:4]) + var uuidBytes [16]byte + copy(uuidBytes[:], hdr[16:32]) + + // 读取数据预览(最多 200 字节) + previewSize := min(int(dataLen), 200) + + previewData := make([]byte, previewSize) + if _, err := io.ReadFull(rq.fd, previewData); err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("read preview at index %d: %w", currentIndex, err) + } + + // 确保预览数据不会在 UTF-8 字符中间截断 + validPreviewSize := previewSize + if previewSize > 0 && previewSize < int(dataLen) { + // 只有在截断的情况下才需要检查 + // 从后往前最多检查 3 个字节,找到最后一个完整的 UTF-8 字符边界 + for i := 0; i < 3 && validPreviewSize > 0; i++ { + if utf8.Valid(previewData[:validPreviewSize]) { + break + } + validPreviewSize-- + } + } + + metadata := &RecordMetadata{ + Index: currentIndex, + UUID: uuidBytes, + DataSize: dataLen, + DataPreview: string(previewData[:validPreviewSize]), + Full: previewSize == int(dataLen), + } + + // 跳过剩余数据部分 + remainingSize := int64(dataLen) - int64(previewSize) + if remainingSize > 0 { + if _, err := rq.fd.Seek(remainingSize, 1); err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("skip remaining data at index %d: %w", currentIndex, err) + } + } + + results = append(results, metadata) + currentIndex++ + } + + return results, nil +} + // readRecordsForward 从指定索引位置向前顺序读取记录 // startIndex: 起始记录索引 // count: 读取数量 @@ -94,8 +205,8 @@ func (rq *RecordQuery) readRecordsForward(startIndex, count int) ([]*Record, err currentOffset := startOffset for len(results) < count { - // 读取头部:[4B len][4B CRC][16B UUID] = 24 字节 - hdr := rq.rbuf[:24] + // 读取头部:[4B len][8B offset][4B CRC][16B UUID] = 32 字节 + hdr := rq.rbuf[:32] if _, err := io.ReadFull(rq.fd, hdr); err != nil { if err == io.EOF { break @@ -105,18 +216,24 @@ func (rq *RecordQuery) readRecordsForward(startIndex, count int) ([]*Record, err rec := &Record{ Len: binary.LittleEndian.Uint32(hdr[0:4]), - CRC: binary.LittleEndian.Uint32(hdr[4:8]), + // hdr[4:12] 是 offset,读取时不需要使用 + CRC: binary.LittleEndian.Uint32(hdr[12:16]), } - copy(rec.UUID[:], hdr[8:24]) + copy(rec.UUID[:], hdr[16:32]) // 读取数据 rec.Data = make([]byte, rec.Len) if _, err := io.ReadFull(rq.fd, rec.Data); err != nil { + // 如果遇到 EOF,说明文件可能不完整(被截断或索引不一致) + // 返回已读取的记录,而不是报错 + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } return nil, fmt.Errorf("read data at offset %d: %w", currentOffset, err) } results = append(results, rec) - currentOffset += 24 + int64(rec.Len) + currentOffset += 32 + int64(rec.Len) } return results, nil @@ -222,6 +339,100 @@ func (rq *RecordQuery) QueryNewest(refIndex, count int) ([]*RecordWithIndex, err return results, nil } +// QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据) +// refIndex: 参考索引位置 +// count: 查询数量 +// 返回的记录按索引递增方向排序,只包含元数据信息 +// 例如:QueryOldestMetadata(5, 3) 查询索引 2, 3, 4(不包含 5),返回 [2, 3, 4] +func (rq *RecordQuery) QueryOldestMetadata(refIndex, count int) ([]*RecordMetadata, error) { + if count <= 0 { + return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) + } + + totalCount := rq.index.Count() + if totalCount == 0 { + return []*RecordMetadata{}, nil + } + + // 验证参考索引范围(严格模式) + if refIndex < 0 || refIndex > totalCount { + return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [0, %d]", refIndex, totalCount), ErrInvalidRange) + } + + // 计算实际起始索引(向索引递减方向) + startIndex := refIndex - count + if startIndex < 0 { + startIndex = 0 + count = refIndex // 调整实际数量 + } + + if count <= 0 { + return []*RecordMetadata{}, nil + } + + // 读取元数据 + return rq.readRecordsMetadataForward(startIndex, count) +} + +// QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据) +// refIndex: 参考索引位置 +// count: 查询数量 +// 返回的记录按索引递增方向排序,只包含元数据信息 +// 例如:QueryNewestMetadata(5, 3) 查询索引 6, 7, 8(不包含 5),返回 [6, 7, 8] +func (rq *RecordQuery) QueryNewestMetadata(refIndex, count int) ([]*RecordMetadata, error) { + if count <= 0 { + return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) + } + + totalCount := rq.index.Count() + if totalCount == 0 { + return []*RecordMetadata{}, nil + } + + // 验证参考索引范围(严格模式) + // QueryNewestMetadata 允许 refIndex = -1(从头开始查询) + if refIndex < -1 || refIndex >= totalCount { + return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [-1, %d)", refIndex, totalCount), ErrInvalidRange) + } + + // 计算实际起始索引(向索引递增方向) + startIndex := refIndex + 1 + if startIndex >= totalCount { + return []*RecordMetadata{}, nil + } + + // 限制查询数量 + remainCount := totalCount - startIndex + if count > remainCount { + count = remainCount + } + + // 读取元数据 + return rq.readRecordsMetadataForward(startIndex, count) +} + +// QueryByIndex 根据索引查询单条记录的完整数据 +// index: 记录索引 +// 返回完整的记录数据 +func (rq *RecordQuery) QueryByIndex(index int) (*Record, error) { + totalCount := rq.index.Count() + if index < 0 || index >= totalCount { + return nil, NewValidationError("index", fmt.Sprintf("index %d out of range [0, %d)", index, totalCount), ErrInvalidRange) + } + + // 读取单条记录 + records, err := rq.readRecordsForward(index, 1) + if err != nil { + return nil, err + } + + if len(records) == 0 { + return nil, fmt.Errorf("record at index %d not found", index) + } + + return records[0], nil +} + // GetRecordCount 获取记录总数 func (rq *RecordQuery) GetRecordCount() (int, error) { return rq.index.Count(), nil diff --git a/seqlog.go b/seqlog.go index f8f9c18..c0189ee 100644 --- a/seqlog.go +++ b/seqlog.go @@ -72,8 +72,7 @@ import "github.com/google/uuid" // Record 日志记录 // -// 存储格式:[4B len][4B CRC][16B UUID][data] -// 注意:Offset 不存储在数据文件中,而是由索引文件管理 +// 存储格式:[4B len][8B offset][4B CRC][16B UUID][data] type Record struct { Len uint32 // 数据长度 CRC uint32 // CRC 校验和 diff --git a/seqlog_test.go b/seqlog_test.go index 45d0d23..b03bf55 100644 --- a/seqlog_test.go +++ b/seqlog_test.go @@ -44,7 +44,7 @@ func TestBasicWriteAndRead(t *testing.T) { } // 读取数据(使用共享的 index) - cursor, err := NewCursor(tmpFile, index) + cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } @@ -90,7 +90,7 @@ func TestCursorNextRange(t *testing.T) { writer.Close() // 测试范围读取 - cursor, err := NewCursor(tmpFile, index) + cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatal(err) } @@ -176,7 +176,7 @@ func TestCursorWindow(t *testing.T) { writer.Close() // 测试窗口模式 - cursor, err := NewCursor(tmpFile, index) + cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatal(err) } @@ -280,19 +280,19 @@ func TestCursorPersistence(t *testing.T) { } // 读取前两条记录 - cursor1, err := NewCursor(tmpFile, index) + cursor1, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } - cursor1.Next() // 读取第一条 + cursor1.Next() // 读取第一条 cursor1.Commit() // 提交 - cursor1.Next() // 读取第二条 + cursor1.Next() // 读取第二条 cursor1.Commit() // 提交 cursor1.Close() // 重新打开 cursor,应该从第三条开始读取 - cursor2, err := NewCursor(tmpFile, index) + cursor2, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("重新创建 cursor 失败: %v", err) } @@ -337,7 +337,7 @@ func TestTailer(t *testing.T) { } // 创建 cursor - cursor, err := NewCursor(tmpFile, index) + cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } @@ -407,7 +407,7 @@ func TestTailerStop(t *testing.T) { return nil } - cursor, err := NewCursor(tmpFile, index) + cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } @@ -650,7 +650,7 @@ func TestUUIDUniqueness(t *testing.T) { } // 读取并验证 UUID 唯一性 - cursor, err := NewCursor(tmpFile, index) + cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } @@ -705,7 +705,7 @@ func TestUUIDValidation(t *testing.T) { } // 读取并验证 UUID - cursor, err := NewCursor(tmpFile, index) + cursor, err := NewCursor(tmpFile, index, nil) if err != nil { t.Fatalf("创建 cursor 失败: %v", err) } @@ -1208,7 +1208,7 @@ func TestRecordQuery(t *testing.T) { } offsets[i] = offset } - writer.Close() + defer writer.Close() // 模拟处理到第 5 条记录 // 窗口范围:[索引 5, 索引 6) @@ -1223,7 +1223,7 @@ func TestRecordQuery(t *testing.T) { defer index.Close() // 创建查询器 - query, err := NewRecordQuery(tmpFile, index) + query, err := NewRecordQuery(tmpFile, index, writer) if err != nil { t.Fatalf("failed to create query: %v", err) } @@ -1821,7 +1821,7 @@ func TestQueryOldestNewest(t *testing.T) { t.Fatal(err) } defer processor.Close() - + // 写入测试数据 for i := 0; i < 10; i++ { data := fmt.Sprintf("message %d", i) @@ -1829,7 +1829,7 @@ func TestQueryOldestNewest(t *testing.T) { t.Fatal(err) } } - + // 测试 QueryNewest - 查询索引 0, 1, 2(向索引递增方向) // QueryNewest(-1, 3) 从 -1 向后查询,得到索引 0, 1, 2 oldest, err := processor.QueryNewest(-1, 3) diff --git a/topic_processor.go b/topic_processor.go index 1767317..9fcb329 100644 --- a/topic_processor.go +++ b/topic_processor.go @@ -13,15 +13,16 @@ import ( // TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口 type TopicProcessor struct { topic string + title string // 显示标题,用于 UI 展示 logPath string logger *slog.Logger // 核心组件(聚合) - writer *LogWriter // 写入器 - index *RecordIndex // 索引管理器 - query *RecordQuery // 查询器 - cursor *ProcessCursor // 游标 - tailer *LogTailer // 持续处理器 + writer *LogWriter // 写入器 + index *RecordIndex // 索引管理器 + query *RecordQuery // 查询器 + cursor *ProcessCursor // 游标 + tailer *LogTailer // 持续处理器 // 配置和状态 handler RecordHandler @@ -39,6 +40,7 @@ type TopicProcessor struct { // TopicConfig topic 配置 type TopicConfig struct { + Title string // 显示标题,可选,默认为 topic 名称 Handler RecordHandler // 处理函数(必填) TailConfig *TailConfig // tail 配置,可选 } @@ -71,8 +73,15 @@ func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *Topic logPath := filepath.Join(baseDir, topic+".log") statsPath := filepath.Join(baseDir, topic+".stats") + // 设置 title,如果未提供则使用 topic 名称 + title := config.Title + if title == "" { + title = topic + } + tp := &TopicProcessor{ topic: topic, + title: title, logPath: logPath, logger: logger, handler: config.Handler, @@ -110,7 +119,7 @@ func (tp *TopicProcessor) initializeComponents() error { tp.writer = writer // 3. 创建查询器(使用共享 index) - query, err := NewRecordQuery(tp.logPath, tp.index) + query, err := NewRecordQuery(tp.logPath, tp.index, tp.writer) if err != nil { tp.writer.Close() tp.index.Close() @@ -118,8 +127,8 @@ func (tp *TopicProcessor) initializeComponents() error { } tp.query = query - // 4. 创建游标(使用共享 index) - cursor, err := NewCursor(tp.logPath, tp.index) + // 4. 创建游标(使用共享 index 和 writer) + cursor, err := NewCursor(tp.logPath, tp.index, tp.writer) if err != nil { tp.query.Close() tp.writer.Close() @@ -241,17 +250,36 @@ func (tp *TopicProcessor) Start() error { tp.running = true - // 如果 tailer 已创建,启动它 - if tp.tailer != nil { - tp.logger.Debug("launching tailer goroutine") - tp.wg.Go(func() { - tp.logger.Debug("tailer goroutine started") - if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled { - tp.logger.Error("tailer error", "error", err) + // 启动定期保存统计信息的 goroutine + tp.wg.Add(1) + go func() { + defer tp.wg.Done() + ticker := time.NewTicker(tp.tailConfig.SaveInterval) + defer ticker.Stop() + + for { + select { + case <-tp.ctx.Done(): + return + case <-ticker.C: + if err := tp.stats.Save(); err != nil { + tp.logger.Error("failed to save stats", "error", err) + } } - tp.logger.Debug("tailer goroutine finished") - }) - } + } + }() + + // 如果 tailer 已创建,启动它 + tp.logger.Debug("launching tailer goroutine") + tp.wg.Add(1) + go func() { + defer tp.wg.Done() + tp.logger.Debug("tailer goroutine started") + if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled { + tp.logger.Error("tailer error", "error", err) + } + tp.logger.Debug("tailer goroutine finished") + }() // 发布启动事件 tp.eventBus.Publish(&Event{ @@ -295,6 +323,11 @@ func (tp *TopicProcessor) Topic() string { return tp.topic } +// Title 返回显示标题 +func (tp *TopicProcessor) Title() string { + return tp.title +} + // IsRunning 检查是否正在运行 func (tp *TopicProcessor) IsRunning() bool { tp.mu.RLock() @@ -363,6 +396,29 @@ func (tp *TopicProcessor) addStatusToRecords(records []*RecordWithIndex) []*Reco return results } +// addStatusToMetadata 为元数据添加状态信息 +func (tp *TopicProcessor) addStatusToMetadata(metadata []*RecordMetadata) []*RecordMetadataWithStatus { + // 获取窗口索引范围(用于状态判断) + var startIdx, endIdx int + tp.mu.RLock() + if tp.tailer != nil { + startIdx = tp.tailer.GetStartIndex() + endIdx = tp.tailer.GetEndIndex() + } + tp.mu.RUnlock() + + // 为每个元数据添加状态 + results := make([]*RecordMetadataWithStatus, len(metadata)) + for i, meta := range metadata { + results[i] = &RecordMetadataWithStatus{ + Metadata: meta, + Status: GetRecordStatus(meta.Index, startIdx, endIdx), + } + } + + return results +} + // QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录) // refIndex: 参考索引位置 // count: 查询数量 @@ -389,6 +445,58 @@ func (tp *TopicProcessor) QueryNewest(refIndex, count int) ([]*RecordWithStatus, return tp.addStatusToRecords(records), nil } +// QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据) +// refIndex: 参考索引位置 +// count: 查询数量 +// 返回的记录只包含元数据信息(索引、UUID、数据大小),按索引递增方向排序 +// 例如:QueryOldestMetadata(5, 3) 查询索引 2, 3, 4(不包含 5) +func (tp *TopicProcessor) QueryOldestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) { + metadata, err := tp.query.QueryOldestMetadata(refIndex, count) + if err != nil { + return nil, err + } + return tp.addStatusToMetadata(metadata), nil +} + +// QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据) +// refIndex: 参考索引位置 +// count: 查询数量 +// 返回的记录只包含元数据信息(索引、UUID、数据大小),按索引递增方向排序 +// 例如:QueryNewestMetadata(5, 3) 查询索引 6, 7, 8(不包含 5) +func (tp *TopicProcessor) QueryNewestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) { + metadata, err := tp.query.QueryNewestMetadata(refIndex, count) + if err != nil { + return nil, err + } + return tp.addStatusToMetadata(metadata), nil +} + +// QueryByIndex 根据索引查询单条记录的完整数据 +// index: 记录索引 +// 返回完整的记录数据,包含状态信息 +func (tp *TopicProcessor) QueryByIndex(index int) (*RecordWithStatus, error) { + record, err := tp.query.QueryByIndex(index) + if err != nil { + return nil, err + } + + // 获取当前处理窗口位置 + var startIdx, endIdx int + tp.mu.RLock() + if tp.tailer != nil { + startIdx = tp.tailer.GetStartIndex() + endIdx = tp.tailer.GetEndIndex() + } + tp.mu.RUnlock() + + status := GetRecordStatus(index, startIdx, endIdx) + return &RecordWithStatus{ + Record: record, + Index: index, + Status: status, + }, nil +} + // GetRecordCount 获取记录总数(统一接口) func (tp *TopicProcessor) GetRecordCount() int { return tp.index.Count() @@ -429,10 +537,39 @@ func (tp *TopicProcessor) QueryFromLast(count int) ([]*RecordWithStatus, error) // 获取记录总数 totalCount := tp.index.Count() + // 如果没有记录,返回空数组 + if totalCount == 0 { + return []*RecordWithStatus{}, nil + } + // QueryOldest(totalCount, count) 会从最后一条记录开始向前查询 + // totalCount 是记录总数,有效索引是 0 到 totalCount-1 + // 所以传入 totalCount 作为 refIndex,会查询 totalCount-count 到 totalCount-1 的记录 return tp.QueryOldest(totalCount, count) } +// QueryFromFirstMetadata 从第一条记录向索引递增方向查询元数据 +// count: 查询数量 +// 返回从第一条记录(索引 0)开始的记录元数据,包含状态信息 +// 例如:QueryFromFirstMetadata(3) 查询索引 0, 1, 2 的元数据 +func (tp *TopicProcessor) QueryFromFirstMetadata(count int) ([]*RecordMetadataWithStatus, error) { + // QueryNewestMetadata(-1, count) 会从索引 0 开始向后查询 + return tp.QueryNewestMetadata(-1, count) +} + +// QueryFromLastMetadata 从最后一条记录向索引递减方向查询元数据 +// count: 查询数量 +// 返回最后 N 条记录的元数据(按索引递增方向排序),包含状态信息 +// 例如:QueryFromLastMetadata(3) 查询最后 3 条记录的元数据 +func (tp *TopicProcessor) QueryFromLastMetadata(count int) ([]*RecordMetadataWithStatus, error) { + totalCount := tp.index.Count() + if totalCount == 0 { + return []*RecordMetadataWithStatus{}, nil + } + // QueryOldestMetadata(totalCount, count) 会从 totalCount 向前查询 count 条 + return tp.QueryOldestMetadata(totalCount, count) +} + // GetProcessingIndex 获取当前处理索引(窗口开始索引) func (tp *TopicProcessor) GetProcessingIndex() int { tp.mu.RLock() diff --git a/ui/index.html b/ui/index.html new file mode 100644 index 0000000..4705c3b --- /dev/null +++ b/ui/index.html @@ -0,0 +1,568 @@ + + + + + + Seqlog Dashboard + + + + + +
+ +
+
+
+
+ + + +

Seqlog Dashboard

+
+
+ 实时日志查询系统 +
+
+
+
+ +
+
+ +
+
+

Topics

+ +
+ 暂无 Topic +
+ +
+ +
+
+ + +
+

统计信息

+
+
+ 写入次数: + {{ stats.write_count }} +
+
+ 写入大小: + {{ formatBytes(stats.write_bytes) }} +
+
+ 已处理: + {{ stats.processed_count }} +
+
+ 处理大小: + {{ formatBytes(stats.processed_bytes) }} +
+
+ 错误次数: + {{ stats.error_count }} +
+
+
+
+ + +
+
+ +
+
+ + + + +
+ + + +
+
+ +
+ Topic: {{ selectedTopic }} +
+
+ + +
+
+ + + +

请选择一个 Topic 开始查看日志

+
+ +
+
+

加载中...

+
+ +
+ + + +

暂无日志记录

+
+ +
+
+
+
+ #{{ log.index }} + + {{ getStatusText(log.status) }} + + {{ formatBytes(log.dataSize || (log.data ? log.data.length : 0)) }} +
+ {{ formatUUID(log.uuid) }} +
+
+ {{ log.dataPreview || log.data }} +
+
+
+
+
+
+
+
+ + + +
+ +
+
+
+ + + + diff --git a/webui.go b/webui.go new file mode 100644 index 0000000..466701f --- /dev/null +++ b/webui.go @@ -0,0 +1,451 @@ +package seqlog + +import ( + "embed" + "encoding/json" + "io/fs" + "net/http" + "strconv" + + "github.com/google/uuid" +) + +//go:embed ui/* +var uiFS embed.FS + +// APIRecord API 响应的记录结构(包含完整数据) +type APIRecord struct { + Index int `json:"index"` + UUID uuid.UUID `json:"uuid"` + Data string `json:"data"` // 转换为字符串 + Status string `json:"status"` // 状态文本 +} + +// APIRecordMetadata API 响应的记录元数据结构(不包含完整数据) +type APIRecordMetadata struct { + Index int `json:"index"` + UUID uuid.UUID `json:"uuid"` + DataSize uint32 `json:"dataSize"` // 数据大小(字节) + DataPreview string `json:"dataPreview"` // 数据预览 + Status string `json:"status"` // 状态文本 + Full bool `json:"full"` // 是否完整 +} + +// toAPIRecord 将 RecordWithStatus 转换为 APIRecord +func toAPIRecord(r *RecordWithStatus) APIRecord { + statusText := "pending" + switch r.Status { + case StatusPending: + statusText = "pending" + case StatusProcessing: + statusText = "processing" + case StatusProcessed: + statusText = "processed" + } + + return APIRecord{ + Index: r.Index, + UUID: r.Record.UUID, + Data: string(r.Record.Data), // 转换为字符串 + Status: statusText, + } +} + +// toAPIRecords 批量转换 +func toAPIRecords(records []*RecordWithStatus) []APIRecord { + result := make([]APIRecord, len(records)) + for i, r := range records { + result[i] = toAPIRecord(r) + } + return result +} + +// toAPIRecordMetadata 将 RecordMetadataWithStatus 转换为 APIRecordMetadata +func toAPIRecordMetadata(r *RecordMetadataWithStatus) APIRecordMetadata { + statusText := "pending" + switch r.Status { + case StatusPending: + statusText = "pending" + case StatusProcessing: + statusText = "processing" + case StatusProcessed: + statusText = "processed" + } + + return APIRecordMetadata{ + Index: r.Metadata.Index, + UUID: r.Metadata.UUID, + DataSize: r.Metadata.DataSize, + DataPreview: r.Metadata.DataPreview, + Status: statusText, + Full: r.Metadata.Full, + } +} + +// toAPIRecordMetadatas 批量转换 +func toAPIRecordMetadatas(metadata []*RecordMetadataWithStatus) []APIRecordMetadata { + result := make([]APIRecordMetadata, len(metadata)) + for i, m := range metadata { + result[i] = toAPIRecordMetadata(m) + } + return result +} + +// RegisterWebUIRoutes 将 Web UI 路由注册到指定的 ServeMux +// 这允许你将 Web UI 集成到现有的 HTTP 服务器或其他框架中 +func (hub *LogHub) RegisterWebUIRoutes(mux *http.ServeMux) error { + // 提供静态文件 + uiContent, err := fs.Sub(uiFS, "ui") + if err != nil { + return err + } + mux.Handle("/", http.FileServer(http.FS(uiContent))) + + // API 端点 + mux.HandleFunc("/api/topics", hub.handleTopics) + mux.HandleFunc("/api/logs", hub.handleLogs) + mux.HandleFunc("/api/logs/first", hub.handleLogsFirst) + mux.HandleFunc("/api/logs/last", hub.handleLogsLast) + mux.HandleFunc("/api/logs/older", hub.handleLogsOlder) // 返回元数据 + mux.HandleFunc("/api/logs/newer", hub.handleLogsNewer) // 返回元数据 + mux.HandleFunc("/api/logs/record", hub.handleLogsByIndex) // 根据索引查询完整数据 + mux.HandleFunc("/api/stats", hub.handleStats) + + return nil +} + +// ServeUI 启动 Web UI 服务器 +// 这会创建一个新的 HTTP 服务器并监听指定地址 +func (hub *LogHub) ServeUI(addr string) error { + mux := http.NewServeMux() + if err := hub.RegisterWebUIRoutes(mux); err != nil { + return err + } + return http.ListenAndServe(addr, mux) +} + +// TopicInfo topic 信息 +type TopicInfo struct { + Topic string `json:"topic"` // topic 名称 + Title string `json:"title"` // 显示标题 +} + +// handleTopics 返回所有 topic 列表及其标题 +func (hub *LogHub) handleTopics(w http.ResponseWriter, r *http.Request) { + hub.mu.RLock() + topics := make([]TopicInfo, 0, len(hub.processors)) + for topic, processor := range hub.processors { + topics = append(topics, TopicInfo{ + Topic: topic, + Title: processor.Title(), + }) + } + hub.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(topics) +} + +// handleLogs 查询最新的 N 条日志(默认行为,只返回元数据) +func (hub *LogHub) handleLogs(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + if topic == "" { + http.Error(w, "topic parameter required", http.StatusBadRequest) + return + } + + count := 20 + if countStr := r.URL.Query().Get("count"); countStr != "" { + if c, err := strconv.Atoi(countStr); err == nil && c > 0 { + count = c + } + } + + hub.mu.RLock() + processor, exists := hub.processors[topic] + hub.mu.RUnlock() + + if !exists { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + + // 查询最新的 N 条记录元数据(从最后一条记录开始向前查询) + metadata, err := processor.QueryFromLastMetadata(count) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // 获取统计信息 + stats := processor.GetStats() + + // 转换为 API 格式 + apiMetadata := toAPIRecordMetadatas(metadata) + + response := map[string]interface{}{ + "records": apiMetadata, + "stats": stats, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// handleLogsFirst 查询从第一条记录开始的 N 条日志(只返回元数据) +func (hub *LogHub) handleLogsFirst(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + if topic == "" { + http.Error(w, "topic parameter required", http.StatusBadRequest) + return + } + + count := 20 + if countStr := r.URL.Query().Get("count"); countStr != "" { + if c, err := strconv.Atoi(countStr); err == nil && c > 0 { + count = c + } + } + + hub.mu.RLock() + processor, exists := hub.processors[topic] + hub.mu.RUnlock() + + if !exists { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + + // 从第一条记录开始查询元数据(索引 0 开始向后) + metadata, err := processor.QueryFromFirstMetadata(count) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // 转换为 API 格式 + apiMetadata := toAPIRecordMetadatas(metadata) + + response := map[string]interface{}{ + "records": apiMetadata, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// handleLogsLast 查询最后的 N 条日志(只返回元数据) +func (hub *LogHub) handleLogsLast(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + if topic == "" { + http.Error(w, "topic parameter required", http.StatusBadRequest) + return + } + + count := 20 + if countStr := r.URL.Query().Get("count"); countStr != "" { + if c, err := strconv.Atoi(countStr); err == nil && c > 0 { + count = c + } + } + + hub.mu.RLock() + processor, exists := hub.processors[topic] + hub.mu.RUnlock() + + if !exists { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + + // 查询最后的 N 条记录元数据(从最后一条向前查询) + metadata, err := processor.QueryFromLastMetadata(count) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // 转换为 API 格式 + apiMetadata := toAPIRecordMetadatas(metadata) + + response := map[string]any{ + "records": apiMetadata, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// handleStats 返回指定 topic 的统计信息 +func (hub *LogHub) handleStats(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + if topic == "" { + http.Error(w, "topic parameter required", http.StatusBadRequest) + return + } + + hub.mu.RLock() + processor, exists := hub.processors[topic] + hub.mu.RUnlock() + + if !exists { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + + stats := processor.GetStats() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) +} + +// handleLogsOlder 从参考索引向前查询更早的日志(无限滚动支持,只返回元数据) +func (hub *LogHub) handleLogsOlder(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + if topic == "" { + http.Error(w, "topic parameter required", http.StatusBadRequest) + return + } + + // 获取参考索引 + refIndexStr := r.URL.Query().Get("refIndex") + if refIndexStr == "" { + http.Error(w, "refIndex parameter required", http.StatusBadRequest) + return + } + refIndex, err := strconv.Atoi(refIndexStr) + if err != nil { + http.Error(w, "invalid refIndex", http.StatusBadRequest) + return + } + + count := 20 + if countStr := r.URL.Query().Get("count"); countStr != "" { + if c, err := strconv.Atoi(countStr); err == nil && c > 0 { + count = c + } + } + + hub.mu.RLock() + processor, exists := hub.processors[topic] + hub.mu.RUnlock() + + if !exists { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + + // 从 refIndex 向前查询更早的记录元数据 + metadata, err := processor.QueryOldestMetadata(refIndex, count) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // 转换为 API 格式 + apiMetadata := toAPIRecordMetadatas(metadata) + + response := map[string]any{ + "records": apiMetadata, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// handleLogsNewer 从参考索引向后查询更新的日志(无限滚动支持,只返回元数据) +func (hub *LogHub) handleLogsNewer(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + if topic == "" { + http.Error(w, "topic parameter required", http.StatusBadRequest) + return + } + + // 获取参考索引 + refIndexStr := r.URL.Query().Get("refIndex") + if refIndexStr == "" { + http.Error(w, "refIndex parameter required", http.StatusBadRequest) + return + } + refIndex, err := strconv.Atoi(refIndexStr) + if err != nil { + http.Error(w, "invalid refIndex", http.StatusBadRequest) + return + } + + count := 20 + if countStr := r.URL.Query().Get("count"); countStr != "" { + if c, err := strconv.Atoi(countStr); err == nil && c > 0 { + count = c + } + } + + hub.mu.RLock() + processor, exists := hub.processors[topic] + hub.mu.RUnlock() + + if !exists { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + + // 从 refIndex 向后查询更新的记录元数据 + metadata, err := processor.QueryNewestMetadata(refIndex, count) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // 转换为 API 格式 + apiMetadata := toAPIRecordMetadatas(metadata) + + response := map[string]any{ + "records": apiMetadata, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// handleLogsByIndex 根据索引查询单条记录的完整数据 +func (hub *LogHub) handleLogsByIndex(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + if topic == "" { + http.Error(w, "topic parameter required", http.StatusBadRequest) + return + } + + // 获取索引 + indexStr := r.URL.Query().Get("index") + if indexStr == "" { + http.Error(w, "index parameter required", http.StatusBadRequest) + return + } + index, err := strconv.Atoi(indexStr) + if err != nil { + http.Error(w, "invalid index", http.StatusBadRequest) + return + } + + hub.mu.RLock() + processor, exists := hub.processors[topic] + hub.mu.RUnlock() + + if !exists { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + + // 根据索引查询完整记录 + record, err := processor.QueryByIndex(index) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // 转换为 API 格式 + apiRecord := toAPIRecord(record) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(apiRecord) +} diff --git a/writer.go b/writer.go index c4d2ac5..10e2bd4 100644 --- a/writer.go +++ b/writer.go @@ -4,16 +4,19 @@ import ( "encoding/binary" "hash/crc32" "os" + "sync" "github.com/google/uuid" ) // LogWriter 日志写入器 type LogWriter struct { - fd *os.File - off int64 // 当前写入偏移 - wbuf []byte // 8 MiB 复用 - index *RecordIndex // 索引管理器(可选) + fd *os.File + off int64 // 当前写入偏移 + dirtyOff int64 // 最后一次写入偏移 + wbuf []byte // 8 MiB 复用 + index *RecordIndex // 索引管理器(可选) + mu sync.RWMutex // 保护 off 字段 } // NewLogWriter 创建一个新的日志写入器 @@ -30,10 +33,11 @@ func NewLogWriter(path string, index *RecordIndex) (*LogWriter, error) { off, _ := fd.Seek(0, 2) // 跳到尾部 w := &LogWriter{ - fd: fd, - off: off, - wbuf: make([]byte, 0, 8<<20), - index: index, + fd: fd, + off: off, + dirtyOff: -1, + wbuf: make([]byte, 0, 8<<20), + index: index, } return w, nil @@ -41,15 +45,24 @@ func NewLogWriter(path string, index *RecordIndex) (*LogWriter, error) { // Append 追加一条日志记录,返回该记录的偏移量 func (w *LogWriter) Append(data []byte) (int64, error) { + w.mu.Lock() + defer w.mu.Unlock() + // 记录当前偏移(返回给调用者,用于索引) offset := w.off + w.dirtyOff = offset + defer func() { + w.dirtyOff = -1 + }() + // 生成 UUID v4 id := uuid.New() - // 编码:[4B len][4B CRC][16B UUID][data] + // 编码:[4B len][8B offset][4B CRC][16B UUID][data] buf := w.wbuf[:0] buf = binary.LittleEndian.AppendUint32(buf, uint32(len(data))) + buf = binary.LittleEndian.AppendUint64(buf, uint64(offset)) buf = binary.LittleEndian.AppendUint32(buf, crc32.ChecksumIEEE(data)) buf = append(buf, id[:]...) buf = append(buf, data...) @@ -74,6 +87,19 @@ func (w *LogWriter) Append(data []byte) (int64, error) { return offset, nil } +// GetWriteOffset 获取当前写入偏移量(线程安全) +func (w *LogWriter) GetWriteOffset() int64 { + w.mu.RLock() + defer w.mu.RUnlock() + return w.off +} + +func (w *LogWriter) GetDirtyOffset() int64 { + w.mu.RLock() + defer w.mu.RUnlock() + return w.dirtyOff +} + // Close 关闭写入器 // 注意:不关闭 index,因为 index 是外部管理的共享资源 func (w *LogWriter) Close() error {