@@ -4,92 +4,15 @@ import (
44 "context"
55 "errors"
66 "fmt"
7- "path/filepath"
87 "time"
98
10- "golang.org/x/sync/errgroup"
11-
129 "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend"
1310 supportlog "github.com/stellar/go-stellar-sdk/support/log"
1411 "github.com/stellar/go-stellar-sdk/xdr"
1512
1613 "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk"
17- "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore"
18- "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger"
19- "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/txhash"
2014)
2115
22- // HotStores holds the long-lived, caller-owned hot stores injected into RunHot.
23- // The caller (the daemon) opens and closes these; RunHot only borrows them to
24- // build the per-type hot ingesters. A field left nil for an enabled data type is
25- // a configuration error caught by RunHot. Every hot store is chunk-bound (each
26- // instance accumulates exactly one chunk before being frozen into cold
27- // artifacts), so each injected store must already be bound to the chunk being
28- // ingested — RunHot rejects a mismatch up front.
29- type HotStores struct {
30- Ledgers * ledger.HotStore
31- Txhash * txhash.HotStore
32- Events * eventstore.HotStore
33- }
34-
35- // buildHotIngesters constructs one HotIngester per data type enabled in cfg, in
36- // canonical ledgers→txhash→events order, from the injected stores. It errors if
37- // an enabled type's store is nil.
38- func buildHotIngesters (stores HotStores , sink MetricSink , cfg Config ) ([]HotIngester , error ) {
39- var ings []HotIngester
40- if cfg .Ledgers {
41- if stores .Ledgers == nil {
42- return nil , errors .New ("ingest: Ledgers enabled but HotStores.Ledgers is nil" )
43- }
44- ings = append (ings , NewLedgerHotIngester (stores .Ledgers , sink ))
45- }
46- if cfg .Txhash {
47- if stores .Txhash == nil {
48- return nil , errors .New ("ingest: Txhash enabled but HotStores.Txhash is nil" )
49- }
50- ings = append (ings , NewTxhashHotIngester (stores .Txhash , sink ))
51- }
52- if cfg .Events {
53- if stores .Events == nil {
54- return nil , errors .New ("ingest: Events enabled but HotStores.Events is nil" )
55- }
56- ings = append (ings , NewEventsHotIngester (stores .Events , sink ))
57- }
58- return ings , nil
59- }
60-
61- // buildColdIngesters opens one ColdIngester per data type enabled in cfg,
62- // each opening its own per-chunk writer under coldDir/<type> (constructors
63- // create their own directories and freely overwrite any prior attempt's
64- // files — see the package doc's artifact model). The constructor table below
65- // is the single definition site of the canonical ledgers→txhash→events order
66- // (buildHotIngesters keeps its explicit if-ladder because its three injected
67- // store types differ). On any constructor error it closes the ingesters built
68- // so far and returns.
69- func buildColdIngesters (coldDir string , chunkID chunk.ID , sink MetricSink , cfg Config ) ([]ColdIngester , error ) {
70- ctors := []struct {
71- enabled bool
72- dataType string
73- open func (string , chunk.ID , MetricSink ) (ColdIngester , error )
74- }{
75- {cfg .Ledgers , dataTypeLedgers , NewLedgerColdIngester },
76- {cfg .Txhash , dataTypeTxhash , NewTxhashColdIngester },
77- {cfg .Events , dataTypeEvents , NewEventsColdIngester },
78- }
79- var ings []ColdIngester
80- for _ , c := range ctors {
81- if ! c .enabled {
82- continue
83- }
84- ing , err := c .open (filepath .Join (coldDir , c .dataType ), chunkID , sink )
85- if err != nil {
86- return nil , closeColdAll (ings , fmt .Errorf ("open %s cold ingester: %w" , c .dataType , err ))
87- }
88- ings = append (ings , ing )
89- }
90- return ings , nil
91- }
92-
9316// errColdBuildAborted is the synthetic error recorded against an
9417// already-built cold ingester's metric when a LATER constructor fails and the
9518// build is rolled back. Without it, closing a fully-built ingester would emit
@@ -122,68 +45,6 @@ func closeColdAll(ings []ColdIngester, err error) error {
12245 return err
12346}
12447
125- // RunHot opens one stream for chunkID from source and feeds each ledger (as a
126- // view) to a HotService over the enabled hot ingesters, built from the INJECTED,
127- // caller-owned stores in hotStores. Ingest errors abort fast; HotService.Ingest
128- // waits for all ingesters before the loop pulls again so the borrowed view is
129- // never read past its lifetime. The hot stores are NOT closed here — the caller
130- // owns their lifecycle.
131- func RunHot (
132- ctx context.Context ,
133- logger * supportlog.Entry ,
134- source ChunkSource ,
135- chunkID chunk.ID ,
136- hotStores HotStores ,
137- sink MetricSink ,
138- cfg Config ,
139- ) error {
140- if verr := cfg .validate (); verr != nil {
141- return verr
142- }
143- // Every hot store is chunk-bound — each instance accumulates exactly one
144- // chunk's data before being frozen into the chunk's cold artifacts — and
145- // records its chunk at open time. An injected store bound to a different
146- // chunk than we're ingesting would silently interleave two chunks' data
147- // (ledgers, txhash) or fail every per-ledger write with an out-of-range
148- // offset (events, whose LedgerOffsets are chunk-relative), so catch the
149- // mismatch up front with a clear message. Nil stores are skipped here:
150- // buildHotIngesters rejects a nil store for an enabled type with a more
151- // specific error.
152- checkBinding := func (name string , got chunk.ID ) error {
153- if got != chunkID {
154- return fmt .Errorf ("ingest: RunHot chunk %d but injected %s store is bound to chunk %d" ,
155- uint32 (chunkID ), name , uint32 (got ))
156- }
157- return nil
158- }
159- if cfg .Ledgers && hotStores .Ledgers != nil {
160- if err := checkBinding ("Ledgers" , hotStores .Ledgers .ChunkID ()); err != nil {
161- return err
162- }
163- }
164- if cfg .Txhash && hotStores .Txhash != nil {
165- if err := checkBinding ("Txhash" , hotStores .Txhash .ChunkID ()); err != nil {
166- return err
167- }
168- }
169- if cfg .Events && hotStores .Events != nil {
170- if err := checkBinding ("Events" , hotStores .Events .ChunkID ()); err != nil {
171- return err
172- }
173- }
174- ings , berr := buildHotIngesters (hotStores , sink , cfg )
175- if berr != nil {
176- return berr
177- }
178- stream , oerr := source .OpenStream (chunkID )
179- if oerr != nil {
180- return fmt .Errorf ("open stream for chunk %d: %w" , uint32 (chunkID ), oerr )
181- }
182- logger .Debugf ("RunHot: ingesting chunk %d [%d, %d]" , uint32 (chunkID ), chunkID .FirstLedger (), chunkID .LastLedger ())
183- service := NewHotService (ings , sink )
184- return drain (ctx , stream , chunkID , service )
185- }
186-
18748// drain pulls the chunk's raw ledgers and feeds each (as a view) to the service,
18849// then verifies the full [first,last] range was consumed. For the cold path this
18950// completeness check runs before Finalize, so a short stream never produces a
@@ -235,123 +96,28 @@ func drain(ctx context.Context, stream ledgerbackend.LedgerStream, chunkID chunk
23596 return nil
23697}
23798
238- // RunCold ingests numChunks consecutive chunks starting at startChunk into the
239- // cold stores under coldDir, processing up to chunkWorkers chunks concurrently.
240- // Each chunk worker opens its own stream via source.OpenStream(chunkID), builds
241- // the enabled cold ingesters (which open their own writers), drives the ledgers
242- // through a ColdService, then Finalizes. A deferred Close drops partials on the
243- // failure path.
244- func RunCold (
245- ctx context.Context ,
246- logger * supportlog.Entry ,
247- source ChunkSource ,
248- coldDir string ,
249- startChunk chunk.ID ,
250- numChunks , chunkWorkers int ,
251- sink MetricSink ,
252- cfg Config ,
253- ) error {
254- if verr := cfg .validate (); verr != nil {
255- return verr
256- }
257- if numChunks < 1 {
258- return fmt .Errorf ("ingest: numChunks must be >= 1, got %d" , numChunks )
259- }
260- if chunkWorkers < 1 {
261- return fmt .Errorf ("ingest: chunkWorkers must be >= 1, got %d" , chunkWorkers )
262- }
263- if chunkWorkers > numChunks {
264- logger .Infof ("chunkWorkers=%d > numChunks=%d; clamping to %d" , chunkWorkers , numChunks , numChunks )
265- chunkWorkers = numChunks
266- }
267-
268- g , gctx := errgroup .WithContext (ctx )
269- g .SetLimit (chunkWorkers )
270- for i := range numChunks {
271- chunkID := startChunk + chunk .ID (uint32 (i ))
272- g .Go (func () error {
273- if rerr := runOneChunkCold (gctx , source , coldDir , chunkID , sink , cfg ); rerr != nil {
274- return fmt .Errorf ("chunk %d: %w" , uint32 (chunkID ), rerr )
275- }
276- return nil
277- })
278- }
279- return g .Wait ()
280- }
281-
282- // runOneChunkCold processes a single chunk: opens its own stream, builds the
283- // enabled cold ingesters into a ColdService, drives the per-ledger Ingest, then
284- // Finalizes (explicit, error-checked). Close is deferred and idempotent. On any
285- // failure the chunk attempt is simply abandoned — leftover files under coldDir
286- // are inert scratch (see the package doc's artifact model) and the retry's
287- // overwrite is the cleanup.
288- func runOneChunkCold (
289- ctx context.Context ,
290- source ChunkSource ,
291- coldDir string ,
292- chunkID chunk.ID ,
293- sink MetricSink ,
294- cfg Config ,
295- ) (err error ) {
296- sink = orNop (sink )
297-
298- // Pre-service failures (ctx, OpenStream, and the constructor failure
299- // below) emit the chunk's single ColdChunkTotal here: the ColdService
300- // that normally owns that aggregate isn't built yet, but the invariant
301- // is "exactly one ColdChunkTotal per chunk attempt, including failures."
302- start := time .Now ()
303- if cerr := ctx .Err (); cerr != nil {
304- sink .ColdChunkTotal (time .Since (start ))
305- return cerr
306- }
307- stream , oerr := source .OpenStream (chunkID )
308- if oerr != nil {
309- sink .ColdChunkTotal (time .Since (start ))
310- return oerr
311- }
312-
313- ings , berr := buildColdIngesters (coldDir , chunkID , sink , cfg )
314- if berr != nil {
315- // A constructor failure is still a chunk attempt
316- // (closeColdAll only emitted the per-ingester aborts).
317- sink .ColdChunkTotal (time .Since (start ))
318- return berr
319- }
320- service := NewColdService (ings , sink )
321- defer func () {
322- if cerr := service .Close (); cerr != nil {
323- err = errors .Join (err , fmt .Errorf ("close: %w" , cerr ))
324- }
325- }()
326-
327- if derr := drain (ctx , stream , chunkID , service ); derr != nil {
328- return derr
329- }
330- // drain verified the full range was consumed, so Finalize never commits a
331- // truncated artifact.
332- return service .Finalize (ctx )
333- }
334-
33599// ColdDirs names the per-data-type output root for one chunk's cold artifacts.
336100// Each field is the directory under which the matching cold ingester composes
337101// its {bucketID:05d}/ subdirectory — the same `coldDir` the per-type constructor
338102// takes. A field left "" for a data type enabled in cfg is a configuration error
339103// caught by RunColdChunk.
340104//
341- // Where RunCold derives the three roots from one coldDir (coldDir/{ledgers,
342- // txhash,events}), ColdDirs lets a caller with a DIFFERENT layout (e.g. the
343- // streaming daemon, whose raw txhash runs live under txhash/raw) place each
344- // artifact at its own canonical path while reusing the same cold pipeline.
105+ // ColdDirs lets a caller with its own on-disk layout (e.g. the streaming daemon,
106+ // whose raw txhash runs live under txhash/raw, not txhash) place each artifact at
107+ // its own canonical path — passing an explicit per-type root instead of deriving
108+ // coldDir/<dataType> — while reusing the very same cold ingesters, ColdService,
109+ // and drain loop.
345110type ColdDirs struct {
346111 Ledgers string
347112 Txhash string
348113 Events string
349114}
350115
351- // buildColdIngestersIn is the ColdDirs counterpart of buildColdIngesters: same
352- // constructors, same canonical ledgers→txhash→events order, same
353- // rollback-on-constructor-error semantics, but each type's root comes from an
354- // explicit dirs field rather than a fixed coldDir/<dataType> subdirectory.
116+ // buildColdIngestersIn opens one ColdIngester per data type enabled in cfg, each
117+ // under its OWN root from dirs (rather than coldDir/<dataType>). The constructor
118+ // table below is the single definition site of the canonical
119+ // ledgers→txhash→events order; on any constructor error it closes the ingesters
120+ // built so far and returns (rollback).
355121func buildColdIngestersIn (dirs ColdDirs , chunkID chunk.ID , sink MetricSink , cfg Config ) ([]ColdIngester , error ) {
356122 ctors := []struct {
357123 enabled bool
@@ -381,11 +147,12 @@ func buildColdIngestersIn(dirs ColdDirs, chunkID chunk.ID, sink MetricSink, cfg
381147}
382148
383149// RunColdChunk ingests EXACTLY ONE chunk's cold artifacts from source into the
384- // per-data-type roots named by dirs, in a single streaming pass. It is the
385- // single-chunk, explicit-layout sibling of RunCold: same constructors,
386- // ColdService, and drain loop (sequence/overrun validation + full-range check
387- // before Finalize), differing only in producing one chunk and taking explicit
388- // per-type roots (via buildColdIngestersIn).
150+ // per-data-type roots named by dirs, in a single streaming pass over the
151+ // chunk's ledgers. It builds the enabled cold ingester constructors, drives them
152+ // through a ColdService over the shared drain loop (sequence/overrun validation,
153+ // full-range completeness check before Finalize), and takes explicit per-type
154+ // output roots so a caller whose layout is not coldDir/<dataType> can reuse the
155+ // cold pipeline verbatim.
389156//
390157// The cold ingesters overwrite any prior attempt's files at their canonical
391158// paths (the package doc's artifact model), so RunColdChunk is the
0 commit comments