-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfile_format.go
More file actions
178 lines (141 loc) · 5.12 KB
/
Copy pathfile_format.go
File metadata and controls
178 lines (141 loc) · 5.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package bloomsearch
/**
This package should probably make lowerJSON versions of structs to protect internal fields, but that's for a later optimization.
*/
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"hash/crc32"
"github.com/bits-and-blooms/bloom/v3"
)
var (
ErrInvalidHash = errors.New("invalid hash")
)
// File format constants
const (
FileVersion = uint32(1)
MagicBytes = "BLOMSRCH"
LengthPrefixSize = 4
VersionPrefixSize = 4
HashSize = 4
)
// CRC32C table used for checksums (Castagnoli)
var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
type FileMetadata struct {
BloomFilters BloomFilters
BloomExpectedItems uint
BloomFalsePositiveRate float64
DataBlocks []DataBlockMetadata
}
// Returns the file metadata as a byte slice and the CRC32C of the file metadata
func (f *FileMetadata) Bytes() ([]byte, []byte) {
jsonBytes, err := json.Marshal(f)
if err != nil {
panic(err)
}
crc := crc32.Checksum(jsonBytes, crc32cTable)
crcBytes := make([]byte, HashSize)
binary.LittleEndian.PutUint32(crcBytes, crc)
return jsonBytes, crcBytes
}
func FileMetadataFromBytesWithHash(bytes []byte, expectedHashBytes []byte) (*FileMetadata, error) {
// Calculate CRC32C of the provided bytes
actualHash := crc32.Checksum(bytes, crc32cTable)
// Convert expected hash bytes to uint32
expectedHash := binary.LittleEndian.Uint32(expectedHashBytes)
// Verify hash matches
if actualHash != expectedHash {
return nil, fmt.Errorf("%w: expected %x, got %x", ErrInvalidHash, expectedHash, actualHash)
}
// Unmarshal the JSON bytes into FileMetadata
var metadata FileMetadata
err := json.Unmarshal(bytes, &metadata)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
}
return &metadata, nil
}
// BloomFilters contains the bloom filters for a data block
// This struct is serialized and stored at the beginning of each data block
type BloomFilters struct {
FieldBloomFilter *bloom.BloomFilter
TokenBloomFilter *bloom.BloomFilter
FieldTokenBloomFilter *bloom.BloomFilter
}
// Returns the data block bloom filters as a byte slice and the CRC32C of the bloom filters
func (d *BloomFilters) Bytes() ([]byte, []byte) {
jsonBytes, err := json.Marshal(d)
if err != nil {
panic(err)
}
crc := crc32.Checksum(jsonBytes, crc32cTable)
crcBytes := make([]byte, HashSize)
binary.LittleEndian.PutUint32(crcBytes, crc)
return jsonBytes, crcBytes
}
func DataBlockBloomFiltersFromBytesWithHash(bytes []byte, expectedHashBytes []byte) (*BloomFilters, error) {
// Calculate CRC32C of the provided bytes
actualHash := crc32.Checksum(bytes, crc32cTable)
// Convert expected hash bytes to uint32
expectedHash := binary.LittleEndian.Uint32(expectedHashBytes)
// Verify hash matches
if actualHash != expectedHash {
return nil, fmt.Errorf("%w: expected %x, got %x", ErrInvalidHash, expectedHash, actualHash)
}
// Unmarshal the JSON bytes into DataBlockBloomFilters
var bloomFilters BloomFilters
err := json.Unmarshal(bytes, &bloomFilters)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal bloom filters: %w", err)
}
return &bloomFilters, nil
}
// ReadDataBlockBloomFilters reads bloom filters from a data block given a file reader and block metadata
func ReadDataBlockBloomFilters(file io.ReadSeeker, blockMetadata DataBlockMetadata) (*BloomFilters, error) {
// Seek to the beginning of the data block
_, err := file.Seek(int64(blockMetadata.Offset), 0)
if err != nil {
return nil, fmt.Errorf("failed to seek to block offset: %w", err)
}
// Read bloom filters bytes (excluding the hash)
bloomFiltersBytes := make([]byte, blockMetadata.BloomFiltersSize-HashSize)
if _, err = io.ReadFull(file, bloomFiltersBytes); err != nil {
return nil, fmt.Errorf("failed to read bloom filters: %w", err)
}
// Read bloom filters hash
bloomFiltersHashBytes := make([]byte, HashSize)
if _, err = io.ReadFull(file, bloomFiltersHashBytes); err != nil {
return nil, fmt.Errorf("failed to read bloom filters hash: %w", err)
}
// Verify and parse bloom filters
return DataBlockBloomFiltersFromBytesWithHash(bloomFiltersBytes, bloomFiltersHashBytes)
}
// CompressionType represents the compression algorithm used for row data
type CompressionType string
const (
CompressionNone CompressionType = "none"
CompressionSnappy CompressionType = "snappy"
CompressionZstd CompressionType = "zstd"
)
type DataBlockMetadata struct {
// Absolute file offset (includes bloom filters at the beginning)
Offset int
// Size includes the bloom filters, their hash, and row data (no trailing hash)
Size int
Rows int
// Size of the bloom filters section (bloom filters + hash)
BloomFiltersSize int
MinMaxIndexes map[string]MinMaxIndex `json:",omitempty"`
PartitionID string `json:",omitempty"`
// Compression algorithm used for the row data in this block
Compression CompressionType `json:",omitempty"`
// Uncompressed size of row data (for decompression buffer allocation)
UncompressedSize int `json:",omitempty"`
// Hash of the compressed row data (for integrity verification)
RowDataHash uint32 `json:",omitempty"`
BloomExpectedItems uint
BloomFalsePositiveRate float64
}