Files
seqlog/example/concurrent/main.go

383 lines
10 KiB
Go
Raw Permalink Normal View History

package main
import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"time"
"code.tczkiot.com/seqlog"
)
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
fmt.Println("=== Seqlog 高并发示例 ===")
fmt.Println("预计运行时间: 约 5 分钟")
fmt.Println()
totalStartTime := time.Now()
// 清理并创建测试目录
testDir := "test_concurrent"
os.RemoveAll(testDir)
if err := os.MkdirAll(testDir, 0755); err != nil {
logger.Error("failed to create test directory", "error", err)
return
}
defer os.RemoveAll(testDir)
// 注册 3 个 topic每个 topic 都有并发处理能力
var processedCount atomic.Int64
handler := func(rec *seqlog.Record) error {
processedCount.Add(1)
// 快速处理,无需模拟延迟
return nil
}
// 创建 Seqlog 实例(默认处理器)
seq := seqlog.NewLogHub(testDir, logger, nil)
topics := []string{"app", "access", "error"}
for _, topic := range topics {
seq.RegisterHandlerWithConfig(topic, &seqlog.TopicConfig{
Handler: handler,
})
}
// 启动
if err := seq.Start(); err != nil {
logger.Error("failed to start seqlog", "error", err)
return
}
defer seq.Stop()
// ===== 场景 1: 并发写入不同 topic =====
fmt.Println("场景 1: 并发写入测试(每个 goroutine 写入不同 topic")
fmt.Println(" - 3 个 topic每个 topic 一个专用写入 goroutine")
fmt.Println(" - 每个 goroutine 写入 2000 条消息")
fmt.Println()
startTime := time.Now()
var writeWg sync.WaitGroup
messagesPerTopic := 2000
var totalWritten atomic.Int64
var writeErrors atomic.Int64
for i, topic := range topics {
writeWg.Add(1)
go func(topicName string, writerID int) {
defer writeWg.Done()
for j := 0; j < messagesPerTopic; j++ {
data := fmt.Sprintf("topic-%s-msg-%d", topicName, j)
_, err := seq.Write(topicName, []byte(data))
if err != nil {
writeErrors.Add(1)
logger.Error("write failed", "topic", topicName, "msg", j, "error", err)
} else {
totalWritten.Add(1)
}
}
}(topic, i)
}
writeWg.Wait()
writeDuration := time.Since(startTime)
fmt.Printf("写入完成:\n")
fmt.Printf(" 总消息数: %d\n", totalWritten.Load())
fmt.Printf(" 错误数: %d\n", writeErrors.Load())
fmt.Printf(" 耗时: %v\n", writeDuration)
fmt.Printf(" 吞吐量: %.0f msg/s\n\n", float64(totalWritten.Load())/writeDuration.Seconds())
// 等待一段时间让消息被处理
time.Sleep(2 * time.Second)
// ===== 场景 2: 并发查询 =====
fmt.Println("场景 2: 并发查询测试")
fmt.Println(" - 20 个 goroutine 并发查询")
fmt.Println(" - 每个 goroutine 执行 200 次查询操作")
fmt.Println()
startTime = time.Now()
var queryWg sync.WaitGroup
queryCount := 20
queriesPerGoroutine := 200
var totalQueries atomic.Int64
var queryErrors atomic.Int64
for i := range queryCount {
queryWg.Add(1)
go func(queryID int) {
defer queryWg.Done()
for j := range queriesPerGoroutine {
// 随机选择一个 topic 进行查询
topic := topics[j%len(topics)]
processor, err := seq.GetProcessor(topic)
if err != nil {
queryErrors.Add(1)
continue
}
// 获取记录总数
count := processor.GetRecordCount()
if count == 0 {
continue
}
// 查询最新的 10 条记录
querySize := min(count, 10)
_, err = processor.QueryNewest(count-1, querySize)
if err != nil {
queryErrors.Add(1)
logger.Error("query failed", "query", queryID, "error", err)
} else {
totalQueries.Add(1)
}
}
}(i)
}
queryWg.Wait()
queryDuration := time.Since(startTime)
fmt.Printf("查询完成:\n")
fmt.Printf(" 总查询数: %d\n", totalQueries.Load())
fmt.Printf(" 错误数: %d\n", queryErrors.Load())
fmt.Printf(" 耗时: %v\n", queryDuration)
fmt.Printf(" 吞吐量: %.0f query/s\n\n", float64(totalQueries.Load())/queryDuration.Seconds())
// ===== 场景 3: 混合读写 =====
fmt.Println("场景 3: 混合读写测试")
fmt.Println(" - 3 个写入 goroutine每个 topic 一个),每个写入 1000 条消息")
fmt.Println(" - 10 个查询 goroutine每个执行 200 次查询")
fmt.Println(" - 同时进行")
fmt.Println()
startTime = time.Now()
var mixWg sync.WaitGroup
var mixWriteCount atomic.Int64
var mixQueryCount atomic.Int64
// 启动写入 goroutine每个 topic 一个专用 goroutine
for _, topic := range topics {
mixWg.Add(1)
go func(topicName string) {
defer mixWg.Done()
for j := range 1000 {
data := fmt.Sprintf("mix-%s-msg-%d", topicName, j)
if _, err := seq.Write(topicName, []byte(data)); err == nil {
mixWriteCount.Add(1)
}
// 稍微降低写入速率
time.Sleep(100 * time.Microsecond)
}
}(topic)
}
// 启动查询 goroutine
for i := range 10 {
mixWg.Add(1)
go func(queryID int) {
defer mixWg.Done()
for j := range 200 {
topic := topics[j%len(topics)]
processor, err := seq.GetProcessor(topic)
if err != nil {
continue
}
count := processor.GetRecordCount()
if count > 0 {
if _, err := processor.QueryNewest(count-1, 5); err == nil {
mixQueryCount.Add(1)
}
}
time.Sleep(200 * time.Microsecond)
}
}(i)
}
mixWg.Wait()
mixDuration := time.Since(startTime)
fmt.Printf("混合操作完成:\n")
fmt.Printf(" 写入: %d 条消息\n", mixWriteCount.Load())
fmt.Printf(" 查询: %d 次\n", mixQueryCount.Load())
fmt.Printf(" 耗时: %v\n\n", mixDuration)
// ===== 场景 4: 持续压测 =====
fmt.Println("场景 4: 持续压测(运行 4 分钟)")
fmt.Println(" - 3 个写入 goroutine 持续写入")
fmt.Println(" - 5 个查询 goroutine 持续查询")
fmt.Println(" - 实时显示进度")
fmt.Println()
stressTestDuration := 4 * time.Minute
stressTestStart := time.Now()
stressCtx, stressCancel := context.WithTimeout(context.Background(), stressTestDuration)
defer stressCancel()
var stressWriteCount atomic.Int64
var stressQueryCount atomic.Int64
var stressWg sync.WaitGroup
// 持续写入 goroutine
for _, topic := range topics {
stressWg.Add(1)
go func(topicName string) {
defer stressWg.Done()
msgCounter := 0
for {
select {
case <-stressCtx.Done():
return
default:
data := fmt.Sprintf("stress-%s-msg-%d", topicName, msgCounter)
if _, err := seq.Write(topicName, []byte(data)); err == nil {
stressWriteCount.Add(1)
msgCounter++
}
time.Sleep(10 * time.Millisecond) // 控制写入速率
}
}
}(topic)
}
// 持续查询 goroutine
for i := range 5 {
stressWg.Add(1)
go func(queryID int) {
defer stressWg.Done()
for {
select {
case <-stressCtx.Done():
return
default:
topic := topics[queryID%len(topics)]
processor, err := seq.GetProcessor(topic)
if err != nil {
continue
}
count := processor.GetRecordCount()
if count > 10 {
if _, err := processor.QueryNewest(count-1, 10); err == nil {
stressQueryCount.Add(1)
}
}
time.Sleep(20 * time.Millisecond) // 控制查询速率
}
}
}(i)
}
// 进度显示 goroutine
stressWg.Go(func() {
defer stressWg.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-stressCtx.Done():
return
case <-ticker.C:
elapsed := time.Since(stressTestStart)
fmt.Printf(" [进度] 已运行 %.0f 秒 - 写入: %d 条, 查询: %d 次\n",
elapsed.Seconds(),
stressWriteCount.Load(),
stressQueryCount.Load())
}
}
})
stressWg.Wait()
stressDuration := time.Since(stressTestStart)
fmt.Printf("\n持续压测完成:\n")
fmt.Printf(" 运行时间: %v\n", stressDuration)
fmt.Printf(" 写入: %d 条消息\n", stressWriteCount.Load())
fmt.Printf(" 查询: %d 次\n", stressQueryCount.Load())
fmt.Printf(" 写入速率: %.0f msg/s\n", float64(stressWriteCount.Load())/stressDuration.Seconds())
fmt.Printf(" 查询速率: %.0f query/s\n\n", float64(stressQueryCount.Load())/stressDuration.Seconds())
// ===== 场景 5: 统计信息汇总 =====
fmt.Println("场景 5: 统计信息汇总")
fmt.Println()
var totalWriteCount, totalWriteBytes, totalProcessedCount, totalErrorCount int64
for _, topic := range topics {
processor, _ := seq.GetProcessor(topic)
stats := processor.GetStats()
fmt.Printf("Topic [%s]:\n", topic)
fmt.Printf(" 记录总数: %d\n", processor.GetRecordCount())
fmt.Printf(" 写入: %d 条, %d 字节\n", stats.WriteCount, stats.WriteBytes)
fmt.Printf(" 已处理: %d 条\n", stats.ProcessedCount)
fmt.Printf(" 处理错误: %d 次\n", stats.ErrorCount)
if !stats.LastWriteTime.IsZero() {
fmt.Printf(" 最后写入: %v\n", stats.LastWriteTime.Format("15:04:05"))
}
fmt.Println()
totalWriteCount += stats.WriteCount
totalWriteBytes += stats.WriteBytes
totalProcessedCount += stats.ProcessedCount
totalErrorCount += stats.ErrorCount
}
fmt.Printf("总计:\n")
fmt.Printf(" 写入: %d 条, %d 字节\n", totalWriteCount, totalWriteBytes)
fmt.Printf(" 已处理: %d 条\n", totalProcessedCount)
fmt.Printf(" 处理错误: %d 次\n", totalErrorCount)
fmt.Println()
// 等待所有消息处理完成
fmt.Println("等待消息处理...")
maxWait := 5 * time.Second
checkInterval := 100 * time.Millisecond
elapsed := time.Duration(0)
for elapsed < maxWait {
processed := processedCount.Load()
if processed >= totalWriteCount {
break
}
time.Sleep(checkInterval)
elapsed += checkInterval
}
fmt.Printf("处理完成: %d/%d 条消息\n", processedCount.Load(), totalWriteCount)
totalDuration := time.Since(totalStartTime)
fmt.Printf("\n=== 所有测试完成 ===\n\n")
fmt.Println("场景耗时总结:")
fmt.Printf(" 场景 1 (并发写入): %v\n", writeDuration)
fmt.Printf(" 场景 2 (并发查询): %v\n", queryDuration)
fmt.Printf(" 场景 3 (混合读写): %v\n", mixDuration)
fmt.Printf(" 场景 4 (持续压测): %v\n", stressDuration)
fmt.Printf(" 总运行时间: %v (%.1f 分钟)\n", totalDuration, totalDuration.Minutes())
}