@@ -23,8 +23,9 @@ use nativelink_config::stores::{RedisMode, RedisSpec};
2323use nativelink_error:: { Code , Error , ErrorContext , ResultExt , make_err} ;
2424use nativelink_macro:: nativelink_test;
2525use nativelink_redis_tester:: {
26- ReadOnlyRedis , add_lua_script, add_to_response, fake_redis_sentinel_master_stream,
27- fake_redis_sentinel_stream, fake_redis_stream, make_fake_redis_with_responses,
26+ ReadOnlyRedis , SubscriptionManagerNotify , add_lua_script, add_to_response,
27+ fake_redis_sentinel_master_stream, fake_redis_sentinel_stream, fake_redis_stream,
28+ make_fake_redis_with_responses,
2829} ;
2930use nativelink_store:: cas_utils:: ZERO_BYTE_DIGESTS ;
3031use nativelink_store:: redis_store:: {
@@ -36,8 +37,9 @@ use nativelink_util::common::DigestInfo;
3637use nativelink_util:: health_utils:: HealthStatus ;
3738use nativelink_util:: store_trait:: {
3839 FalseValue , SchedulerCurrentVersionProvider , SchedulerIndexProvider , SchedulerStore ,
39- SchedulerStoreDataProvider , SchedulerStoreDecodeTo , SchedulerStoreKeyProvider , StoreKey ,
40- StoreLike , TrueValue , UploadSizeInfo ,
40+ SchedulerStoreDataProvider , SchedulerStoreDecodeTo , SchedulerStoreKeyProvider ,
41+ SchedulerSubscription , SchedulerSubscriptionManager , StoreKey , StoreLike , TrueValue ,
42+ UploadSizeInfo ,
4143} ;
4244use pretty_assertions:: assert_eq;
4345use redis:: { PushInfo , RedisError , Value , make_extension_error} ;
@@ -1734,3 +1736,128 @@ async fn test_update_data_versioned_with_expiry() {
17341736 . await
17351737 . expect ( "working update" ) ;
17361738}
1739+
1740+ /// Test key provider that just wraps a string. Reused across the
1741+ /// subscription regression tests below.
1742+ #[ derive( Clone ) ]
1743+ struct TestSubKey ( String ) ;
1744+
1745+ impl SchedulerStoreKeyProvider for TestSubKey {
1746+ type Versioned = FalseValue ;
1747+ fn get_key ( & self ) -> StoreKey < ' static > {
1748+ StoreKey :: Str ( std:: borrow:: Cow :: Owned ( self . 0 . clone ( ) ) )
1749+ }
1750+ }
1751+
1752+ /// Sanity: a single subscriber that drops cleanly produces no warning
1753+ /// and no error.
1754+ #[ nativelink_test]
1755+ async fn redis_subscription_single_drop_is_silent ( ) -> Result < ( ) , Error > {
1756+ let ( _tx, rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
1757+ let manager = RedisSubscriptionManager :: new ( rx) ;
1758+
1759+ let sub = manager. subscribe ( TestSubKey ( "solo-key" . to_string ( ) ) ) ?;
1760+ drop ( sub) ;
1761+ sleep ( Duration :: from_millis ( 10 ) ) . await ;
1762+
1763+ assert ! (
1764+ !logs_contain( "key absent from subscribed_keys under write lock" ) ,
1765+ "single-subscriber drop unexpectedly logged the absence warning" ,
1766+ ) ;
1767+ assert ! ( !logs_contain( "ERROR" ) ) ;
1768+
1769+ drop ( manager) ;
1770+ Ok ( ( ) )
1771+ }
1772+
1773+ #[ nativelink_test]
1774+ async fn redis_subscription_drop_one_of_two_keeps_publisher ( ) -> Result < ( ) , Error > {
1775+ let ( _tx, rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
1776+ let manager = RedisSubscriptionManager :: new ( rx) ;
1777+
1778+ let key = "shared-key" ;
1779+ let sub_a = manager. subscribe ( TestSubKey ( key. to_string ( ) ) ) ?;
1780+ let mut sub_b = manager. subscribe ( TestSubKey ( key. to_string ( ) ) ) ?;
1781+
1782+ // Drop the first; the second's subscription must still resolve
1783+ // when we notify on the same key.
1784+ drop ( sub_a) ;
1785+
1786+ manager. notify_for_test ( key. to_string ( ) ) ;
1787+ timeout ( Duration :: from_secs ( 2 ) , sub_b. changed ( ) )
1788+ . await
1789+ . expect ( "sub_b.changed() did not fire — publisher entry was dropped prematurely" ) ?;
1790+
1791+ assert ! (
1792+ !logs_contain( "key absent from subscribed_keys under write lock" ) ,
1793+ "absence warning fired during single drop with another receiver alive" ,
1794+ ) ;
1795+ drop ( sub_b) ;
1796+ drop ( manager) ;
1797+ Ok ( ( ) )
1798+ }
1799+
1800+ #[ nativelink_test]
1801+ async fn redis_subscription_concurrent_drops_no_absence_warn ( ) -> Result < ( ) , Error > {
1802+ let ( _tx, rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
1803+ let manager = RedisSubscriptionManager :: new ( rx) ;
1804+
1805+ const ITERATIONS : usize = 200 ;
1806+ for i in 0 ..ITERATIONS {
1807+ let key = format ! ( "race-key-{i}" ) ;
1808+ let sub_a = manager. subscribe ( TestSubKey ( key. clone ( ) ) ) ?;
1809+ let sub_b = manager. subscribe ( TestSubKey ( key. clone ( ) ) ) ?;
1810+
1811+ // `spawn_blocking` puts each Drop on its own thread; with the
1812+ // pre-fix code's "drop receiver, then take lock" sequence,
1813+ // both threads can race into the lock with already-decremented
1814+ // counts. With the post-fix "take lock, then evaluate, then
1815+ // drop receiver" sequence, the lock serialises the decision
1816+ // and the warning never fires.
1817+ let h_a = tokio:: task:: spawn_blocking ( move || drop ( sub_a) ) ;
1818+ let h_b = tokio:: task:: spawn_blocking ( move || drop ( sub_b) ) ;
1819+ h_a. await . unwrap ( ) ;
1820+ h_b. await . unwrap ( ) ;
1821+ }
1822+ sleep ( Duration :: from_millis ( 50 ) ) . await ;
1823+
1824+ assert ! (
1825+ !logs_contain( "key absent from subscribed_keys under write lock" ) ,
1826+ "concurrent drops produced the absence warning at least once across {ITERATIONS} \
1827+ iterations — the Drop ordering regressed",
1828+ ) ;
1829+ assert ! ( !logs_contain( "ERROR" ) ) ;
1830+
1831+ drop ( manager) ;
1832+ Ok ( ( ) )
1833+ }
1834+
1835+ #[ nativelink_test]
1836+ async fn redis_subscription_resubscribe_after_drop_creates_fresh_publisher ( ) -> Result < ( ) , Error > {
1837+ let ( _tx, rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
1838+ let manager = RedisSubscriptionManager :: new ( rx) ;
1839+
1840+ let key = "cycle-key" ;
1841+ let sub_a = manager. subscribe ( TestSubKey ( key. to_string ( ) ) ) ?;
1842+ let sub_b = manager. subscribe ( TestSubKey ( key. to_string ( ) ) ) ?;
1843+ drop ( sub_a) ;
1844+ drop ( sub_b) ;
1845+
1846+ // Re-subscribe to the same key. If the previous drops left the
1847+ // map in an inconsistent state (stale publisher kept, or a
1848+ // partially-deconstructed entry), this either reuses a dead
1849+ // publisher (changed() never fires) or panics inside the
1850+ // patricia map.
1851+ let mut sub_c = manager. subscribe ( TestSubKey ( key. to_string ( ) ) ) ?;
1852+ manager. notify_for_test ( key. to_string ( ) ) ;
1853+ timeout ( Duration :: from_secs ( 2 ) , sub_c. changed ( ) )
1854+ . await
1855+ . expect ( "re-subscribe after drops produced a dead publisher" ) ?;
1856+
1857+ assert ! ( !logs_contain(
1858+ "key absent from subscribed_keys under write lock"
1859+ ) ) ;
1860+ drop ( sub_c) ;
1861+ drop ( manager) ;
1862+ Ok ( ( ) )
1863+ }
0 commit comments