Skip to content

Commit e96d2d9

Browse files
authored
Remove generations (#636)
1 parent 996a50d commit e96d2d9

20 files changed

+599
-1594
lines changed

abs/replica_client.go

Lines changed: 46 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"net/url"
88
"os"
99
"path"
10-
"strings"
1110
"sync"
1211
"time"
1312

@@ -90,53 +89,17 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
9089
return nil
9190
}
9291

93-
// Generations returns a list of available generation names.
94-
func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
95-
if err := c.Init(ctx); err != nil {
96-
return nil, err
97-
}
98-
99-
var generations []string
100-
var marker azblob.Marker
101-
for marker.NotDone() {
102-
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
103-
104-
resp, err := c.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
105-
Prefix: litestream.GenerationsPath(c.Path) + "/",
106-
})
107-
if err != nil {
108-
return nil, err
109-
}
110-
marker = resp.NextMarker
111-
112-
for _, prefix := range resp.Segment.BlobPrefixes {
113-
name := path.Base(strings.TrimSuffix(prefix.Name, "/"))
114-
if !litestream.IsGenerationName(name) {
115-
continue
116-
}
117-
generations = append(generations, name)
118-
}
119-
}
120-
121-
return generations, nil
122-
}
123-
124-
// DeleteGeneration deletes all snapshots & WAL segments within a generation.
125-
func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error {
92+
// DeleteAll deletes all snapshots & WAL segments .
93+
func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
12694
if err := c.Init(ctx); err != nil {
12795
return err
12896
}
12997

130-
dir, err := litestream.GenerationPath(c.Path, generation)
131-
if err != nil {
132-
return fmt.Errorf("cannot determine generation path: %w", err)
133-
}
134-
13598
var marker azblob.Marker
13699
for marker.NotDone() {
137100
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
138101

139-
resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
102+
resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: "/"})
140103
if err != nil {
141104
return err
142105
}
@@ -154,26 +117,26 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
154117
}
155118
}
156119

157-
// log.Printf("%s(%s): retainer: deleting generation: %s", r.db.Path(), r.Name(), generation)
120+
// log.Printf("%s(%s): retainer: deleting: %s", r.db.Path(), r.Name())
158121

159122
return nil
160123
}
161124

162-
// Snapshots returns an iterator over all available snapshots for a generation.
163-
func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
125+
// Snapshots returns an iterator over all available snapshots.
126+
func (c *ReplicaClient) Snapshots(ctx context.Context) (litestream.SnapshotIterator, error) {
164127
if err := c.Init(ctx); err != nil {
165128
return nil, err
166129
}
167-
return newSnapshotIterator(ctx, generation, c), nil
130+
return newSnapshotIterator(ctx, c), nil
168131
}
169132

170133
// WriteSnapshot writes LZ4 compressed data from rd to the object storage.
171-
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) {
134+
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) {
172135
if err := c.Init(ctx); err != nil {
173136
return info, err
174137
}
175138

176-
key, err := litestream.SnapshotPath(c.Path, generation, index)
139+
key, err := litestream.SnapshotPath(c.Path, index)
177140
if err != nil {
178141
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
179142
}
@@ -192,23 +155,22 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
192155
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
193156
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N()))
194157

195-
// log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
158+
// log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), index, time.Since(startTime).Truncate(time.Millisecond))
196159

197160
return litestream.SnapshotInfo{
198-
Generation: generation,
199-
Index: index,
200-
Size: rc.N(),
201-
CreatedAt: startTime.UTC(),
161+
Index: index,
162+
Size: rc.N(),
163+
CreatedAt: startTime.UTC(),
202164
}, nil
203165
}
204166

205-
// SnapshotReader returns a reader for snapshot data at the given generation/index.
206-
func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
167+
// SnapshotReader returns a reader for snapshot data at the given index.
168+
func (c *ReplicaClient) SnapshotReader(ctx context.Context, index int) (io.ReadCloser, error) {
207169
if err := c.Init(ctx); err != nil {
208170
return nil, err
209171
}
210172

211-
key, err := litestream.SnapshotPath(c.Path, generation, index)
173+
key, err := litestream.SnapshotPath(c.Path, index)
212174
if err != nil {
213175
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
214176
}
@@ -227,13 +189,13 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
227189
return resp.Body(azblob.RetryReaderOptions{}), nil
228190
}
229191

230-
// DeleteSnapshot deletes a snapshot with the given generation & index.
231-
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error {
192+
// DeleteSnapshot deletes a snapshot with the given index.
193+
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, index int) error {
232194
if err := c.Init(ctx); err != nil {
233195
return err
234196
}
235197

236-
key, err := litestream.SnapshotPath(c.Path, generation, index)
198+
key, err := litestream.SnapshotPath(c.Path, index)
237199
if err != nil {
238200
return fmt.Errorf("cannot determine snapshot path: %w", err)
239201
}
@@ -249,12 +211,12 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
249211
return nil
250212
}
251213

252-
// WALSegments returns an iterator over all available WAL files for a generation.
253-
func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
214+
// WALSegments returns an iterator over all available WAL files.
215+
func (c *ReplicaClient) WALSegments(ctx context.Context) (litestream.WALSegmentIterator, error) {
254216
if err := c.Init(ctx); err != nil {
255217
return nil, err
256218
}
257-
return newWALSegmentIterator(ctx, generation, c), nil
219+
return newWALSegmentIterator(ctx, c), nil
258220
}
259221

