为所有核心组件添加 Reset() 方法: - LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变 - RecordIndex.Reset(): 清空索引数据并重新创建索引文件 - RecordQuery.Reset(): 关闭并重新打开日志文件 - ProcessCursor.Reset(): 删除位置文件并重置游标位置 - LogTailer.Reset(): 重置内部 channel 状态 优化 TopicProcessor.Reset() 实现: - 不再销毁和重建组件对象 - 通过调用各组件的 Reset() 方法重置状态 - 保持组件间引用关系稳定 - 减少代码行数约 20 行 - 避免空指针风险和内存分配开销 代码改进: - LogWriter 添加 path 字段用于重置 - 移除 topic_processor.go 中未使用的 os import - 职责分离更清晰,每个组件管理自己的重置逻辑 测试结果: - TestTopicReset: PASS - TestTopicResetWithPendingRecords: PASS - 所有 TopicProcessor 相关测试通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
993 lines
26 KiB
Go
993 lines
26 KiB
Go
package seqlog
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log/slog"
|
||
"path/filepath"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// ProcessorState 处理器状态
|
||
type ProcessorState int
|
||
|
||
const (
|
||
StateIdle ProcessorState = iota // 空闲(未启动)
|
||
StateStarting // 启动中
|
||
StateRunning // 运行中
|
||
StateStopping // 停止中
|
||
StateStopped // 已停止
|
||
StateResetting // 重置中(阻止所有操作)
|
||
StateError // 错误状态
|
||
)
|
||
|
||
// String 返回状态的字符串表示
|
||
func (s ProcessorState) String() string {
|
||
switch s {
|
||
case StateIdle:
|
||
return "idle"
|
||
case StateStarting:
|
||
return "starting"
|
||
case StateRunning:
|
||
return "running"
|
||
case StateStopping:
|
||
return "stopping"
|
||
case StateStopped:
|
||
return "stopped"
|
||
case StateResetting:
|
||
return "resetting"
|
||
case StateError:
|
||
return "error"
|
||
default:
|
||
return "unknown"
|
||
}
|
||
}
|
||
|
||
// CanWrite 判断当前状态是否允许写入
|
||
func (s ProcessorState) CanWrite() bool {
|
||
// 允许在 Idle、Starting、Running 状态下写入
|
||
// 不允许在 Stopping、Stopped、Resetting、Error 状态下写入
|
||
return s == StateIdle || s == StateStarting || s == StateRunning
|
||
}
|
||
|
||
// CanQuery 判断当前状态是否允许查询
|
||
func (s ProcessorState) CanQuery() bool {
|
||
return s != StateResetting
|
||
}
|
||
|
||
// CanProcess 判断当前状态是否允许处理
|
||
func (s ProcessorState) CanProcess() bool {
|
||
return s == StateRunning
|
||
}
|
||
|
||
// ProcessorStatus 处理器状态信息
|
||
type ProcessorStatus struct {
|
||
State ProcessorState // 当前状态
|
||
LastUpdated time.Time // 最后更新时间
|
||
Error error // 错误信息(仅在 StateError 时有效)
|
||
}
|
||
|
||
// TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口
|
||
type TopicProcessor struct {
|
||
topic string
|
||
title string // 显示标题,用于 UI 展示
|
||
logPath string
|
||
logger *slog.Logger
|
||
|
||
// 核心组件(聚合)
|
||
writer *LogWriter // 写入器
|
||
index *RecordIndex // 索引管理器
|
||
query *RecordQuery // 查询器
|
||
cursor *ProcessCursor // 游标
|
||
tailer *LogTailer // 持续处理器
|
||
|
||
// 配置和状态
|
||
handler RecordHandler
|
||
tailConfig *TailConfig
|
||
stats *TopicStats // 统计信息
|
||
eventBus *EventBus // 事件总线
|
||
status ProcessorStatus // 处理器状态
|
||
|
||
// 并发控制
|
||
mu sync.RWMutex
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
// TopicConfig topic 配置
|
||
type TopicConfig struct {
|
||
Title string // 显示标题,可选,默认为 topic 名称
|
||
Handler RecordHandler // 处理函数(必填)
|
||
TailConfig *TailConfig // tail 配置,可选
|
||
}
|
||
|
||
// NewTopicProcessor 创建一个新的 topic 处理器
|
||
// 在初始化时创建所有核心组件,index 在组件间共享
|
||
// handler 为必填参数,如果 config 为 nil 或 config.Handler 为 nil 会返回错误
|
||
func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *TopicConfig) (*TopicProcessor, error) {
|
||
// 验证必填参数
|
||
if config == nil || config.Handler == nil {
|
||
return nil, NewValidationError("config", "config and config.Handler are required", ErrInvalidConfig)
|
||
}
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
|
||
// 默认配置
|
||
tailConfig := &TailConfig{
|
||
PollInterval: 100 * 1000000, // 100ms
|
||
SaveInterval: 1000 * 1000000, // 1s
|
||
}
|
||
|
||
if config.TailConfig != nil {
|
||
tailConfig = config.TailConfig
|
||
}
|
||
|
||
if logger == nil {
|
||
logger = slog.Default()
|
||
}
|
||
|
||
logPath := filepath.Join(baseDir, topic+".log")
|
||
statsPath := filepath.Join(baseDir, topic+".stats")
|
||
|
||
// 设置 title,如果未提供则使用 topic 名称
|
||
title := config.Title
|
||
if title == "" {
|
||
title = topic
|
||
}
|
||
|
||
tp := &TopicProcessor{
|
||
topic: topic,
|
||
title: title,
|
||
logPath: logPath,
|
||
logger: logger,
|
||
handler: config.Handler,
|
||
tailConfig: tailConfig,
|
||
stats: NewTopicStats(statsPath),
|
||
eventBus: NewEventBus(),
|
||
status: ProcessorStatus{
|
||
State: StateIdle,
|
||
LastUpdated: time.Now(),
|
||
},
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
}
|
||
|
||
// 初始化所有组件
|
||
if err := tp.initializeComponents(); err != nil {
|
||
cancel()
|
||
return nil, fmt.Errorf("failed to initialize components: %w", err)
|
||
}
|
||
|
||
return tp, nil
|
||
}
|
||
|
||
// initializeComponents 初始化所有核心组件
|
||
func (tp *TopicProcessor) initializeComponents() error {
|
||
// 1. 创建共享的索引管理器
|
||
index, err := NewRecordIndex(tp.logPath)
|
||
if err != nil {
|
||
return fmt.Errorf("create index: %w", err)
|
||
}
|
||
tp.index = index
|
||
|
||
// 2. 创建写入器(使用共享 index)
|
||
writer, err := NewLogWriter(tp.logPath, tp.index)
|
||
if err != nil {
|
||
tp.index.Close()
|
||
return fmt.Errorf("create writer: %w", err)
|
||
}
|
||
tp.writer = writer
|
||
|
||
// 3. 创建查询器(使用共享 index)
|
||
query, err := NewRecordQuery(tp.logPath, tp.index, tp.writer)
|
||
if err != nil {
|
||
tp.writer.Close()
|
||
tp.index.Close()
|
||
return fmt.Errorf("create query: %w", err)
|
||
}
|
||
tp.query = query
|
||
|
||
// 4. 创建游标(使用共享 index 和 writer)
|
||
cursor, err := NewCursor(tp.logPath, tp.index, tp.writer)
|
||
if err != nil {
|
||
tp.query.Close()
|
||
tp.writer.Close()
|
||
tp.index.Close()
|
||
return fmt.Errorf("create cursor: %w", err)
|
||
}
|
||
tp.cursor = cursor
|
||
|
||
// 5. 创建 tailer(handler 为必填,总是创建)
|
||
// 注意:只创建不启动,启动在 Start() 中进行
|
||
if err := tp.createTailer(); err != nil {
|
||
tp.cursor.Close()
|
||
tp.query.Close()
|
||
tp.writer.Close()
|
||
tp.index.Close()
|
||
return fmt.Errorf("create tailer: %w", err)
|
||
}
|
||
|
||
tp.logger.Debug("all components initialized")
|
||
return nil
|
||
}
|
||
|
||
// createTailer 创建 tailer(不启动)
|
||
func (tp *TopicProcessor) createTailer() error {
|
||
// 包装 handler,添加统计功能和事件发布
|
||
wrappedHandler := func(rec *Record) error {
|
||
if err := tp.handler(rec); err != nil {
|
||
tp.stats.IncError()
|
||
|
||
// 发布处理错误事件
|
||
tp.eventBus.Publish(&Event{
|
||
Type: EventProcessError,
|
||
Topic: tp.topic,
|
||
Timestamp: time.Now(),
|
||
Record: rec,
|
||
Error: err,
|
||
Position: 0, // Position 在 tailer 模式下不可用
|
||
})
|
||
|
||
return err
|
||
}
|
||
|
||
// 处理成功,更新统计
|
||
tp.stats.IncProcessed(int64(len(rec.Data)))
|
||
|
||
// 发布处理成功事件
|
||
tp.eventBus.Publish(&Event{
|
||
Type: EventProcessSuccess,
|
||
Topic: tp.topic,
|
||
Timestamp: time.Now(),
|
||
Record: rec,
|
||
Position: 0, // Position 在 tailer 模式下不可用
|
||
})
|
||
|
||
return nil
|
||
}
|
||
|
||
tp.logger.Debug("creating tailer")
|
||
tailer, err := NewTailer(tp.cursor, wrappedHandler, tp.tailConfig)
|
||
if err != nil {
|
||
tp.logger.Error("failed to create tailer", "error", err)
|
||
return fmt.Errorf("failed to create tailer: %w", err)
|
||
}
|
||
|
||
tp.tailer = tailer
|
||
tp.logger.Debug("tailer created")
|
||
return nil
|
||
}
|
||
|
||
// Write 写入日志(统一接口)
|
||
func (tp *TopicProcessor) Write(data []byte) (int64, error) {
|
||
// 检查状态
|
||
tp.mu.RLock()
|
||
state := tp.status.State
|
||
tp.mu.RUnlock()
|
||
|
||
if !state.CanWrite() {
|
||
if state == StateResetting {
|
||
return 0, ErrProcessorResetting
|
||
}
|
||
return 0, ErrNotRunning
|
||
}
|
||
|
||
offset, err := tp.writer.Append(data)
|
||
if err != nil {
|
||
tp.logger.Error("failed to append", "error", err)
|
||
tp.stats.IncError()
|
||
|
||
// 发布写入错误事件
|
||
tp.eventBus.Publish(&Event{
|
||
Type: EventWriteError,
|
||
Topic: tp.topic,
|
||
Timestamp: time.Now(),
|
||
Error: err,
|
||
})
|
||
|
||
return 0, err
|
||
}
|
||
|
||
// 更新统计信息
|
||
tp.stats.IncWrite(int64(len(data)))
|
||
|
||
tp.logger.Debug("write success", "offset", offset, "size", len(data))
|
||
|
||
// 发布写入成功事件
|
||
tp.eventBus.Publish(&Event{
|
||
Type: EventWriteSuccess,
|
||
Topic: tp.topic,
|
||
Timestamp: time.Now(),
|
||
Position: offset,
|
||
})
|
||
|
||
return offset, nil
|
||
}
|
||
|
||
// Start 启动 tailer(如果已创建)
|
||
func (tp *TopicProcessor) Start() error {
|
||
tp.mu.Lock()
|
||
defer tp.mu.Unlock()
|
||
|
||
// 检查状态
|
||
if tp.status.State != StateIdle && tp.status.State != StateStopped {
|
||
return NewTopicError(tp.topic, "start", ErrInvalidState)
|
||
}
|
||
|
||
tp.logger.Debug("starting processor")
|
||
|
||
// 设置为启动中状态
|
||
tp.setState(StateStarting, nil)
|
||
|
||
// 重新创建 context(如果之前被 cancel 了)
|
||
if tp.ctx.Err() != nil {
|
||
tp.ctx, tp.cancel = context.WithCancel(context.Background())
|
||
}
|
||
|
||
// 启动定期保存统计信息的 goroutine
|
||
tp.wg.Add(1)
|
||
go func() {
|
||
defer tp.wg.Done()
|
||
ticker := time.NewTicker(tp.tailConfig.SaveInterval)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-tp.ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
if err := tp.stats.Save(); err != nil {
|
||
tp.logger.Error("failed to save stats", "error", err)
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 如果 tailer 已创建,启动它
|
||
tp.logger.Debug("launching tailer goroutine")
|
||
tp.wg.Add(1)
|
||
go func() {
|
||
defer tp.wg.Done()
|
||
tp.logger.Debug("tailer goroutine started")
|
||
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
|
||
tp.logger.Error("tailer error", "error", err)
|
||
}
|
||
tp.logger.Debug("tailer goroutine finished")
|
||
}()
|
||
|
||
// 设置为运行中状态
|
||
tp.setState(StateRunning, nil)
|
||
|
||
// 发布启动事件
|
||
tp.eventBus.Publish(&Event{
|
||
Type: EventProcessorStart,
|
||
Topic: tp.topic,
|
||
Timestamp: time.Now(),
|
||
})
|
||
|
||
return nil
|
||
}
|
||
|
||
// Stop 停止 tailer
|
||
func (tp *TopicProcessor) Stop() error {
|
||
tp.mu.Lock()
|
||
if tp.status.State != StateRunning {
|
||
tp.mu.Unlock()
|
||
return nil
|
||
}
|
||
tp.logger.Debug("stopping processor")
|
||
|
||
// 设置为停止中状态
|
||
tp.setState(StateStopping, nil)
|
||
tp.cancel()
|
||
tp.mu.Unlock()
|
||
|
||
// 等待 tailer 停止
|
||
tp.wg.Wait()
|
||
|
||
tp.logger.Debug("processor stopped")
|
||
|
||
// 设置为已停止状态
|
||
tp.mu.Lock()
|
||
tp.setState(StateStopped, nil)
|
||
tp.mu.Unlock()
|
||
|
||
// 发布停止事件
|
||
tp.eventBus.Publish(&Event{
|
||
Type: EventProcessorStop,
|
||
Topic: tp.topic,
|
||
Timestamp: time.Now(),
|
||
})
|
||
|
||
return nil
|
||
}
|
||
|
||
// Topic 返回 topic 名称
|
||
func (tp *TopicProcessor) Topic() string {
|
||
return tp.topic
|
||
}
|
||
|
||
// Title 返回显示标题
|
||
func (tp *TopicProcessor) Title() string {
|
||
return tp.title
|
||
}
|
||
|
||
// GetState 获取当前状态
|
||
func (tp *TopicProcessor) GetState() ProcessorState {
|
||
tp.mu.RLock()
|
||
defer tp.mu.RUnlock()
|
||
return tp.status.State
|
||
}
|
||
|
||
// GetStatus 获取完整状态信息
|
||
func (tp *TopicProcessor) GetStatus() ProcessorStatus {
|
||
tp.mu.RLock()
|
||
defer tp.mu.RUnlock()
|
||
return tp.status
|
||
}
|
||
|
||
// setState 设置状态(需持有锁)
|
||
func (tp *TopicProcessor) setState(state ProcessorState, err error) {
|
||
tp.status.State = state
|
||
tp.status.LastUpdated = time.Now()
|
||
tp.status.Error = err
|
||
|
||
// 发布状态变更事件
|
||
tp.eventBus.Publish(&Event{
|
||
Type: EventStateChanged,
|
||
Topic: tp.topic,
|
||
Timestamp: time.Now(),
|
||
Error: err,
|
||
})
|
||
}
|
||
|
||
// IsRunning 检查是否正在运行
|
||
func (tp *TopicProcessor) IsRunning() bool {
|
||
return tp.GetState() == StateRunning
|
||
}
|
||
|
||
// UpdateTailConfig 动态更新 tail 配置
|
||
func (tp *TopicProcessor) UpdateTailConfig(config *TailConfig) error {
|
||
tp.mu.Lock()
|
||
defer tp.mu.Unlock()
|
||
|
||
if config == nil {
|
||
return fmt.Errorf("config cannot be nil")
|
||
}
|
||
|
||
tp.tailConfig = config
|
||
|
||
// 如果 tailer 已经在运行,更新其配置
|
||
if tp.tailer != nil {
|
||
tp.tailer.UpdateConfig(*config)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetTailConfig 获取当前 tail 配置
|
||
func (tp *TopicProcessor) GetTailConfig() *TailConfig {
|
||
tp.mu.RLock()
|
||
defer tp.mu.RUnlock()
|
||
cfg := tp.tailConfig
|
||
return cfg
|
||
}
|
||
|
||
// GetStats 获取当前统计信息
|
||
func (tp *TopicProcessor) GetStats() Stats {
|
||
return tp.stats.Get()
|
||
}
|
||
|
||
// Query 获取共享的查询器
|
||
func (tp *TopicProcessor) Query() (*RecordQuery, error) {
|
||
tp.mu.RLock()
|
||
defer tp.mu.RUnlock()
|
||
|
||
if !tp.status.State.CanQuery() {
|
||
return nil, ErrProcessorResetting
|
||
}
|
||
|
||
return tp.query, nil
|
||
}
|
||
|
||
// addStatusToRecords 为记录添加状态信息(辅助方法)
|
||
func (tp *TopicProcessor) addStatusToRecords(records []*RecordWithIndex) []*RecordWithStatus {
|
||
// 获取窗口索引范围(用于状态判断)
|
||
var startIdx, endIdx int
|
||
tp.mu.RLock()
|
||
if tp.tailer != nil {
|
||
startIdx = tp.tailer.GetStartIndex()
|
||
endIdx = tp.tailer.GetEndIndex()
|
||
}
|
||
tp.mu.RUnlock()
|
||
|
||
// 为每条记录添加状态
|
||
results := make([]*RecordWithStatus, len(records))
|
||
for i, rec := range records {
|
||
results[i] = &RecordWithStatus{
|
||
Record: rec.Record,
|
||
Index: rec.Index,
|
||
Status: GetRecordStatus(rec.Index, startIdx, endIdx),
|
||
}
|
||
}
|
||
|
||
return results
|
||
}
|
||
|
||
// addStatusToMetadata 为元数据添加状态信息
|
||
func (tp *TopicProcessor) addStatusToMetadata(metadata []*RecordMetadata) []*RecordMetadataWithStatus {
|
||
// 获取窗口索引范围(用于状态判断)
|
||
var startIdx, endIdx int
|
||
tp.mu.RLock()
|
||
if tp.tailer != nil {
|
||
startIdx = tp.tailer.GetStartIndex()
|
||
endIdx = tp.tailer.GetEndIndex()
|
||
}
|
||
tp.mu.RUnlock()
|
||
|
||
// 为每个元数据添加状态
|
||
results := make([]*RecordMetadataWithStatus, len(metadata))
|
||
for i, meta := range metadata {
|
||
results[i] = &RecordMetadataWithStatus{
|
||
Metadata: meta,
|
||
Status: GetRecordStatus(meta.Index, startIdx, endIdx),
|
||
}
|
||
}
|
||
|
||
return results
|
||
}
|
||
|
||
// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录)
|
||
// refIndex: 参考索引位置
|
||
// count: 查询数量
|
||
// 返回的记录包含索引和状态信息,按索引递增方向排序
|
||
// 例如:QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5)
|
||
func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) {
|
||
records, err := tp.query.QueryOldest(refIndex, count)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return tp.addStatusToRecords(records), nil
|
||
}
|
||
|
||
// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录)
|
||
// refIndex: 参考索引位置
|
||
// count: 查询数量
|
||
// 返回的记录包含索引和状态信息,按索引递增方向排序
|
||
// 例如:QueryNewest(5, 3) 查询索引 6, 7, 8(不包含 5)
|
||
func (tp *TopicProcessor) QueryNewest(refIndex, count int) ([]*RecordWithStatus, error) {
|
||
records, err := tp.query.QueryNewest(refIndex, count)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return tp.addStatusToRecords(records), nil
|
||
}
|
||
|
||
// QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据)
|
||
// refIndex: 参考索引位置
|
||
// count: 查询数量
|
||
// 返回的记录只包含元数据信息(索引、UUID、数据大小),按索引递增方向排序
|
||
// 例如:QueryOldestMetadata(5, 3) 查询索引 2, 3, 4(不包含 5)
|
||
func (tp *TopicProcessor) QueryOldestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) {
|
||
metadata, err := tp.query.QueryOldestMetadata(refIndex, count)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return tp.addStatusToMetadata(metadata), nil
|
||
}
|
||
|
||
// QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据)
|
||
// refIndex: 参考索引位置
|
||
// count: 查询数量
|
||
// 返回的记录只包含元数据信息(索引、UUID、数据大小),按索引递增方向排序
|
||
// 例如:QueryNewestMetadata(5, 3) 查询索引 6, 7, 8(不包含 5)
|
||
func (tp *TopicProcessor) QueryNewestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) {
|
||
metadata, err := tp.query.QueryNewestMetadata(refIndex, count)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return tp.addStatusToMetadata(metadata), nil
|
||
}
|
||
|
||
// QueryByIndex 根据索引查询单条记录的完整数据
|
||
// index: 记录索引
|
||
// 返回完整的记录数据,包含状态信息
|
||
func (tp *TopicProcessor) QueryByIndex(index int) (*RecordWithStatus, error) {
|
||
record, err := tp.query.QueryByIndex(index)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 获取当前处理窗口位置
|
||
var startIdx, endIdx int
|
||
tp.mu.RLock()
|
||
if tp.tailer != nil {
|
||
startIdx = tp.tailer.GetStartIndex()
|
||
endIdx = tp.tailer.GetEndIndex()
|
||
}
|
||
tp.mu.RUnlock()
|
||
|
||
status := GetRecordStatus(index, startIdx, endIdx)
|
||
return &RecordWithStatus{
|
||
Record: record,
|
||
Index: index,
|
||
Status: status,
|
||
}, nil
|
||
}
|
||
|
||
// GetRecordCount 获取记录总数(统一接口)
|
||
func (tp *TopicProcessor) GetRecordCount() int {
|
||
return tp.index.Count()
|
||
}
|
||
|
||
// QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录
|
||
// count: 查询数量
|
||
// 返回从处理窗口开始位置(startIndex)向后的记录,包含状态信息
|
||
func (tp *TopicProcessor) QueryFromProcessing(count int) ([]*RecordWithStatus, error) {
|
||
// 获取当前处理窗口的开始位置
|
||
tp.mu.RLock()
|
||
var startIdx int
|
||
if tp.tailer != nil {
|
||
startIdx = tp.tailer.GetStartIndex()
|
||
}
|
||
tp.mu.RUnlock()
|
||
|
||
// 从 startIdx 开始向索引递增方向查询
|
||
// QueryNewest(refIndex, count) 查询从 refIndex+1 开始的记录
|
||
// 所以要从 startIdx 开始,应该调用 QueryNewest(startIdx - 1, count)
|
||
return tp.QueryNewest(startIdx-1, count)
|
||
}
|
||
|
||
// QueryFromFirst 从第一条记录向索引递增方向查询
|
||
// count: 查询数量
|
||
// 返回从第一条记录(索引 0)开始的记录,包含状态信息
|
||
// 例如:QueryFromFirst(3) 查询索引 0, 1, 2
|
||
func (tp *TopicProcessor) QueryFromFirst(count int) ([]*RecordWithStatus, error) {
|
||
// QueryNewest(-1, count) 会从索引 0 开始向后查询
|
||
return tp.QueryNewest(-1, count)
|
||
}
|
||
|
||
// QueryFromLast 从最后一条记录向索引递减方向查询
|
||
// count: 查询数量
|
||
// 返回从最后一条记录开始向前的记录,包含状态信息,按索引递增方向排序
|
||
// 例如:如果总共有 10 条记录,QueryFromLast(3) 查询索引 7, 8, 9
|
||
func (tp *TopicProcessor) QueryFromLast(count int) ([]*RecordWithStatus, error) {
|
||
// 获取记录总数
|
||
totalCount := tp.index.Count()
|
||
|
||
// 如果没有记录,返回空数组
|
||
if totalCount == 0 {
|
||
return []*RecordWithStatus{}, nil
|
||
}
|
||
|
||
// QueryOldest(totalCount, count) 会从最后一条记录开始向前查询
|
||
// totalCount 是记录总数,有效索引是 0 到 totalCount-1
|
||
// 所以传入 totalCount 作为 refIndex,会查询 totalCount-count 到 totalCount-1 的记录
|
||
return tp.QueryOldest(totalCount, count)
|
||
}
|
||
|
||
// QueryFromFirstMetadata 从第一条记录向索引递增方向查询元数据
|
||
// count: 查询数量
|
||
// 返回从第一条记录(索引 0)开始的记录元数据,包含状态信息
|
||
// 例如:QueryFromFirstMetadata(3) 查询索引 0, 1, 2 的元数据
|
||
func (tp *TopicProcessor) QueryFromFirstMetadata(count int) ([]*RecordMetadataWithStatus, error) {
|
||
// QueryNewestMetadata(-1, count) 会从索引 0 开始向后查询
|
||
return tp.QueryNewestMetadata(-1, count)
|
||
}
|
||
|
||
// QueryFromLastMetadata 从最后一条记录向索引递减方向查询元数据
|
||
// count: 查询数量
|
||
// 返回最后 N 条记录的元数据(按索引递增方向排序),包含状态信息
|
||
// 例如:QueryFromLastMetadata(3) 查询最后 3 条记录的元数据
|
||
func (tp *TopicProcessor) QueryFromLastMetadata(count int) ([]*RecordMetadataWithStatus, error) {
|
||
totalCount := tp.index.Count()
|
||
if totalCount == 0 {
|
||
return []*RecordMetadataWithStatus{}, nil
|
||
}
|
||
// QueryOldestMetadata(totalCount, count) 会从 totalCount 向前查询 count 条
|
||
return tp.QueryOldestMetadata(totalCount, count)
|
||
}
|
||
|
||
// GetProcessingIndex 获取当前处理索引(窗口开始索引)
|
||
func (tp *TopicProcessor) GetProcessingIndex() int {
|
||
tp.mu.RLock()
|
||
defer tp.mu.RUnlock()
|
||
|
||
if tp.tailer == nil {
|
||
return 0
|
||
}
|
||
|
||
return tp.tailer.GetStartIndex()
|
||
}
|
||
|
||
// GetReadIndex 获取当前读取索引(窗口结束索引)
|
||
func (tp *TopicProcessor) GetReadIndex() int {
|
||
tp.mu.RLock()
|
||
defer tp.mu.RUnlock()
|
||
|
||
if tp.tailer == nil {
|
||
return 0
|
||
}
|
||
|
||
return tp.tailer.GetEndIndex()
|
||
}
|
||
|
||
// Subscribe 订阅事件
|
||
func (tp *TopicProcessor) Subscribe(eventType EventType, listener EventListener) {
|
||
tp.eventBus.Subscribe(eventType, listener)
|
||
}
|
||
|
||
// SubscribeAll 订阅所有事件
|
||
func (tp *TopicProcessor) SubscribeAll(listener EventListener) {
|
||
tp.eventBus.SubscribeAll(listener)
|
||
}
|
||
|
||
// Unsubscribe 取消订阅
|
||
func (tp *TopicProcessor) Unsubscribe(eventType EventType) {
|
||
tp.eventBus.Unsubscribe(eventType)
|
||
}
|
||
|
||
// CanReset 检查是否可以执行重置操作
|
||
// 返回 true 表示可以重置,false 表示不能重置(正在重置中或有待处理的记录)
|
||
func (tp *TopicProcessor) CanReset() bool {
|
||
tp.mu.RLock()
|
||
defer tp.mu.RUnlock()
|
||
return tp.canResetLocked()
|
||
}
|
||
|
||
// canResetLocked 内部方法,检查是否可以重置(调用前必须已持有锁)
|
||
func (tp *TopicProcessor) canResetLocked() bool {
|
||
// 检查当前状态
|
||
if tp.status.State == StateResetting {
|
||
return false
|
||
}
|
||
|
||
// 检查是否有待处理的日志
|
||
processingIndex := 0
|
||
if tp.tailer != nil {
|
||
processingIndex = tp.tailer.GetStartIndex()
|
||
}
|
||
|
||
recordCount := 0
|
||
if tp.index != nil {
|
||
recordCount = tp.index.Count()
|
||
}
|
||
|
||
// 只有当所有记录都已处理完成时才能重置
|
||
return processingIndex >= recordCount
|
||
}
|
||
|
||
// Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件
|
||
// 注意:Reset 不会停止 processor,只有在没有待处理的日志时才能执行
|
||
// 重置完成后,如果之前是 Running 状态,会自动恢复到 Running 状态
|
||
func (tp *TopicProcessor) Reset() error {
|
||
tp.mu.Lock()
|
||
defer tp.mu.Unlock()
|
||
|
||
// 检查是否可以执行重置操作
|
||
if !tp.canResetLocked() {
|
||
// 提供详细的错误信息
|
||
currentState := tp.status.State
|
||
if currentState == StateResetting {
|
||
return nil
|
||
}
|
||
|
||
// 获取待处理记录信息
|
||
processingIndex := 0
|
||
if tp.tailer != nil {
|
||
processingIndex = tp.tailer.GetStartIndex()
|
||
}
|
||
|
||
recordCount := 0
|
||
if tp.index != nil {
|
||
recordCount = tp.index.Count()
|
||
}
|
||
|
||
return fmt.Errorf("cannot reset with pending records (%d/%d processed), wait for processing to complete", processingIndex, recordCount)
|
||
}
|
||
|
||
// 记录之前是否在运行
|
||
currentState := tp.status.State
|
||
wasRunning := currentState == StateRunning
|
||
|
||
// 进入 Resetting 状态
|
||
tp.setState(StateResetting, nil)
|
||
tp.logger.Debug("entering resetting state", "wasRunning", wasRunning, "currentState", currentState)
|
||
|
||
// 如果之前在运行,停止 goroutines
|
||
if wasRunning && tp.cancel != nil {
|
||
tp.cancel()
|
||
|
||
// 释放锁以等待 goroutines 停止
|
||
tp.mu.Unlock()
|
||
tp.wg.Wait()
|
||
tp.mu.Lock()
|
||
}
|
||
|
||
var errs []error
|
||
|
||
// 重置所有组件(保持引用关系不变)
|
||
// 1. 重置索引(会删除索引文件并重新创建)
|
||
if tp.index != nil {
|
||
if err := tp.index.Reset(); err != nil {
|
||
tp.logger.Error("failed to reset index", "error", err)
|
||
errs = append(errs, fmt.Errorf("reset index: %w", err))
|
||
}
|
||
}
|
||
|
||
// 2. 重置写入器(会删除日志文件并重新创建)
|
||
if tp.writer != nil {
|
||
if err := tp.writer.Reset(); err != nil {
|
||
tp.logger.Error("failed to reset writer", "error", err)
|
||
errs = append(errs, fmt.Errorf("reset writer: %w", err))
|
||
}
|
||
}
|
||
|
||
// 3. 重置游标(会删除位置文件并重置位置)
|
||
if tp.cursor != nil {
|
||
if err := tp.cursor.Reset(); err != nil {
|
||
tp.logger.Error("failed to reset cursor", "error", err)
|
||
errs = append(errs, fmt.Errorf("reset cursor: %w", err))
|
||
}
|
||
}
|
||
|
||
// 4. 重置查询器(重新打开日志文件)
|
||
if tp.query != nil {
|
||
if err := tp.query.Reset(); err != nil {
|
||
tp.logger.Error("failed to reset query", "error", err)
|
||
errs = append(errs, fmt.Errorf("reset query: %w", err))
|
||
}
|
||
}
|
||
|
||
// 5. 重置 tailer(重置内部状态)
|
||
if tp.tailer != nil {
|
||
if err := tp.tailer.Reset(); err != nil {
|
||
tp.logger.Error("failed to reset tailer", "error", err)
|
||
errs = append(errs, fmt.Errorf("reset tailer: %w", err))
|
||
}
|
||
}
|
||
|
||
// 6. 重置统计信息(会删除统计文件并重置所有计数器)
|
||
if tp.stats != nil {
|
||
if err := tp.stats.Reset(); err != nil {
|
||
tp.logger.Error("failed to reset stats", "error", err)
|
||
errs = append(errs, fmt.Errorf("reset stats: %w", err))
|
||
}
|
||
}
|
||
|
||
// 如果有错误,设置为错误状态
|
||
if len(errs) > 0 {
|
||
tp.setState(StateError, errs[0])
|
||
return errs[0]
|
||
}
|
||
|
||
// 发布重置事件
|
||
tp.eventBus.Publish(&Event{
|
||
Type: EventProcessorReset,
|
||
Topic: tp.topic,
|
||
Timestamp: time.Now(),
|
||
})
|
||
|
||
// 如果之前在运行,恢复到 Running 状态
|
||
if wasRunning {
|
||
// 创建新的 context
|
||
tp.ctx, tp.cancel = context.WithCancel(context.Background())
|
||
|
||
// 启动 tailer goroutine(如果有 handler)
|
||
if tp.handler != nil && tp.tailer != nil {
|
||
tp.wg.Add(1)
|
||
go func() {
|
||
defer tp.wg.Done()
|
||
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
|
||
tp.logger.Error("tailer error after reset", "error", err)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// 启动统计保存 goroutine
|
||
tp.wg.Add(1)
|
||
go func() {
|
||
defer tp.wg.Done()
|
||
ticker := time.NewTicker(tp.tailConfig.SaveInterval)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-tp.ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
if tp.stats != nil {
|
||
if err := tp.stats.Save(); err != nil {
|
||
tp.logger.Error("failed to save stats", "error", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
|
||
tp.setState(StateRunning, nil)
|
||
tp.logger.Debug("processor reset completed, resumed to running state")
|
||
} else {
|
||
// 恢复到之前的状态(通常是 Idle 或 Stopped)
|
||
if currentState == StateStopped {
|
||
tp.setState(StateStopped, nil)
|
||
} else {
|
||
tp.setState(StateIdle, nil)
|
||
}
|
||
tp.logger.Debug("processor reset completed", "newState", tp.status.State)
|
||
}
|
||
|
||
// 如果有错误,返回第一个
|
||
if len(errs) > 0 {
|
||
return errs[0]
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Close 清理 processor 的所有资源
|
||
func (tp *TopicProcessor) Close() error {
|
||
tp.mu.Lock()
|
||
defer tp.mu.Unlock()
|
||
|
||
tp.logger.Debug("closing processor")
|
||
|
||
var errs []error
|
||
|
||
// 保存统计信息
|
||
if tp.stats != nil {
|
||
if err := tp.stats.Save(); err != nil {
|
||
tp.logger.Error("failed to save stats", "error", err)
|
||
errs = append(errs, fmt.Errorf("save stats: %w", err))
|
||
}
|
||
}
|
||
|
||
// 关闭 query
|
||
if tp.query != nil {
|
||
if err := tp.query.Close(); err != nil {
|
||
tp.logger.Error("failed to close query", "error", err)
|
||
errs = append(errs, fmt.Errorf("close query: %w", err))
|
||
}
|
||
tp.query = nil
|
||
}
|
||
|
||
// 关闭 cursor(如果 tailer 未启动,cursor 可能还未关闭)
|
||
if tp.cursor != nil {
|
||
if err := tp.cursor.Close(); err != nil {
|
||
tp.logger.Error("failed to close cursor", "error", err)
|
||
errs = append(errs, fmt.Errorf("close cursor: %w", err))
|
||
}
|
||
tp.cursor = nil
|
||
}
|
||
|
||
// 关闭 writer
|
||
if tp.writer != nil {
|
||
if err := tp.writer.Close(); err != nil {
|
||
tp.logger.Error("failed to close writer", "error", err)
|
||
errs = append(errs, fmt.Errorf("close writer: %w", err))
|
||
}
|
||
tp.writer = nil
|
||
}
|
||
|
||
// 关闭 index(最后关闭,因为其他组件可能依赖它)
|
||
if tp.index != nil {
|
||
if err := tp.index.Close(); err != nil {
|
||
tp.logger.Error("failed to close index", "error", err)
|
||
errs = append(errs, fmt.Errorf("close index: %w", err))
|
||
}
|
||
tp.index = nil
|
||
}
|
||
|
||
// tailer 会通过 context cancel 和 Stop() 自动关闭
|
||
tp.tailer = nil
|
||
|
||
tp.logger.Debug("processor closed")
|
||
|
||
// 如果有多个错误,返回第一个
|
||
if len(errs) > 0 {
|
||
return errs[0]
|
||
}
|
||
return nil
|
||
}
|