- 修改记录存储格式为 [4B len][8B offset][4B CRC][16B UUID][data] - 修复 TopicProcessor 中 WaitGroup 使用错误导致 handler 不执行的问题 - 修复写入保护逻辑,避免 dirtyOffset=-1 时误判为写入中 - 添加统计信息定期持久化功能 - 改进 UTF-8 字符截断处理,防止 CJK 字符乱码 - 优化 Web UI:显示人类可读的文件大小,支持点击外部关闭弹窗 - 重构示例代码,添加 webui 和 webui_integration 示例 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
452 lines
12 KiB
Go
452 lines
12 KiB
Go
package seqlog
|
|
|
|
import (
|
|
"embed"
|
|
"encoding/json"
|
|
"io/fs"
|
|
"net/http"
|
|
"strconv"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
//go:embed ui/*
|
|
var uiFS embed.FS
|
|
|
|
// APIRecord API 响应的记录结构(包含完整数据)
|
|
type APIRecord struct {
|
|
Index int `json:"index"`
|
|
UUID uuid.UUID `json:"uuid"`
|
|
Data string `json:"data"` // 转换为字符串
|
|
Status string `json:"status"` // 状态文本
|
|
}
|
|
|
|
// APIRecordMetadata API 响应的记录元数据结构(不包含完整数据)
|
|
type APIRecordMetadata struct {
|
|
Index int `json:"index"`
|
|
UUID uuid.UUID `json:"uuid"`
|
|
DataSize uint32 `json:"dataSize"` // 数据大小(字节)
|
|
DataPreview string `json:"dataPreview"` // 数据预览
|
|
Status string `json:"status"` // 状态文本
|
|
Full bool `json:"full"` // 是否完整
|
|
}
|
|
|
|
// toAPIRecord 将 RecordWithStatus 转换为 APIRecord
|
|
func toAPIRecord(r *RecordWithStatus) APIRecord {
|
|
statusText := "pending"
|
|
switch r.Status {
|
|
case StatusPending:
|
|
statusText = "pending"
|
|
case StatusProcessing:
|
|
statusText = "processing"
|
|
case StatusProcessed:
|
|
statusText = "processed"
|
|
}
|
|
|
|
return APIRecord{
|
|
Index: r.Index,
|
|
UUID: r.Record.UUID,
|
|
Data: string(r.Record.Data), // 转换为字符串
|
|
Status: statusText,
|
|
}
|
|
}
|
|
|
|
// toAPIRecords 批量转换
|
|
func toAPIRecords(records []*RecordWithStatus) []APIRecord {
|
|
result := make([]APIRecord, len(records))
|
|
for i, r := range records {
|
|
result[i] = toAPIRecord(r)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// toAPIRecordMetadata 将 RecordMetadataWithStatus 转换为 APIRecordMetadata
|
|
func toAPIRecordMetadata(r *RecordMetadataWithStatus) APIRecordMetadata {
|
|
statusText := "pending"
|
|
switch r.Status {
|
|
case StatusPending:
|
|
statusText = "pending"
|
|
case StatusProcessing:
|
|
statusText = "processing"
|
|
case StatusProcessed:
|
|
statusText = "processed"
|
|
}
|
|
|
|
return APIRecordMetadata{
|
|
Index: r.Metadata.Index,
|
|
UUID: r.Metadata.UUID,
|
|
DataSize: r.Metadata.DataSize,
|
|
DataPreview: r.Metadata.DataPreview,
|
|
Status: statusText,
|
|
Full: r.Metadata.Full,
|
|
}
|
|
}
|
|
|
|
// toAPIRecordMetadatas 批量转换
|
|
func toAPIRecordMetadatas(metadata []*RecordMetadataWithStatus) []APIRecordMetadata {
|
|
result := make([]APIRecordMetadata, len(metadata))
|
|
for i, m := range metadata {
|
|
result[i] = toAPIRecordMetadata(m)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// RegisterWebUIRoutes 将 Web UI 路由注册到指定的 ServeMux
|
|
// 这允许你将 Web UI 集成到现有的 HTTP 服务器或其他框架中
|
|
func (hub *LogHub) RegisterWebUIRoutes(mux *http.ServeMux) error {
|
|
// 提供静态文件
|
|
uiContent, err := fs.Sub(uiFS, "ui")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mux.Handle("/", http.FileServer(http.FS(uiContent)))
|
|
|
|
// API 端点
|
|
mux.HandleFunc("/api/topics", hub.handleTopics)
|
|
mux.HandleFunc("/api/logs", hub.handleLogs)
|
|
mux.HandleFunc("/api/logs/first", hub.handleLogsFirst)
|
|
mux.HandleFunc("/api/logs/last", hub.handleLogsLast)
|
|
mux.HandleFunc("/api/logs/older", hub.handleLogsOlder) // 返回元数据
|
|
mux.HandleFunc("/api/logs/newer", hub.handleLogsNewer) // 返回元数据
|
|
mux.HandleFunc("/api/logs/record", hub.handleLogsByIndex) // 根据索引查询完整数据
|
|
mux.HandleFunc("/api/stats", hub.handleStats)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ServeUI 启动 Web UI 服务器
|
|
// 这会创建一个新的 HTTP 服务器并监听指定地址
|
|
func (hub *LogHub) ServeUI(addr string) error {
|
|
mux := http.NewServeMux()
|
|
if err := hub.RegisterWebUIRoutes(mux); err != nil {
|
|
return err
|
|
}
|
|
return http.ListenAndServe(addr, mux)
|
|
}
|
|
|
|
// TopicInfo topic 信息
|
|
type TopicInfo struct {
|
|
Topic string `json:"topic"` // topic 名称
|
|
Title string `json:"title"` // 显示标题
|
|
}
|
|
|
|
// handleTopics 返回所有 topic 列表及其标题
|
|
func (hub *LogHub) handleTopics(w http.ResponseWriter, r *http.Request) {
|
|
hub.mu.RLock()
|
|
topics := make([]TopicInfo, 0, len(hub.processors))
|
|
for topic, processor := range hub.processors {
|
|
topics = append(topics, TopicInfo{
|
|
Topic: topic,
|
|
Title: processor.Title(),
|
|
})
|
|
}
|
|
hub.mu.RUnlock()
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(topics)
|
|
}
|
|
|
|
// handleLogs 查询最新的 N 条日志(默认行为,只返回元数据)
|
|
func (hub *LogHub) handleLogs(w http.ResponseWriter, r *http.Request) {
|
|
topic := r.URL.Query().Get("topic")
|
|
if topic == "" {
|
|
http.Error(w, "topic parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
count := 20
|
|
if countStr := r.URL.Query().Get("count"); countStr != "" {
|
|
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
|
|
count = c
|
|
}
|
|
}
|
|
|
|
hub.mu.RLock()
|
|
processor, exists := hub.processors[topic]
|
|
hub.mu.RUnlock()
|
|
|
|
if !exists {
|
|
http.Error(w, "topic not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// 查询最新的 N 条记录元数据(从最后一条记录开始向前查询)
|
|
metadata, err := processor.QueryFromLastMetadata(count)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// 获取统计信息
|
|
stats := processor.GetStats()
|
|
|
|
// 转换为 API 格式
|
|
apiMetadata := toAPIRecordMetadatas(metadata)
|
|
|
|
response := map[string]interface{}{
|
|
"records": apiMetadata,
|
|
"stats": stats,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// handleLogsFirst 查询从第一条记录开始的 N 条日志(只返回元数据)
|
|
func (hub *LogHub) handleLogsFirst(w http.ResponseWriter, r *http.Request) {
|
|
topic := r.URL.Query().Get("topic")
|
|
if topic == "" {
|
|
http.Error(w, "topic parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
count := 20
|
|
if countStr := r.URL.Query().Get("count"); countStr != "" {
|
|
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
|
|
count = c
|
|
}
|
|
}
|
|
|
|
hub.mu.RLock()
|
|
processor, exists := hub.processors[topic]
|
|
hub.mu.RUnlock()
|
|
|
|
if !exists {
|
|
http.Error(w, "topic not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// 从第一条记录开始查询元数据(索引 0 开始向后)
|
|
metadata, err := processor.QueryFromFirstMetadata(count)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// 转换为 API 格式
|
|
apiMetadata := toAPIRecordMetadatas(metadata)
|
|
|
|
response := map[string]interface{}{
|
|
"records": apiMetadata,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// handleLogsLast 查询最后的 N 条日志(只返回元数据)
|
|
func (hub *LogHub) handleLogsLast(w http.ResponseWriter, r *http.Request) {
|
|
topic := r.URL.Query().Get("topic")
|
|
if topic == "" {
|
|
http.Error(w, "topic parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
count := 20
|
|
if countStr := r.URL.Query().Get("count"); countStr != "" {
|
|
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
|
|
count = c
|
|
}
|
|
}
|
|
|
|
hub.mu.RLock()
|
|
processor, exists := hub.processors[topic]
|
|
hub.mu.RUnlock()
|
|
|
|
if !exists {
|
|
http.Error(w, "topic not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// 查询最后的 N 条记录元数据(从最后一条向前查询)
|
|
metadata, err := processor.QueryFromLastMetadata(count)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// 转换为 API 格式
|
|
apiMetadata := toAPIRecordMetadatas(metadata)
|
|
|
|
response := map[string]any{
|
|
"records": apiMetadata,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// handleStats 返回指定 topic 的统计信息
|
|
func (hub *LogHub) handleStats(w http.ResponseWriter, r *http.Request) {
|
|
topic := r.URL.Query().Get("topic")
|
|
if topic == "" {
|
|
http.Error(w, "topic parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
hub.mu.RLock()
|
|
processor, exists := hub.processors[topic]
|
|
hub.mu.RUnlock()
|
|
|
|
if !exists {
|
|
http.Error(w, "topic not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
stats := processor.GetStats()
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(stats)
|
|
}
|
|
|
|
// handleLogsOlder 从参考索引向前查询更早的日志(无限滚动支持,只返回元数据)
|
|
func (hub *LogHub) handleLogsOlder(w http.ResponseWriter, r *http.Request) {
|
|
topic := r.URL.Query().Get("topic")
|
|
if topic == "" {
|
|
http.Error(w, "topic parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// 获取参考索引
|
|
refIndexStr := r.URL.Query().Get("refIndex")
|
|
if refIndexStr == "" {
|
|
http.Error(w, "refIndex parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
refIndex, err := strconv.Atoi(refIndexStr)
|
|
if err != nil {
|
|
http.Error(w, "invalid refIndex", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
count := 20
|
|
if countStr := r.URL.Query().Get("count"); countStr != "" {
|
|
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
|
|
count = c
|
|
}
|
|
}
|
|
|
|
hub.mu.RLock()
|
|
processor, exists := hub.processors[topic]
|
|
hub.mu.RUnlock()
|
|
|
|
if !exists {
|
|
http.Error(w, "topic not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// 从 refIndex 向前查询更早的记录元数据
|
|
metadata, err := processor.QueryOldestMetadata(refIndex, count)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// 转换为 API 格式
|
|
apiMetadata := toAPIRecordMetadatas(metadata)
|
|
|
|
response := map[string]any{
|
|
"records": apiMetadata,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// handleLogsNewer 从参考索引向后查询更新的日志(无限滚动支持,只返回元数据)
|
|
func (hub *LogHub) handleLogsNewer(w http.ResponseWriter, r *http.Request) {
|
|
topic := r.URL.Query().Get("topic")
|
|
if topic == "" {
|
|
http.Error(w, "topic parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// 获取参考索引
|
|
refIndexStr := r.URL.Query().Get("refIndex")
|
|
if refIndexStr == "" {
|
|
http.Error(w, "refIndex parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
refIndex, err := strconv.Atoi(refIndexStr)
|
|
if err != nil {
|
|
http.Error(w, "invalid refIndex", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
count := 20
|
|
if countStr := r.URL.Query().Get("count"); countStr != "" {
|
|
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
|
|
count = c
|
|
}
|
|
}
|
|
|
|
hub.mu.RLock()
|
|
processor, exists := hub.processors[topic]
|
|
hub.mu.RUnlock()
|
|
|
|
if !exists {
|
|
http.Error(w, "topic not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// 从 refIndex 向后查询更新的记录元数据
|
|
metadata, err := processor.QueryNewestMetadata(refIndex, count)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// 转换为 API 格式
|
|
apiMetadata := toAPIRecordMetadatas(metadata)
|
|
|
|
response := map[string]any{
|
|
"records": apiMetadata,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// handleLogsByIndex 根据索引查询单条记录的完整数据
|
|
func (hub *LogHub) handleLogsByIndex(w http.ResponseWriter, r *http.Request) {
|
|
topic := r.URL.Query().Get("topic")
|
|
if topic == "" {
|
|
http.Error(w, "topic parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// 获取索引
|
|
indexStr := r.URL.Query().Get("index")
|
|
if indexStr == "" {
|
|
http.Error(w, "index parameter required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
index, err := strconv.Atoi(indexStr)
|
|
if err != nil {
|
|
http.Error(w, "invalid index", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
hub.mu.RLock()
|
|
processor, exists := hub.processors[topic]
|
|
hub.mu.RUnlock()
|
|
|
|
if !exists {
|
|
http.Error(w, "topic not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// 根据索引查询完整记录
|
|
record, err := processor.QueryByIndex(index)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// 转换为 API 格式
|
|
apiRecord := toAPIRecord(record)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(apiRecord)
|
|
}
|