Skip to content

Commit a8f2064

Browse files
committed
Adopt orphaned R1 streams on standalone-to-cluster promotion
Signed-off-by: Tilak Raj <tilak.raj94@gmail.com>
1 parent e3f7ac5 commit a8f2064

File tree

2 files changed

+2040
-16
lines changed

2 files changed

+2040
-16
lines changed

server/jetstream_cluster.go

Lines changed: 234 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ type jetStreamCluster struct {
8080
// Track last meta snapshot time and duration for monitoring.
8181
lastMetaSnapTime int64 // Unix nanoseconds
8282
lastMetaSnapDuration int64 // Duration in nanoseconds
83+
// Signals that this node was promoted from standalone to clustered mode
84+
// with no prior meta state. Used to detect standalone-to-clustered transitions.
85+
ptc bool
86+
// Tracks streams that were preserved (not deleted) due to name conflicts
87+
// with existing cluster assignments during ptc promotion. This prevents
88+
// the ptc marker from being cleaned up while unresolved conflicts remain.
89+
ptcConflicts int
8390
}
8491

8592
// Used to track inflight stream create/update/delete requests that have been proposed but not yet applied.
@@ -362,6 +369,13 @@ const (
362369
defaultMetaGroupName = "_meta_"
363370
defaultMetaFSBlkSize = 1024 * 1024
364371
jsExcludePlacement = "!jetstream"
372+
373+
// ptcMarkerFile is a sentinel file written into the meta Raft group directory
374+
// when a standalone server is promoted to a cluster member and has R1 streams
375+
// pending adoption. Its presence tells checkForOrphans (and processStreamAssignment)
376+
// that local R1 stream data must be preserved, not deleted, until all orphans
377+
// have been successfully adopted.
378+
ptcMarkerFile = "ptc"
365379
)
366380

367381
// Returns information useful in mixed mode.
@@ -870,6 +884,52 @@ func (s *Server) enableJetStreamClustering() error {
870884
return js.setupMetaGroup()
871885
}
872886

