Skip to content

Commit 40e2729

Browse files
committed
Adopt orphaned R1 streams on standalone-to-cluster promotion
Signed-off-by: Tilak Raj <tilak.raj94@gmail.com>
1 parent e3f7ac5 commit 40e2729

File tree

2 files changed

+2076
-16
lines changed

2 files changed

+2076
-16
lines changed

server/jetstream_cluster.go

Lines changed: 270 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ type jetStreamCluster struct {
8080
// Track last meta snapshot time and duration for monitoring.
8181
lastMetaSnapTime int64 // Unix nanoseconds
8282
lastMetaSnapDuration int64 // Duration in nanoseconds
83+
// Signals that this node was promoted from standalone to clustered mode
84+
// with no prior meta state. Used to detect standalone-to-clustered transitions.
85+
ptc bool
86+
// Tracks streams that were preserved (not deleted) due to name conflicts
87+
// with existing cluster assignments during ptc promotion. This prevents
88+
// the ptc marker from being cleaned up while unresolved conflicts remain.
89+
ptcConflicts int
8390
}
8491

8592
// Used to track inflight stream create/update/delete requests that have been proposed but not yet applied.
@@ -362,6 +369,13 @@ const (
362369
defaultMetaGroupName = "_meta_"
363370
defaultMetaFSBlkSize = 1024 * 1024
364371
jsExcludePlacement = "!jetstream"
372+
373+
// ptcMarkerFile is a sentinel file written into the meta Raft group directory
374+
// when a standalone server is promoted to a cluster member and has R1 streams
375+
// pending adoption. Its presence tells checkForOrphans (and processStreamAssignment)
376+
// that local R1 stream data must be preserved, not deleted, until all orphans
377+
// have been successfully adopted.
378+
ptcMarkerFile = "ptc"
365379
)
366380

367381
// Returns information useful in mixed mode.
@@ -870,6 +884,51 @@ func (s *Server) enableJetStreamClustering() error {
870884
return js.setupMetaGroup()
871885
}
872886

