Files
seqlog/topic_processor.go
bourdon 90cc9e21c9 重构:重命名核心组件并增强查询功能
主要更改:

1. 核心重命名
   - Seqlog -> LogHub (更准确地反映其作为日志中枢的角色)
   - NewSeqlog() -> NewLogHub()
   - LogCursor -> ProcessCursor (更准确地反映其用于处理场景)
   - seqlog_manager.go -> loghub.go (文件名与结构体名对应)

2. TopicProcessor.Reset 增强
   - 如果正在运行且没有待处理的日志,会自动停止后重置
   - 如果有待处理的日志,返回详细错误(显示已处理/总记录数)
   - 简化了 LogHub.ResetTopic,移除显式 Stop 调用

3. 新增查询方法
   - TopicProcessor.QueryFromFirst(count) - 从第一条记录向索引递增方向查询
   - TopicProcessor.QueryFromLast(count) - 从最后一条记录向索引递减方向查询
   - LogHub.QueryFromFirst(topic, count)
   - LogHub.QueryFromLast(topic, count)

4. 测试覆盖
   - 添加 query_test.go - QueryFromProcessing 测试
   - 添加 TestQueryFromFirstAndLast - TopicProcessor 查询测试
   - 添加 TestLogHubQueryFromFirstAndLast - LogHub 查询测试
   - 添加 TestTopicResetWithPendingRecords - Reset 增强功能测试

5. 示例代码
   - 添加 example/get_record/ - 演示 QueryFromProcessing 用法
   - 更新所有示例以使用 LogHub 和新 API

所有测试通过 

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 13:26:21 +08:00