887+
// Used during setupMetaGroup before js.cluster is initialised.
888+
func hasPTCMarker(storeDir string) bool {
889+
_, err := os.Stat(filepath.Join(storeDir, ptcMarkerFile))
890+
return err == nil
891+
}
892+
893+
func (js *jetStream) ptcMarkerPath() string {
894+
s := js.srv
895+
sysAcc := s.SystemAccount()
896+
if sysAcc == nil {
897+
return _EMPTY_
898+
}
899+
return filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName, ptcMarkerFile)
900+
}
901+
902+
func (js *jetStream) writePTCMarker() error {
903+
p := js.ptcMarkerPath()
904+
if p != _EMPTY_ {
905+
return os.WriteFile(p, []byte{}, defaultFilePerms)
906+
}
907+
908+
return nil
909+
}
910+
911+
func (js *jetStream) removePTCMarker() error {
912+
var err error
913+
p := js.ptcMarkerPath()
914+
if p != _EMPTY_ {
915+
err = os.Remove(p)
916+
if errors.Is(err, os.ErrNotExist) {
917+
return nil
918+
}
919+
}
920+
921+
return err
922+
}
923+
924+
func (js *jetStream) hasPTCMarker() bool {
925+
p := js.ptcMarkerPath()
926+
if p == _EMPTY_ {
927+
return false
928+
}
929+
_, err := os.Stat(p)
930+
return err == nil
931+
}
932+
873933
// isClustered returns if we are clustered.
874934
// Lock should not be held.
875935
func (js *jetStream) isClustered() bool {
@@ -972,6 +1032,11 @@ func (js *jetStream) setupMetaGroup() error {
9721032
// If we are bootstrapped with no state, start campaign early.
9731033
if bootstrap {
9741034
n.Campaign()
1035+
1036+
// Write the PTC marker immediately on bootstrap
1037+
if err = os.WriteFile(filepath.Join(storeDir, ptcMarkerFile), []byte{}, defaultFilePerms); err != nil {
1038+
s.Fatalf("Failed to write PTC marker on bootstrap: %v", err)
1039+
}
9751040
}
9761041

9771042
c := s.createInternalJetStreamClient()
@@ -985,6 +1050,7 @@ func (js *jetStream) setupMetaGroup() error {
9851050
c: c,
9861051
qch: make(chan struct{}),
9871052
stopped: make(chan struct{}),
1053+
ptc: bootstrap || hasPTCMarker(storeDir),
9881054
}
9891055
atomic.StoreInt32(&js.clustered, 1)
9901056
c.registerWithAccount(sysAcc)
@@ -1390,50 +1456,181 @@ func (js *jetStream) checkForOrphans() {
13901456
s, cc := js.srv, js.cluster
13911457
s.Debugf("JetStream cluster checking for orphans")
13921458

1393-
// We only want to cleanup any orphans if we know we are current with the meta-leader.
1459+
// We only want to cleanup any orphans if we know there is a meta-leader.
1460+
// ForwardProposal works on any node — followers forward to the leader automatically.
13941461
meta := cc.meta
13951462
if meta == nil || meta.Leaderless() {
13961463
js.mu.Unlock()
13971464
s.Debugf("JetStream cluster skipping check for orphans, no meta-leader")
13981465
return
13991466
}
1400-
if !meta.Healthy() {
1401-
js.mu.Unlock()
1402-
s.Debugf("JetStream cluster skipping check for orphans, not current with the meta-leader")
1403-
return
1404-
}
1467+
ourPeerID := meta.ID()
14051468
streams, consumers := js.getOrphans()
1469+
ptc := cc.ptc
1470+
// the counter tells us here that they still need operator attention so we must
1471+
// keep the PTC marker alive.
1472+
ptcConflicts := cc.ptcConflicts
14061473
js.mu.Unlock()
14071474

1475+
// hasUnresolved tracks whether any R1 stream still needs attention after this
1476+
// run. It is used to decide whether the PTC marker can be cleaned up.
1477+
hasUnresolved := ptcConflicts > 0
1478+
14081479
for _, mset := range streams {
14091480
mset.mu.RLock()
1410-
accName, stream := mset.acc.Name, mset.cfg.Name
1481+
accName := mset.acc.Name
1482+
cfg := mset.cfg
1483+
created := mset.created
14111484
mset.mu.RUnlock()
1412-
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
1485+
1486+
if cfg.Replicas == 1 {
1487+
if !ptc {
1488+
s.Warnf("Detected R1 orphan stream '%s > %s' without promotion marker; "+
1489+
"data preserved on disk — investigate or restart to retry", accName, cfg.Name)
1490+
hasUnresolved = true
1491+
continue
1492+
}
1493+
cfgCopy := cfg
1494+
sa := &streamAssignment{
1495+
Client: &ClientInfo{Account: accName},
1496+
Config: &cfgCopy,
1497+
Created: created,
1498+
Sync: syncSubjForStream(),
1499+
Group: &raftGroup{
1500+
Name: groupNameForStream([]string{ourPeerID}, cfg.Storage),
1501+
Peers: []string{ourPeerID},
1502+
Storage: cfg.Storage,
1503+
},
1504+
}
1505+
sa.ConfigJSON, _ = json.Marshal(cfgCopy)
1506+
s.Noticef("Adopting orphaned R1 stream '%s > %s' into cluster assignments", accName, cfg.Name)
1507+
if err := cc.meta.ForwardProposal(encodeAddStreamAssignment(sa)); err != nil {
1508+
s.Warnf("Failed to propose adoption of R1 stream '%s > %s': %v", accName, cfg.Name, err)
1509+
hasUnresolved = true
1510+
continue
1511+
}
1512+
1513+
// Propose assignments for durable consumers on this stream.
1514+
// Ephemeral consumers are transient; they do not need adoption.
1515+
for _, o := range mset.getConsumers() {
1516+
if !o.isDurable() {
1517+
continue
1518+
}
1519+
oCfg := o.config()
1520+
oCreated := o.createdTime()
1521+
oStorage := cfg.Storage
1522+
if oCfg.MemoryStorage {
1523+
oStorage = MemoryStorage
1524+
}
1525+
ca := &consumerAssignment{
1526+
Client: &ClientInfo{Account: accName},
1527+
Name: o.String(),
1528+
Stream: cfg.Name,
1529+
Config: &oCfg,
1530+
Created: oCreated,
1531+
Group: &raftGroup{
1532+
Name: groupNameForConsumer([]string{ourPeerID}, oStorage),
1533+
Peers: []string{ourPeerID},
1534+
Storage: oStorage,
1535+
},
1536+
}
1537+
ca.ConfigJSON, _ = json.Marshal(oCfg)
1538+
s.Noticef("Adopting orphaned consumer '%s > %s > %s' into cluster assignments", accName, cfg.Name, o.String())
1539+
if err := cc.meta.ForwardProposal(encodeAddConsumerAssignment(ca)); err != nil {
1540+
s.Warnf("Failed to propose adoption of consumer '%s > %s > %s': %v", accName, cfg.Name, o.String(), err)
1541+
hasUnresolved = true
1542+
}
1543+
}
1544+
continue
1545+
}
1546+
1547+
// R>1 orphans: Raft group exists but meta assignment is gone — clean up.
1548+
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, cfg.Name)
14131549
if err := mset.delete(); err != nil {
14141550
s.Warnf("Deleting stream encountered an error: %v", err)
14151551
}
14161552
}
1553+
14171554
for _, o := range consumers {
14181555
o.mu.RLock()
1419-
accName, mset, consumer := o.acc.Name, o.mset, o.name
1556+
accName, mset, oName := o.acc.Name, o.mset, o.name
14201557
o.mu.RUnlock()
1421-
stream := "N/A"
1558+
1559+
streamName := "N/A"
1560+
var streamCfg StreamConfig
14221561
if mset != nil {
14231562
mset.mu.RLock()
1424-
stream = mset.cfg.Name
1563+
streamName = mset.cfg.Name
1564+
streamCfg = mset.cfg
14251565
mset.mu.RUnlock()
14261566
}
1567+
1568+
// If the parent stream is R1 and this is a PTC node, adopt the durable consumer
1569+
if streamCfg.Replicas == 1 && o.isDurable() && ptc {
1570+
oCfg := o.config()
1571+
oCreated := o.createdTime()
1572+
oStorage := streamCfg.Storage
1573+
if oCfg.MemoryStorage {
1574+
oStorage = MemoryStorage
1575+
}
1576+
ca := &consumerAssignment{
1577+
Client: &ClientInfo{Account: accName},
1578+
Name: oName,
1579+
Stream: streamName,
1580+
Config: &oCfg,
1581+
Created: oCreated,
1582+
Group: &raftGroup{
1583+
Name: groupNameForConsumer([]string{ourPeerID}, oStorage),
1584+
Peers: []string{ourPeerID},
1585+
Storage: oStorage,
1586+
},
1587+
}
1588+
ca.ConfigJSON, _ = json.Marshal(oCfg)
1589+
s.Noticef("Adopting orphaned consumer '%s > %s > %s' into cluster assignments", accName, streamName, oName)
1590+
if err := cc.meta.ForwardProposal(encodeAddConsumerAssignment(ca)); err != nil {
1591+
s.Warnf("Failed to propose adoption of consumer '%s > %s > %s': %v", accName, streamName, oName, err)
1592+
hasUnresolved = true
1593+
}
1594+
continue
1595+
}
1596+
14271597
if o.isDurable() {
1428-
s.Warnf("Detected orphaned durable consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
1598+
s.Warnf("Detected orphaned durable consumer '%s > %s > %s', will cleanup", accName, streamName, oName)
14291599
} else {
1430-
s.Debugf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
1600+
s.Debugf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, streamName, oName)
14311601
}
1432-
14331602
if err := o.delete(); err != nil {
14341603
s.Warnf("Deleting consumer encountered an error: %v", err)
14351604
}
14361605
}
1606+
1607+
// PTC marker lifecycle:
1608+
// - Keep it if there are still unresolved conflicts or remaining orphans
1609+
// (they will be retried / resolved on the next orphan sweep).
1610+
// - Remove it once everything has been adopted and no conflicts remain.
1611+
if !ptc {
1612+
return
1613+
}
1614+
1615+
if hasUnresolved {
1616+
err := js.writePTCMarker()
1617+
if err != nil {
1618+
s.Warnf("Failed to write ptc marker: %v", err)
1619+
}
1620+
} else {
1621+
if err := js.removePTCMarker(); err != nil {
1622+
s.Warnf("Failed to remove ptc marker: %v", err)
1623+
}
1624+
1625+
// Clear the in-memory flag so future orphan sweeps don't re-enter
1626+
// the PTC adoption path unnecessarily.
1627+
js.mu.Lock()
1628+
if js.cluster != nil {
1629+
js.cluster.ptc = false
1630+
js.cluster.ptcConflicts = 0
1631+
}
1632+
js.mu.Unlock()
1633+
}
14371634
}
14381635

14391636
// Returns orphaned streams and consumers that were recovered from disk, but don't
@@ -2027,6 +2224,14 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
20272224
deleteStreams, deleteConsumers := js.getOrphans()
20282225
js.mu.RUnlock()
20292226
for _, mset := range deleteStreams {
2227+
mset.mu.RLock()
2228+
cfg := mset.cfg
2229+
mset.mu.RUnlock()
2230+
// Preserve R1 orphans: the meta-leader's snapshot may not yet include the
2231+
// adoption proposal. checkForOrphans will handle them 30 s after startup.
2232+
if cfg.Replicas == 1 {
2233+
continue
2234+
}
20302235
mset.stop(true, false)
20312236
}
20322237
for _, o := range deleteConsumers {
@@ -4565,6 +4770,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
45654770
if sa.Group != nil && ourID != _EMPTY_ {
45664771
isMember = sa.Group.isMember(ourID)
45674772
}
4773+
ptc := !noMeta && cc.ptc
45684774

45694775
if s == nil || noMeta {
45704776
js.mu.Unlock()
@@ -4648,8 +4854,20 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
46484854
if isMember {
46494855
js.processClusterCreateStream(acc, sa)
46504856
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
4651-
// We have one here even though we are not a member. This can happen on re-assignment.
4652-
s.removeStream(mset, sa)
4857+
if ptc {
4858+
// On a node promoted from standalone to clustered, do not delete
4859+
// conflicting streams. Stop the stream to free resources but preserve
4860+
// data on disk so the operator can recover it (e.g. rename the stream, and re-promote).
4861+
s.Warnf("JetStream preserving local stream '%s > %s' data on disk: conflicts with existing cluster assignment",
4862+
accName, sa.Config.Name)
4863+
mset.stop(false, false)
4864+
js.mu.Lock()
4865+
cc.ptcConflicts++
4866+
js.mu.Unlock()
4867+
} else {
4868+
// We have one here even though we are not a member. This can happen on re-assignment.
4869+
s.removeStream(mset, sa)
4870+
}
46534871
}
46544872

46554873
// If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected.

0 commit comments

Comments
 (0)