2025-10-08 16:42:31 +08:00
|
|
|
|
package srdb
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"bytes"
|
|
|
|
|
|
"encoding/binary"
|
2025-10-10 16:38:19 +08:00
|
|
|
|
"encoding/json"
|
2025-10-08 16:42:31 +08:00
|
|
|
|
"fmt"
|
|
|
|
|
|
"os"
|
|
|
|
|
|
"path/filepath"
|
|
|
|
|
|
"sort"
|
|
|
|
|
|
"strings"
|
|
|
|
|
|
"sync"
|
2025-10-10 00:20:45 +08:00
|
|
|
|
"time"
|
2025-10-08 16:42:31 +08:00
|
|
|
|
|
|
|
|
|
|
"github.com/edsrzf/mmap-go"
|
2025-10-10 00:20:45 +08:00
|
|
|
|
"github.com/shopspring/decimal"
|
2025-10-08 16:42:31 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
|
// 文件格式
|
|
|
|
|
|
SSTableMagicNumber = 0x53535433 // "SST3"
|
|
|
|
|
|
SSTableVersion = 1
|
|
|
|
|
|
SSTableHeaderSize = 256 // 文件头大小
|
|
|
|
|
|
SSTableBlockSize = 64 * 1024 // 数据块大小 (64 KB)
|
|
|
|
|
|
|
|
|
|
|
|
// 二进制编码格式:
|
|
|
|
|
|
// [Magic: 4 bytes][Seq: 8 bytes][Time: 8 bytes][DataLen: 4 bytes][Data: variable]
|
2025-10-09 01:33:22 +08:00
|
|
|
|
SSTableRowMagic = 0x524F5731 // "ROW1"
|
2025-10-08 16:42:31 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// SSTableHeader SST 文件头 (256 bytes)
|
|
|
|
|
|
type SSTableHeader struct {
|
|
|
|
|
|
// 基础信息 (32 bytes)
|
|
|
|
|
|
Magic uint32 // Magic Number: 0x53535433
|
|
|
|
|
|
Version uint32 // 版本号
|
2025-10-09 01:33:22 +08:00
|
|
|
|
Compression uint8 // 压缩类型(保留字段用于向后兼容)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
Reserved1 [3]byte
|
|
|
|
|
|
Flags uint32 // 标志位
|
|
|
|
|
|
Reserved2 [16]byte
|
|
|
|
|
|
|
|
|
|
|
|
// 索引信息 (32 bytes)
|
|
|
|
|
|
IndexOffset int64 // B+Tree 索引起始位置
|
|
|
|
|
|
IndexSize int64 // B+Tree 索引大小
|
|
|
|
|
|
RootOffset int64 // B+Tree 根节点位置
|
|
|
|
|
|
Reserved3 [8]byte
|
|
|
|
|
|
|
|
|
|
|
|
// 数据信息 (32 bytes)
|
|
|
|
|
|
DataOffset int64 // 数据块起始位置
|
|
|
|
|
|
DataSize int64 // 数据块总大小
|
|
|
|
|
|
RowCount int64 // 行数
|
|
|
|
|
|
Reserved4 [8]byte
|
|
|
|
|
|
|
|
|
|
|
|
// 统计信息 (32 bytes)
|
|
|
|
|
|
MinKey int64 // 最小 key (_seq)
|
|
|
|
|
|
MaxKey int64 // 最大 key (_seq)
|
|
|
|
|
|
MinTime int64 // 最小时间戳
|
|
|
|
|
|
MaxTime int64 // 最大时间戳
|
|
|
|
|
|
|
|
|
|
|
|
// CRC 校验 (8 bytes)
|
|
|
|
|
|
CRC32 uint32 // Header CRC32
|
|
|
|
|
|
Reserved5 [4]byte
|
|
|
|
|
|
|
|
|
|
|
|
// 预留空间 (120 bytes)
|
|
|
|
|
|
Reserved6 [120]byte
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Marshal 序列化 Header
|
|
|
|
|
|
func (h *SSTableHeader) Marshal() []byte {
|
|
|
|
|
|
buf := make([]byte, SSTableHeaderSize)
|
|
|
|
|
|
|
|
|
|
|
|
// 基础信息
|
|
|
|
|
|
binary.LittleEndian.PutUint32(buf[0:4], h.Magic)
|
|
|
|
|
|
binary.LittleEndian.PutUint32(buf[4:8], h.Version)
|
|
|
|
|
|
buf[8] = h.Compression
|
|
|
|
|
|
copy(buf[9:12], h.Reserved1[:])
|
|
|
|
|
|
binary.LittleEndian.PutUint32(buf[12:16], h.Flags)
|
|
|
|
|
|
copy(buf[16:32], h.Reserved2[:])
|
|
|
|
|
|
|
|
|
|
|
|
// 索引信息
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[32:40], uint64(h.IndexOffset))
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[40:48], uint64(h.IndexSize))
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[48:56], uint64(h.RootOffset))
|
|
|
|
|
|
copy(buf[56:64], h.Reserved3[:])
|
|
|
|
|
|
|
|
|
|
|
|
// 数据信息
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[64:72], uint64(h.DataOffset))
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[72:80], uint64(h.DataSize))
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[80:88], uint64(h.RowCount))
|
|
|
|
|
|
copy(buf[88:96], h.Reserved4[:])
|
|
|
|
|
|
|
|
|
|
|
|
// 统计信息
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[96:104], uint64(h.MinKey))
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[104:112], uint64(h.MaxKey))
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[112:120], uint64(h.MinTime))
|
|
|
|
|
|
binary.LittleEndian.PutUint64(buf[120:128], uint64(h.MaxTime))
|
|
|
|
|
|
|
|
|
|
|
|
// CRC 校验
|
|
|
|
|
|
binary.LittleEndian.PutUint32(buf[128:132], h.CRC32)
|
|
|
|
|
|
copy(buf[132:136], h.Reserved5[:])
|
|
|
|
|
|
|
|
|
|
|
|
// 预留空间
|
|
|
|
|
|
copy(buf[136:256], h.Reserved6[:])
|
|
|
|
|
|
|
|
|
|
|
|
return buf
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Unmarshal 反序列化 Header
|
|
|
|
|
|
func UnmarshalSSTableHeader(data []byte) *SSTableHeader {
|
|
|
|
|
|
if len(data) < SSTableHeaderSize {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
h := &SSTableHeader{}
|
|
|
|
|
|
|
|
|
|
|
|
// 基础信息
|
|
|
|
|
|
h.Magic = binary.LittleEndian.Uint32(data[0:4])
|
|
|
|
|
|
h.Version = binary.LittleEndian.Uint32(data[4:8])
|
|
|
|
|
|
h.Compression = data[8]
|
|
|
|
|
|
copy(h.Reserved1[:], data[9:12])
|
|
|
|
|
|
h.Flags = binary.LittleEndian.Uint32(data[12:16])
|
|
|
|
|
|
copy(h.Reserved2[:], data[16:32])
|
|
|
|
|
|
|
|
|
|
|
|
// 索引信息
|
|
|
|
|
|
h.IndexOffset = int64(binary.LittleEndian.Uint64(data[32:40]))
|
|
|
|
|
|
h.IndexSize = int64(binary.LittleEndian.Uint64(data[40:48]))
|
|
|
|
|
|
h.RootOffset = int64(binary.LittleEndian.Uint64(data[48:56]))
|
|
|
|
|
|
copy(h.Reserved3[:], data[56:64])
|
|
|
|
|
|
|
|
|
|
|
|
// 数据信息
|
|
|
|
|
|
h.DataOffset = int64(binary.LittleEndian.Uint64(data[64:72]))
|
|
|
|
|
|
h.DataSize = int64(binary.LittleEndian.Uint64(data[72:80]))
|
|
|
|
|
|
h.RowCount = int64(binary.LittleEndian.Uint64(data[80:88]))
|
|
|
|
|
|
copy(h.Reserved4[:], data[88:96])
|
|
|
|
|
|
|
|
|
|
|
|
// 统计信息
|
|
|
|
|
|
h.MinKey = int64(binary.LittleEndian.Uint64(data[96:104]))
|
|
|
|
|
|
h.MaxKey = int64(binary.LittleEndian.Uint64(data[104:112]))
|
|
|
|
|
|
h.MinTime = int64(binary.LittleEndian.Uint64(data[112:120]))
|
|
|
|
|
|
h.MaxTime = int64(binary.LittleEndian.Uint64(data[120:128]))
|
|
|
|
|
|
|
|
|
|
|
|
// CRC 校验
|
|
|
|
|
|
h.CRC32 = binary.LittleEndian.Uint32(data[128:132])
|
|
|
|
|
|
copy(h.Reserved5[:], data[132:136])
|
|
|
|
|
|
|
|
|
|
|
|
// 预留空间
|
|
|
|
|
|
copy(h.Reserved6[:], data[136:256])
|
|
|
|
|
|
|
|
|
|
|
|
return h
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Validate 验证 Header
|
|
|
|
|
|
func (h *SSTableHeader) Validate() bool {
|
|
|
|
|
|
return h.Magic == SSTableMagicNumber && h.Version == SSTableVersion
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// encodeSSTableRowBinary 使用二进制格式编码行数据(按字段压缩)
|
|
|
|
|
|
func encodeSSTableRowBinary(row *SSTableRow, schema *Schema) ([]byte, error) {
|
|
|
|
|
|
buf := new(bytes.Buffer)
|
|
|
|
|
|
|
|
|
|
|
|
// 写入 Magic Number (用于验证)
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(SSTableRowMagic)); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 写入 Seq
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, row.Seq); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 写入 Time
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, row.Time); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 强制要求 Schema
|
2025-10-08 16:42:31 +08:00
|
|
|
|
if schema == nil {
|
2025-10-09 01:33:22 +08:00
|
|
|
|
return nil, fmt.Errorf("schema is required for encoding SSTable rows")
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 按字段分别编码和压缩
|
|
|
|
|
|
fieldCount := uint16(len(schema.Fields))
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, fieldCount); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 1. 先编码所有字段到各自的 buffer(无压缩)
|
|
|
|
|
|
fieldData := make([][]byte, len(schema.Fields))
|
2025-10-08 16:42:31 +08:00
|
|
|
|
|
|
|
|
|
|
for i, field := range schema.Fields {
|
|
|
|
|
|
fieldBuf := new(bytes.Buffer)
|
|
|
|
|
|
value, exists := row.Data[field.Name]
|
|
|
|
|
|
|
2025-10-10 14:00:34 +08:00
|
|
|
|
if !exists || value == nil {
|
|
|
|
|
|
// 字段不存在或值为 nil(nullable 字段),写入零值
|
2025-10-08 16:42:31 +08:00
|
|
|
|
if err := writeFieldZeroValue(fieldBuf, field.Type); err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("write zero value for field %s: %w", field.Name, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
if err := writeFieldBinaryValue(fieldBuf, field.Type, value); err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("write field %s: %w", field.Name, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 直接使用二进制数据(无压缩)
|
|
|
|
|
|
fieldData[i] = fieldBuf.Bytes()
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 写入字段偏移表(相对于数据区起始位置)
|
|
|
|
|
|
currentOffset := 0
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
for _, data := range fieldData {
|
2025-10-08 16:42:31 +08:00
|
|
|
|
// 写入字段偏移(相对于数据区)
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(currentOffset)); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 写入数据大小
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(len(data))); err != nil {
|
2025-10-08 16:42:31 +08:00
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2025-10-09 01:33:22 +08:00
|
|
|
|
currentOffset += len(data)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 3. 写入字段数据
|
|
|
|
|
|
for _, data := range fieldData {
|
|
|
|
|
|
if _, err := buf.Write(data); err != nil {
|
2025-10-08 16:42:31 +08:00
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return buf.Bytes(), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// writeFieldBinaryValue 写入字段值(二进制格式)
|
|
|
|
|
|
func writeFieldBinaryValue(buf *bytes.Buffer, typ FieldType, value any) error {
|
|
|
|
|
|
switch typ {
|
2025-10-10 00:20:45 +08:00
|
|
|
|
// 有符号整数类型
|
|
|
|
|
|
case Int:
|
|
|
|
|
|
v, ok := value.(int)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected int, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, int64(v))
|
|
|
|
|
|
|
|
|
|
|
|
case Int8:
|
|
|
|
|
|
v, ok := value.(int8)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected int8, got %T", value)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
|
|
|
|
|
|
2025-10-10 00:20:45 +08:00
|
|
|
|
case Int16:
|
|
|
|
|
|
v, ok := value.(int16)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected int16, got %T", value)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
|
|
|
|
|
|
2025-10-10 00:20:45 +08:00
|
|
|
|
case Int32, Rune:
|
|
|
|
|
|
// rune 和 int32 底层类型相同(都是 int32)
|
|
|
|
|
|
v, ok := value.(int32)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected int32 (or rune), got %T", value)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
2025-10-10 00:20:45 +08:00
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
|
|
|
|
|
|
|
|
|
|
|
case Int64:
|
|
|
|
|
|
v, ok := value.(int64)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected int64, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
|
|
|
|
|
|
|
|
|
|
|
// 无符号整数类型
|
|
|
|
|
|
case Uint:
|
|
|
|
|
|
v, ok := value.(uint)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected uint, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, uint64(v))
|
|
|
|
|
|
|
|
|
|
|
|
case Uint8, Byte:
|
|
|
|
|
|
// byte 和 uint8 底层类型相同
|
|
|
|
|
|
v, ok := value.(uint8)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected uint8 or byte, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
|
|
|
|
|
|
|
|
|
|
|
case Uint16:
|
|
|
|
|
|
v, ok := value.(uint16)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected uint16, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
|
|
|
|
|
|
|
|
|
|
|
case Uint32:
|
|
|
|
|
|
v, ok := value.(uint32)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected uint32, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
|
|
|
|
|
|
|
|
|
|
|
case Uint64:
|
|
|
|
|
|
v, ok := value.(uint64)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected uint64, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
|
2025-10-10 00:20:45 +08:00
|
|
|
|
// 浮点类型
|
|
|
|
|
|
case Float32:
|
|
|
|
|
|
v, ok := value.(float32)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected float32, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
|
|
|
|
|
|
|
|
|
|
|
case Float64:
|
|
|
|
|
|
v, ok := value.(float64)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected float64, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v)
|
|
|
|
|
|
|
|
|
|
|
|
// 字符串类型
|
|
|
|
|
|
case String:
|
|
|
|
|
|
s, ok := value.(string)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected string, got %T", value)
|
|
|
|
|
|
}
|
2025-10-08 16:42:31 +08:00
|
|
|
|
// 写入长度
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(len(s))); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
// 写入内容
|
|
|
|
|
|
_, err := buf.WriteString(s)
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
2025-10-10 00:20:45 +08:00
|
|
|
|
// 布尔类型
|
|
|
|
|
|
case Bool:
|
|
|
|
|
|
v, ok := value.(bool)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected bool, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
var b byte
|
|
|
|
|
|
if v {
|
|
|
|
|
|
b = 1
|
|
|
|
|
|
}
|
|
|
|
|
|
return buf.WriteByte(b)
|
|
|
|
|
|
|
|
|
|
|
|
// Decimal 类型
|
|
|
|
|
|
case Decimal:
|
|
|
|
|
|
v, ok := value.(decimal.Decimal)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected decimal.Decimal, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
// 使用 MarshalBinary 序列化
|
|
|
|
|
|
data, err := v.MarshalBinary()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("marshal decimal: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
// 写入长度
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(len(data))); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
// 写入数据
|
|
|
|
|
|
_, err = buf.Write(data)
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
|
|
|
|
// 时间类型
|
|
|
|
|
|
case Time:
|
|
|
|
|
|
v, ok := value.(time.Time)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected time.Time, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
// 存储为 Unix 时间戳(秒,int64)
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, v.Unix())
|
|
|
|
|
|
|
|
|
|
|
|
// 时间间隔类型
|
|
|
|
|
|
case Duration:
|
|
|
|
|
|
v, ok := value.(time.Duration)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("expected time.Duration, got %T", value)
|
|
|
|
|
|
}
|
|
|
|
|
|
// 存储为纳秒(int64)
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, int64(v))
|
|
|
|
|
|
|
2025-10-10 16:38:19 +08:00
|
|
|
|
// Object 类型(使用 JSON 编码)
|
|
|
|
|
|
case Object:
|
|
|
|
|
|
// 使用 JSON 序列化
|
|
|
|
|
|
data, err := json.Marshal(value)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("marshal object: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
// 写入长度
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(len(data))); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
// 写入数据
|
|
|
|
|
|
_, err = buf.Write(data)
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
|
|
|
|
// Array 类型(使用 JSON 编码)
|
|
|
|
|
|
case Array:
|
|
|
|
|
|
// 使用 JSON 序列化
|
|
|
|
|
|
data, err := json.Marshal(value)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("marshal array: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
// 写入长度
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(len(data))); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
// 写入数据
|
|
|
|
|
|
_, err = buf.Write(data)
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
2025-10-08 16:42:31 +08:00
|
|
|
|
default:
|
|
|
|
|
|
return fmt.Errorf("unsupported field type: %d", typ)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// writeFieldZeroValue 写入字段零值
|
|
|
|
|
|
func writeFieldZeroValue(buf *bytes.Buffer, typ FieldType) error {
|
|
|
|
|
|
switch typ {
|
2025-10-10 00:20:45 +08:00
|
|
|
|
// 有符号整数类型
|
|
|
|
|
|
case Int:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, int64(0))
|
|
|
|
|
|
case Int8:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, int8(0))
|
|
|
|
|
|
case Int16:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, int16(0))
|
|
|
|
|
|
case Int32, Rune:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, int32(0))
|
|
|
|
|
|
case Int64:
|
2025-10-08 16:42:31 +08:00
|
|
|
|
return binary.Write(buf, binary.LittleEndian, int64(0))
|
2025-10-10 00:20:45 +08:00
|
|
|
|
|
|
|
|
|
|
// 无符号整数类型
|
|
|
|
|
|
case Uint:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, uint64(0))
|
|
|
|
|
|
case Uint8, Byte:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, uint8(0))
|
|
|
|
|
|
case Uint16:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, uint16(0))
|
|
|
|
|
|
case Uint32:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, uint32(0))
|
|
|
|
|
|
case Uint64:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, uint64(0))
|
|
|
|
|
|
|
|
|
|
|
|
// 浮点类型
|
|
|
|
|
|
case Float32:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, float32(0))
|
|
|
|
|
|
case Float64:
|
2025-10-08 16:42:31 +08:00
|
|
|
|
return binary.Write(buf, binary.LittleEndian, float64(0))
|
2025-10-10 00:20:45 +08:00
|
|
|
|
|
|
|
|
|
|
// 字符串类型
|
|
|
|
|
|
case String:
|
2025-10-08 16:42:31 +08:00
|
|
|
|
return binary.Write(buf, binary.LittleEndian, uint32(0))
|
2025-10-10 00:20:45 +08:00
|
|
|
|
|
|
|
|
|
|
// 布尔类型
|
|
|
|
|
|
case Bool:
|
|
|
|
|
|
return buf.WriteByte(0)
|
|
|
|
|
|
|
|
|
|
|
|
// Decimal 类型(零值)
|
|
|
|
|
|
case Decimal:
|
|
|
|
|
|
zero := decimal.Zero
|
|
|
|
|
|
data, err := zero.MarshalBinary()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("marshal zero decimal: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(len(data))); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
_, err = buf.Write(data)
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
|
|
|
|
// 时间类型(零值:Unix epoch)
|
|
|
|
|
|
case Time:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, int64(0))
|
|
|
|
|
|
|
|
|
|
|
|
// 时间间隔类型(零值)
|
|
|
|
|
|
case Duration:
|
|
|
|
|
|
return binary.Write(buf, binary.LittleEndian, int64(0))
|
|
|
|
|
|
|
2025-10-10 16:38:19 +08:00
|
|
|
|
// Object 类型(零值:空 JSON 对象 {})
|
|
|
|
|
|
case Object:
|
|
|
|
|
|
data := []byte("{}")
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(len(data))); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
_, err := buf.Write(data)
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
|
|
|
|
// Array 类型(零值:空 JSON 数组 [])
|
|
|
|
|
|
case Array:
|
|
|
|
|
|
data := []byte("[]")
|
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, uint32(len(data))); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
_, err := buf.Write(data)
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
2025-10-08 16:42:31 +08:00
|
|
|
|
default:
|
|
|
|
|
|
return fmt.Errorf("unsupported field type: %d", typ)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// decodeSSTableRowBinary 解码二进制格式的行数据(完整解码)
|
|
|
|
|
|
func decodeSSTableRowBinary(data []byte, schema *Schema) (*SSTableRow, error) {
|
|
|
|
|
|
return decodeSSTableRowBinaryPartial(data, schema, nil)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// decodeSSTableRowBinaryPartial 按需解码(只读取和解压指定字段)
|
|
|
|
|
|
func decodeSSTableRowBinaryPartial(data []byte, schema *Schema, fields []string) (*SSTableRow, error) {
|
|
|
|
|
|
buf := bytes.NewReader(data)
|
|
|
|
|
|
|
|
|
|
|
|
// 读取并验证 Magic Number
|
|
|
|
|
|
var magic uint32
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &magic); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if magic != SSTableRowMagic {
|
|
|
|
|
|
return nil, fmt.Errorf("invalid row magic: %x", magic)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
row := &SSTableRow{Data: make(map[string]any)}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取 Seq
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &row.Seq); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取 Time
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &row.Time); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 强制要求 Schema
|
2025-10-08 16:42:31 +08:00
|
|
|
|
if schema == nil {
|
2025-10-09 01:33:22 +08:00
|
|
|
|
return nil, fmt.Errorf("schema is required for decoding SSTable rows")
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取字段数量
|
|
|
|
|
|
var fieldCount uint16
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &fieldCount); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 读取字段偏移表
|
|
|
|
|
|
type fieldInfo struct {
|
|
|
|
|
|
offset uint32
|
|
|
|
|
|
size uint32
|
|
|
|
|
|
}
|
|
|
|
|
|
fieldInfos := make([]fieldInfo, fieldCount)
|
|
|
|
|
|
for i := range fieldInfos {
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &fieldInfos[i].offset); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &fieldInfos[i].size); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 构建需要读取的字段集合
|
|
|
|
|
|
needFields := make(map[string]bool)
|
|
|
|
|
|
if fields == nil {
|
|
|
|
|
|
// nil 表示读取所有字段
|
|
|
|
|
|
for _, f := range schema.Fields {
|
|
|
|
|
|
needFields[f.Name] = true
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
for _, f := range fields {
|
|
|
|
|
|
needFields[f] = true
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 数据区起始位置(当前 buf 位置)
|
|
|
|
|
|
dataStart := buf.Size() - int64(buf.Len())
|
|
|
|
|
|
|
|
|
|
|
|
// 按需读取和解压字段
|
|
|
|
|
|
for i, field := range schema.Fields {
|
|
|
|
|
|
if i >= int(fieldCount) {
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
need := needFields[field.Name]
|
|
|
|
|
|
if !need {
|
|
|
|
|
|
// 跳过不需要的字段(不读取,不解压)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 读取字段数据(无压缩)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
info := fieldInfos[i]
|
2025-10-09 01:33:22 +08:00
|
|
|
|
fieldPos := dataStart + int64(info.offset)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
|
|
|
|
|
|
// Seek 到字段位置
|
2025-10-09 01:33:22 +08:00
|
|
|
|
if _, err := buf.Seek(fieldPos, 0); err != nil {
|
2025-10-08 16:42:31 +08:00
|
|
|
|
return nil, fmt.Errorf("seek to field %s: %w", field.Name, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
fieldData := make([]byte, info.size)
|
|
|
|
|
|
if _, err := buf.Read(fieldData); err != nil {
|
2025-10-08 16:42:31 +08:00
|
|
|
|
return nil, fmt.Errorf("read field %s: %w", field.Name, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 解析字段值(直接从二进制数据)
|
|
|
|
|
|
fieldBuf := bytes.NewReader(fieldData)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
value, err := readFieldBinaryValue(fieldBuf, field.Type, true)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("parse field %s: %w", field.Name, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if value != nil {
|
|
|
|
|
|
row.Data[field.Name] = value
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return row, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// readFieldBinaryValue 读取字段值(二进制格式)
|
|
|
|
|
|
func readFieldBinaryValue(buf *bytes.Reader, typ FieldType, keep bool) (any, error) {
|
|
|
|
|
|
switch typ {
|
2025-10-10 00:20:45 +08:00
|
|
|
|
// 有符号整数类型
|
|
|
|
|
|
case Int:
|
|
|
|
|
|
var v int64
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return int(v), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Int8:
|
|
|
|
|
|
var v int8
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Int16:
|
|
|
|
|
|
var v int16
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Int32:
|
|
|
|
|
|
var v int32
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Rune:
|
|
|
|
|
|
var v int32
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return rune(v), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Int64:
|
2025-10-08 16:42:31 +08:00
|
|
|
|
var v int64
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
2025-10-10 00:20:45 +08:00
|
|
|
|
// 无符号整数类型
|
|
|
|
|
|
case Uint:
|
|
|
|
|
|
var v uint64
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return uint(v), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Uint8:
|
|
|
|
|
|
var v uint8
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Byte:
|
|
|
|
|
|
var v uint8
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return byte(v), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Uint16:
|
|
|
|
|
|
var v uint16
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Uint32:
|
|
|
|
|
|
var v uint32
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Uint64:
|
|
|
|
|
|
var v uint64
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
// 浮点类型
|
|
|
|
|
|
case Float32:
|
|
|
|
|
|
var v float32
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
case Float64:
|
2025-10-08 16:42:31 +08:00
|
|
|
|
var v float64
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return v, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
2025-10-10 00:20:45 +08:00
|
|
|
|
// 字符串类型
|
|
|
|
|
|
case String:
|
|
|
|
|
|
var length uint32
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &length); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2025-10-12 03:44:31 +08:00
|
|
|
|
if length == 0 {
|
|
|
|
|
|
// 空字符串,直接返回
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return "", nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
}
|
2025-10-10 00:20:45 +08:00
|
|
|
|
str := make([]byte, length)
|
|
|
|
|
|
if _, err := buf.Read(str); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return string(str), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
// 布尔类型
|
|
|
|
|
|
case Bool:
|
2025-10-08 16:42:31 +08:00
|
|
|
|
b, err := buf.ReadByte()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return b == 1, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
2025-10-10 00:20:45 +08:00
|
|
|
|
// Decimal 类型
|
|
|
|
|
|
case Decimal:
|
2025-10-08 16:42:31 +08:00
|
|
|
|
var length uint32
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &length); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2025-10-12 03:44:31 +08:00
|
|
|
|
if length == 0 {
|
|
|
|
|
|
// 零值 Decimal
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return decimal.Zero, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
}
|
2025-10-10 00:20:45 +08:00
|
|
|
|
data := make([]byte, length)
|
|
|
|
|
|
if _, err := buf.Read(data); err != nil {
|
2025-10-08 16:42:31 +08:00
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
2025-10-10 00:20:45 +08:00
|
|
|
|
var d decimal.Decimal
|
|
|
|
|
|
if err := d.UnmarshalBinary(data); err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("unmarshal decimal: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
return d, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
// 时间类型
|
|
|
|
|
|
case Time:
|
|
|
|
|
|
var v int64
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
// 从 Unix 时间戳(秒)转换为 time.Time
|
|
|
|
|
|
return time.Unix(v, 0), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
// 时间间隔类型
|
|
|
|
|
|
case Duration:
|
|
|
|
|
|
var v int64
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
// 从纳秒转换为 time.Duration
|
|
|
|
|
|
return time.Duration(v), nil
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
2025-10-10 16:38:19 +08:00
|
|
|
|
// Object 类型(使用 JSON 解码)
|
|
|
|
|
|
case Object:
|
|
|
|
|
|
var length uint32
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &length); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2025-10-12 03:44:31 +08:00
|
|
|
|
if length == 0 {
|
|
|
|
|
|
// 空对象
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return map[string]any{}, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
}
|
2025-10-10 16:38:19 +08:00
|
|
|
|
data := make([]byte, length)
|
|
|
|
|
|
if _, err := buf.Read(data); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
var obj map[string]any
|
|
|
|
|
|
if err := json.Unmarshal(data, &obj); err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("unmarshal object: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
return obj, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
|
|
|
|
|
// Array 类型(使用 JSON 解码)
|
|
|
|
|
|
case Array:
|
|
|
|
|
|
var length uint32
|
|
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &length); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2025-10-12 03:44:31 +08:00
|
|
|
|
if length == 0 {
|
|
|
|
|
|
// 空数组
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
return []any{}, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
}
|
2025-10-10 16:38:19 +08:00
|
|
|
|
data := make([]byte, length)
|
|
|
|
|
|
if _, err := buf.Read(data); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if keep {
|
|
|
|
|
|
var arr []any
|
|
|
|
|
|
if err := json.Unmarshal(data, &arr); err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("unmarshal array: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
return arr, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
|
2025-10-08 16:42:31 +08:00
|
|
|
|
default:
|
|
|
|
|
|
return nil, fmt.Errorf("unsupported field type: %d", typ)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SSTableWriter SST 文件写入器
|
|
|
|
|
|
type SSTableWriter struct {
|
2025-10-09 01:33:22 +08:00
|
|
|
|
file *os.File
|
|
|
|
|
|
builder *BTreeBuilder
|
|
|
|
|
|
dataOffset int64
|
|
|
|
|
|
dataStart int64 // 数据起始位置
|
|
|
|
|
|
rowCount int64
|
|
|
|
|
|
minKey int64
|
|
|
|
|
|
maxKey int64
|
|
|
|
|
|
minTime int64
|
|
|
|
|
|
maxTime int64
|
|
|
|
|
|
schema *Schema // Schema 用于优化编码
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewSSTableWriter 创建 SST 写入器
|
|
|
|
|
|
func NewSSTableWriter(file *os.File, schema *Schema) *SSTableWriter {
|
|
|
|
|
|
return &SSTableWriter{
|
2025-10-09 01:33:22 +08:00
|
|
|
|
file: file,
|
|
|
|
|
|
builder: NewBTreeBuilder(file, SSTableHeaderSize),
|
|
|
|
|
|
dataOffset: 0, // 先写数据,后面会更新
|
|
|
|
|
|
minKey: -1,
|
|
|
|
|
|
maxKey: -1,
|
|
|
|
|
|
minTime: -1,
|
|
|
|
|
|
maxTime: -1,
|
|
|
|
|
|
schema: schema,
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SSTableRow 表示一行数据
|
|
|
|
|
|
type SSTableRow struct {
|
|
|
|
|
|
Seq int64 // _seq
|
|
|
|
|
|
Time int64 // _time
|
|
|
|
|
|
Data map[string]any // 用户数据
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Add 添加一行数据
|
|
|
|
|
|
func (w *SSTableWriter) Add(row *SSTableRow) error {
|
|
|
|
|
|
// 更新统计信息
|
|
|
|
|
|
if w.minKey == -1 || row.Seq < w.minKey {
|
|
|
|
|
|
w.minKey = row.Seq
|
|
|
|
|
|
}
|
|
|
|
|
|
if w.maxKey == -1 || row.Seq > w.maxKey {
|
|
|
|
|
|
w.maxKey = row.Seq
|
|
|
|
|
|
}
|
|
|
|
|
|
if w.minTime == -1 || row.Time < w.minTime {
|
|
|
|
|
|
w.minTime = row.Time
|
|
|
|
|
|
}
|
|
|
|
|
|
if w.maxTime == -1 || row.Time > w.maxTime {
|
|
|
|
|
|
w.maxTime = row.Time
|
|
|
|
|
|
}
|
|
|
|
|
|
w.rowCount++
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 序列化数据(使用 Schema 优化的二进制格式,无压缩)
|
|
|
|
|
|
data, err := encodeSSTableRow(row, w.schema)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("encode row: %w", err)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 写入数据块(不压缩)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
// 第一次写入时,确定数据起始位置
|
|
|
|
|
|
if w.dataStart == 0 {
|
|
|
|
|
|
// 预留足够空间给 B+Tree 索引
|
|
|
|
|
|
// 假设索引最多占用 10% 的空间,最少 1 MB
|
|
|
|
|
|
estimatedIndexSize := int64(10 * 1024 * 1024) // 10 MB
|
|
|
|
|
|
w.dataStart = SSTableHeaderSize + estimatedIndexSize
|
|
|
|
|
|
w.dataOffset = w.dataStart
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
offset := w.dataOffset
|
2025-10-09 01:33:22 +08:00
|
|
|
|
_, err = w.file.WriteAt(data, offset)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 添加到 B+Tree
|
2025-10-09 01:33:22 +08:00
|
|
|
|
err = w.builder.Add(row.Seq, offset, int32(len(data)))
|
2025-10-08 16:42:31 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 更新数据偏移
|
2025-10-09 01:33:22 +08:00
|
|
|
|
w.dataOffset += int64(len(data))
|
2025-10-08 16:42:31 +08:00
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Finish 完成写入
|
|
|
|
|
|
func (w *SSTableWriter) Finish() error {
|
|
|
|
|
|
// 1. 构建 B+Tree 索引
|
|
|
|
|
|
rootOffset, err := w.builder.Build()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 计算索引大小
|
|
|
|
|
|
indexSize := w.dataStart - SSTableHeaderSize
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 创建 Header
|
|
|
|
|
|
header := &SSTableHeader{
|
|
|
|
|
|
Magic: SSTableMagicNumber,
|
|
|
|
|
|
Version: SSTableVersion,
|
2025-10-09 01:33:22 +08:00
|
|
|
|
Compression: 0, // 不使用压缩(保留字段用于向后兼容)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
IndexOffset: SSTableHeaderSize,
|
|
|
|
|
|
IndexSize: indexSize,
|
|
|
|
|
|
RootOffset: rootOffset,
|
|
|
|
|
|
DataOffset: w.dataStart,
|
|
|
|
|
|
DataSize: w.dataOffset - w.dataStart,
|
|
|
|
|
|
RowCount: w.rowCount,
|
|
|
|
|
|
MinKey: w.minKey,
|
|
|
|
|
|
MaxKey: w.maxKey,
|
|
|
|
|
|
MinTime: w.minTime,
|
|
|
|
|
|
MaxTime: w.maxTime,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 4. 写入 Header
|
|
|
|
|
|
headerData := header.Marshal()
|
|
|
|
|
|
_, err = w.file.WriteAt(headerData, 0)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 5. Sync 到磁盘
|
|
|
|
|
|
return w.file.Sync()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// encodeSSTableRow 编码行数据 (使用二进制格式)
|
2025-10-09 01:33:22 +08:00
|
|
|
|
func encodeSSTableRow(row *SSTableRow, schema *Schema) ([]byte, error) {
|
2025-10-08 16:42:31 +08:00
|
|
|
|
// 使用二进制格式编码
|
|
|
|
|
|
encoded, err := encodeSSTableRowBinary(row, schema)
|
|
|
|
|
|
if err != nil {
|
2025-10-09 01:33:22 +08:00
|
|
|
|
return nil, fmt.Errorf("failed to encode row: %w", err)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
2025-10-09 01:33:22 +08:00
|
|
|
|
return encoded, nil
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SSTableReader SST 文件读取器
|
|
|
|
|
|
type SSTableReader struct {
|
|
|
|
|
|
path string
|
|
|
|
|
|
file *os.File
|
|
|
|
|
|
mmap mmap.MMap
|
|
|
|
|
|
header *SSTableHeader
|
|
|
|
|
|
btReader *BTreeReader
|
|
|
|
|
|
schema *Schema // Schema 用于优化解码
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewSSTableReader 创建 SST 读取器
|
|
|
|
|
|
func NewSSTableReader(path string) (*SSTableReader, error) {
|
|
|
|
|
|
// 1. 打开文件
|
|
|
|
|
|
file, err := os.Open(path)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 2. mmap 映射
|
|
|
|
|
|
mmapData, err := mmap.Map(file, mmap.RDONLY, 0)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
file.Close()
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 读取 Header
|
|
|
|
|
|
if len(mmapData) < SSTableHeaderSize {
|
|
|
|
|
|
mmapData.Unmap()
|
|
|
|
|
|
file.Close()
|
|
|
|
|
|
return nil, fmt.Errorf("file too small")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
header := UnmarshalSSTableHeader(mmapData[:SSTableHeaderSize])
|
|
|
|
|
|
if header == nil || !header.Validate() {
|
|
|
|
|
|
mmapData.Unmap()
|
|
|
|
|
|
file.Close()
|
|
|
|
|
|
return nil, fmt.Errorf("invalid header")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 4. 创建 B+Tree Reader
|
|
|
|
|
|
btReader := NewBTreeReader(mmapData, header.RootOffset)
|
|
|
|
|
|
|
|
|
|
|
|
return &SSTableReader{
|
|
|
|
|
|
path: path,
|
|
|
|
|
|
file: file,
|
|
|
|
|
|
mmap: mmapData,
|
|
|
|
|
|
header: header,
|
|
|
|
|
|
btReader: btReader,
|
|
|
|
|
|
}, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Get 查询一行数据
|
|
|
|
|
|
func (r *SSTableReader) Get(key int64) (*SSTableRow, error) {
|
|
|
|
|
|
// 1. 检查范围
|
|
|
|
|
|
if key < r.header.MinKey || key > r.header.MaxKey {
|
|
|
|
|
|
return nil, fmt.Errorf("key out of range")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 在 B+Tree 中查找
|
|
|
|
|
|
dataOffset, dataSize, found := r.btReader.Get(key)
|
|
|
|
|
|
if !found {
|
|
|
|
|
|
return nil, fmt.Errorf("key not found")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 读取数据
|
|
|
|
|
|
if dataOffset+int64(dataSize) > int64(len(r.mmap)) {
|
|
|
|
|
|
return nil, fmt.Errorf("invalid data offset")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
data := r.mmap[dataOffset : dataOffset+int64(dataSize)]
|
2025-10-08 16:42:31 +08:00
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 4. 反序列化(无压缩)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
row, err := decodeSSTableRow(data, r.schema)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return row, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetPartial 按需查询一行数据(只读取指定字段)
|
|
|
|
|
|
func (r *SSTableReader) GetPartial(key int64, fields []string) (*SSTableRow, error) {
|
|
|
|
|
|
// 1. 检查范围
|
|
|
|
|
|
if key < r.header.MinKey || key > r.header.MaxKey {
|
|
|
|
|
|
return nil, fmt.Errorf("key out of range")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 在 B+Tree 中查找
|
|
|
|
|
|
dataOffset, dataSize, found := r.btReader.Get(key)
|
|
|
|
|
|
if !found {
|
|
|
|
|
|
return nil, fmt.Errorf("key not found")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 读取数据
|
|
|
|
|
|
if dataOffset+int64(dataSize) > int64(len(r.mmap)) {
|
|
|
|
|
|
return nil, fmt.Errorf("invalid data offset")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
data := r.mmap[dataOffset : dataOffset+int64(dataSize)]
|
2025-10-08 16:42:31 +08:00
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 4. 按需反序列化(只解析需要的字段,无压缩)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
row, err := decodeSSTableRowBinaryPartial(data, r.schema, fields)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return row, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SetSchema 设置 Schema(用于优化编解码)
|
|
|
|
|
|
func (r *SSTableReader) SetSchema(schema *Schema) {
|
|
|
|
|
|
r.schema = schema
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetHeader 获取文件头信息
|
|
|
|
|
|
func (r *SSTableReader) GetHeader() *SSTableHeader {
|
|
|
|
|
|
return r.header
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetPath 获取文件路径
|
|
|
|
|
|
func (r *SSTableReader) GetPath() string {
|
|
|
|
|
|
return r.path
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetAllKeys 获取文件中所有的 key(按顺序)
|
|
|
|
|
|
func (r *SSTableReader) GetAllKeys() []int64 {
|
|
|
|
|
|
return r.btReader.GetAllKeys()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-11 13:19:26 +08:00
|
|
|
|
// ForEach 升序迭代所有 key-offset-size 对
|
|
|
|
|
|
// callback 返回 false 时停止迭代,支持提前终止
|
|
|
|
|
|
func (r *SSTableReader) ForEach(callback KeyCallback) {
|
|
|
|
|
|
r.btReader.ForEach(callback)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ForEachDesc 降序迭代所有 key-offset-size 对
|
|
|
|
|
|
// callback 返回 false 时停止迭代,支持提前终止
|
|
|
|
|
|
func (r *SSTableReader) ForEachDesc(callback KeyCallback) {
|
|
|
|
|
|
r.btReader.ForEachDesc(callback)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-08 16:42:31 +08:00
|
|
|
|
// Close 关闭读取器
|
|
|
|
|
|
func (r *SSTableReader) Close() error {
|
|
|
|
|
|
if r.mmap != nil {
|
|
|
|
|
|
r.mmap.Unmap()
|
|
|
|
|
|
}
|
|
|
|
|
|
if r.file != nil {
|
|
|
|
|
|
return r.file.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// decodeSSTableRow 解码行数据(只支持二进制格式)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
func decodeSSTableRow(data []byte, schema *Schema) (*SSTableRow, error) {
|
2025-10-09 01:33:22 +08:00
|
|
|
|
// 使用二进制格式解码
|
2025-10-08 16:42:31 +08:00
|
|
|
|
row, err := decodeSSTableRowBinary(data, schema)
|
|
|
|
|
|
if err != nil {
|
2025-10-09 01:33:22 +08:00
|
|
|
|
return nil, fmt.Errorf("failed to decode row: %w", err)
|
2025-10-08 16:42:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
return row, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SSTableManager SST 文件管理器
|
|
|
|
|
|
type SSTableManager struct {
|
|
|
|
|
|
dir string
|
|
|
|
|
|
readers []*SSTableReader
|
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
|
schema *Schema // Schema 用于优化编解码
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewSSTableManager 创建 SST 管理器
|
|
|
|
|
|
func NewSSTableManager(dir string) (*SSTableManager, error) {
|
|
|
|
|
|
// 确保目录存在
|
|
|
|
|
|
err := os.MkdirAll(dir, 0755)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
mgr := &SSTableManager{
|
|
|
|
|
|
dir: dir,
|
|
|
|
|
|
readers: make([]*SSTableReader, 0),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 恢复现有的 SST 文件
|
|
|
|
|
|
err = mgr.recover()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return mgr, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// recover 恢复现有的 SST 文件
|
|
|
|
|
|
func (m *SSTableManager) recover() error {
|
|
|
|
|
|
// 查找所有 SST 文件
|
|
|
|
|
|
files, err := filepath.Glob(filepath.Join(m.dir, "*.sst"))
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, file := range files {
|
|
|
|
|
|
// 跳过索引文件
|
|
|
|
|
|
filename := filepath.Base(file)
|
|
|
|
|
|
if strings.HasPrefix(filename, "idx_") {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 打开 SST Reader
|
|
|
|
|
|
reader, err := NewSSTableReader(file)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 设置 Schema
|
|
|
|
|
|
if m.schema != nil {
|
|
|
|
|
|
reader.SetSchema(m.schema)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
m.readers = append(m.readers, reader)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// CreateSST 创建新的 SST 文件
|
|
|
|
|
|
// fileNumber: 文件编号(由 VersionSet 分配)
|
|
|
|
|
|
func (m *SSTableManager) CreateSST(fileNumber int64, rows []*SSTableRow) (*SSTableReader, error) {
|
|
|
|
|
|
return m.CreateSSTWithLevel(fileNumber, rows, 0) // 默认创建到 L0
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// CreateSSTWithLevel 创建新的 SST 文件到指定层级
|
|
|
|
|
|
// fileNumber: 文件编号(由 VersionSet 分配)
|
|
|
|
|
|
func (m *SSTableManager) CreateSSTWithLevel(fileNumber int64, rows []*SSTableRow, level int) (*SSTableReader, error) {
|
|
|
|
|
|
m.mu.Lock()
|
|
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
sstPath := filepath.Join(m.dir, fmt.Sprintf("%06d.sst", fileNumber))
|
|
|
|
|
|
|
|
|
|
|
|
// 创建文件
|
|
|
|
|
|
file, err := os.Create(sstPath)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
writer := NewSSTableWriter(file, m.schema)
|
|
|
|
|
|
|
|
|
|
|
|
// 写入所有行
|
|
|
|
|
|
for _, row := range rows {
|
|
|
|
|
|
err = writer.Add(row)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
file.Close()
|
|
|
|
|
|
os.Remove(sstPath)
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 完成写入
|
|
|
|
|
|
err = writer.Finish()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
file.Close()
|
|
|
|
|
|
os.Remove(sstPath)
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
file.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 打开 SST Reader
|
|
|
|
|
|
reader, err := NewSSTableReader(sstPath)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 设置 Schema
|
|
|
|
|
|
if m.schema != nil {
|
|
|
|
|
|
reader.SetSchema(m.schema)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 添加到 readers 列表
|
|
|
|
|
|
m.readers = append(m.readers, reader)
|
|
|
|
|
|
|
|
|
|
|
|
return reader, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SetSchema 设置 Schema(用于优化编解码)
|
|
|
|
|
|
func (m *SSTableManager) SetSchema(schema *Schema) {
|
|
|
|
|
|
m.mu.Lock()
|
|
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
m.schema = schema
|
|
|
|
|
|
|
|
|
|
|
|
// 为所有已存在的 readers 设置 schema
|
|
|
|
|
|
for _, reader := range m.readers {
|
|
|
|
|
|
reader.SetSchema(schema)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Get 从所有 SST 文件中查找数据
|
|
|
|
|
|
func (m *SSTableManager) Get(seq int64) (*SSTableRow, error) {
|
|
|
|
|
|
m.mu.RLock()
|
|
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 从后往前查找(新的文件优先)
|
|
|
|
|
|
for i := len(m.readers) - 1; i >= 0; i-- {
|
|
|
|
|
|
reader := m.readers[i]
|
|
|
|
|
|
row, err := reader.Get(seq)
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
return row, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil, fmt.Errorf("key not found: %d", seq)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetPartial 从所有 SST 文件中按需查找数据(只读取指定字段)
|
|
|
|
|
|
func (m *SSTableManager) GetPartial(seq int64, fields []string) (*SSTableRow, error) {
|
|
|
|
|
|
m.mu.RLock()
|
|
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 从后往前查找(新的文件优先)
|
|
|
|
|
|
for i := len(m.readers) - 1; i >= 0; i-- {
|
|
|
|
|
|
reader := m.readers[i]
|
|
|
|
|
|
row, err := reader.GetPartial(seq, fields)
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
return row, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil, fmt.Errorf("key not found: %d", seq)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// RemoveReader 移除指定文件编号的 reader(用于 compaction)
|
|
|
|
|
|
func (m *SSTableManager) RemoveReader(fileNumber int64) error {
|
|
|
|
|
|
m.mu.Lock()
|
|
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 查找并移除对应的 reader
|
|
|
|
|
|
for i, reader := range m.readers {
|
|
|
|
|
|
// 从文件名中提取文件编号
|
|
|
|
|
|
filename := filepath.Base(reader.path)
|
|
|
|
|
|
var readerFileNum int64
|
|
|
|
|
|
if _, err := fmt.Sscanf(filename, "%d.sst", &readerFileNum); err == nil {
|
|
|
|
|
|
if readerFileNum == fileNumber {
|
|
|
|
|
|
// 关闭 reader
|
|
|
|
|
|
reader.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 从列表中移除
|
|
|
|
|
|
m.readers = append(m.readers[:i], m.readers[i+1:]...)
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return fmt.Errorf("reader for file %d not found", fileNumber)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// AddReader 添加 reader 到管理器(用于 compaction 创建的新文件)
|
|
|
|
|
|
func (m *SSTableManager) AddReader(reader *SSTableReader) {
|
|
|
|
|
|
m.mu.Lock()
|
|
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
m.readers = append(m.readers, reader)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetReaders 获取所有 Readers(用于扫描)
|
|
|
|
|
|
func (m *SSTableManager) GetReaders() []*SSTableReader {
|
|
|
|
|
|
m.mu.RLock()
|
|
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 返回副本
|
|
|
|
|
|
readers := make([]*SSTableReader, len(m.readers))
|
|
|
|
|
|
copy(readers, m.readers)
|
|
|
|
|
|
|
|
|
|
|
|
// 按 MinKey 排序,确保查询时按 seq 顺序遍历
|
|
|
|
|
|
// 这对于 compaction 后的文件顺序至关重要:
|
|
|
|
|
|
// compaction 生成的新文件包含旧 seq,但被添加到 readers 末尾
|
|
|
|
|
|
// 排序后保证查询结果有序
|
|
|
|
|
|
sort.Slice(readers, func(i, j int) bool {
|
|
|
|
|
|
return readers[i].header.MinKey < readers[j].header.MinKey
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return readers
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetMaxSeq 获取所有 SST 中的最大 seq
|
|
|
|
|
|
func (m *SSTableManager) GetMaxSeq() int64 {
|
|
|
|
|
|
m.mu.RLock()
|
|
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
maxSeq := int64(0)
|
|
|
|
|
|
for _, reader := range m.readers {
|
|
|
|
|
|
header := reader.GetHeader()
|
|
|
|
|
|
if header.MaxKey > maxSeq {
|
|
|
|
|
|
maxSeq = header.MaxKey
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return maxSeq
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Count 获取 SST 文件数量
|
|
|
|
|
|
func (m *SSTableManager) Count() int {
|
|
|
|
|
|
m.mu.RLock()
|
|
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
return len(m.readers)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ListFiles 列出所有 SST 文件
|
|
|
|
|
|
func (m *SSTableManager) ListFiles() []string {
|
|
|
|
|
|
m.mu.RLock()
|
|
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
files := make([]string, 0, len(m.readers))
|
|
|
|
|
|
for _, reader := range m.readers {
|
|
|
|
|
|
files = append(files, reader.path)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return files
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Close 关闭所有 SST Readers
|
|
|
|
|
|
func (m *SSTableManager) Close() error {
|
|
|
|
|
|
m.mu.Lock()
|
|
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
for _, reader := range m.readers {
|
|
|
|
|
|
reader.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
m.readers = nil
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SSTableStats 统计信息
|
|
|
|
|
|
type SSTableStats struct {
|
|
|
|
|
|
FileCount int
|
|
|
|
|
|
TotalSize int64
|
|
|
|
|
|
MinSeq int64
|
|
|
|
|
|
MaxSeq int64
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetStats 获取统计信息
|
|
|
|
|
|
func (m *SSTableManager) GetStats() *SSTableStats {
|
|
|
|
|
|
m.mu.RLock()
|
|
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
stats := &SSTableStats{
|
|
|
|
|
|
FileCount: len(m.readers),
|
|
|
|
|
|
MinSeq: -1,
|
|
|
|
|
|
MaxSeq: -1,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, reader := range m.readers {
|
|
|
|
|
|
header := reader.GetHeader()
|
|
|
|
|
|
|
|
|
|
|
|
if stats.MinSeq == -1 || header.MinKey < stats.MinSeq {
|
|
|
|
|
|
stats.MinSeq = header.MinKey
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if stats.MaxSeq == -1 || header.MaxKey > stats.MaxSeq {
|
|
|
|
|
|
stats.MaxSeq = header.MaxKey
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 获取文件大小
|
|
|
|
|
|
if stat, err := os.Stat(reader.path); err == nil {
|
|
|
|
|
|
stats.TotalSize += stat.Size()
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return stats
|
|
|
|
|
|
}
|