Skip to content

Commit c2398ad

Browse files
committed
streaming(fullhistory): Phase 2 layer 2 — live ingestion + daemon wiring (closes #816, closes #808)
Wires the hot tier into the daemon: the full startStreaming (open the resume hot DB, launch the lifecycle goroutine, run the live ingestion loop following captive core), the production boundaries (captive-core opener, bulk-backend tip adapter), the lifecycle-config/HotProbe assembly, and the hot watermark refinement (startup passes the real probe so resume re-derives from the last synced batch). Completes the full-history streaming daemon — backfill + live ingest + freeze/discard/prune. Verified by the lifecycle E2E (first-start -> ingest -> freeze -> lookup -> restart-resume re-derivation -> prune). Closes #816. Closes #808.
1 parent df8ec80 commit c2398ad

6 files changed

Lines changed: 1064 additions & 146 deletions

File tree

cmd/stellar-rpc/internal/fullhistory/streaming/daemon.go

Lines changed: 100 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/stellar/go-stellar-sdk/ingest/ledgerbackend"
1313
supportlog "github.com/stellar/go-stellar-sdk/support/log"
14+
"github.com/stellar/go-stellar-sdk/xdr"
1415

1516
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/daemon/interfaces"
1617
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/ingest"
@@ -107,6 +108,10 @@ type Boundaries struct {
107108
// deployment that never backfills.
108109
Backend ingest.ChunkSource
109110

111+
// Core starts captive core at the resume ledger and yields the live getter
112+
// the ingestion loop polls. Required.
113+
Core CoreOpener
114+
110115
// ServeReads launches the RPC read server (it must return promptly, not block
111116
// until shutdown). Required.
112117
//
@@ -122,6 +127,9 @@ func (b Boundaries) validate() error {
122127
if b.NetworkTip == nil {
123128
return errors.New("streaming: Boundaries.NetworkTip is nil")
124129
}
130+
if b.Core == nil {
131+
return errors.New("streaming: Boundaries.Core is nil")
132+
}
125133
if b.ServeReads == nil {
126134
return errors.New("streaming: Boundaries.ServeReads is nil")
127135
}
@@ -218,7 +226,10 @@ func RunDaemonWith(ctx context.Context, configPath string, opts DaemonOptions) e
218226
}
219227

220228
// startConfig threads the loaded Config, the bound catalog/logger, and the
221-
// assembled boundaries into the StartConfig startStreaming consumes.
229+
// assembled boundaries into the StartConfig startStreaming consumes. The Exec
230+
// and Lifecycle bundles share ONE catalog, worker pool, and retention floor (the
231+
// design's "catch-up and the lifecycle goroutine share one set of
232+
// postconditions"), so Lifecycle embeds the same ExecConfig.
222233
func startConfig(
223234
cfg Config, cat *Catalog, logger *supportlog.Entry, b Boundaries, metrics Metrics,
224235
sink ingest.MetricSink, tipBackoff time.Duration, tipMaxAttempts int,
@@ -230,18 +241,24 @@ func startConfig(
230241
Workers: derefInt(cfg.Backfill.Workers),
231242
MaxRetries: derefInt(cfg.Backfill.MaxRetries),
232243
Process: ProcessConfig{
244+
HotProbe: NewRocksHotProbe(cat.Layout().HotChunkPath, logger),
233245
Backend: b.Backend,
234246
BackendWaiter: b.BackendWaiter,
235247
Sink: sink,
236248
},
237249
}
238-
return StartConfig{
239-
Exec: exec,
250+
life := LifecycleConfig{
251+
ExecConfig: exec,
240252
RetentionChunks: derefU32(cfg.Streaming.RetentionChunks),
241-
NetworkTip: b.NetworkTip,
242-
ServeReads: b.ServeReads,
243-
TipBackoff: tipBackoff,
244-
TipMaxAttempts: tipMaxAttempts,
253+
}
254+
return StartConfig{
255+
Exec: exec,
256+
Lifecycle: life,
257+
NetworkTip: b.NetworkTip,
258+
Core: b.Core,
259+
ServeReads: b.ServeReads,
260+
TipBackoff: tipBackoff,
261+
TipMaxAttempts: tipMaxAttempts,
245262
}
246263
}
247264

@@ -251,10 +268,10 @@ func startConfig(
251268
// only does on ctx cancellation) returns nil. A cancelled ctx during the backoff
252269
// also returns nil — no restart after a shutdown request.
253270
//
254-
// It does NOT swallow the fatal sentinel ErrFirstStartNoTip: it is returned UP so
255-
// an operator/supervisor sees it. The retry here is for transient restartable
256-
// failures (a backfill hiccup) where a fresh start converges; the unrecoverable
257-
// one surfaces.
271+
// It does NOT swallow the fatal sentinels (ErrHotVolumeLost, ErrFirstStartNoTip):
272+
// those are returned UP so an operator/supervisor sees them. The retry here is
273+
// for transient restartable failures (a backfill/ingest hiccup, a captive core
274+
// crash) where a fresh start converges; the unrecoverable ones surface.
258275
func superviseStreaming(
259276
ctx context.Context, start StartConfig, logger *supportlog.Entry, backoff time.Duration,
260277
) error {
@@ -268,7 +285,7 @@ func superviseStreaming(
268285
}
269286
// Unrecoverable: surface up rather than spin restarting on a condition a
270287
// fresh start cannot heal.
271-
if errors.Is(err, ErrFirstStartNoTip) {
288+
if errors.Is(err, ErrHotVolumeLost) || errors.Is(err, ErrFirstStartNoTip) {
272289
return err
273290
}
274291
logger.WithError(err).Warnf("streaming: daemon run failed; restarting in %s", backoff)
@@ -289,6 +306,11 @@ func superviseStreaming(
289306
// buildProductionBoundaries assembles the real external boundaries from the
290307
// loaded config:
291308
//
309+
// - Core: captive stellar-core via NewCaptiveCoreStream, wrapped so
310+
// OpenLedgerStream hands the live stream to the ingestion loop (the stream
311+
// owns the core process lifecycle — started on the first RawLedgers pull,
312+
// torn down when iteration ends — so this builder constructs it without
313+
// sequencing PrepareRange/Close itself).
292314
// - Backend: the bulk datastore ChunkSource (NewDataStoreSource) when a bucket
293315
// path is configured; nil for a frontfill-only deployment.
294316
// - NetworkTip / BackendWaiter: an adapter over the bulk backend's tip.
@@ -299,16 +321,22 @@ func superviseStreaming(
299321
// tip-resolution the v1 path performs differently. Until #772 lands the cutover,
300322
// a deployment that needs catch-up against a real lake must wire NetworkTip/
301323
// BackendWaiter/Backend through DaemonOptions.BuildBoundaries; buildProduction-
302-
// Boundaries supplies a tip adapter that errors clearly when no bulk backend is
303-
// configured, so a frontfill ("genesis" or "now" with no backfill) deployment
304-
// runs unchanged.
324+
// Boundaries supplies the captive-core Core (fully wired) and a tip adapter that
325+
// errors clearly when no bulk backend is configured, so a frontfill ("genesis"
326+
// or "now" with no backfill) deployment runs unchanged.
305327
func buildProductionBoundaries(
306-
_ context.Context, _ Config, _ Paths, _ *Catalog, _ *supportlog.Entry,
328+
ctx context.Context, cfg Config, _ Paths, _ *Catalog, logger *supportlog.Entry,
307329
) (Boundaries, error) {
330+
core, err := newCaptiveCoreOpener(cfg.Streaming.CaptiveCoreConfig, logger)
331+
if err != nil {
332+
return Boundaries{}, err
333+
}
334+
308335
b := Boundaries{
336+
Core: core,
309337
// TODO(#772): wire the full-history RPC read server. The SQLite read path
310338
// is still the v1 daemon's; until the #772 cutover, serving is a no-op here
311-
// so the streaming daemon catches up + freezes without double-serving reads.
339+
// so the streaming daemon ingests + freezes without double-serving reads.
312340
ServeReads: func(context.Context) error { return nil },
313341
}
314342

@@ -322,6 +350,59 @@ func buildProductionBoundaries(
322350
return b, nil
323351
}
324352

353+
// captiveCoreOpener is the production CoreOpener: it prepares captive core at the
354+
// resume ledger and hands back a LedgerGetter the ingestion loop polls by
355+
// sequence (the design's core.GetLedger(ctx, seq)) plus a closer.
356+
type captiveCoreOpener struct {
357+
backend ledgerbackend.LedgerBackend
358+
}
359+
360+
func newCaptiveCoreOpener(captiveCoreConfigPath string, logger *supportlog.Entry) (*captiveCoreOpener, error) {
361+
if captiveCoreConfigPath == "" {
362+
return nil, errors.New("streaming: [streaming].captive_core_config is required")
363+
}
364+
// TODO(#772): the captive-core CaptiveCoreConfig (binary path, network
365+
// passphrase, history-archive URLs, storage path) is assembled from the v1
366+
// daemon config today; threading those through the streaming Config is part
367+
// of the cutover. The factory below is the wiring point — once the fields are
368+
// in Config, build a ledgerbackend.CaptiveCoreConfig from
369+
// NewCaptiveCoreTomlFromFile(captiveCoreConfigPath, ...) and NewCaptive, then
370+
// PrepareRange(UnboundedRange(resume)) in OpenCore. The seam (a LedgerGetter
371+
// behind CoreOpener) is final; only the config plumbing is deferred.
372+
return nil, fmt.Errorf("streaming: production captive-core wiring is deferred to #772 "+
373+
"(config %q parsed; pass a CoreOpener via DaemonOptions.BuildBoundaries to run today)",
374+
captiveCoreConfigPath)
375+
}
376+
377+
// OpenCore prepares the backend over the unbounded range from resumeLedger and
378+
// returns a getter wrapping GetLedger plus the backend's Close.
379+
func (c *captiveCoreOpener) OpenCore(
380+
ctx context.Context, resumeLedger uint32,
381+
) (LedgerGetter, func() error, error) {
382+
if err := c.backend.PrepareRange(ctx, ledgerbackend.UnboundedRange(resumeLedger)); err != nil {
383+
return nil, nil, fmt.Errorf("streaming: captive core prepare range from %d: %w", resumeLedger, err)
384+
}
385+
return backendGetter{backend: c.backend}, c.backend.Close, nil
386+
}
387+
388+
// backendGetter adapts a ledgerbackend.LedgerBackend to LedgerGetter: GetLedger
389+
// blocks until the ledger is available and returns its raw wire bytes.
390+
type backendGetter struct {
391+
backend ledgerbackend.LedgerBackend
392+
}
393+
394+
func (g backendGetter) GetLedger(ctx context.Context, seq uint32) (xdr.LedgerCloseMetaView, error) {
395+
lcm, err := g.backend.GetLedger(ctx, seq)
396+
if err != nil {
397+
return nil, err
398+
}
399+
raw, err := lcm.MarshalBinary()
400+
if err != nil {
401+
return nil, fmt.Errorf("streaming: marshal ledger %d: %w", seq, err)
402+
}
403+
return xdr.LedgerCloseMetaView(raw), nil
404+
}
405+
325406
// notConfiguredTip is the NetworkTipBackend for a deployment with no bulk
326407
// backend configured: every sample returns a clear not-configured error. It is
327408
// the honest placeholder until the #772 cutover wires the real lake tip.
@@ -419,6 +500,8 @@ func newLogger(cfg LoggingConfig) (*supportlog.Entry, error) {
419500
// compile-time assertions: the production adapters satisfy the injected
420501
// interfaces startStreaming/processChunk consume.
421502
var (
503+
_ CoreOpener = (*captiveCoreOpener)(nil)
504+
_ LedgerGetter = backendGetter{}
422505
_ NetworkTipBackend = (*backendTip)(nil)
423506
_ BackendWaiter = (*backendTip)(nil)
424507
_ NetworkTipBackend = notConfiguredTip{}

0 commit comments

Comments
 (0)