package seqlog import ( "embed" "encoding/json" "io/fs" "net/http" "strconv" "github.com/google/uuid" ) //go:embed ui/* var uiFS embed.FS // APIRecord API 响应的记录结构(包含完整数据) type APIRecord struct { Index int `json:"index"` UUID uuid.UUID `json:"uuid"` Data string `json:"data"` // 转换为字符串 Status string `json:"status"` // 状态文本 } // APIRecordMetadata API 响应的记录元数据结构(不包含完整数据) type APIRecordMetadata struct { Index int `json:"index"` UUID uuid.UUID `json:"uuid"` DataSize uint32 `json:"dataSize"` // 数据大小(字节) DataPreview string `json:"dataPreview"` // 数据预览 Status string `json:"status"` // 状态文本 Full bool `json:"full"` // 是否完整 } // toAPIRecord 将 RecordWithStatus 转换为 APIRecord func toAPIRecord(r *RecordWithStatus) APIRecord { statusText := "pending" switch r.Status { case StatusPending: statusText = "pending" case StatusProcessing: statusText = "processing" case StatusProcessed: statusText = "processed" } return APIRecord{ Index: r.Index, UUID: r.Record.UUID, Data: string(r.Record.Data), // 转换为字符串 Status: statusText, } } // toAPIRecords 批量转换 func toAPIRecords(records []*RecordWithStatus) []APIRecord { result := make([]APIRecord, len(records)) for i, r := range records { result[i] = toAPIRecord(r) } return result } // toAPIRecordMetadata 将 RecordMetadataWithStatus 转换为 APIRecordMetadata func toAPIRecordMetadata(r *RecordMetadataWithStatus) APIRecordMetadata { statusText := "pending" switch r.Status { case StatusPending: statusText = "pending" case StatusProcessing: statusText = "processing" case StatusProcessed: statusText = "processed" } return APIRecordMetadata{ Index: r.Metadata.Index, UUID: r.Metadata.UUID, DataSize: r.Metadata.DataSize, DataPreview: r.Metadata.DataPreview, Status: statusText, Full: r.Metadata.Full, } } // toAPIRecordMetadatas 批量转换 func toAPIRecordMetadatas(metadata []*RecordMetadataWithStatus) []APIRecordMetadata { result := make([]APIRecordMetadata, len(metadata)) for i, m := range metadata { result[i] = toAPIRecordMetadata(m) } return result } // RegisterWebUIRoutes 将 Web UI 路由注册到指定的 ServeMux // 这允许你将 Web UI 集成到现有的 HTTP 服务器或其他框架中 func (hub *LogHub) RegisterWebUIRoutes(mux *http.ServeMux) error { // 提供静态文件 uiContent, err := fs.Sub(uiFS, "ui") if err != nil { return err } mux.Handle("/", http.FileServer(http.FS(uiContent))) // API 端点 mux.HandleFunc("/api/topics", hub.handleTopics) mux.HandleFunc("/api/logs", hub.handleLogs) mux.HandleFunc("/api/logs/first", hub.handleLogsFirst) mux.HandleFunc("/api/logs/last", hub.handleLogsLast) mux.HandleFunc("/api/logs/older", hub.handleLogsOlder) // 返回元数据 mux.HandleFunc("/api/logs/newer", hub.handleLogsNewer) // 返回元数据 mux.HandleFunc("/api/logs/record", hub.handleLogsByIndex) // 根据索引查询完整数据 mux.HandleFunc("/api/stats", hub.handleStats) return nil } // ServeUI 启动 Web UI 服务器 // 这会创建一个新的 HTTP 服务器并监听指定地址 func (hub *LogHub) ServeUI(addr string) error { mux := http.NewServeMux() if err := hub.RegisterWebUIRoutes(mux); err != nil { return err } return http.ListenAndServe(addr, mux) } // TopicInfo topic 信息 type TopicInfo struct { Topic string `json:"topic"` // topic 名称 Title string `json:"title"` // 显示标题 } // handleTopics 返回所有 topic 列表及其标题 func (hub *LogHub) handleTopics(w http.ResponseWriter, r *http.Request) { hub.mu.RLock() topics := make([]TopicInfo, 0, len(hub.processors)) for topic, processor := range hub.processors { topics = append(topics, TopicInfo{ Topic: topic, Title: processor.Title(), }) } hub.mu.RUnlock() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(topics) } // handleLogs 查询最新的 N 条日志(默认行为,只返回元数据) func (hub *LogHub) handleLogs(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { http.Error(w, "topic parameter required", http.StatusBadRequest) return } count := 20 if countStr := r.URL.Query().Get("count"); countStr != "" { if c, err := strconv.Atoi(countStr); err == nil && c > 0 { count = c } } hub.mu.RLock() processor, exists := hub.processors[topic] hub.mu.RUnlock() if !exists { http.Error(w, "topic not found", http.StatusNotFound) return } // 查询最新的 N 条记录元数据(从最后一条记录开始向前查询) metadata, err := processor.QueryFromLastMetadata(count) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // 获取统计信息 stats := processor.GetStats() // 转换为 API 格式 apiMetadata := toAPIRecordMetadatas(metadata) response := map[string]interface{}{ "records": apiMetadata, "stats": stats, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) } // handleLogsFirst 查询从第一条记录开始的 N 条日志(只返回元数据) func (hub *LogHub) handleLogsFirst(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { http.Error(w, "topic parameter required", http.StatusBadRequest) return } count := 20 if countStr := r.URL.Query().Get("count"); countStr != "" { if c, err := strconv.Atoi(countStr); err == nil && c > 0 { count = c } } hub.mu.RLock() processor, exists := hub.processors[topic] hub.mu.RUnlock() if !exists { http.Error(w, "topic not found", http.StatusNotFound) return } // 从第一条记录开始查询元数据(索引 0 开始向后) metadata, err := processor.QueryFromFirstMetadata(count) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // 转换为 API 格式 apiMetadata := toAPIRecordMetadatas(metadata) response := map[string]interface{}{ "records": apiMetadata, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) } // handleLogsLast 查询最后的 N 条日志(只返回元数据) func (hub *LogHub) handleLogsLast(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { http.Error(w, "topic parameter required", http.StatusBadRequest) return } count := 20 if countStr := r.URL.Query().Get("count"); countStr != "" { if c, err := strconv.Atoi(countStr); err == nil && c > 0 { count = c } } hub.mu.RLock() processor, exists := hub.processors[topic] hub.mu.RUnlock() if !exists { http.Error(w, "topic not found", http.StatusNotFound) return } // 查询最后的 N 条记录元数据(从最后一条向前查询) metadata, err := processor.QueryFromLastMetadata(count) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // 转换为 API 格式 apiMetadata := toAPIRecordMetadatas(metadata) response := map[string]any{ "records": apiMetadata, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) } // handleStats 返回指定 topic 的统计信息 func (hub *LogHub) handleStats(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { http.Error(w, "topic parameter required", http.StatusBadRequest) return } hub.mu.RLock() processor, exists := hub.processors[topic] hub.mu.RUnlock() if !exists { http.Error(w, "topic not found", http.StatusNotFound) return } stats := processor.GetStats() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(stats) } // handleLogsOlder 从参考索引向前查询更早的日志(无限滚动支持,只返回元数据) func (hub *LogHub) handleLogsOlder(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { http.Error(w, "topic parameter required", http.StatusBadRequest) return } // 获取参考索引 refIndexStr := r.URL.Query().Get("refIndex") if refIndexStr == "" { http.Error(w, "refIndex parameter required", http.StatusBadRequest) return } refIndex, err := strconv.Atoi(refIndexStr) if err != nil { http.Error(w, "invalid refIndex", http.StatusBadRequest) return } count := 20 if countStr := r.URL.Query().Get("count"); countStr != "" { if c, err := strconv.Atoi(countStr); err == nil && c > 0 { count = c } } hub.mu.RLock() processor, exists := hub.processors[topic] hub.mu.RUnlock() if !exists { http.Error(w, "topic not found", http.StatusNotFound) return } // 从 refIndex 向前查询更早的记录元数据 metadata, err := processor.QueryOldestMetadata(refIndex, count) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // 转换为 API 格式 apiMetadata := toAPIRecordMetadatas(metadata) response := map[string]any{ "records": apiMetadata, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) } // handleLogsNewer 从参考索引向后查询更新的日志(无限滚动支持,只返回元数据) func (hub *LogHub) handleLogsNewer(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { http.Error(w, "topic parameter required", http.StatusBadRequest) return } // 获取参考索引 refIndexStr := r.URL.Query().Get("refIndex") if refIndexStr == "" { http.Error(w, "refIndex parameter required", http.StatusBadRequest) return } refIndex, err := strconv.Atoi(refIndexStr) if err != nil { http.Error(w, "invalid refIndex", http.StatusBadRequest) return } count := 20 if countStr := r.URL.Query().Get("count"); countStr != "" { if c, err := strconv.Atoi(countStr); err == nil && c > 0 { count = c } } hub.mu.RLock() processor, exists := hub.processors[topic] hub.mu.RUnlock() if !exists { http.Error(w, "topic not found", http.StatusNotFound) return } // 从 refIndex 向后查询更新的记录元数据 metadata, err := processor.QueryNewestMetadata(refIndex, count) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // 转换为 API 格式 apiMetadata := toAPIRecordMetadatas(metadata) response := map[string]any{ "records": apiMetadata, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) } // handleLogsByIndex 根据索引查询单条记录的完整数据 func (hub *LogHub) handleLogsByIndex(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { http.Error(w, "topic parameter required", http.StatusBadRequest) return } // 获取索引 indexStr := r.URL.Query().Get("index") if indexStr == "" { http.Error(w, "index parameter required", http.StatusBadRequest) return } index, err := strconv.Atoi(indexStr) if err != nil { http.Error(w, "invalid index", http.StatusBadRequest) return } hub.mu.RLock() processor, exists := hub.processors[topic] hub.mu.RUnlock() if !exists { http.Error(w, "topic not found", http.StatusNotFound) return } // 根据索引查询完整记录 record, err := processor.QueryByIndex(index) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // 转换为 API 格式 apiRecord := toAPIRecord(record) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(apiRecord) }