667 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
)
// TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口
type TopicProcessor struct {
topic string
logPath string
logger *slog.Logger
// 核心组件(聚合)
writer *LogWriter // 写入器
index *RecordIndex // 索引管理器
query *RecordQuery // 查询器
cursor *ProcessCursor // 游标
tailer *LogTailer // 持续处理器
// 配置和状态
handler RecordHandler
tailConfig *TailConfig
stats *TopicStats // 统计信息
eventBus *EventBus // 事件总线
// 并发控制
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
running bool
}
// TopicConfig topic 配置
type TopicConfig struct {
Handler RecordHandler // 处理函数(必填)
TailConfig *TailConfig // tail 配置,可选
}
// NewTopicProcessor 创建一个新的 topic 处理器
// 在初始化时创建所有核心组件index 在组件间共享
// handler 为必填参数,如果 config 为 nil 或 config.Handler 为 nil 会返回错误
func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *TopicConfig) (*TopicProcessor, error) {
// 验证必填参数
if config == nil || config.Handler == nil {
return nil, NewValidationError("config", "config and config.Handler are required", ErrInvalidConfig)
}
ctx, cancel := context.WithCancel(context.Background())
// 默认配置
tailConfig := &TailConfig{
PollInterval: 100 * 1000000, // 100ms
SaveInterval: 1000 * 1000000, // 1s
}
if config.TailConfig != nil {
tailConfig = config.TailConfig
}
if logger == nil {
logger = slog.Default()
}
logPath := filepath.Join(baseDir, topic+".log")
statsPath := filepath.Join(baseDir, topic+".stats")
tp := &TopicProcessor{
topic: topic,
logPath: logPath,
logger: logger,
handler: config.Handler,
tailConfig: tailConfig,
stats: NewTopicStats(statsPath),
eventBus: NewEventBus(),
ctx: ctx,
cancel: cancel,
}
// 初始化所有组件
if err := tp.initializeComponents(); err != nil {
cancel()
return nil, fmt.Errorf("failed to initialize components: %w", err)
}
return tp, nil
}
// initializeComponents 初始化所有核心组件
func (tp *TopicProcessor) initializeComponents() error {
// 1. 创建共享的索引管理器
index, err := NewRecordIndex(tp.logPath)
if err != nil {
return fmt.Errorf("create index: %w", err)
}
tp.index = index
// 2. 创建写入器(使用共享 index
writer, err := NewLogWriter(tp.logPath, tp.index)
if err != nil {
tp.index.Close()
return fmt.Errorf("create writer: %w", err)
}
tp.writer = writer
// 3. 创建查询器(使用共享 index
query, err := NewRecordQuery(tp.logPath, tp.index)
if err != nil {
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create query: %w", err)
}
tp.query = query
// 4. 创建游标(使用共享 index
cursor, err := NewCursor(tp.logPath, tp.index)
if err != nil {
tp.query.Close()
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create cursor: %w", err)
}
tp.cursor = cursor
// 5. 创建 tailerhandler 为必填,总是创建)
// 注意:只创建不启动,启动在 Start() 中进行
if err := tp.createTailer(); err != nil {
tp.cursor.Close()
tp.query.Close()
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create tailer: %w", err)
}
tp.logger.Debug("all components initialized")
return nil
}
// createTailer 创建 tailer不启动
func (tp *TopicProcessor) createTailer() error {
// 包装 handler添加统计功能和事件发布
wrappedHandler := func(rec *Record) error {
if err := tp.handler(rec); err != nil {
tp.stats.IncError()
// 发布处理错误事件
tp.eventBus.Publish(&Event{
Type: EventProcessError,
Topic: tp.topic,
Timestamp: time.Now(),
Record: rec,
Error: err,
Position: 0, // Position 在 tailer 模式下不可用
})
return err
}
// 处理成功,更新统计
tp.stats.IncProcessed(int64(len(rec.Data)))
// 发布处理成功事件
tp.eventBus.Publish(&Event{
Type: EventProcessSuccess,
Topic: tp.topic,
Timestamp: time.Now(),
Record: rec,
Position: 0, // Position 在 tailer 模式下不可用
})
return nil
}
tp.logger.Debug("creating tailer")
tailer, err := NewTailer(tp.cursor, wrappedHandler, tp.tailConfig)
if err != nil {
tp.logger.Error("failed to create tailer", "error", err)
return fmt.Errorf("failed to create tailer: %w", err)
}
tp.tailer = tailer
tp.logger.Debug("tailer created")
return nil
}
// Write 写入日志(统一接口)
func (tp *TopicProcessor) Write(data []byte) (int64, error) {
offset, err := tp.writer.Append(data)
if err != nil {
tp.logger.Error("failed to append", "error", err)
tp.stats.IncError()
// 发布写入错误事件
tp.eventBus.Publish(&Event{
Type: EventWriteError,
Topic: tp.topic,
Timestamp: time.Now(),
Error: err,
})
return 0, err
}
// 更新统计信息
tp.stats.IncWrite(int64(len(data)))
tp.logger.Debug("write success", "offset", offset, "size", len(data))
// 发布写入成功事件
tp.eventBus.Publish(&Event{
Type: EventWriteSuccess,
Topic: tp.topic,
Timestamp: time.Now(),
Position: offset,
})
return offset, nil
}
// Start 启动 tailer如果已创建
func (tp *TopicProcessor) Start() error {
tp.mu.Lock()
defer tp.mu.Unlock()
if tp.running {
return NewTopicError(tp.topic, "start", ErrAlreadyRunning)
}
tp.logger.Debug("starting processor")
// 重新创建 context如果之前被 cancel 了)
if tp.ctx.Err() != nil {
tp.ctx, tp.cancel = context.WithCancel(context.Background())
}
tp.running = true
// 如果 tailer 已创建,启动它
if tp.tailer != nil {
tp.logger.Debug("launching tailer goroutine")
tp.wg.Go(func() {
tp.logger.Debug("tailer goroutine started")
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
tp.logger.Error("tailer error", "error", err)
}
tp.logger.Debug("tailer goroutine finished")
})
}
// 发布启动事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStart,
Topic: tp.topic,
Timestamp: time.Now(),
})
return nil
}
// Stop 停止 tailer
func (tp *TopicProcessor) Stop() error {
tp.mu.Lock()
if !tp.running {
tp.mu.Unlock()
return nil
}
tp.logger.Debug("stopping processor")
tp.running = false
tp.cancel()
tp.mu.Unlock()
// 等待 tailer 停止
tp.wg.Wait()
tp.logger.Debug("processor stopped")
// 发布停止事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStop,
Topic: tp.topic,
Timestamp: time.Now(),
})
return nil
}
// Topic 返回 topic 名称
func (tp *TopicProcessor) Topic() string {
return tp.topic
}
// IsRunning 检查是否正在运行
func (tp *TopicProcessor) IsRunning() bool {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.running
}
// UpdateTailConfig 动态更新 tail 配置
func (tp *TopicProcessor) UpdateTailConfig(config *TailConfig) error {
tp.mu.Lock()
defer tp.mu.Unlock()
if config == nil {
return fmt.Errorf("config cannot be nil")
}
tp.tailConfig = config
// 如果 tailer 已经在运行,更新其配置
if tp.tailer != nil {
tp.tailer.UpdateConfig(*config)
}
return nil
}
// GetTailConfig 获取当前 tail 配置
func (tp *TopicProcessor) GetTailConfig() *TailConfig {
tp.mu.RLock()
defer tp.mu.RUnlock()
cfg := tp.tailConfig
return cfg
}
// GetStats 获取当前统计信息
func (tp *TopicProcessor) GetStats() Stats {
return tp.stats.Get()
}
// Query 获取共享的查询器
func (tp *TopicProcessor) Query() *RecordQuery {
return tp.query
}
// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录包含索引和状态信息,按索引递增方向排序
// 例如QueryOldest(5, 3) 查询索引 2, 3, 4不包含 5
func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) {
// 查询记录(包含索引信息)
records, err := tp.query.QueryOldest(refIndex, count)
if err != nil {
return nil, err
}
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
// 为每条记录添加状态
results := make([]*RecordWithStatus, len(records))
for i, rec := range records {
results[i] = &RecordWithStatus{
Record: rec.Record,
Index: rec.Index,
Status: GetRecordStatus(rec.Index, startIdx, endIdx),
}
}
return results, nil
}
// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录包含索引和状态信息,按索引递增方向排序
// 例如QueryNewest(5, 3) 查询索引 6, 7, 8不包含 5
func (tp *TopicProcessor) QueryNewest(refIndex, count int) ([]*RecordWithStatus, error) {
// 查询记录(包含索引信息)
records, err := tp.query.QueryNewest(refIndex, count)
if err != nil {
return nil, err
}
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
// 为每条记录添加状态
results := make([]*RecordWithStatus, len(records))
for i, rec := range records {
results[i] = &RecordWithStatus{
Record: rec.Record,
Index: rec.Index,
Status: GetRecordStatus(rec.Index, startIdx, endIdx),
}
}
return results, nil
}
// GetRecordCount 获取记录总数(统一接口)
func (tp *TopicProcessor) GetRecordCount() int {
return tp.index.Count()
}
// QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录
// count: 查询数量
// 返回从处理窗口开始位置startIndex向后的记录包含状态信息
func (tp *TopicProcessor) QueryFromProcessing(count int) ([]*RecordWithStatus, error) {
// 获取当前处理窗口的开始位置
tp.mu.RLock()
var startIdx int
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
}
tp.mu.RUnlock()
// 从 startIdx 开始向索引递增方向查询
// QueryNewest(refIndex, count) 查询从 refIndex+1 开始的记录
// 所以要从 startIdx 开始,应该调用 QueryNewest(startIdx - 1, count)
return tp.QueryNewest(startIdx-1, count)
}
// QueryFromFirst 从第一条记录向索引递增方向查询
// count: 查询数量
// 返回从第一条记录(索引 0开始的记录包含状态信息
// 例如QueryFromFirst(3) 查询索引 0, 1, 2
func (tp *TopicProcessor) QueryFromFirst(count int) ([]*RecordWithStatus, error) {
// QueryNewest(-1, count) 会从索引 0 开始向后查询
return tp.QueryNewest(-1, count)
}
// QueryFromLast 从最后一条记录向索引递减方向查询
// count: 查询数量
// 返回从最后一条记录开始向前的记录,包含状态信息,按索引递增方向排序
// 例如:如果总共有 10 条记录QueryFromLast(3) 查询索引 7, 8, 9
func (tp *TopicProcessor) QueryFromLast(count int) ([]*RecordWithStatus, error) {
// 获取记录总数
totalCount := tp.index.Count()
// QueryOldest(totalCount, count) 会从最后一条记录开始向前查询
return tp.QueryOldest(totalCount, count)
}
// GetProcessingIndex 获取当前处理索引(窗口开始索引)
func (tp *TopicProcessor) GetProcessingIndex() int {
tp.mu.RLock()
defer tp.mu.RUnlock()
if tp.tailer == nil {
return 0
}
return tp.tailer.GetStartIndex()
}
// GetReadIndex 获取当前读取索引(窗口结束索引)
func (tp *TopicProcessor) GetReadIndex() int {
tp.mu.RLock()
defer tp.mu.RUnlock()
if tp.tailer == nil {
return 0
}
return tp.tailer.GetEndIndex()
}
// Subscribe 订阅事件
func (tp *TopicProcessor) Subscribe(eventType EventType, listener EventListener) {
tp.eventBus.Subscribe(eventType, listener)
}
// SubscribeAll 订阅所有事件
func (tp *TopicProcessor) SubscribeAll(listener EventListener) {
tp.eventBus.SubscribeAll(listener)
}
// Unsubscribe 取消订阅
func (tp *TopicProcessor) Unsubscribe(eventType EventType) {
tp.eventBus.Unsubscribe(eventType)
}
// Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件
// 如果 processor 正在运行且没有待处理的日志,会自动停止
// 如果有待处理的日志,则返回错误,需要等待处理完成或手动停止
func (tp *TopicProcessor) Reset() error {
tp.mu.Lock()
defer tp.mu.Unlock()
if tp.running {
// 检查是否有待处理的日志
processingIndex := 0
if tp.tailer != nil {
processingIndex = tp.tailer.GetStartIndex()
}
recordCount := tp.index.Count()
hasPendingRecords := processingIndex < recordCount
if hasPendingRecords {
return fmt.Errorf("cannot reset while processor is running with pending records (%d/%d processed), please stop first or wait for processing to complete", processingIndex, recordCount)
}
// 没有待处理的日志,自动停止
tp.logger.Debug("auto-stopping processor before reset (no pending records)")
tp.running = false
tp.cancel()
// 释放锁以等待 tailer 停止
tp.mu.Unlock()
tp.wg.Wait()
tp.mu.Lock()
// 发布停止事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStop,
Topic: tp.topic,
Timestamp: time.Now(),
})
}
tp.logger.Debug("resetting processor")
var errs []error
// 关闭 writer如果还未关闭
if tp.writer != nil {
if err := tp.writer.Close(); err != nil {
tp.logger.Error("failed to close writer during reset", "error", err)
errs = append(errs, fmt.Errorf("close writer: %w", err))
}
tp.writer = nil
}
// 删除日志文件
if err := os.Remove(tp.logPath); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove log file", "error", err)
errs = append(errs, fmt.Errorf("remove log file: %w", err))
}
// 删除位置文件
posFile := tp.logPath + ".pos"
if err := os.Remove(posFile); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove position file", "error", err)
errs = append(errs, fmt.Errorf("remove position file: %w", err))
}
// 删除索引文件
indexFile := tp.logPath + ".idx"
if err := os.Remove(indexFile); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove index file", "error", err)
errs = append(errs, fmt.Errorf("remove index file: %w", err))
}
// 关闭所有组件
if tp.query != nil {
tp.query.Close()
tp.query = nil
}
if tp.index != nil {
tp.index.Close()
tp.index = nil
}
// 重新初始化所有组件(已持有锁)
// 这会重新创建 index, writer, query如果有 handler 也会创建 tailer
if err := tp.initializeComponents(); err != nil {
tp.logger.Error("failed to reinitialize components", "error", err)
errs = append(errs, fmt.Errorf("reinitialize components: %w", err))
}
// 重置统计信息
if tp.stats != nil {
tp.stats.Reset()
}
tp.logger.Debug("processor reset completed")
// 发布重置事件
tp.eventBus.Publish(&Event{
Type: EventProcessorReset,
Topic: tp.topic,
Timestamp: time.Now(),
})
// 如果有多个错误,返回第一个
if len(errs) > 0 {
return errs[0]
}
return nil
}
// Close 清理 processor 的所有资源
func (tp *TopicProcessor) Close() error {
tp.mu.Lock()
defer tp.mu.Unlock()
tp.logger.Debug("closing processor")
var errs []error
// 保存统计信息
if tp.stats != nil {
if err := tp.stats.Save(); err != nil {
tp.logger.Error("failed to save stats", "error", err)
errs = append(errs, fmt.Errorf("save stats: %w", err))
}
}
// 关闭 query
if tp.query != nil {
if err := tp.query.Close(); err != nil {
tp.logger.Error("failed to close query", "error", err)
errs = append(errs, fmt.Errorf("close query: %w", err))
}
tp.query = nil
}
// 关闭 cursor如果 tailer 未启动cursor 可能还未关闭)
if tp.cursor != nil {
if err := tp.cursor.Close(); err != nil {
tp.logger.Error("failed to close cursor", "error", err)
errs = append(errs, fmt.Errorf("close cursor: %w", err))
}
tp.cursor = nil
}
// 关闭 writer
if tp.writer != nil {
if err := tp.writer.Close(); err != nil {
tp.logger.Error("failed to close writer", "error", err)
errs = append(errs, fmt.Errorf("close writer: %w", err))
}
tp.writer = nil
}
// 关闭 index最后关闭因为其他组件可能依赖它
if tp.index != nil {
if err := tp.index.Close(); err != nil {
tp.logger.Error("failed to close index", "error", err)
errs = append(errs, fmt.Errorf("close index: %w", err))
}
tp.index = nil
}
// tailer 会通过 context cancel 和 Stop() 自动关闭
tp.tailer = nil
tp.logger.Debug("processor closed")
// 如果有多个错误,返回第一个
if len(errs) > 0 {
return errs[0]
}
return nil
}