重构:将示例文件组织到子目录
修复 Go 规范问题:一个目录不能有多个 package main
文件结构调整:
- example/concurrent_example.go → example/concurrent/main.go
- example/index_example.go → example/index/main.go
- example/topic_processor_example.go → example/topic_processor/main.go
修复 API 适配:
- index/main.go: 更新为新的查询 API(移除 startIdx/endIdx 参数)
- webapp/main.go: 使用 processor.Query 方法替代 RecordQuery
- 移除 queryCache,直接使用 processor
- 更新查询调用,移除状态参数
文档更新:
- example/README.md: 更新所有示例的运行路径
- example/RUN_CONCURRENT.md: 更新运行命令
所有示例编译测试通过 ✅
This commit is contained in:
386
example/concurrent/main.go
Normal file
386
example/concurrent/main.go
Normal file
@@ -0,0 +1,386 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"code.tczkiot.com/seqlog"
|
||||
)
|
||||
|
||||
func main() {
|
||||
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: slog.LevelInfo,
|
||||
}))
|
||||
|
||||
fmt.Println("=== Seqlog 高并发示例 ===")
|
||||
fmt.Println("预计运行时间: 约 5 分钟")
|
||||
fmt.Println()
|
||||
totalStartTime := time.Now()
|
||||
|
||||
// 清理并创建测试目录
|
||||
testDir := "test_concurrent"
|
||||
os.RemoveAll(testDir)
|
||||
if err := os.MkdirAll(testDir, 0755); err != nil {
|
||||
logger.Error("failed to create test directory", "error", err)
|
||||
return
|
||||
}
|
||||
defer os.RemoveAll(testDir)
|
||||
|
||||
// 注册 3 个 topic,每个 topic 都有并发处理能力
|
||||
var processedCount atomic.Int64
|
||||
handler := func(rec *seqlog.Record) error {
|
||||
processedCount.Add(1)
|
||||
// 快速处理,无需模拟延迟
|
||||
return nil
|
||||
}
|
||||
|
||||
// 创建 Seqlog 实例(默认处理器)
|
||||
seq := seqlog.NewSeqlog(testDir, logger, nil)
|
||||
|
||||
topics := []string{"app", "access", "error"}
|
||||
for _, topic := range topics {
|
||||
seq.RegisterHandlerWithConfig(topic, &seqlog.TopicConfig{
|
||||
Handler: handler,
|
||||
})
|
||||
}
|
||||
|
||||
// 启动
|
||||
if err := seq.Start(); err != nil {
|
||||
logger.Error("failed to start seqlog", "error", err)
|
||||
return
|
||||
}
|
||||
defer seq.Stop()
|
||||
|
||||
// ===== 场景 1: 并发写入不同 topic =====
|
||||
fmt.Println("场景 1: 并发写入测试(每个 goroutine 写入不同 topic)")
|
||||
fmt.Println(" - 3 个 topic,每个 topic 一个专用写入 goroutine")
|
||||
fmt.Println(" - 每个 goroutine 写入 2000 条消息")
|
||||
fmt.Println()
|
||||
|
||||
startTime := time.Now()
|
||||
var writeWg sync.WaitGroup
|
||||
messagesPerTopic := 2000
|
||||
|
||||
var totalWritten atomic.Int64
|
||||
var writeErrors atomic.Int64
|
||||
|
||||
for i, topic := range topics {
|
||||
writeWg.Add(1)
|
||||
go func(topicName string, writerID int) {
|
||||
defer writeWg.Done()
|
||||
|
||||
for j := 0; j < messagesPerTopic; j++ {
|
||||
data := fmt.Sprintf("topic-%s-msg-%d", topicName, j)
|
||||
|
||||
_, err := seq.Write(topicName, []byte(data))
|
||||
if err != nil {
|
||||
writeErrors.Add(1)
|
||||
logger.Error("write failed", "topic", topicName, "msg", j, "error", err)
|
||||
} else {
|
||||
totalWritten.Add(1)
|
||||
}
|
||||
}
|
||||
}(topic, i)
|
||||
}
|
||||
|
||||
writeWg.Wait()
|
||||
writeDuration := time.Since(startTime)
|
||||
|
||||
fmt.Printf("写入完成:\n")
|
||||
fmt.Printf(" 总消息数: %d\n", totalWritten.Load())
|
||||
fmt.Printf(" 错误数: %d\n", writeErrors.Load())
|
||||
fmt.Printf(" 耗时: %v\n", writeDuration)
|
||||
fmt.Printf(" 吞吐量: %.0f msg/s\n\n", float64(totalWritten.Load())/writeDuration.Seconds())
|
||||
|
||||
// 等待一段时间让消息被处理
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// ===== 场景 2: 并发查询 =====
|
||||
fmt.Println("场景 2: 并发查询测试")
|
||||
fmt.Println(" - 20 个 goroutine 并发查询")
|
||||
fmt.Println(" - 每个 goroutine 执行 200 次查询操作")
|
||||
fmt.Println()
|
||||
|
||||
startTime = time.Now()
|
||||
var queryWg sync.WaitGroup
|
||||
queryCount := 20
|
||||
queriesPerGoroutine := 200
|
||||
|
||||
var totalQueries atomic.Int64
|
||||
var queryErrors atomic.Int64
|
||||
|
||||
for i := 0; i < queryCount; i++ {
|
||||
queryWg.Add(1)
|
||||
go func(queryID int) {
|
||||
defer queryWg.Done()
|
||||
|
||||
for j := 0; j < queriesPerGoroutine; j++ {
|
||||
// 随机选择一个 topic 进行查询
|
||||
topic := topics[j%len(topics)]
|
||||
|
||||
processor, err := seq.GetProcessor(topic)
|
||||
if err != nil {
|
||||
queryErrors.Add(1)
|
||||
continue
|
||||
}
|
||||
|
||||
// 获取记录总数
|
||||
count := processor.GetRecordCount()
|
||||
if count == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// 查询最新的 10 条记录
|
||||
querySize := 10
|
||||
if count < querySize {
|
||||
querySize = count
|
||||
}
|
||||
|
||||
_, err = processor.QueryNewest(count-1, querySize)
|
||||
if err != nil {
|
||||
queryErrors.Add(1)
|
||||
logger.Error("query failed", "query", queryID, "error", err)
|
||||
} else {
|
||||
totalQueries.Add(1)
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
queryWg.Wait()
|
||||
queryDuration := time.Since(startTime)
|
||||
|
||||
fmt.Printf("查询完成:\n")
|
||||
fmt.Printf(" 总查询数: %d\n", totalQueries.Load())
|
||||
fmt.Printf(" 错误数: %d\n", queryErrors.Load())
|
||||
fmt.Printf(" 耗时: %v\n", queryDuration)
|
||||
fmt.Printf(" 吞吐量: %.0f query/s\n\n", float64(totalQueries.Load())/queryDuration.Seconds())
|
||||
|
||||
// ===== 场景 3: 混合读写 =====
|
||||
fmt.Println("场景 3: 混合读写测试")
|
||||
fmt.Println(" - 3 个写入 goroutine(每个 topic 一个),每个写入 1000 条消息")
|
||||
fmt.Println(" - 10 个查询 goroutine,每个执行 200 次查询")
|
||||
fmt.Println(" - 同时进行")
|
||||
fmt.Println()
|
||||
|
||||
startTime = time.Now()
|
||||
var mixWg sync.WaitGroup
|
||||
|
||||
var mixWriteCount atomic.Int64
|
||||
var mixQueryCount atomic.Int64
|
||||
|
||||
// 启动写入 goroutine(每个 topic 一个专用 goroutine)
|
||||
for _, topic := range topics {
|
||||
mixWg.Add(1)
|
||||
go func(topicName string) {
|
||||
defer mixWg.Done()
|
||||
|
||||
for j := 0; j < 1000; j++ {
|
||||
data := fmt.Sprintf("mix-%s-msg-%d", topicName, j)
|
||||
|
||||
if _, err := seq.Write(topicName, []byte(data)); err == nil {
|
||||
mixWriteCount.Add(1)
|
||||
}
|
||||
|
||||
// 稍微降低写入速率
|
||||
time.Sleep(100 * time.Microsecond)
|
||||
}
|
||||
}(topic)
|
||||
}
|
||||
|
||||
// 启动查询 goroutine
|
||||
for i := 0; i < 10; i++ {
|
||||
mixWg.Add(1)
|
||||
go func(queryID int) {
|
||||
defer mixWg.Done()
|
||||
|
||||
for j := 0; j < 200; j++ {
|
||||
topic := topics[j%len(topics)]
|
||||
|
||||
processor, err := seq.GetProcessor(topic)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
count := processor.GetRecordCount()
|
||||
if count > 0 {
|
||||
if _, err := processor.QueryNewest(count-1, 5); err == nil {
|
||||
mixQueryCount.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(200 * time.Microsecond)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
mixWg.Wait()
|
||||
mixDuration := time.Since(startTime)
|
||||
|
||||
fmt.Printf("混合操作完成:\n")
|
||||
fmt.Printf(" 写入: %d 条消息\n", mixWriteCount.Load())
|
||||
fmt.Printf(" 查询: %d 次\n", mixQueryCount.Load())
|
||||
fmt.Printf(" 耗时: %v\n\n", mixDuration)
|
||||
|
||||
// ===== 场景 4: 持续压测 =====
|
||||
fmt.Println("场景 4: 持续压测(运行 4 分钟)")
|
||||
fmt.Println(" - 3 个写入 goroutine 持续写入")
|
||||
fmt.Println(" - 5 个查询 goroutine 持续查询")
|
||||
fmt.Println(" - 实时显示进度")
|
||||
fmt.Println()
|
||||
|
||||
stressTestDuration := 4 * time.Minute
|
||||
stressTestStart := time.Now()
|
||||
stressCtx, stressCancel := context.WithTimeout(context.Background(), stressTestDuration)
|
||||
defer stressCancel()
|
||||
|
||||
var stressWriteCount atomic.Int64
|
||||
var stressQueryCount atomic.Int64
|
||||
var stressWg sync.WaitGroup
|
||||
|
||||
// 持续写入 goroutine
|
||||
for _, topic := range topics {
|
||||
stressWg.Add(1)
|
||||
go func(topicName string) {
|
||||
defer stressWg.Done()
|
||||
msgCounter := 0
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stressCtx.Done():
|
||||
return
|
||||
default:
|
||||
data := fmt.Sprintf("stress-%s-msg-%d", topicName, msgCounter)
|
||||
if _, err := seq.Write(topicName, []byte(data)); err == nil {
|
||||
stressWriteCount.Add(1)
|
||||
msgCounter++
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond) // 控制写入速率
|
||||
}
|
||||
}
|
||||
}(topic)
|
||||
}
|
||||
|
||||
// 持续查询 goroutine
|
||||
for i := 0; i < 5; i++ {
|
||||
stressWg.Add(1)
|
||||
go func(queryID int) {
|
||||
defer stressWg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stressCtx.Done():
|
||||
return
|
||||
default:
|
||||
topic := topics[queryID%len(topics)]
|
||||
processor, err := seq.GetProcessor(topic)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
count := processor.GetRecordCount()
|
||||
if count > 10 {
|
||||
if _, err := processor.QueryNewest(count-1, 10); err == nil {
|
||||
stressQueryCount.Add(1)
|
||||
}
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond) // 控制查询速率
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// 进度显示 goroutine
|
||||
stressWg.Add(1)
|
||||
go func() {
|
||||
defer stressWg.Done()
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stressCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
elapsed := time.Since(stressTestStart)
|
||||
fmt.Printf(" [进度] 已运行 %.0f 秒 - 写入: %d 条, 查询: %d 次\n",
|
||||
elapsed.Seconds(),
|
||||
stressWriteCount.Load(),
|
||||
stressQueryCount.Load())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
stressWg.Wait()
|
||||
stressDuration := time.Since(stressTestStart)
|
||||
|
||||
fmt.Printf("\n持续压测完成:\n")
|
||||
fmt.Printf(" 运行时间: %v\n", stressDuration)
|
||||
fmt.Printf(" 写入: %d 条消息\n", stressWriteCount.Load())
|
||||
fmt.Printf(" 查询: %d 次\n", stressQueryCount.Load())
|
||||
fmt.Printf(" 写入速率: %.0f msg/s\n", float64(stressWriteCount.Load())/stressDuration.Seconds())
|
||||
fmt.Printf(" 查询速率: %.0f query/s\n\n", float64(stressQueryCount.Load())/stressDuration.Seconds())
|
||||
|
||||
// ===== 场景 5: 统计信息汇总 =====
|
||||
fmt.Println("场景 5: 统计信息汇总")
|
||||
fmt.Println()
|
||||
|
||||
var totalWriteCount, totalWriteBytes, totalProcessedCount, totalErrorCount int64
|
||||
for _, topic := range topics {
|
||||
processor, _ := seq.GetProcessor(topic)
|
||||
stats := processor.GetStats()
|
||||
|
||||
fmt.Printf("Topic [%s]:\n", topic)
|
||||
fmt.Printf(" 记录总数: %d\n", processor.GetRecordCount())
|
||||
fmt.Printf(" 写入: %d 条, %d 字节\n", stats.WriteCount, stats.WriteBytes)
|
||||
fmt.Printf(" 已处理: %d 条\n", stats.ProcessedCount)
|
||||
fmt.Printf(" 处理错误: %d 次\n", stats.ErrorCount)
|
||||
if !stats.LastWriteTime.IsZero() {
|
||||
fmt.Printf(" 最后写入: %v\n", stats.LastWriteTime.Format("15:04:05"))
|
||||
}
|
||||
fmt.Println()
|
||||
|
||||
totalWriteCount += stats.WriteCount
|
||||
totalWriteBytes += stats.WriteBytes
|
||||
totalProcessedCount += stats.ProcessedCount
|
||||
totalErrorCount += stats.ErrorCount
|
||||
}
|
||||
|
||||
fmt.Printf("总计:\n")
|
||||
fmt.Printf(" 写入: %d 条, %d 字节\n", totalWriteCount, totalWriteBytes)
|
||||
fmt.Printf(" 已处理: %d 条\n", totalProcessedCount)
|
||||
fmt.Printf(" 处理错误: %d 次\n", totalErrorCount)
|
||||
fmt.Println()
|
||||
|
||||
// 等待所有消息处理完成
|
||||
fmt.Println("等待消息处理...")
|
||||
maxWait := 5 * time.Second
|
||||
checkInterval := 100 * time.Millisecond
|
||||
elapsed := time.Duration(0)
|
||||
|
||||
for elapsed < maxWait {
|
||||
processed := processedCount.Load()
|
||||
if processed >= totalWriteCount {
|
||||
break
|
||||
}
|
||||
time.Sleep(checkInterval)
|
||||
elapsed += checkInterval
|
||||
}
|
||||
|
||||
fmt.Printf("处理完成: %d/%d 条消息\n", processedCount.Load(), totalWriteCount)
|
||||
|
||||
totalDuration := time.Since(totalStartTime)
|
||||
fmt.Printf("\n=== 所有测试完成 ===\n\n")
|
||||
|
||||
fmt.Println("场景耗时总结:")
|
||||
fmt.Printf(" 场景 1 (并发写入): %v\n", writeDuration)
|
||||
fmt.Printf(" 场景 2 (并发查询): %v\n", queryDuration)
|
||||
fmt.Printf(" 场景 3 (混合读写): %v\n", mixDuration)
|
||||
fmt.Printf(" 场景 4 (持续压测): %v\n", stressDuration)
|
||||
fmt.Printf(" 总运行时间: %v (%.1f 分钟)\n", totalDuration, totalDuration.Minutes())
|
||||
}
|
||||
Reference in New Issue
Block a user