260222
// WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
@@ -263,7 +225,7 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
263225
return info, err
264226
}
265227

266-
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
228+
key, err := litestream.WALSegmentPath(c.Path, pos.Index, pos.Offset)
267229
if err != nil {
268230
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
269231
}
@@ -283,11 +245,10 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
283245
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N()))
284246

285247
return litestream.WALSegmentInfo{
286-
Generation: pos.Generation,
287-
Index: pos.Index,
288-
Offset: pos.Offset,
289-
Size: rc.N(),
290-
CreatedAt: startTime.UTC(),
248+
Index: pos.Index,
249+
Offset: pos.Offset,
250+
Size: rc.N(),
251+
CreatedAt: startTime.UTC(),
291252
}, nil
292253
}
293254

@@ -298,7 +259,7 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos
298259
return nil, err
299260
}
300261

301-
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
262+
key, err := litestream.WALSegmentPath(c.Path, pos.Index, pos.Offset)
302263
if err != nil {
303264
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
304265
}
@@ -324,7 +285,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
324285
}
325286

326287
for _, pos := range a {
327-
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
288+
key, err := litestream.WALSegmentPath(c.Path, pos.Index, pos.Offset)
328289
if err != nil {
329290
return fmt.Errorf("cannot determine wal segment path: %w", err)
330291
}
@@ -343,8 +304,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
343304
}
344305

345306
type snapshotIterator struct {
346-
client *ReplicaClient
347-
generation string
307+
client *ReplicaClient
348308

349309
ch chan litestream.SnapshotInfo
350310
g errgroup.Group
@@ -355,11 +315,10 @@ type snapshotIterator struct {
355315
err error
356316
}
357317

358-
func newSnapshotIterator(ctx context.Context, generation string, client *ReplicaClient) *snapshotIterator {
318+
func newSnapshotIterator(ctx context.Context, client *ReplicaClient) *snapshotIterator {
359319
itr := &snapshotIterator{
360-
client: client,
361-
generation: generation,
362-
ch: make(chan litestream.SnapshotInfo),
320+
client: client,
321+
ch: make(chan litestream.SnapshotInfo),
363322
}
364323

365324
itr.ctx, itr.cancel = context.WithCancel(ctx)
@@ -372,7 +331,7 @@ func newSnapshotIterator(ctx context.Context, generation string, client *Replica
372331
func (itr *snapshotIterator) fetch() error {
373332
defer close(itr.ch)
374333

375-
dir, err := litestream.SnapshotsPath(itr.client.Path, itr.generation)
334+
dir, err := litestream.SnapshotsPath(itr.client.Path)
376335
if err != nil {
377336
return fmt.Errorf("cannot determine snapshots path: %w", err)
378337
}
@@ -395,10 +354,9 @@ func (itr *snapshotIterator) fetch() error {
395354
}
396355

397356
info := litestream.SnapshotInfo{
398-
Generation: itr.generation,
399-
Index: index,
400-
Size: *item.Properties.ContentLength,
401-
CreatedAt: item.Properties.CreationTime.UTC(),
357+
Index: index,
358+
Size: *item.Properties.ContentLength,
359+
CreatedAt: item.Properties.CreationTime.UTC(),
402360
}
403361

404362
select {
@@ -449,8 +407,7 @@ func (itr *snapshotIterator) Snapshot() litestream.SnapshotInfo {
449407
}
450408

451409
type walSegmentIterator struct {
452-
client *ReplicaClient
453-
generation string
410+
client *ReplicaClient
454411

455412
ch chan litestream.WALSegmentInfo
456413
g errgroup.Group
@@ -461,11 +418,10 @@ type walSegmentIterator struct {
461418
err error
462419
}
463420

464-
func newWALSegmentIterator(ctx context.Context, generation string, client *ReplicaClient) *walSegmentIterator {
421+
func newWALSegmentIterator(ctx context.Context, client *ReplicaClient) *walSegmentIterator {
465422
itr := &walSegmentIterator{
466-
client: client,
467-
generation: generation,
468-
ch: make(chan litestream.WALSegmentInfo),
423+
client: client,
424+
ch: make(chan litestream.WALSegmentInfo),
469425
}
470426

471427
itr.ctx, itr.cancel = context.WithCancel(ctx)
@@ -478,7 +434,7 @@ func newWALSegmentIterator(ctx context.Context, generation string, client *Repli
478434
func (itr *walSegmentIterator) fetch() error {
479435
defer close(itr.ch)
480436

481-
dir, err := litestream.WALPath(itr.client.Path, itr.generation)
437+
dir, err := litestream.WALPath(itr.client.Path)
482438
if err != nil {
483439
return fmt.Errorf("cannot determine wal path: %w", err)
484440
}
@@ -501,11 +457,10 @@ func (itr *walSegmentIterator) fetch() error {
501457
}
502458

503459
info := litestream.WALSegmentInfo{
504-
Generation: itr.generation,
505-
Index: index,
506-
Offset: offset,
507-
Size: *item.Properties.ContentLength,
508-
CreatedAt: item.Properties.CreationTime.UTC(),
460+
Index: index,
461+
Offset: offset,
462+
Size: *item.Properties.ContentLength,
463+
CreatedAt: item.Properties.CreationTime.UTC(),
509464
}
510465

511466
select {

0 commit comments

Comments
 (0)