-
Notifications
You must be signed in to change notification settings - Fork 347
Expand file tree
/
Copy pathreplica_client.go
More file actions
160 lines (134 loc) · 5.53 KB
/
replica_client.go
File metadata and controls
160 lines (134 loc) · 5.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package litestream
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"log/slog"
"github.com/superfly/ltx"
)
var ErrStopIter = errors.New("stop iterator")
// ReplicaClient represents client to connect to a Replica.
type ReplicaClient interface {
// Type returns the type of client.
Type() string
// Init initializes the replica client connection.
// This may establish connections, validate configuration, etc.
// Implementations should be idempotent (no-op if already initialized).
Init(ctx context.Context) error
// LTXFiles returns an iterator of all LTX files on the replica for a given level.
// If seek is specified, the iterator start from the given TXID or the next available if not found.
// If useMetadata is true, the iterator fetches accurate timestamps from metadata for timestamp-based restore.
// When false, the iterator uses fast timestamps (LastModified/Created/ModTime) for normal operations.
LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)
// OpenLTXFile returns a reader that contains an LTX file at a given TXID.
// If seek is specified, the reader will start at the given offset.
// Returns an os.ErrNotFound error if the LTX file does not exist.
OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
// WriteLTXFile writes an LTX file to the replica.
// Returns metadata for the written file.
WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error)
// DeleteLTXFiles deletes one or more LTX files.
DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error
// DeleteAll deletes all files.
DeleteAll(ctx context.Context) error
// SetLogger sets the logger for the client.
SetLogger(logger *slog.Logger)
}
// FindLTXFiles returns a list of files that match filter.
// The useMetadata parameter is passed through to LTXFiles to control whether accurate timestamps
// are fetched from metadata. When true (timestamp-based restore), accurate timestamps are required.
// When false (normal operations), fast timestamps are sufficient.
func FindLTXFiles(ctx context.Context, client ReplicaClient, level int, useMetadata bool, filter func(*ltx.FileInfo) (bool, error)) ([]*ltx.FileInfo, error) {
itr, err := client.LTXFiles(ctx, level, 0, useMetadata)
if err != nil {
return nil, err
}
defer func() { _ = itr.Close() }()
var a []*ltx.FileInfo
for itr.Next() {
item := itr.Item()
match, err := filter(item)
if match {
a = append(a, item)
}
if errors.Is(err, ErrStopIter) {
break
} else if err != nil {
return a, err
}
}
if err := itr.Close(); err != nil {
return nil, err
}
return a, nil
}
// DefaultEstimatedPageIndexSize is size that is first fetched when fetching the page index.
// If the fetch was smaller than the actual page index, another call is made to fetch the rest.
const DefaultEstimatedPageIndexSize = 32 * 1024 // 32KB
func FetchPageIndex(ctx context.Context, client ReplicaClient, info *ltx.FileInfo) (map[uint32]ltx.PageIndexElem, error) {
rc, err := fetchPageIndexData(ctx, client, info)
if err != nil {
return nil, err
}
defer rc.Close()
return ltx.DecodePageIndex(bufio.NewReader(rc), info.Level, info.MinTXID, info.MaxTXID)
}
// FetchLTXHeader reads & returns the LTX header for the given file info.
func FetchLTXHeader(ctx context.Context, client ReplicaClient, info *ltx.FileInfo) (ltx.Header, error) {
rc, err := client.OpenLTXFile(ctx, info.Level, info.MinTXID, info.MaxTXID, 0, ltx.HeaderSize)
if err != nil {
return ltx.Header{}, fmt.Errorf("open ltx file: %w", err)
}
defer rc.Close()
hdr, _, err := ltx.PeekHeader(rc)
if err != nil {
return ltx.Header{}, fmt.Errorf("peek header: %w", err)
}
return hdr, nil
}
// fetchPageIndexData fetches a chunk of the end of the file to get the page index.
// If the fetch was smaller than the actual page index, another call is made to fetch the rest.
func fetchPageIndexData(ctx context.Context, client ReplicaClient, info *ltx.FileInfo) (io.ReadCloser, error) {
// Fetch the end of the file to get the page index.
offset := info.Size - DefaultEstimatedPageIndexSize
if offset < 0 {
offset = 0
}
f, err := client.OpenLTXFile(ctx, info.Level, info.MinTXID, info.MaxTXID, offset, 0)
if err != nil {
return nil, fmt.Errorf("open ltx file: %w", err)
}
defer f.Close()
// If we have read the full size of the page index, return the page index block as a reader.
b, err := io.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("read ltx page index: %w", err)
}
size := binary.BigEndian.Uint64(b[len(b)-ltx.TrailerSize-8:])
if off := len(b) - int(size) - ltx.TrailerSize - 8; off > 0 {
return io.NopCloser(bytes.NewReader(b[off:])), nil
}
// Otherwise read the file from the start of the page index.
f, err = client.OpenLTXFile(ctx, info.Level, info.MinTXID, info.MaxTXID, info.Size-ltx.TrailerSize-8-int64(size), 0)
if err != nil {
return nil, fmt.Errorf("open ltx file: %w", err)
}
return f, nil
}
// FetchPage fetches and decodes a single page frame from an LTX file.
func FetchPage(ctx context.Context, client ReplicaClient, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (ltx.PageHeader, []byte, error) {
f, err := client.OpenLTXFile(ctx, level, minTXID, maxTXID, offset, size)
if err != nil {
return ltx.PageHeader{}, nil, fmt.Errorf("open ltx file: %w", err)
}
defer f.Close()
b, err := io.ReadAll(f)
if err != nil {
return ltx.PageHeader{}, nil, fmt.Errorf("read ltx page frame: %w", err)
}
return ltx.DecodePageData(b)
}