Files
seqlog/webui.go
bourdon 810664eb12 重构:优化记录格式并修复核心功能
- 修改记录存储格式为 [4B len][8B offset][4B CRC][16B UUID][data]
- 修复 TopicProcessor 中 WaitGroup 使用错误导致 handler 不执行的问题
- 修复写入保护逻辑,避免 dirtyOffset=-1 时误判为写入中
- 添加统计信息定期持久化功能
- 改进 UTF-8 字符截断处理,防止 CJK 字符乱码
- 优化 Web UI:显示人类可读的文件大小,支持点击外部关闭弹窗
- 重构示例代码,添加 webui 和 webui_integration 示例

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 17:54:49 +08:00

452 lines
12 KiB
Go

package seqlog
import (
"embed"
"encoding/json"
"io/fs"
"net/http"
"strconv"
"github.com/google/uuid"
)
//go:embed ui/*
var uiFS embed.FS
// APIRecord API 响应的记录结构(包含完整数据)
type APIRecord struct {
Index int `json:"index"`
UUID uuid.UUID `json:"uuid"`
Data string `json:"data"` // 转换为字符串
Status string `json:"status"` // 状态文本
}
// APIRecordMetadata API 响应的记录元数据结构(不包含完整数据)
type APIRecordMetadata struct {
Index int `json:"index"`
UUID uuid.UUID `json:"uuid"`
DataSize uint32 `json:"dataSize"` // 数据大小(字节)
DataPreview string `json:"dataPreview"` // 数据预览
Status string `json:"status"` // 状态文本
Full bool `json:"full"` // 是否完整
}
// toAPIRecord 将 RecordWithStatus 转换为 APIRecord
func toAPIRecord(r *RecordWithStatus) APIRecord {
statusText := "pending"
switch r.Status {
case StatusPending:
statusText = "pending"
case StatusProcessing:
statusText = "processing"
case StatusProcessed:
statusText = "processed"
}
return APIRecord{
Index: r.Index,
UUID: r.Record.UUID,
Data: string(r.Record.Data), // 转换为字符串
Status: statusText,
}
}
// toAPIRecords 批量转换
func toAPIRecords(records []*RecordWithStatus) []APIRecord {
result := make([]APIRecord, len(records))
for i, r := range records {
result[i] = toAPIRecord(r)
}
return result
}
// toAPIRecordMetadata 将 RecordMetadataWithStatus 转换为 APIRecordMetadata
func toAPIRecordMetadata(r *RecordMetadataWithStatus) APIRecordMetadata {
statusText := "pending"
switch r.Status {
case StatusPending:
statusText = "pending"
case StatusProcessing:
statusText = "processing"
case StatusProcessed:
statusText = "processed"
}
return APIRecordMetadata{
Index: r.Metadata.Index,
UUID: r.Metadata.UUID,
DataSize: r.Metadata.DataSize,
DataPreview: r.Metadata.DataPreview,
Status: statusText,
Full: r.Metadata.Full,
}
}
// toAPIRecordMetadatas 批量转换
func toAPIRecordMetadatas(metadata []*RecordMetadataWithStatus) []APIRecordMetadata {
result := make([]APIRecordMetadata, len(metadata))
for i, m := range metadata {
result[i] = toAPIRecordMetadata(m)
}
return result
}
// RegisterWebUIRoutes 将 Web UI 路由注册到指定的 ServeMux
// 这允许你将 Web UI 集成到现有的 HTTP 服务器或其他框架中
func (hub *LogHub) RegisterWebUIRoutes(mux *http.ServeMux) error {
// 提供静态文件
uiContent, err := fs.Sub(uiFS, "ui")
if err != nil {
return err
}
mux.Handle("/", http.FileServer(http.FS(uiContent)))
// API 端点
mux.HandleFunc("/api/topics", hub.handleTopics)
mux.HandleFunc("/api/logs", hub.handleLogs)
mux.HandleFunc("/api/logs/first", hub.handleLogsFirst)
mux.HandleFunc("/api/logs/last", hub.handleLogsLast)
mux.HandleFunc("/api/logs/older", hub.handleLogsOlder) // 返回元数据
mux.HandleFunc("/api/logs/newer", hub.handleLogsNewer) // 返回元数据
mux.HandleFunc("/api/logs/record", hub.handleLogsByIndex) // 根据索引查询完整数据
mux.HandleFunc("/api/stats", hub.handleStats)
return nil
}
// ServeUI 启动 Web UI 服务器
// 这会创建一个新的 HTTP 服务器并监听指定地址
func (hub *LogHub) ServeUI(addr string) error {
mux := http.NewServeMux()
if err := hub.RegisterWebUIRoutes(mux); err != nil {
return err
}
return http.ListenAndServe(addr, mux)
}
// TopicInfo topic 信息
type TopicInfo struct {
Topic string `json:"topic"` // topic 名称
Title string `json:"title"` // 显示标题
}
// handleTopics 返回所有 topic 列表及其标题
func (hub *LogHub) handleTopics(w http.ResponseWriter, r *http.Request) {
hub.mu.RLock()
topics := make([]TopicInfo, 0, len(hub.processors))
for topic, processor := range hub.processors {
topics = append(topics, TopicInfo{
Topic: topic,
Title: processor.Title(),
})
}
hub.mu.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(topics)
}
// handleLogs 查询最新的 N 条日志(默认行为,只返回元数据)
func (hub *LogHub) handleLogs(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
http.Error(w, "topic parameter required", http.StatusBadRequest)
return
}
count := 20
if countStr := r.URL.Query().Get("count"); countStr != "" {
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
count = c
}
}
hub.mu.RLock()
processor, exists := hub.processors[topic]
hub.mu.RUnlock()
if !exists {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
// 查询最新的 N 条记录元数据(从最后一条记录开始向前查询)
metadata, err := processor.QueryFromLastMetadata(count)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 获取统计信息
stats := processor.GetStats()
// 转换为 API 格式
apiMetadata := toAPIRecordMetadatas(metadata)
response := map[string]interface{}{
"records": apiMetadata,
"stats": stats,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// handleLogsFirst 查询从第一条记录开始的 N 条日志(只返回元数据)
func (hub *LogHub) handleLogsFirst(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
http.Error(w, "topic parameter required", http.StatusBadRequest)
return
}
count := 20
if countStr := r.URL.Query().Get("count"); countStr != "" {
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
count = c
}
}
hub.mu.RLock()
processor, exists := hub.processors[topic]
hub.mu.RUnlock()
if !exists {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
// 从第一条记录开始查询元数据(索引 0 开始向后)
metadata, err := processor.QueryFromFirstMetadata(count)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 转换为 API 格式
apiMetadata := toAPIRecordMetadatas(metadata)
response := map[string]interface{}{
"records": apiMetadata,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// handleLogsLast 查询最后的 N 条日志(只返回元数据)
func (hub *LogHub) handleLogsLast(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
http.Error(w, "topic parameter required", http.StatusBadRequest)
return
}
count := 20
if countStr := r.URL.Query().Get("count"); countStr != "" {
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
count = c
}
}
hub.mu.RLock()
processor, exists := hub.processors[topic]
hub.mu.RUnlock()
if !exists {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
// 查询最后的 N 条记录元数据(从最后一条向前查询)
metadata, err := processor.QueryFromLastMetadata(count)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 转换为 API 格式
apiMetadata := toAPIRecordMetadatas(metadata)
response := map[string]any{
"records": apiMetadata,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// handleStats 返回指定 topic 的统计信息
func (hub *LogHub) handleStats(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
http.Error(w, "topic parameter required", http.StatusBadRequest)
return
}
hub.mu.RLock()
processor, exists := hub.processors[topic]
hub.mu.RUnlock()
if !exists {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
stats := processor.GetStats()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
// handleLogsOlder 从参考索引向前查询更早的日志(无限滚动支持,只返回元数据)
func (hub *LogHub) handleLogsOlder(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
http.Error(w, "topic parameter required", http.StatusBadRequest)
return
}
// 获取参考索引
refIndexStr := r.URL.Query().Get("refIndex")
if refIndexStr == "" {
http.Error(w, "refIndex parameter required", http.StatusBadRequest)
return
}
refIndex, err := strconv.Atoi(refIndexStr)
if err != nil {
http.Error(w, "invalid refIndex", http.StatusBadRequest)
return
}
count := 20
if countStr := r.URL.Query().Get("count"); countStr != "" {
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
count = c
}
}
hub.mu.RLock()
processor, exists := hub.processors[topic]
hub.mu.RUnlock()
if !exists {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
// 从 refIndex 向前查询更早的记录元数据
metadata, err := processor.QueryOldestMetadata(refIndex, count)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 转换为 API 格式
apiMetadata := toAPIRecordMetadatas(metadata)
response := map[string]any{
"records": apiMetadata,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// handleLogsNewer 从参考索引向后查询更新的日志(无限滚动支持,只返回元数据)
func (hub *LogHub) handleLogsNewer(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
http.Error(w, "topic parameter required", http.StatusBadRequest)
return
}
// 获取参考索引
refIndexStr := r.URL.Query().Get("refIndex")
if refIndexStr == "" {
http.Error(w, "refIndex parameter required", http.StatusBadRequest)
return
}
refIndex, err := strconv.Atoi(refIndexStr)
if err != nil {
http.Error(w, "invalid refIndex", http.StatusBadRequest)
return
}
count := 20
if countStr := r.URL.Query().Get("count"); countStr != "" {
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
count = c
}
}
hub.mu.RLock()
processor, exists := hub.processors[topic]
hub.mu.RUnlock()
if !exists {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
// 从 refIndex 向后查询更新的记录元数据
metadata, err := processor.QueryNewestMetadata(refIndex, count)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 转换为 API 格式
apiMetadata := toAPIRecordMetadatas(metadata)
response := map[string]any{
"records": apiMetadata,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// handleLogsByIndex 根据索引查询单条记录的完整数据
func (hub *LogHub) handleLogsByIndex(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
http.Error(w, "topic parameter required", http.StatusBadRequest)
return
}
// 获取索引
indexStr := r.URL.Query().Get("index")
if indexStr == "" {
http.Error(w, "index parameter required", http.StatusBadRequest)
return
}
index, err := strconv.Atoi(indexStr)
if err != nil {
http.Error(w, "invalid index", http.StatusBadRequest)
return
}
hub.mu.RLock()
processor, exists := hub.processors[topic]
hub.mu.RUnlock()
if !exists {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
// 根据索引查询完整记录
record, err := processor.QueryByIndex(index)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 转换为 API 格式
apiRecord := toAPIRecord(record)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(apiRecord)
}