887+
func hasPTCMarker(storeDir string) bool {
888+
_, err := os.Stat(filepath.Join(storeDir, ptcMarkerFile))
889+
return err == nil
890+
}
891+
892+
func (js *jetStream) ptcMarkerPath() string {
893+
s := js.srv
894+
sysAcc := s.SystemAccount()
895+
if sysAcc == nil {
896+
return _EMPTY_
897+
}
898+
return filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName, ptcMarkerFile)
899+
}
900+
901+
func (js *jetStream) writePTCMarker() error {
902+
p := js.ptcMarkerPath()
903+
if p != _EMPTY_ {
904+
return os.WriteFile(p, []byte{}, defaultFilePerms)
905+
}
906+
907+
return nil
908+
}
909+
910+
func (js *jetStream) removePTCMarker() error {
911+
var err error
912+
p := js.ptcMarkerPath()
913+
if p != _EMPTY_ {
914+
err = os.Remove(p)
915+
if errors.Is(err, os.ErrNotExist) {
916+
return nil
917+
}
918+
}
919+
920+
return err
921+
}
922+
923+
func (js *jetStream) hasPTCMarker() bool {
924+
p := js.ptcMarkerPath()
925+
if p == _EMPTY_ {
926+
return false
927+
}
928+
_, err := os.Stat(p)
929+
return err == nil
930+
}
931+
873932
// isClustered returns if we are clustered.
874933
// Lock should not be held.
875934
func (js *jetStream) isClustered() bool {
@@ -972,6 +1031,11 @@ func (js *jetStream) setupMetaGroup() error {
9721031
// If we are bootstrapped with no state, start campaign early.
9731032
if bootstrap {
9741033
n.Campaign()
1034+
1035+
// Write the PTC marker immediately on bootstrap
1036+
if err = os.WriteFile(filepath.Join(storeDir, ptcMarkerFile), []byte{}, defaultFilePerms); err != nil {
1037+
s.Fatalf("Failed to write PTC marker on bootstrap: %v", err)
1038+
}
9751039
}
9761040

9771041
c := s.createInternalJetStreamClient()
@@ -985,6 +1049,7 @@ func (js *jetStream) setupMetaGroup() error {
9851049
c: c,
9861050
qch: make(chan struct{}),
9871051
stopped: make(chan struct{}),
1052+
ptc: bootstrap || hasPTCMarker(storeDir),
9881053
}
9891054
atomic.StoreInt32(&js.clustered, 1)
9901055
c.registerWithAccount(sysAcc)
@@ -1390,50 +1455,190 @@ func (js *jetStream) checkForOrphans() {
13901455
s, cc := js.srv, js.cluster
13911456
s.Debugf("JetStream cluster checking for orphans")
13921457

1393-
// We only want to cleanup any orphans if we know we are current with the meta-leader.
1458+
// Skip if there is no leader or if this node has not yet applied all committed Raft entries;
13941459
meta := cc.meta
13951460
if meta == nil || meta.Leaderless() {
13961461
js.mu.Unlock()
1397-
s.Debugf("JetStream cluster skipping check for orphans, no meta-leader")
1462+
s.Debugf("JetStream cluster skipping orphan check, no meta-leader")
13981463
return
13991464
}
1400-
if !meta.Healthy() {
1465+
if !meta.Current() {
14011466
js.mu.Unlock()
1402-
s.Debugf("JetStream cluster skipping check for orphans, not current with the meta-leader")
1467+
s.Debugf("JetStream cluster skipping orphan check, meta not current yet will retry")
1468+
time.AfterFunc(5*time.Second, js.checkForOrphans)
14031469
return
14041470
}
1471+
ourPeerID := meta.ID()
14051472
streams, consumers := js.getOrphans()
1473+
ptc := cc.ptc
1474+
// the counter tells us here that they still need operator attention so we must
1475+
// keep the PTC marker alive.
1476+
ptcConflicts := cc.ptcConflicts
14061477
js.mu.Unlock()
14071478

1479+
// hasUnresolved tracks whether any R1 stream still needs attention after this
1480+
// run. It is used to decide whether the PTC marker can be cleaned up.
1481+
hasUnresolved := ptcConflicts > 0
1482+
14081483
for _, mset := range streams {
14091484
mset.mu.RLock()
1410-
accName, stream := mset.acc.Name, mset.cfg.Name
1485+
accName := mset.acc.Name
1486+
cfg := mset.cfg
1487+
created := mset.created
14111488
mset.mu.RUnlock()
1412-
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
1489+
1490+
// R1 streams on a PTC node are adopted into the cluster instead of deleted.
1491+
// Any other orphan (R>1, or R1 without the PTC flag) is cleaned up normally.
1492+
if cfg.Replicas == 1 && ptc {
1493+
cfgCopy := cfg
1494+
sa := &streamAssignment{
1495+
Client: &ClientInfo{Account: accName},
1496+
Config: &cfgCopy,
1497+
Created: created,
1498+
Sync: syncSubjForStream(),
1499+
Group: &raftGroup{
1500+
Name: groupNameForStream([]string{ourPeerID}, cfg.Storage),
1501+
Peers: []string{ourPeerID},
1502+
Storage: cfg.Storage,
1503+
},
1504+
}
1505+
sa.ConfigJSON, _ = json.Marshal(cfgCopy)
1506+
s.Noticef("Adopting orphaned R1 stream '%s > %s' into cluster assignments", accName, cfg.Name)
1507+
if err := meta.ForwardProposal(encodeAddStreamAssignment(sa)); err != nil {
1508+
s.Warnf("Failed to propose adoption of R1 stream '%s > %s': %v", accName, cfg.Name, err)
1509+
hasUnresolved = true
1510+
continue
1511+
}
1512+
// Proposal was sent but Raft commit is async. Keep the marker so that a
1513+
// crash between here and the commit does not lose the resume signal. The
1514+
// next sweep will find no orphans once the entry is committed and will
1515+
// then remove the marker cleanly.
1516+
hasUnresolved = true
1517+
1518+
// Propose assignments for durable consumers on this stream.
1519+
// Ephemeral consumers are transient; they do not need adoption.
1520+
for _, o := range mset.getConsumers() {
1521+
if !o.isDurable() {
1522+
continue
1523+
}
1524+
oCfg := o.config()
1525+
oCreated := o.createdTime()
1526+
oStorage := cfg.Storage
1527+
if oCfg.MemoryStorage {
1528+
oStorage = MemoryStorage
1529+
}
1530+
ca := &consumerAssignment{
1531+
Client: &ClientInfo{Account: accName},
1532+
Name: o.String(),
1533+
Stream: cfg.Name,
1534+
Config: &oCfg,
1535+
Created: oCreated,
1536+
Group: &raftGroup{
1537+
Name: groupNameForConsumer([]string{ourPeerID}, oStorage),
1538+
Peers: []string{ourPeerID},
1539+
Storage: oStorage,
1540+
},
1541+
}
1542+
ca.ConfigJSON, _ = json.Marshal(oCfg)
1543+
s.Noticef("Adopting orphaned consumer '%s > %s > %s' into cluster assignments", accName, cfg.Name, o.String())
1544+
if err := meta.ForwardProposal(encodeAddConsumerAssignment(ca)); err != nil {
1545+
s.Warnf("Failed to propose adoption of consumer '%s > %s > %s': %v", accName, cfg.Name, o.String(), err)
1546+
}
1547+
// Consumer proposal is also async; hasUnresolved is already true from
1548+
// the stream proposal above, so no additional set is needed here.
1549+
}
1550+
continue
1551+
}
1552+
1553+
// Orphaned stream with no valid adoption path: clean up locally.
1554+
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, cfg.Name)
14131555
if err := mset.delete(); err != nil {
14141556
s.Warnf("Deleting stream encountered an error: %v", err)
14151557
}
14161558
}
1559+
14171560
for _, o := range consumers {
14181561
o.mu.RLock()
1419-
accName, mset, consumer := o.acc.Name, o.mset, o.name
1562+
accName, mset, oName := o.acc.Name, o.mset, o.name
14201563
o.mu.RUnlock()
1421-
stream := "N/A"
1564+
1565+
streamName := "N/A"
1566+
var streamCfg StreamConfig
14221567
if mset != nil {
14231568
mset.mu.RLock()
1424-
stream = mset.cfg.Name
1569+
streamName = mset.cfg.Name
1570+
streamCfg = mset.cfg
14251571
mset.mu.RUnlock()
14261572
}
1573+
1574+
// If the parent stream is R1 and this is a PTC node, adopt the durable consumer
1575+
if streamCfg.Replicas == 1 && o.isDurable() && ptc {
1576+
oCfg := o.config()
1577+
oCreated := o.createdTime()
1578+
oStorage := streamCfg.Storage
1579+
if oCfg.MemoryStorage {
1580+
oStorage = MemoryStorage
1581+
}
1582+
ca := &consumerAssignment{
1583+
Client: &ClientInfo{Account: accName},
1584+
Name: oName,
1585+
Stream: streamName,
1586+
Config: &oCfg,
1587+
Created: oCreated,
1588+
Group: &raftGroup{
1589+
Name: groupNameForConsumer([]string{ourPeerID}, oStorage),
1590+
Peers: []string{ourPeerID},
1591+
Storage: oStorage,
1592+
},
1593+
}
1594+
ca.ConfigJSON, _ = json.Marshal(oCfg)
1595+
s.Noticef("Adopting orphaned consumer '%s > %s > %s' into cluster assignments", accName, streamName, oName)
1596+
if err := meta.ForwardProposal(encodeAddConsumerAssignment(ca)); err != nil {
1597+
s.Warnf("Failed to propose adoption of consumer '%s > %s > %s': %v", accName, streamName, oName, err)
1598+
}
1599+
// Proposal is async; keep the marker until the next sweep confirms the
1600+
// consumer is no longer an orphan after the Raft entry is committed.
1601+
hasUnresolved = true
1602+
continue
1603+
}
1604+
14271605
if o.isDurable() {
1428-
s.Warnf("Detected orphaned durable consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
1606+
s.Warnf("Detected orphaned durable consumer '%s > %s > %s', will cleanup", accName, streamName, oName)
14291607
} else {
1430-
s.Debugf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
1608+
s.Debugf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, streamName, oName)
14311609
}
1432-
14331610
if err := o.delete(); err != nil {
14341611
s.Warnf("Deleting consumer encountered an error: %v", err)
14351612
}
14361613
}
1614+
1615+
// PTC marker lifecycle:
1616+
// - Keep it if there are still unresolved conflicts or remaining orphans
1617+
// (they will be retried / resolved on the next orphan sweep).
1618+
// - Remove it once everything has been adopted and no conflicts remain.
1619+
if !ptc {
1620+
return
1621+
}
1622+
1623+
if hasUnresolved {
1624+
err := js.writePTCMarker()
1625+
if err != nil {
1626+
s.Warnf("Failed to write ptc marker: %v", err)
1627+
}
1628+
} else {
1629+
if err := js.removePTCMarker(); err != nil {
1630+
s.Warnf("Failed to remove ptc marker: %v", err)
1631+
}
1632+
1633+
// Clear the in-memory flag so future orphan sweeps don't re-enter
1634+
// the PTC adoption path unnecessarily.
1635+
js.mu.Lock()
1636+
if js.cluster != nil {
1637+
js.cluster.ptc = false
1638+
js.cluster.ptcConflicts = 0
1639+
}
1640+
js.mu.Unlock()
1641+
}
14371642
}
14381643

14391644
// Returns orphaned streams and consumers that were recovered from disk, but don't
@@ -2025,11 +2230,34 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
20252230
// logging and sending out advisories.
20262231
js.mu.RLock()
20272232
deleteStreams, deleteConsumers := js.getOrphans()
2233+
cc := js.cluster
2234+
ptc := cc != nil && cc.ptc
20282235
js.mu.RUnlock()
20292236
for _, mset := range deleteStreams {
2237+
mset.mu.RLock()
2238+
cfg := mset.cfg
2239+
mset.mu.RUnlock()
2240+
// During active PTC promotion, preserve R1 orphans: the meta-leader's snapshot
2241+
// may not yet include the adoption proposal. checkForOrphans will handle them.
2242+
if cfg.Replicas == 1 && ptc {
2243+
continue
2244+
}
20302245
mset.stop(true, false)
20312246
}
20322247
for _, o := range deleteConsumers {
2248+
o.mu.RLock()
2249+
mset := o.mset
2250+
o.mu.RUnlock()
2251+
if mset != nil {
2252+
mset.mu.RLock()
2253+
replicas := mset.cfg.Replicas
2254+
mset.mu.RUnlock()
2255+
// During active PTC promotion, preserve consumers whose parent stream is R1:
2256+
// the consumer adoption proposal may still be in-flight. checkForOrphans will handle them.
2257+
if replicas == 1 && ptc {
2258+
continue
2259+
}
2260+
}
20332261
o.deleteWithoutAdvisory()
20342262
}
20352263
}
@@ -4565,6 +4793,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
45654793
if sa.Group != nil && ourID != _EMPTY_ {
45664794
isMember = sa.Group.isMember(ourID)
45674795
}
4796+
ptc := !noMeta && cc.ptc
45684797

45694798
if s == nil || noMeta {
45704799
js.mu.Unlock()
@@ -4648,8 +4877,23 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
46484877
if isMember {
46494878
js.processClusterCreateStream(acc, sa)
46504879
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
4651-
// We have one here even though we are not a member. This can happen on re-assignment.
4652-
s.removeStream(mset, sa)
4880+
mset.mu.RLock()
4881+
replicas := mset.cfg.Replicas
4882+
mset.mu.RUnlock()
4883+
if ptc && replicas == 1 {
4884+
// On a node promoted from standalone to clustered, do not delete
4885+
// conflicting R1 streams. Stop the stream to free resources but preserve
4886+
// data on disk so the operator can recover it (e.g. rename the stream, and re-promote).
4887+
s.Warnf("JetStream preserving local stream '%s > %s' data on disk: conflicts with existing cluster assignment",
4888+
accName, sa.Config.Name)
4889+
mset.stop(false, false)
4890+
js.mu.Lock()
4891+
cc.ptcConflicts++
4892+
js.mu.Unlock()
4893+
} else {
4894+
// We have one here even though we are not a member. This can happen on re-assignment.
4895+
s.removeStream(mset, sa)
4896+
}
46534897
}
46544898

46554899
// If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected.
@@ -5388,6 +5632,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
53885632
if ca.Group != nil && ourID != _EMPTY_ {
53895633
isMember = ca.Group.isMember(ourID)
53905634
}
5635+
ptc := !noMeta && cc.ptc
53915636
js.mu.RUnlock()
53925637

53935638
if s == nil || noMeta || shuttingDown {
@@ -5495,8 +5740,17 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
54955740
js.processClusterCreateConsumer(oca, ca, state, wasExisting)
54965741
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
54975742
if o := mset.lookupConsumer(ca.Name); o != nil {
5498-
// We have one here even though we are not a member. This can happen on re-assignment.
5499-
s.removeConsumer(o, ca)
5743+
mset.mu.RLock()
5744+
replicas := mset.cfg.Replicas
5745+
mset.mu.RUnlock()
5746+
if ptc && replicas == 1 {
5747+
// On a node promoted from standalone to clustered, preserve local consumers
5748+
// of R1 streams. Data is on disk; checkForOrphans will adopt them.
5749+
s.Debugf("JetStream preserving local consumer '%s > %s > %s' during PTC promotion", accName, stream, ca.Name)
5750+
} else {
5751+
// We have one here even though we are not a member. This can happen on re-assignment.
5752+
s.removeConsumer(o, ca)
5753+
}
55005754
}
55015755
}
55025756
}

0 commit comments

Comments
 (0)