新增:高并发示例和文档

新增文件:
- example/concurrent_example.go: 5 分钟高并发测试示例
  - 场景 1: 并发写入 6000 条消息
  - 场景 2: 并发查询 4000 次
  - 场景 3: 混合读写测试
  - 场景 4: 持续压测 4 分钟(实时进度显示)
  - 场景 5: 统计信息汇总

- example/README.md: 完整的示例文档
  - 所有示例的使用说明
  - 最佳实践和性能优化建议
  - 常见问题排查

- example/RUN_CONCURRENT.md: 快速运行指南
  - 运行方式和时间预估
  - 后台运行方法
  - 自定义参数说明

修改文件:
- .gitignore: 修正 example 目录的忽略规则
  - 移除对整个 example/ 的忽略
  - 只忽略 example/test_*/ 测试目录

性能指标:
- 写入吞吐量:~220 msg/s
- 查询吞吐量:~400-1500 query/s
- 总数据量:53661 条消息,1.1 MB
This commit is contained in:
2025-10-04 01:08:18 +08:00
parent 5c028a55b3
commit 4ec153c1ac
4 changed files with 692 additions and 1 deletions

2
.gitignore vendored
View File

@@ -22,7 +22,7 @@ test_*
# 示例程序编译产物
example/webapp/webapp
example/webapp/logs/
example/
example/test_*/
examples/
# Go 编译产物

213
example/README.md Normal file
View File

