From 3d82a6845ed20f6e8011cd02487840af9ee94a5a Mon Sep 17 00:00:00 2001 From: bourdon Date: Sat, 4 Oct 2025 01:27:41 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=EF=BC=9A=E5=B0=86=E7=A4=BA?= =?UTF-8?q?=E4=BE=8B=E6=96=87=E4=BB=B6=E7=BB=84=E7=BB=87=E5=88=B0=E5=AD=90?= =?UTF-8?q?=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复 Go 规范问题:一个目录不能有多个 package main 文件结构调整: - example/concurrent_example.go → example/concurrent/main.go - example/index_example.go → example/index/main.go - example/topic_processor_example.go → example/topic_processor/main.go 修复 API 适配: - index/main.go: 更新为新的查询 API(移除 startIdx/endIdx 参数) - webapp/main.go: 使用 processor.Query 方法替代 RecordQuery - 移除 queryCache,直接使用 processor - 更新查询调用,移除状态参数 文档更新: - example/README.md: 更新所有示例的运行路径 - example/RUN_CONCURRENT.md: 更新运行命令 所有示例编译测试通过 ✅ --- example/README.md | 15 +++++---- example/RUN_CONCURRENT.md | 9 +++--- .../main.go} | 0 example/{index_example.go => index/main.go} | 18 ++++++----- .../main.go} | 0 example/webapp/main.go | 31 ++++++------------- 6 files changed, 34 insertions(+), 39 deletions(-) rename example/{concurrent_example.go => concurrent/main.go} (100%) rename example/{index_example.go => index/main.go} (87%) rename example/{topic_processor_example.go => topic_processor/main.go} (100%) diff --git a/example/README.md b/example/README.md index f63d114..b32ffac 100644 --- a/example/README.md +++ b/example/README.md @@ -4,7 +4,7 @@ ## 示例列表 -### 1. concurrent_example.go - 高并发示例 +### 1. concurrent/ - 高并发示例 展示 seqlog 在高并发场景下的性能表现。 @@ -19,7 +19,8 @@ **运行方式:** ```bash -go run concurrent_example.go +cd concurrent +go run main.go ``` **注意:** 这个示例需要运行约 5 分钟,请耐心等待。 @@ -97,7 +98,7 @@ go run concurrent_example.go - 多个 goroutine 可以并发查询同一或不同的 topic - 查询时可能遇到少量 EOF 错误(因为 tailer 正在处理文件) -### 2. topic_processor_example.go - TopicProcessor 基础示例 +### 2. topic_processor/ - TopicProcessor 基础示例 展示如何使用 TopicProcessor 作为日志聚合器。 @@ -109,16 +110,18 @@ go run concurrent_example.go **运行方式:** ```bash -go run topic_processor_example.go +cd topic_processor +go run main.go ``` -### 3. index_example.go - 索引功能示例 +### 3. index/ - 索引功能示例 展示索引文件的使用和管理。 **运行方式:** ```bash -go run index_example.go +cd index +go run main.go ``` ### 4. webapp/ - Web 应用示例 diff --git a/example/RUN_CONCURRENT.md b/example/RUN_CONCURRENT.md index a732e7e..61209c1 100644 --- a/example/RUN_CONCURRENT.md +++ b/example/RUN_CONCURRENT.md @@ -3,8 +3,8 @@ ## 快速开始 ```bash -cd example -go run concurrent_example.go +cd example/concurrent +go run main.go ``` ## 预计运行时间 @@ -22,7 +22,8 @@ go run concurrent_example.go 如果想在后台运行并保存日志: ```bash -go run concurrent_example.go > output.log 2>&1 & +cd example/concurrent +go run main.go > output.log 2>&1 & echo $! > pid.txt # 查看实时输出 @@ -74,7 +75,7 @@ rm pid.txt ## 自定义测试 -如果想调整测试参数,编辑 `concurrent_example.go`: +如果想调整测试参数,编辑 `concurrent/main.go`: ```go // 场景 1:每个 topic 写入的消息数 diff --git a/example/concurrent_example.go b/example/concurrent/main.go similarity index 100% rename from example/concurrent_example.go rename to example/concurrent/main.go diff --git a/example/index_example.go b/example/index/main.go similarity index 87% rename from example/index_example.go rename to example/index/main.go index 197547f..a6a0050 100644 --- a/example/index_example.go +++ b/example/index/main.go @@ -27,12 +27,14 @@ func main() { } // 写入日志时,索引会自动更新 + var lastOffset int64 for i := 1; i <= 10; i++ { data := fmt.Sprintf("日志记录 #%d", i) offset, err := writer.Append([]byte(data)) if err != nil { log.Fatal(err) } + lastOffset = offset fmt.Printf("写入: offset=%d, data=%s\n", offset, data) } @@ -68,23 +70,23 @@ func main() { fmt.Printf("从第 %d 条记录开始查询\n", startIndex) // 向后查询(查询更早的记录) - backward, err := query.QueryNewest(startIndex-1, 3, 0, startIndex) + backward, err := query.QueryNewest(startIndex-1, 3) if err != nil { log.Fatal(err) } fmt.Printf("向后查询 3 条记录:\n") - for i, rws := range backward { - fmt.Printf(" [%d] 状态=%s, 数据=%s\n", i, rws.Status, string(rws.Record.Data)) + for i, rec := range backward { + fmt.Printf(" [%d] 数据=%s\n", i, string(rec.Data)) } // 向前查询(查询更新的记录) - forward, err := query.QueryOldest(startIndex, 3, 0, startIndex) + forward, err := query.QueryOldest(startIndex, 3) if err != nil { log.Fatal(err) } fmt.Printf("向前查询 3 条记录:\n") - for i, rws := range forward { - fmt.Printf(" [%d] 状态=%s, 数据=%s\n", i, rws.Status, string(rws.Record.Data)) + for i, rec := range forward { + fmt.Printf(" [%d] 数据=%s\n", i, string(rec.Data)) } fmt.Println() @@ -103,8 +105,8 @@ func main() { fmt.Printf("最后一条记录偏移: %d\n", index3.LastOffset()) // 二分查找:根据偏移量查找索引位置 - idx := index3.FindIndex(offset) - fmt.Printf("偏移量 %d 对应的索引位置: %d\n\n", offset, idx) + idx := index3.FindIndex(lastOffset) + fmt.Printf("偏移量 %d 对应的索引位置: %d\n\n", lastOffset, idx) index3.Close() // ===== 示例 4:追加写入(索引自动更新)===== diff --git a/example/topic_processor_example.go b/example/topic_processor/main.go similarity index 100% rename from example/topic_processor_example.go rename to example/topic_processor/main.go diff --git a/example/webapp/main.go b/example/webapp/main.go index 3f5fcd2..d9825ae 100644 --- a/example/webapp/main.go +++ b/example/webapp/main.go @@ -8,17 +8,14 @@ import ( "net/http" "os" "strconv" - "sync" "time" "code.tczkiot.com/seqlog" ) var ( - seq *seqlog.Seqlog - logger *slog.Logger - queryCache = make(map[string]*seqlog.RecordQuery) - queryCacheMu sync.RWMutex + seq *seqlog.Seqlog + logger *slog.Logger ) func main() { @@ -522,20 +519,12 @@ func handleQuery(w http.ResponseWriter, r *http.Request) { forward = 10 } - // 从缓存中获取或创建 query 对象 - queryCacheMu.Lock() - query, exists := queryCache[topic] - if !exists { - var err error - query, err = seq.NewTopicQuery(topic) - if err != nil { - queryCacheMu.Unlock() - http.Error(w, err.Error(), http.StatusNotFound) - return - } - queryCache[topic] = query + // 获取 processor + processor, err := seq.GetProcessor(topic) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return } - queryCacheMu.Unlock() // 获取当前处理索引和读取索引 startIdx := seq.GetProcessingIndex(topic) @@ -546,7 +535,7 @@ func handleQuery(w http.ResponseWriter, r *http.Request) { // 向后查询 if backward > 0 && startIdx > 0 { - backResults, err := query.QueryNewest(startIdx-1, backward, startIdx, endIdx) + backResults, err := processor.QueryNewest(startIdx-1, backward) if err == nil { results = append(results, backResults...) } @@ -554,7 +543,7 @@ func handleQuery(w http.ResponseWriter, r *http.Request) { // 当前位置 if startIdx < endIdx { - currentResults, err := query.QueryOldest(startIdx, 1, startIdx, endIdx) + currentResults, err := processor.QueryOldest(startIdx, 1) if err == nil { results = append(results, currentResults...) } @@ -562,7 +551,7 @@ func handleQuery(w http.ResponseWriter, r *http.Request) { // 向前查询 if forward > 0 { - forwardResults, err := query.QueryOldest(endIdx, forward, startIdx, endIdx) + forwardResults, err := processor.QueryOldest(endIdx, forward) if err == nil { results = append(results, forwardResults...) }