package main import ( "encoding/json" "fmt" "log/slog" "math/rand" "net/http" "os" "strconv" "sync" "time" "code.tczkiot.com/seqlog" ) var ( seq *seqlog.Seqlog logger *slog.Logger queryCache = make(map[string]*seqlog.RecordQuery) queryCacheMu sync.RWMutex ) func main() { // 初始化 logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, })) // 创建 Seqlog seq = seqlog.NewSeqlog("logs", logger, func(topic string, rec *seqlog.Record) error { // 简单的日志处理:只打印摘要信息 dataPreview := string(rec.Data) if len(dataPreview) > 100 { dataPreview = dataPreview[:100] + "..." } logger.Info("处理日志", "topic", topic, "size", len(rec.Data), "preview", dataPreview) return nil }) if err := seq.Start(); err != nil { logger.Error("启动失败", "error", err) os.Exit(1) } defer seq.Stop() logger.Info("Seqlog 已启动") // 启动后台业务模拟 go simulateBusiness() // 启动 Web 服务器 http.HandleFunc("/", handleIndex) http.HandleFunc("/api/topics", handleTopics) http.HandleFunc("/api/stats", handleStats) http.HandleFunc("/api/query", handleQuery) http.HandleFunc("/api/write", handleWrite) addr := ":8080" logger.Info("Web 服务器启动", "地址", "http://localhost"+addr) if err := http.ListenAndServe(addr, nil); err != nil { logger.Error("服务器错误", "error", err) } } // 模拟业务写日志 func simulateBusiness() { topics := []string{"app", "api", "database", "cache"} actions := []string{"查询", "插入", "更新", "删除", "连接", "断开", "备份", "恢复", "同步"} status := []string{"成功", "失败", "超时", "重试"} ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for range ticker.C { // 随机选择 topic 和内容 topic := topics[rand.Intn(len(topics))] action := actions[rand.Intn(len(actions))] st := status[rand.Intn(len(status))] // 随机生成日志大小:2KB 到 10MB // 80% 概率生成小日志(2KB-100KB) // 15% 概率生成中日志(100KB-1MB) // 5% 概率生成大日志(1MB-10MB) var logSize int prob := rand.Intn(100) if prob < 80 { // 2KB - 100KB logSize = 2*1024 + rand.Intn(98*1024) } else if prob < 95 { // 100KB - 1MB logSize = 100*1024 + rand.Intn(924*1024) } else { // 1MB - 10MB logSize = 1024*1024 + rand.Intn(9*1024*1024) } // 生成日志内容 header := fmt.Sprintf("[%s] %s %s - 用时: %dms | 数据大小: %s | ", time.Now().Format("15:04:05"), action, st, rand.Intn(1000), formatBytes(int64(logSize))) // 填充随机数据到指定大小 data := make([]byte, logSize) copy(data, []byte(header)) // 填充可读的模拟数据 fillOffset := len(header) patterns := []string{ "user_id=%d, session=%x, ip=%d.%d.%d.%d, ", "query_time=%dms, rows=%d, cached=%v, ", "error_code=%d, retry_count=%d, ", "request_id=%x, trace_id=%x, ", } for fillOffset < logSize-100 { pattern := patterns[rand.Intn(len(patterns))] var chunk string switch pattern { case patterns[0]: chunk = fmt.Sprintf(pattern, rand.Intn(10000), rand.Intn(0xFFFFFF), rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256)) case patterns[1]: chunk = fmt.Sprintf(pattern, rand.Intn(1000), rand.Intn(10000), rand.Intn(2) == 1) case patterns[2]: chunk = fmt.Sprintf(pattern, rand.Intn(500), rand.Intn(5)) case patterns[3]: chunk = fmt.Sprintf(pattern, rand.Intn(0xFFFFFFFF), rand.Intn(0xFFFFFFFF)) } remaining := logSize - fillOffset if len(chunk) > remaining { chunk = chunk[:remaining] } copy(data[fillOffset:], []byte(chunk)) fillOffset += len(chunk) } // 写入日志 if _, err := seq.Write(topic, data); err != nil { logger.Error("写入日志失败", "error", err, "size", logSize) } else { logger.Info("写入日志", "topic", topic, "size", formatBytes(int64(logSize))) } } } func formatBytes(bytes int64) string { if bytes < 1024 { return fmt.Sprintf("%d B", bytes) } if bytes < 1024*1024 { return fmt.Sprintf("%.1f KB", float64(bytes)/1024) } return fmt.Sprintf("%.2f MB", float64(bytes)/1024/1024) } // 首页 func handleIndex(w http.ResponseWriter, r *http.Request) { html := ` Seqlog 日志查询

Seqlog 日志查询系统

实时查看和管理应用日志
显示范围: 前 条, 后
选择一个 topic 开始查看日志
` w.Header().Set("Content-Type", "text/html; charset=utf-8") fmt.Fprint(w, html) } // API: 获取所有 topics func handleTopics(w http.ResponseWriter, r *http.Request) { topics := seq.GetTopics() json.NewEncoder(w).Encode(topics) } // API: 获取统计信息 func handleStats(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { http.Error(w, "缺少 topic 参数", http.StatusBadRequest) return } stats, err := seq.GetTopicStats(topic) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } json.NewEncoder(w).Encode(stats) } // API: 查询日志 func handleQuery(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { http.Error(w, "缺少 topic 参数", http.StatusBadRequest) return } backward, _ := strconv.Atoi(r.URL.Query().Get("backward")) forward, _ := strconv.Atoi(r.URL.Query().Get("forward")) if backward == 0 { backward = 10 } if forward == 0 { forward = 10 } // 从缓存中获取或创建 query 对象 queryCacheMu.Lock() query, exists := queryCache[topic] if !exists { var err error query, err = seq.NewTopicQuery(topic) if err != nil { queryCacheMu.Unlock() http.Error(w, err.Error(), http.StatusNotFound) return } queryCache[topic] = query } queryCacheMu.Unlock() // 获取当前处理索引和读取索引 startIdx := seq.GetProcessingIndex(topic) endIdx := seq.GetReadIndex(topic) // 获取索引用于转换 processor, err := seq.GetProcessor(topic) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } index := processor.Index() // 合并查询结果:向后 + 当前 + 向前 var results []*seqlog.RecordWithStatus // 向后查询 if backward > 0 && startIdx > 0 { startPos, err := index.GetOffset(startIdx) if err == nil { backResults, err := query.QueryAt(startPos, -1, backward, startIdx, endIdx) if err == nil { results = append(results, backResults...) } } } // 当前位置 if startIdx < endIdx { startPos, err := index.GetOffset(startIdx) if err == nil { currentResults, err := query.QueryAt(startPos, 0, 1, startIdx, endIdx) if err == nil { results = append(results, currentResults...) } } } // 向前查询 if forward > 0 { startPos, err := index.GetOffset(startIdx) if err == nil { forwardResults, err := query.QueryAt(startPos, 1, forward, startIdx, endIdx) if err == nil { results = append(results, forwardResults...) } } } type Record struct { Status string `json:"status"` Data string `json:"data"` } records := make([]Record, len(results)) for i, r := range results { records[i] = Record{ Status: r.Status.String(), Data: string(r.Record.Data), } } json.NewEncoder(w).Encode(map[string]interface{}{ "records": records, "total": len(records), }) } // API: 手动写入日志 func handleWrite(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, "只支持 POST", http.StatusMethodNotAllowed) return } var req struct { Topic string `json:"topic"` Data string `json:"data"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } offset, err := seq.Write(req.Topic, []byte(req.Data)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } json.NewEncoder(w).Encode(map[string]interface{}{ "success": true, "offset": offset, }) }