Files
seqlog/topic_processor.go
bourdon 5c028a55b3 重构:简化查询接口
- RecordQuery.QueryOldest 和 QueryNewest 不再接收 startIdx/endIdx 参数
- 查询方法返回纯 Record 列表,状态判断移到调用方
- TopicProcessor 的查询方法负责添加状态信息
- 更新所有测试文件以适配新接口
2025-10-04 00:23:09 +08:00

607 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
)
// TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口
type TopicProcessor struct {
topic string
logPath string
logger *slog.Logger
// 核心组件(聚合)
writer *LogWriter // 写入器
index *RecordIndex // 索引管理器
query *RecordQuery // 查询器
cursor *LogCursor // 游标
tailer *LogTailer // 持续处理器
// 配置和状态
handler RecordHandler
tailConfig *TailConfig
stats *TopicStats // 统计信息
eventBus *EventBus // 事件总线
// 并发控制
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
running bool
}
// TopicConfig topic 配置
type TopicConfig struct {
Handler RecordHandler // 处理函数(必填)
TailConfig *TailConfig // tail 配置,可选
}
// NewTopicProcessor 创建一个新的 topic 处理器
// 在初始化时创建所有核心组件index 在组件间共享
// handler 为必填参数,如果 config 为 nil 或 config.Handler 为 nil 会返回错误
func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *TopicConfig) (*TopicProcessor, error) {
// 验证必填参数
if config == nil || config.Handler == nil {
return nil, fmt.Errorf("config and config.Handler are required")
}
ctx, cancel := context.WithCancel(context.Background())
// 默认配置
tailConfig := &TailConfig{
PollInterval: 100 * 1000000, // 100ms
SaveInterval: 1000 * 1000000, // 1s
}
if config.TailConfig != nil {
tailConfig = config.TailConfig
}
if logger == nil {
logger = slog.Default()
}
logPath := filepath.Join(baseDir, topic+".log")
statsPath := filepath.Join(baseDir, topic+".stats")
tp := &TopicProcessor{
topic: topic,
logPath: logPath,
logger: logger,
handler: config.Handler,
tailConfig: tailConfig,
stats: NewTopicStats(statsPath),
eventBus: NewEventBus(),
ctx: ctx,
cancel: cancel,
}
// 初始化所有组件
if err := tp.initializeComponents(); err != nil {
cancel()
return nil, fmt.Errorf("failed to initialize components: %w", err)
}
return tp, nil
}
// initializeComponents 初始化所有核心组件
func (tp *TopicProcessor) initializeComponents() error {
// 1. 创建共享的索引管理器
index, err := NewRecordIndex(tp.logPath)
if err != nil {
return fmt.Errorf("create index: %w", err)
}
tp.index = index
// 2. 创建写入器(使用共享 index
writer, err := NewLogWriter(tp.logPath, tp.index)
if err != nil {
tp.index.Close()
return fmt.Errorf("create writer: %w", err)
}
tp.writer = writer
// 3. 创建查询器(使用共享 index
query, err := NewRecordQuery(tp.logPath, tp.index)
if err != nil {
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create query: %w", err)
}
tp.query = query
// 4. 创建游标(使用共享 index
cursor, err := NewCursor(tp.logPath, tp.index)
if err != nil {
tp.query.Close()
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create cursor: %w", err)
}
tp.cursor = cursor
// 5. 创建 tailerhandler 为必填,总是创建)
// 注意:只创建不启动,启动在 Start() 中进行
if err := tp.createTailer(); err != nil {
tp.cursor.Close()
tp.query.Close()
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create tailer: %w", err)
}
tp.logger.Debug("all components initialized")
return nil
}
// createTailer 创建 tailer不启动
func (tp *TopicProcessor) createTailer() error {
// 包装 handler添加统计功能和事件发布
wrappedHandler := func(rec *Record) error {
if err := tp.handler(rec); err != nil {
tp.stats.IncError()
// 发布处理错误事件
tp.eventBus.Publish(&Event{
Type: EventProcessError,
Topic: tp.topic,
Timestamp: time.Now(),
Record: rec,
Error: err,
Position: 0, // Position 在 tailer 模式下不可用
})
return err
}
// 处理成功,更新统计
tp.stats.IncProcessed(int64(len(rec.Data)))
// 发布处理成功事件
tp.eventBus.Publish(&Event{
Type: EventProcessSuccess,
Topic: tp.topic,
Timestamp: time.Now(),
Record: rec,
Position: 0, // Position 在 tailer 模式下不可用
})
return nil
}
tp.logger.Debug("creating tailer")
tailer, err := NewTailer(tp.cursor, wrappedHandler, tp.tailConfig)
if err != nil {
tp.logger.Error("failed to create tailer", "error", err)
return fmt.Errorf("failed to create tailer: %w", err)
}
tp.tailer = tailer
tp.logger.Debug("tailer created")
return nil
}
// Write 写入日志(统一接口)
func (tp *TopicProcessor) Write(data []byte) (int64, error) {
offset, err := tp.writer.Append(data)
if err != nil {
tp.logger.Error("failed to append", "error", err)
tp.stats.IncError()
// 发布写入错误事件
tp.eventBus.Publish(&Event{
Type: EventWriteError,
Topic: tp.topic,
Timestamp: time.Now(),
Error: err,
})
return 0, err
}
// 更新统计信息
tp.stats.IncWrite(int64(len(data)))
tp.logger.Debug("write success", "offset", offset, "size", len(data))
// 发布写入成功事件
tp.eventBus.Publish(&Event{
Type: EventWriteSuccess,
Topic: tp.topic,
Timestamp: time.Now(),
Position: offset,
})
return offset, nil
}
// Start 启动 tailer如果已创建
func (tp *TopicProcessor) Start() error {
tp.mu.Lock()
defer tp.mu.Unlock()
if tp.running {
return fmt.Errorf("topic processor for %s is already running", tp.topic)
}
tp.logger.Debug("starting processor")
// 重新创建 context如果之前被 cancel 了)
if tp.ctx.Err() != nil {
tp.ctx, tp.cancel = context.WithCancel(context.Background())
}
tp.running = true
// 如果 tailer 已创建,启动它
if tp.tailer != nil {
tp.logger.Debug("launching tailer goroutine")
tp.wg.Go(func() {
tp.logger.Debug("tailer goroutine started")
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
tp.logger.Error("tailer error", "error", err)
}
tp.logger.Debug("tailer goroutine finished")
})
}
// 发布启动事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStart,
Topic: tp.topic,
Timestamp: time.Now(),
})
return nil
}
// Stop 停止 tailer
func (tp *TopicProcessor) Stop() error {
tp.mu.Lock()
if !tp.running {
tp.mu.Unlock()
return nil
}
tp.logger.Debug("stopping processor")
tp.running = false
tp.cancel()
tp.mu.Unlock()
// 等待 tailer 停止
tp.wg.Wait()
tp.logger.Debug("processor stopped")
// 发布停止事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStop,
Topic: tp.topic,
Timestamp: time.Now(),
})
return nil
}
// Topic 返回 topic 名称
func (tp *TopicProcessor) Topic() string {
return tp.topic
}
// IsRunning 检查是否正在运行
func (tp *TopicProcessor) IsRunning() bool {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.running
}
// UpdateTailConfig 动态更新 tail 配置
func (tp *TopicProcessor) UpdateTailConfig(config *TailConfig) error {
tp.mu.Lock()
defer tp.mu.Unlock()
if config == nil {
return fmt.Errorf("config cannot be nil")
}
tp.tailConfig = config
// 如果 tailer 已经在运行,更新其配置
if tp.tailer != nil {
tp.tailer.UpdateConfig(*config)
}
return nil
}
// GetTailConfig 获取当前 tail 配置
func (tp *TopicProcessor) GetTailConfig() *TailConfig {
tp.mu.RLock()
defer tp.mu.RUnlock()
cfg := tp.tailConfig
return cfg
}
// GetStats 获取当前统计信息
func (tp *TopicProcessor) GetStats() Stats {
return tp.stats.Get()
}
// Query 获取共享的查询器
func (tp *TopicProcessor) Query() *RecordQuery {
return tp.query
}
// QueryOldest 从指定索引开始查询记录(向前读取)
// startIndex: 查询起始索引
// count: 查询数量
// 返回的记录包含状态信息(基于 tailer 的窗口索引),按时间顺序(索引递增方向)
func (tp *TopicProcessor) QueryOldest(startIndex, count int) ([]*RecordWithStatus, error) {
// 查询记录
records, err := tp.query.QueryOldest(startIndex, count)
if err != nil {
return nil, err
}
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
// 为每条记录添加状态
results := make([]*RecordWithStatus, len(records))
for i, rec := range records {
results[i] = &RecordWithStatus{
Record: rec,
Status: GetRecordStatus(startIndex+i, startIdx, endIdx),
}
}
return results, nil
}
// QueryNewest 从指定索引开始向后查询记录(索引递减方向)
// endIndex: 查询结束索引(最新的记录)
// count: 查询数量
// 返回的记录包含状态信息(基于 tailer 的窗口索引),按时间倒序(最新在前)
func (tp *TopicProcessor) QueryNewest(endIndex, count int) ([]*RecordWithStatus, error) {
// 查询记录
records, err := tp.query.QueryNewest(endIndex, count)
if err != nil {
return nil, err
}
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
// 为每条记录添加状态倒序endIndex, endIndex-1, ...
results := make([]*RecordWithStatus, len(records))
for i, rec := range records {
results[i] = &RecordWithStatus{
Record: rec,
Status: GetRecordStatus(endIndex-i, startIdx, endIdx),
}
}
return results, nil
}
// GetRecordCount 获取记录总数(统一接口)
func (tp *TopicProcessor) GetRecordCount() int {
return tp.index.Count()
}
// Cursor 创建一个新的游标实例(使用共享的 index
// 注意:每次调用都会创建新实例,调用者需要负责关闭
// Tailer 内部有自己的游标,不会与此冲突
func (tp *TopicProcessor) Cursor() (*LogCursor, error) {
return NewCursor(tp.logPath, tp.index)
}
// Index 获取索引管理器
func (tp *TopicProcessor) Index() *RecordIndex {
return tp.index
}
// GetProcessingIndex 获取当前处理索引(窗口开始索引)
func (tp *TopicProcessor) GetProcessingIndex() int {
tp.mu.RLock()
defer tp.mu.RUnlock()
if tp.tailer == nil {
return 0
}
return tp.tailer.GetStartIndex()
}
// GetReadIndex 获取当前读取索引(窗口结束索引)
func (tp *TopicProcessor) GetReadIndex() int {
tp.mu.RLock()
defer tp.mu.RUnlock()
if tp.tailer == nil {
return 0
}
return tp.tailer.GetEndIndex()
}
// Subscribe 订阅事件
func (tp *TopicProcessor) Subscribe(eventType EventType, listener EventListener) {
tp.eventBus.Subscribe(eventType, listener)
}
// SubscribeAll 订阅所有事件
func (tp *TopicProcessor) SubscribeAll(listener EventListener) {
tp.eventBus.SubscribeAll(listener)
}
// Unsubscribe 取消订阅
func (tp *TopicProcessor) Unsubscribe(eventType EventType) {
tp.eventBus.Unsubscribe(eventType)
}
// Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件
// 注意:必须在 Stop 之后调用
func (tp *TopicProcessor) Reset() error {
tp.mu.Lock()
defer tp.mu.Unlock()
if tp.running {
return fmt.Errorf("cannot reset while processor is running, please stop first")
}
tp.logger.Debug("resetting processor")
var errs []error
// 关闭 writer如果还未关闭
if tp.writer != nil {
if err := tp.writer.Close(); err != nil {
tp.logger.Error("failed to close writer during reset", "error", err)
errs = append(errs, fmt.Errorf("close writer: %w", err))
}
tp.writer = nil
}
// 删除日志文件
if err := os.Remove(tp.logPath); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove log file", "error", err)
errs = append(errs, fmt.Errorf("remove log file: %w", err))
}
// 删除位置文件
posFile := tp.logPath + ".pos"
if err := os.Remove(posFile); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove position file", "error", err)
errs = append(errs, fmt.Errorf("remove position file: %w", err))
}
// 删除索引文件
indexFile := tp.logPath + ".idx"
if err := os.Remove(indexFile); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove index file", "error", err)
errs = append(errs, fmt.Errorf("remove index file: %w", err))
}
// 关闭所有组件
if tp.query != nil {
tp.query.Close()
tp.query = nil
}
if tp.index != nil {
tp.index.Close()
tp.index = nil
}
// 重新初始化所有组件(已持有锁)
// 这会重新创建 index, writer, query如果有 handler 也会创建 tailer
if err := tp.initializeComponents(); err != nil {
tp.logger.Error("failed to reinitialize components", "error", err)
errs = append(errs, fmt.Errorf("reinitialize components: %w", err))
}
// 重置统计信息
if tp.stats != nil {
tp.stats.Reset()
}
tp.logger.Debug("processor reset completed")
// 发布重置事件
tp.eventBus.Publish(&Event{
Type: EventProcessorReset,
Topic: tp.topic,
Timestamp: time.Now(),
})
// 如果有多个错误,返回第一个
if len(errs) > 0 {
return errs[0]
}
return nil
}
// Close 清理 processor 的所有资源
func (tp *TopicProcessor) Close() error {
tp.mu.Lock()
defer tp.mu.Unlock()
tp.logger.Debug("closing processor")
var errs []error
// 保存统计信息
if tp.stats != nil {
if err := tp.stats.Save(); err != nil {
tp.logger.Error("failed to save stats", "error", err)
errs = append(errs, fmt.Errorf("save stats: %w", err))
}
}
// 关闭 query
if tp.query != nil {
if err := tp.query.Close(); err != nil {
tp.logger.Error("failed to close query", "error", err)
errs = append(errs, fmt.Errorf("close query: %w", err))
}
tp.query = nil
}
// 关闭 cursor如果 tailer 未启动cursor 可能还未关闭)
if tp.cursor != nil {
if err := tp.cursor.Close(); err != nil {
tp.logger.Error("failed to close cursor", "error", err)
errs = append(errs, fmt.Errorf("close cursor: %w", err))
}
tp.cursor = nil
}
// 关闭 writer
if tp.writer != nil {
if err := tp.writer.Close(); err != nil {
tp.logger.Error("failed to close writer", "error", err)
errs = append(errs, fmt.Errorf("close writer: %w", err))
}
tp.writer = nil
}
// 关闭 index最后关闭因为其他组件可能依赖它
if tp.index != nil {
if err := tp.index.Close(); err != nil {
tp.logger.Error("failed to close index", "error", err)
errs = append(errs, fmt.Errorf("close index: %w", err))
}
tp.index = nil
}
// tailer 会通过 context cancel 和 Stop() 自动关闭
tp.tailer = nil
tp.logger.Debug("processor closed")
// 如果有多个错误,返回第一个
if len(errs) > 0 {
return errs[0]
}
return nil
}