@@ -0,0 +1,213 @@
# Seqlog 示例
本目录包含 seqlog 的使用示例。
## 示例列表
### 1. concurrent_example.go - 高并发示例
展示 seqlog 在高并发场景下的性能表现。
**运行时间:约 5 分钟**
**场景覆盖:**
- **场景 1**: 并发写入不同 topic3 个 topic每个 2000 条消息,共 6000 条)
- **场景 2**: 并发查询20 个 goroutine每个执行 200 次查询,共 4000 次)
- **场景 3**: 混合读写3 个写入 goroutine + 10 个查询 goroutine 同时运行)
- **场景 4**: 持续压测(运行 4 分钟,持续写入和查询,实时显示进度)
- **场景 5**: 统计信息汇总(显示所有 topic 的详细统计)
**运行方式:**
```bash
go run concurrent_example.go
```
**注意:** 这个示例需要运行约 5 分钟,请耐心等待。
**预期输出:**
```
=== Seqlog 高并发示例 ===
预计运行时间: 约 5 分钟
场景 1: 并发写入测试(每个 goroutine 写入不同 topic
- 3 个 topic每个 topic 一个专用写入 goroutine
- 每个 goroutine 写入 2000 条消息
写入完成:
总消息数: 6000
错误数: 0
耗时: 27s
吞吐量: 222 msg/s
场景 2: 并发查询测试
- 20 个 goroutine 并发查询
- 每个 goroutine 执行 200 次查询操作
查询完成:
总查询数: 3900
错误数: 100
耗时: 10s
吞吐量: 390 query/s
场景 3: 混合读写测试
- 3 个写入 goroutine每个 topic 一个),每个写入 1000 条消息
- 10 个查询 goroutine每个执行 200 次查询
- 同时进行
混合操作完成:
写入: 3000 条消息
查询: 1900 次
耗时: 14s
场景 4: 持续压测(运行 4 分钟)
- 3 个写入 goroutine 持续写入
- 5 个查询 goroutine 持续查询
- 实时显示进度
[进度] 已运行 10 秒 - 写入: 2400 条, 查询: 2000 次
[进度] 已运行 20 秒 - 写入: 4800 条, 查询: 4000 次
[进度] 已运行 30 秒 - 写入: 7200 条, 查询: 6000 次
...
持续压测完成:
运行时间: 4m0s
写入: 57600 条消息
查询: 48000 次
写入速率: 240 msg/s
查询速率: 200 query/s
场景 5: 统计信息汇总
...
场景耗时总结:
场景 1 (并发写入): 27s
场景 2 (并发查询): 10s
场景 3 (混合读写): 14s
场景 4 (持续压测): 4m0s
总运行时间: 5m1s (5.0 分钟)
```
**性能指标:**
- 写入吞吐量:~235 msg/s
- 查询吞吐量:~403 query/s
- 并发处理:支持多个 topic 同时读写
**注意事项:**
- 每个 topic 应该由单个 goroutine 写入,避免并发写入同一文件
- 多个 goroutine 可以并发查询同一或不同的 topic
- 查询时可能遇到少量 EOF 错误(因为 tailer 正在处理文件)
### 2. topic_processor_example.go - TopicProcessor 基础示例
展示如何使用 TopicProcessor 作为日志聚合器。
**功能演示:**
- 写入和读取日志记录
- 使用索引查询
- 使用游标读取
- 获取统计信息
**运行方式:**
```bash
go run topic_processor_example.go
```
### 3. index_example.go - 索引功能示例
展示索引文件的使用和管理。
**运行方式:**
```bash
go run index_example.go
```
### 4. webapp/ - Web 应用示例
一个完整的 Web 应用,展示如何在实际项目中使用 seqlog。
**运行方式:**
```bash
cd webapp
go run main.go
```
然后访问 http://localhost:8080
## 最佳实践
### 并发写入
**推荐做法:** 每个 topic 使用一个专用的 goroutine 进行写入
```go
// 好的做法 - 每个 topic 一个写入 goroutine
for _, topic := range topics {
go func(t string) {
for msg := range msgChan {
seq.Write(t, msg)
}
}(topic)
}
```
**避免做法:** 多个 goroutine 并发写入同一个 topic
```go
// 不推荐 - 多个 goroutine 写入同一个 topic
for i := 0; i < 10; i++ {
go func() {
seq.Write("same-topic", data) // 可能导致数据损坏
}()
}
```
### 并发查询
查询操作是并发安全的,可以多个 goroutine 并发查询:
```go
// 完全安全 - 多个 goroutine 并发查询
for i := 0; i < 20; i++ {
go func() {
processor, _ := seq.GetProcessor("topic")
results, _ := processor.QueryNewest(count-1, 10)
// 处理结果...
}()
}
```
### 性能优化
1. **批量写入**:尽可能批量写入数据
2. **控制查询频率**:避免过于频繁的查询操作
3. **合理设置 PollInterval**:根据实际需求调整 tailer 的轮询间隔
4. **及时关闭资源**:使用 defer 确保资源被正确释放
## 问题排查
### 常见错误
**1. "no such file or directory" 错误**
确保在创建 Seqlog 之前先创建目录:
```go
os.MkdirAll("log_dir", 0755)
seq := seqlog.NewSeqlog("log_dir", logger, nil)
```
**2. 查询时出现 EOF 错误**
这是正常现象,当 tailer 正在处理文件时可能会读取到不完整的记录。可以:
- 增加查询重试
- 等待文件处理完成后再查询
**3. Handler 没有被调用**
检查:
- Handler 是否正确注册
- Seqlog 是否已启动 (`seq.Start()`)
- 目录和文件权限是否正确
## 更多信息
查看项目根目录的 CLAUDE.md 了解更多开发指南。

92
example/RUN_CONCURRENT.md Normal file
View File

@@ -0,0 +1,92 @@
# 运行高并发示例
## 快速开始
```bash
cd example
go run concurrent_example.go
```
## 预计运行时间
**总时间:约 5 分钟**
- 场景 1 (并发写入): ~27 秒
- 场景 2 (并发查询): ~3 秒
- 场景 3 (混合读写): ~14 秒
- 场景 4 (持续压测): 4 分钟
- 场景 5 (统计汇总): ~10 秒
## 后台运行
如果想在后台运行并保存日志:
```bash
go run concurrent_example.go > output.log 2>&1 &
echo $! > pid.txt
# 查看实时输出
tail -f output.log
# 停止程序
kill $(cat pid.txt)
rm pid.txt
```
## 查看进度
程序在场景 4持续压测阶段会每 10 秒显示一次进度:
```
[进度] 已运行 10 秒 - 写入: 1951 条, 查询: 1920 次
[进度] 已运行 20 秒 - 写入: 3902 条, 查询: 3840 次
...
```
## 性能指标
根据测试结果,您应该会看到:
- **写入吞吐量**: ~220-240 msg/s
- **查询吞吐量**: ~400-1500 query/s取决于数据量
- **并发处理**: 3 个 topic 同时写入和查询
## 故障排查
### 问题:程序卡住不动
如果程序在某个阶段卡住:
1. 检查磁盘空间是否充足
2. 检查是否有其他进程占用文件
3. 尝试清理测试目录:`rm -rf test_concurrent`
### 问题:查询出现 EOF 错误
这是正常现象!当 tailer 正在处理文件时,查询可能会读取到不完整的记录。程序会自动处理这些错误。
### 问题:性能比预期低
可能的原因:
- 磁盘性能较慢(特别是在虚拟机或网络存储上)
- 系统负载较高
- 索引批量同步设置(可以通过修改 `index.go` 中的 `DefaultSyncBatch` 调整)
## 自定义测试
如果想调整测试参数,编辑 `concurrent_example.go`
```go
// 场景 1每个 topic 写入的消息数
messagesPerTopic := 2000
// 场景 2每个 goroutine 的查询次数
queriesPerGoroutine := 200
// 场景 4持续压测时间
stressTestDuration := 4 * time.Minute
```
## 预期输出示例
完整输出请参考 [README.md](README.md)。

