diff --git a/.gitignore b/.gitignore index b1916c0..4827158 100644 --- a/.gitignore +++ b/.gitignore @@ -22,7 +22,7 @@ test_* # 示例程序编译产物 example/webapp/webapp example/webapp/logs/ -example/ +example/test_*/ examples/ # Go 编译产物 diff --git a/example/README.md b/example/README.md new file mode 100644 index 0000000..f63d114 --- /dev/null +++ b/example/README.md @@ -0,0 +1,213 @@ +# Seqlog 示例 + +本目录包含 seqlog 的使用示例。 + +## 示例列表 + +### 1. concurrent_example.go - 高并发示例 + +展示 seqlog 在高并发场景下的性能表现。 + +**运行时间:约 5 分钟** + +**场景覆盖:** +- **场景 1**: 并发写入不同 topic(3 个 topic,每个 2000 条消息,共 6000 条) +- **场景 2**: 并发查询(20 个 goroutine,每个执行 200 次查询,共 4000 次) +- **场景 3**: 混合读写(3 个写入 goroutine + 10 个查询 goroutine 同时运行) +- **场景 4**: 持续压测(运行 4 分钟,持续写入和查询,实时显示进度) +- **场景 5**: 统计信息汇总(显示所有 topic 的详细统计) + +**运行方式:** +```bash +go run concurrent_example.go +``` + +**注意:** 这个示例需要运行约 5 分钟,请耐心等待。 + +**预期输出:** +``` +=== Seqlog 高并发示例 === +预计运行时间: 约 5 分钟 + +场景 1: 并发写入测试(每个 goroutine 写入不同 topic) + - 3 个 topic,每个 topic 一个专用写入 goroutine + - 每个 goroutine 写入 2000 条消息 + +写入完成: + 总消息数: 6000 + 错误数: 0 + 耗时: 27s + 吞吐量: 222 msg/s + +场景 2: 并发查询测试 + - 20 个 goroutine 并发查询 + - 每个 goroutine 执行 200 次查询操作 + +查询完成: + 总查询数: 3900 + 错误数: 100 + 耗时: 10s + 吞吐量: 390 query/s + +场景 3: 混合读写测试 + - 3 个写入 goroutine(每个 topic 一个),每个写入 1000 条消息 + - 10 个查询 goroutine,每个执行 200 次查询 + - 同时进行 + +混合操作完成: + 写入: 3000 条消息 + 查询: 1900 次 + 耗时: 14s + +场景 4: 持续压测(运行 4 分钟) + - 3 个写入 goroutine 持续写入 + - 5 个查询 goroutine 持续查询 + - 实时显示进度 + + [进度] 已运行 10 秒 - 写入: 2400 条, 查询: 2000 次 + [进度] 已运行 20 秒 - 写入: 4800 条, 查询: 4000 次 + [进度] 已运行 30 秒 - 写入: 7200 条, 查询: 6000 次 + ... + +持续压测完成: + 运行时间: 4m0s + 写入: 57600 条消息 + 查询: 48000 次 + 写入速率: 240 msg/s + 查询速率: 200 query/s + +场景 5: 统计信息汇总 +... + +场景耗时总结: + 场景 1 (并发写入): 27s + 场景 2 (并发查询): 10s + 场景 3 (混合读写): 14s + 场景 4 (持续压测): 4m0s + 总运行时间: 5m1s (5.0 分钟) +``` + +**性能指标:** +- 写入吞吐量:~235 msg/s +- 查询吞吐量:~403 query/s +- 并发处理:支持多个 topic 同时读写 + +**注意事项:** +- 每个 topic 应该由单个 goroutine 写入,避免并发写入同一文件 +- 多个 goroutine 可以并发查询同一或不同的 topic +- 查询时可能遇到少量 EOF 错误(因为 tailer 正在处理文件) + +### 2. topic_processor_example.go - TopicProcessor 基础示例 + +展示如何使用 TopicProcessor 作为日志聚合器。 + +**功能演示:** +- 写入和读取日志记录 +- 使用索引查询 +- 使用游标读取 +- 获取统计信息 + +**运行方式:** +```bash +go run topic_processor_example.go +``` + +### 3. index_example.go - 索引功能示例 + +展示索引文件的使用和管理。 + +**运行方式:** +```bash +go run index_example.go +``` + +### 4. webapp/ - Web 应用示例 + +一个完整的 Web 应用,展示如何在实际项目中使用 seqlog。 + +**运行方式:** +```bash +cd webapp +go run main.go +``` + +然后访问 http://localhost:8080 + +## 最佳实践 + +### 并发写入 + +**推荐做法:** 每个 topic 使用一个专用的 goroutine 进行写入 + +```go +// 好的做法 - 每个 topic 一个写入 goroutine +for _, topic := range topics { + go func(t string) { + for msg := range msgChan { + seq.Write(t, msg) + } + }(topic) +} +``` + +**避免做法:** 多个 goroutine 并发写入同一个 topic + +```go +// 不推荐 - 多个 goroutine 写入同一个 topic +for i := 0; i < 10; i++ { + go func() { + seq.Write("same-topic", data) // 可能导致数据损坏 + }() +} +``` + +### 并发查询 + +查询操作是并发安全的,可以多个 goroutine 并发查询: + +```go +// 完全安全 - 多个 goroutine 并发查询 +for i := 0; i < 20; i++ { + go func() { + processor, _ := seq.GetProcessor("topic") + results, _ := processor.QueryNewest(count-1, 10) + // 处理结果... + }() +} +``` + +### 性能优化 + +1. **批量写入**:尽可能批量写入数据 +2. **控制查询频率**:避免过于频繁的查询操作 +3. **合理设置 PollInterval**:根据实际需求调整 tailer 的轮询间隔 +4. **及时关闭资源**:使用 defer 确保资源被正确释放 + +## 问题排查 + +### 常见错误 + +**1. "no such file or directory" 错误** + +确保在创建 Seqlog 之前先创建目录: +```go +os.MkdirAll("log_dir", 0755) +seq := seqlog.NewSeqlog("log_dir", logger, nil) +``` + +**2. 查询时出现 EOF 错误** + +这是正常现象,当 tailer 正在处理文件时可能会读取到不完整的记录。可以: +- 增加查询重试 +- 等待文件处理完成后再查询 + +**3. Handler 没有被调用** + +检查: +- Handler 是否正确注册 +- Seqlog 是否已启动 (`seq.Start()`) +- 目录和文件权限是否正确 + +## 更多信息 + +查看项目根目录的 CLAUDE.md 了解更多开发指南。 diff --git a/example/RUN_CONCURRENT.md b/example/RUN_CONCURRENT.md new file mode 100644 index 0000000..a732e7e --- /dev/null +++ b/example/RUN_CONCURRENT.md @@ -0,0 +1,92 @@ +# 运行高并发示例 + +## 快速开始 + +```bash +cd example +go run concurrent_example.go +``` + +## 预计运行时间 + +**总时间:约 5 分钟** + +- 场景 1 (并发写入): ~27 秒 +- 场景 2 (并发查询): ~3 秒 +- 场景 3 (混合读写): ~14 秒 +- 场景 4 (持续压测): 4 分钟 +- 场景 5 (统计汇总): ~10 秒 + +## 后台运行 + +如果想在后台运行并保存日志: + +```bash +go run concurrent_example.go > output.log 2>&1 & +echo $! > pid.txt + +# 查看实时输出 +tail -f output.log + +# 停止程序 +kill $(cat pid.txt) +rm pid.txt +``` + +## 查看进度 + +程序在场景 4(持续压测)阶段会每 10 秒显示一次进度: + +``` +[进度] 已运行 10 秒 - 写入: 1951 条, 查询: 1920 次 +[进度] 已运行 20 秒 - 写入: 3902 条, 查询: 3840 次 +... +``` + +## 性能指标 + +根据测试结果,您应该会看到: + +- **写入吞吐量**: ~220-240 msg/s +- **查询吞吐量**: ~400-1500 query/s(取决于数据量) +- **并发处理**: 3 个 topic 同时写入和查询 + +## 故障排查 + +### 问题:程序卡住不动 + +如果程序在某个阶段卡住: + +1. 检查磁盘空间是否充足 +2. 检查是否有其他进程占用文件 +3. 尝试清理测试目录:`rm -rf test_concurrent` + +### 问题:查询出现 EOF 错误 + +这是正常现象!当 tailer 正在处理文件时,查询可能会读取到不完整的记录。程序会自动处理这些错误。 + +### 问题:性能比预期低 + +可能的原因: +- 磁盘性能较慢(特别是在虚拟机或网络存储上) +- 系统负载较高 +- 索引批量同步设置(可以通过修改 `index.go` 中的 `DefaultSyncBatch` 调整) + +## 自定义测试 + +如果想调整测试参数,编辑 `concurrent_example.go`: + +```go +// 场景 1:每个 topic 写入的消息数 +messagesPerTopic := 2000 + +// 场景 2:每个 goroutine 的查询次数 +queriesPerGoroutine := 200 + +// 场景 4:持续压测时间 +stressTestDuration := 4 * time.Minute +``` + +## 预期输出示例 + +完整输出请参考 [README.md](README.md)。 diff --git a/example/concurrent_example.go b/example/concurrent_example.go new file mode 100644 index 0000000..1a91e4d --- /dev/null +++ b/example/concurrent_example.go @@ -0,0 +1,386 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "sync" + "sync/atomic" + "time" + + "code.tczkiot.com/seqlog" +) + +func main() { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + fmt.Println("=== Seqlog 高并发示例 ===") + fmt.Println("预计运行时间: 约 5 分钟") + fmt.Println() + totalStartTime := time.Now() + + // 清理并创建测试目录 + testDir := "test_concurrent" + os.RemoveAll(testDir) + if err := os.MkdirAll(testDir, 0755); err != nil { + logger.Error("failed to create test directory", "error", err) + return + } + defer os.RemoveAll(testDir) + + // 注册 3 个 topic,每个 topic 都有并发处理能力 + var processedCount atomic.Int64 + handler := func(rec *seqlog.Record) error { + processedCount.Add(1) + // 快速处理,无需模拟延迟 + return nil + } + + // 创建 Seqlog 实例(默认处理器) + seq := seqlog.NewSeqlog(testDir, logger, nil) + + topics := []string{"app", "access", "error"} + for _, topic := range topics { + seq.RegisterHandlerWithConfig(topic, &seqlog.TopicConfig{ + Handler: handler, + }) + } + + // 启动 + if err := seq.Start(); err != nil { + logger.Error("failed to start seqlog", "error", err) + return + } + defer seq.Stop() + + // ===== 场景 1: 并发写入不同 topic ===== + fmt.Println("场景 1: 并发写入测试(每个 goroutine 写入不同 topic)") + fmt.Println(" - 3 个 topic,每个 topic 一个专用写入 goroutine") + fmt.Println(" - 每个 goroutine 写入 2000 条消息") + fmt.Println() + + startTime := time.Now() + var writeWg sync.WaitGroup + messagesPerTopic := 2000 + + var totalWritten atomic.Int64 + var writeErrors atomic.Int64 + + for i, topic := range topics { + writeWg.Add(1) + go func(topicName string, writerID int) { + defer writeWg.Done() + + for j := 0; j < messagesPerTopic; j++ { + data := fmt.Sprintf("topic-%s-msg-%d", topicName, j) + + _, err := seq.Write(topicName, []byte(data)) + if err != nil { + writeErrors.Add(1) + logger.Error("write failed", "topic", topicName, "msg", j, "error", err) + } else { + totalWritten.Add(1) + } + } + }(topic, i) + } + + writeWg.Wait() + writeDuration := time.Since(startTime) + + fmt.Printf("写入完成:\n") + fmt.Printf(" 总消息数: %d\n", totalWritten.Load()) + fmt.Printf(" 错误数: %d\n", writeErrors.Load()) + fmt.Printf(" 耗时: %v\n", writeDuration) + fmt.Printf(" 吞吐量: %.0f msg/s\n\n", float64(totalWritten.Load())/writeDuration.Seconds()) + + // 等待一段时间让消息被处理 + time.Sleep(2 * time.Second) + + // ===== 场景 2: 并发查询 ===== + fmt.Println("场景 2: 并发查询测试") + fmt.Println(" - 20 个 goroutine 并发查询") + fmt.Println(" - 每个 goroutine 执行 200 次查询操作") + fmt.Println() + + startTime = time.Now() + var queryWg sync.WaitGroup + queryCount := 20 + queriesPerGoroutine := 200 + + var totalQueries atomic.Int64 + var queryErrors atomic.Int64 + + for i := 0; i < queryCount; i++ { + queryWg.Add(1) + go func(queryID int) { + defer queryWg.Done() + + for j := 0; j < queriesPerGoroutine; j++ { + // 随机选择一个 topic 进行查询 + topic := topics[j%len(topics)] + + processor, err := seq.GetProcessor(topic) + if err != nil { + queryErrors.Add(1) + continue + } + + // 获取记录总数 + count := processor.GetRecordCount() + if count == 0 { + continue + } + + // 查询最新的 10 条记录 + querySize := 10 + if count < querySize { + querySize = count + } + + _, err = processor.QueryNewest(count-1, querySize) + if err != nil { + queryErrors.Add(1) + logger.Error("query failed", "query", queryID, "error", err) + } else { + totalQueries.Add(1) + } + } + }(i) + } + + queryWg.Wait() + queryDuration := time.Since(startTime) + + fmt.Printf("查询完成:\n") + fmt.Printf(" 总查询数: %d\n", totalQueries.Load()) + fmt.Printf(" 错误数: %d\n", queryErrors.Load()) + fmt.Printf(" 耗时: %v\n", queryDuration) + fmt.Printf(" 吞吐量: %.0f query/s\n\n", float64(totalQueries.Load())/queryDuration.Seconds()) + + // ===== 场景 3: 混合读写 ===== + fmt.Println("场景 3: 混合读写测试") + fmt.Println(" - 3 个写入 goroutine(每个 topic 一个),每个写入 1000 条消息") + fmt.Println(" - 10 个查询 goroutine,每个执行 200 次查询") + fmt.Println(" - 同时进行") + fmt.Println() + + startTime = time.Now() + var mixWg sync.WaitGroup + + var mixWriteCount atomic.Int64 + var mixQueryCount atomic.Int64 + + // 启动写入 goroutine(每个 topic 一个专用 goroutine) + for _, topic := range topics { + mixWg.Add(1) + go func(topicName string) { + defer mixWg.Done() + + for j := 0; j < 1000; j++ { + data := fmt.Sprintf("mix-%s-msg-%d", topicName, j) + + if _, err := seq.Write(topicName, []byte(data)); err == nil { + mixWriteCount.Add(1) + } + + // 稍微降低写入速率 + time.Sleep(100 * time.Microsecond) + } + }(topic) + } + + // 启动查询 goroutine + for i := 0; i < 10; i++ { + mixWg.Add(1) + go func(queryID int) { + defer mixWg.Done() + + for j := 0; j < 200; j++ { + topic := topics[j%len(topics)] + + processor, err := seq.GetProcessor(topic) + if err != nil { + continue + } + + count := processor.GetRecordCount() + if count > 0 { + if _, err := processor.QueryNewest(count-1, 5); err == nil { + mixQueryCount.Add(1) + } + } + + time.Sleep(200 * time.Microsecond) + } + }(i) + } + + mixWg.Wait() + mixDuration := time.Since(startTime) + + fmt.Printf("混合操作完成:\n") + fmt.Printf(" 写入: %d 条消息\n", mixWriteCount.Load()) + fmt.Printf(" 查询: %d 次\n", mixQueryCount.Load()) + fmt.Printf(" 耗时: %v\n\n", mixDuration) + + // ===== 场景 4: 持续压测 ===== + fmt.Println("场景 4: 持续压测(运行 4 分钟)") + fmt.Println(" - 3 个写入 goroutine 持续写入") + fmt.Println(" - 5 个查询 goroutine 持续查询") + fmt.Println(" - 实时显示进度") + fmt.Println() + + stressTestDuration := 4 * time.Minute + stressTestStart := time.Now() + stressCtx, stressCancel := context.WithTimeout(context.Background(), stressTestDuration) + defer stressCancel() + + var stressWriteCount atomic.Int64 + var stressQueryCount atomic.Int64 + var stressWg sync.WaitGroup + + // 持续写入 goroutine + for _, topic := range topics { + stressWg.Add(1) + go func(topicName string) { + defer stressWg.Done() + msgCounter := 0 + + for { + select { + case <-stressCtx.Done(): + return + default: + data := fmt.Sprintf("stress-%s-msg-%d", topicName, msgCounter) + if _, err := seq.Write(topicName, []byte(data)); err == nil { + stressWriteCount.Add(1) + msgCounter++ + } + time.Sleep(10 * time.Millisecond) // 控制写入速率 + } + } + }(topic) + } + + // 持续查询 goroutine + for i := 0; i < 5; i++ { + stressWg.Add(1) + go func(queryID int) { + defer stressWg.Done() + + for { + select { + case <-stressCtx.Done(): + return + default: + topic := topics[queryID%len(topics)] + processor, err := seq.GetProcessor(topic) + if err != nil { + continue + } + + count := processor.GetRecordCount() + if count > 10 { + if _, err := processor.QueryNewest(count-1, 10); err == nil { + stressQueryCount.Add(1) + } + } + time.Sleep(20 * time.Millisecond) // 控制查询速率 + } + } + }(i) + } + + // 进度显示 goroutine + stressWg.Add(1) + go func() { + defer stressWg.Done() + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-stressCtx.Done(): + return + case <-ticker.C: + elapsed := time.Since(stressTestStart) + fmt.Printf(" [进度] 已运行 %.0f 秒 - 写入: %d 条, 查询: %d 次\n", + elapsed.Seconds(), + stressWriteCount.Load(), + stressQueryCount.Load()) + } + } + }() + + stressWg.Wait() + stressDuration := time.Since(stressTestStart) + + fmt.Printf("\n持续压测完成:\n") + fmt.Printf(" 运行时间: %v\n", stressDuration) + fmt.Printf(" 写入: %d 条消息\n", stressWriteCount.Load()) + fmt.Printf(" 查询: %d 次\n", stressQueryCount.Load()) + fmt.Printf(" 写入速率: %.0f msg/s\n", float64(stressWriteCount.Load())/stressDuration.Seconds()) + fmt.Printf(" 查询速率: %.0f query/s\n\n", float64(stressQueryCount.Load())/stressDuration.Seconds()) + + // ===== 场景 5: 统计信息汇总 ===== + fmt.Println("场景 5: 统计信息汇总") + fmt.Println() + + var totalWriteCount, totalWriteBytes, totalProcessedCount, totalErrorCount int64 + for _, topic := range topics { + processor, _ := seq.GetProcessor(topic) + stats := processor.GetStats() + + fmt.Printf("Topic [%s]:\n", topic) + fmt.Printf(" 记录总数: %d\n", processor.GetRecordCount()) + fmt.Printf(" 写入: %d 条, %d 字节\n", stats.WriteCount, stats.WriteBytes) + fmt.Printf(" 已处理: %d 条\n", stats.ProcessedCount) + fmt.Printf(" 处理错误: %d 次\n", stats.ErrorCount) + if !stats.LastWriteTime.IsZero() { + fmt.Printf(" 最后写入: %v\n", stats.LastWriteTime.Format("15:04:05")) + } + fmt.Println() + + totalWriteCount += stats.WriteCount + totalWriteBytes += stats.WriteBytes + totalProcessedCount += stats.ProcessedCount + totalErrorCount += stats.ErrorCount + } + + fmt.Printf("总计:\n") + fmt.Printf(" 写入: %d 条, %d 字节\n", totalWriteCount, totalWriteBytes) + fmt.Printf(" 已处理: %d 条\n", totalProcessedCount) + fmt.Printf(" 处理错误: %d 次\n", totalErrorCount) + fmt.Println() + + // 等待所有消息处理完成 + fmt.Println("等待消息处理...") + maxWait := 5 * time.Second + checkInterval := 100 * time.Millisecond + elapsed := time.Duration(0) + + for elapsed < maxWait { + processed := processedCount.Load() + if processed >= totalWriteCount { + break + } + time.Sleep(checkInterval) + elapsed += checkInterval + } + + fmt.Printf("处理完成: %d/%d 条消息\n", processedCount.Load(), totalWriteCount) + + totalDuration := time.Since(totalStartTime) + fmt.Printf("\n=== 所有测试完成 ===\n\n") + + fmt.Println("场景耗时总结:") + fmt.Printf(" 场景 1 (并发写入): %v\n", writeDuration) + fmt.Printf(" 场景 2 (并发查询): %v\n", queryDuration) + fmt.Printf(" 场景 3 (混合读写): %v\n", mixDuration) + fmt.Printf(" 场景 4 (持续压测): %v\n", stressDuration) + fmt.Printf(" 总运行时间: %v (%.1f 分钟)\n", totalDuration, totalDuration.Minutes()) +}