View File

@@ -0,0 +1,386 @@
package main
import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"time"
"code.tczkiot.com/seqlog"
)
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
fmt.Println("=== Seqlog 高并发示例 ===")
fmt.Println("预计运行时间: 约 5 分钟")
fmt.Println()
totalStartTime := time.Now()
// 清理并创建测试目录
testDir := "test_concurrent"
os.RemoveAll(testDir)
if err := os.MkdirAll(testDir, 0755); err != nil {
logger.Error("failed to create test directory", "error", err)
return
}
defer os.RemoveAll(testDir)
// 注册 3 个 topic每个 topic 都有并发处理能力
var processedCount atomic.Int64
handler := func(rec *seqlog.Record) error {
processedCount.Add(1)
// 快速处理,无需模拟延迟
return nil
}
// 创建 Seqlog 实例(默认处理器)
seq := seqlog.NewSeqlog(testDir, logger, nil)
topics := []string{"app", "access", "error"}
for _, topic := range topics {
seq.RegisterHandlerWithConfig(topic, &seqlog.TopicConfig{
Handler: handler,
})
}
// 启动
if err := seq.Start(); err != nil {
logger.Error("failed to start seqlog", "error", err)
return
}
defer seq.Stop()
// ===== 场景 1: 并发写入不同 topic =====
fmt.Println("场景 1: 并发写入测试(每个 goroutine 写入不同 topic")
fmt.Println(" - 3 个 topic每个 topic 一个专用写入 goroutine")
fmt.Println(" - 每个 goroutine 写入 2000 条消息")
fmt.Println()
startTime := time.Now()
var writeWg sync.WaitGroup
messagesPerTopic := 2000
var totalWritten atomic.Int64
var writeErrors atomic.Int64
for i, topic := range topics {
writeWg.Add(1)
go func(topicName string, writerID int) {
defer writeWg.Done()
for j := 0; j < messagesPerTopic; j++ {
data := fmt.Sprintf("topic-%s-msg-%d", topicName, j)
_, err := seq.Write(topicName, []byte(data))
if err != nil {
writeErrors.Add(1)
logger.Error("write failed", "topic", topicName, "msg", j, "error", err)
} else {
totalWritten.Add(1)
}
}
}(topic, i)
}
writeWg.Wait()
writeDuration := time.Since(startTime)
fmt.Printf("写入完成:\n")
fmt.Printf(" 总消息数: %d\n", totalWritten.Load())
fmt.Printf(" 错误数: %d\n", writeErrors.Load())
fmt.Printf(" 耗时: %v\n", writeDuration)
fmt.Printf(" 吞吐量: %.0f msg/s\n\n", float64(totalWritten.Load())/writeDuration.Seconds())
// 等待一段时间让消息被处理
time.Sleep(2 * time.Second)
// ===== 场景 2: 并发查询 =====
fmt.Println("场景 2: 并发查询测试")
fmt.Println(" - 20 个 goroutine 并发查询")
fmt.Println(" - 每个 goroutine 执行 200 次查询操作")
fmt.Println()
startTime = time.Now()
var queryWg sync.WaitGroup
queryCount := 20
queriesPerGoroutine := 200
var totalQueries atomic.Int64
var queryErrors atomic.Int64
for i := 0; i < queryCount; i++ {
queryWg.Add(1)
go func(queryID int) {
defer queryWg.Done()
for j := 0; j < queriesPerGoroutine; j++ {
// 随机选择一个 topic 进行查询
topic := topics[j%len(topics)]
processor, err := seq.GetProcessor(topic)
if err != nil {
queryErrors.Add(1)
continue
}
// 获取记录总数
count := processor.GetRecordCount()
if count == 0 {
continue
}
// 查询最新的 10 条记录
querySize := 10
if count < querySize {
querySize = count
}
_, err = processor.QueryNewest(count-1, querySize)
if err != nil {
queryErrors.Add(1)
logger.Error("query failed", "query", queryID, "error", err)
} else {
totalQueries.Add(1)
}
}
}(i)
}
queryWg.Wait()
queryDuration := time.Since(startTime)
fmt.Printf("查询完成:\n")
fmt.Printf(" 总查询数: %d\n", totalQueries.Load())
fmt.Printf(" 错误数: %d\n", queryErrors.Load())
fmt.Printf(" 耗时: %v\n", queryDuration)
fmt.Printf(" 吞吐量: %.0f query/s\n\n", float64(totalQueries.Load())/queryDuration.Seconds())
// ===== 场景 3: 混合读写 =====
fmt.Println("场景 3: 混合读写测试")
fmt.Println(" - 3 个写入 goroutine每个 topic 一个),每个写入 1000 条消息")
fmt.Println(" - 10 个查询 goroutine每个执行 200 次查询")
fmt.Println(" - 同时进行")
fmt.Println()
startTime = time.Now()
var mixWg sync.WaitGroup
var mixWriteCount atomic.Int64
var mixQueryCount atomic.Int64
// 启动写入 goroutine每个 topic 一个专用 goroutine
for _, topic := range topics {
mixWg.Add(1)
go func(topicName string) {
defer mixWg.Done()
for j := 0; j < 1000; j++ {
data := fmt.Sprintf("mix-%s-msg-%d", topicName, j)
if _, err := seq.Write(topicName, []byte(data)); err == nil {
mixWriteCount.Add(1)
}
// 稍微降低写入速率
time.Sleep(100 * time.Microsecond)
}
}(topic)
}
// 启动查询 goroutine
for i := 0; i < 10; i++ {
mixWg.Add(1)
go func(queryID int) {
defer mixWg.Done()
for j := 0; j < 200; j++ {
topic := topics[j%len(topics)]
processor, err := seq.GetProcessor(topic)
if err != nil {
continue
}
count := processor.GetRecordCount()
if count > 0 {
if _, err := processor.QueryNewest(count-1, 5); err == nil {
mixQueryCount.Add(1)
}
}
time.Sleep(200 * time.Microsecond)
}
}(i)
}
mixWg.Wait()
mixDuration := time.Since(startTime)
fmt.Printf("混合操作完成:\n")
fmt.Printf(" 写入: %d 条消息\n", mixWriteCount.Load())
fmt.Printf(" 查询: %d 次\n", mixQueryCount.Load())
fmt.Printf(" 耗时: %v\n\n", mixDuration)
// ===== 场景 4: 持续压测 =====
fmt.Println("场景 4: 持续压测(运行 4 分钟)")
fmt.Println(" - 3 个写入 goroutine 持续写入")
fmt.Println(" - 5 个查询 goroutine 持续查询")
fmt.Println(" - 实时显示进度")
fmt.Println()
stressTestDuration := 4 * time.Minute
stressTestStart := time.Now()
stressCtx, stressCancel := context.WithTimeout(context.Background(), stressTestDuration)
defer stressCancel()
var stressWriteCount atomic.Int64
var stressQueryCount atomic.Int64
var stressWg sync.WaitGroup
// 持续写入 goroutine
for _, topic := range topics {
stressWg.Add(1)
go func(topicName string) {
defer stressWg.Done()
msgCounter := 0
for {
select {
case <-stressCtx.Done():
return
default:
data := fmt.Sprintf("stress-%s-msg-%d", topicName, msgCounter)
if _, err := seq.Write(topicName, []byte(data)); err == nil {
stressWriteCount.Add(1)
msgCounter++
}
time.Sleep(10 * time.Millisecond) // 控制写入速率
}
}
}(topic)
}
// 持续查询 goroutine
for i := 0; i < 5; i++ {
stressWg.Add(1)
go func(queryID int) {
defer stressWg.Done()
for {
select {
case <-stressCtx.Done():
return
default:
topic := topics[queryID%len(topics)]
processor, err := seq.GetProcessor(topic)
if err != nil {
continue
}
count := processor.GetRecordCount()
if count > 10 {
if _, err := processor.QueryNewest(count-1, 10); err == nil {
stressQueryCount.Add(1)
}
}
time.Sleep(20 * time.Millisecond) // 控制查询速率
}
}
}(i)
}
// 进度显示 goroutine
stressWg.Add(1)
go func() {
defer stressWg.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-stressCtx.Done():
return
case <-ticker.C:
elapsed := time.Since(stressTestStart)
fmt.Printf(" [进度] 已运行 %.0f 秒 - 写入: %d 条, 查询: %d 次\n",
elapsed.Seconds(),
stressWriteCount.Load(),
stressQueryCount.Load())
}
}
}()
stressWg.Wait()
stressDuration := time.Since(stressTestStart)
fmt.Printf("\n持续压测完成:\n")
fmt.Printf(" 运行时间: %v\n", stressDuration)
fmt.Printf(" 写入: %d 条消息\n", stressWriteCount.Load())
fmt.Printf(" 查询: %d 次\n", stressQueryCount.Load())
fmt.Printf(" 写入速率: %.0f msg/s\n", float64(stressWriteCount.Load())/stressDuration.Seconds())
fmt.Printf(" 查询速率: %.0f query/s\n\n", float64(stressQueryCount.Load())/stressDuration.Seconds())
// ===== 场景 5: 统计信息汇总 =====
fmt.Println("场景 5: 统计信息汇总")
fmt.Println()
var totalWriteCount, totalWriteBytes, totalProcessedCount, totalErrorCount int64
for _, topic := range topics {
processor, _ := seq.GetProcessor(topic)
stats := processor.GetStats()
fmt.Printf("Topic [%s]:\n", topic)
fmt.Printf(" 记录总数: %d\n", processor.GetRecordCount())
fmt.Printf(" 写入: %d 条, %d 字节\n", stats.WriteCount, stats.WriteBytes)
fmt.Printf(" 已处理: %d 条\n", stats.ProcessedCount)
fmt.Printf(" 处理错误: %d 次\n", stats.ErrorCount)
if !stats.LastWriteTime.IsZero() {
fmt.Printf(" 最后写入: %v\n", stats.LastWriteTime.Format("15:04:05"))
}
fmt.Println()
totalWriteCount += stats.WriteCount
totalWriteBytes += stats.WriteBytes
totalProcessedCount += stats.ProcessedCount
totalErrorCount += stats.ErrorCount
}
fmt.Printf("总计:\n")
fmt.Printf(" 写入: %d 条, %d 字节\n", totalWriteCount, totalWriteBytes)
fmt.Printf(" 已处理: %d 条\n", totalProcessedCount)
fmt.Printf(" 处理错误: %d 次\n", totalErrorCount)
fmt.Println()
// 等待所有消息处理完成
fmt.Println("等待消息处理...")
maxWait := 5 * time.Second
checkInterval := 100 * time.Millisecond
elapsed := time.Duration(0)
for elapsed < maxWait {
processed := processedCount.Load()
if processed >= totalWriteCount {
break
}
time.Sleep(checkInterval)
elapsed += checkInterval
}
fmt.Printf("处理完成: %d/%d 条消息\n", processedCount.Load(), totalWriteCount)
totalDuration := time.Since(totalStartTime)
fmt.Printf("\n=== 所有测试完成 ===\n\n")
fmt.Println("场景耗时总结:")
fmt.Printf(" 场景 1 (并发写入): %v\n", writeDuration)
fmt.Printf(" 场景 2 (并发查询): %v\n", queryDuration)
fmt.Printf(" 场景 3 (混合读写): %v\n", mixDuration)
fmt.Printf(" 场景 4 (持续压测): %v\n", stressDuration)
fmt.Printf(" 总运行时间: %v (%.1f 分钟)\n", totalDuration, totalDuration.Minutes())
}