Skip to content

Commit dfab2cd

Browse files
committed
dm: wake subscription reporter at the next deadline, not every 4 s
The reporter loop slept on a fixed `Timer::after(4 s)`, with the only early-wake signal being `Subscriptions::notification`. When an event (or attribute change) was emitted while a subscription was still inside its `min_int` quiet period, the notification fired, the loop woke, `is_report_allowed` was false, the notification was consumed without sending a report, and the next wake-up had to come from the 4 s fallback timer. A 1 s `MinIntervalFloor` therefore observed as ~4 s between the first and second report in a burst — visible on Generic Switch endpoints as ~4 s between `InitialPress` and `ShortRelease`. Add `Subscriptions::next_wake`, which returns the earliest instant at which any subscription will become reportable: end of `min_int` if it has pending changes/events, or start of the `max_int / 2` liveness window otherwise. The reporter sleeps until that instant, clamped to a 4 s safety ceiling.
1 parent b23a6ab commit dfab2cd

2 files changed

Lines changed: 107 additions & 2 deletions

File tree

rs-matter/src/dm.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -604,9 +604,29 @@ where
604604
/// Process all valid subscriptions in an endless loop, checking for changes
605605
/// and reporting them to the peers.
606606
async fn process_subscriptions(&self, matter: &Matter<'_>) -> Result<(), Error> {
607+
// Hard ceiling on how long the reporter can sleep without
608+
// re-evaluating. Bounds the latency of late notifications and acts
609+
// as the fallback when no subscription has an earlier deadline.
610+
const SLEEP_CEILING: embassy_time::Duration = embassy_time::Duration::from_secs(4);
611+
607612
loop {
608-
// TODO: Un-hardcode these 4 seconds of waiting when the more precise change detection logic is implemented
609-
let mut timeout = pin!(Timer::after(embassy_time::Duration::from_secs(4)));
613+
// Wake at the soonest of: a subscription's `min_int` quiet
614+
// period ending while it has pending data, a subscription's
615+
// `max_int / 2` liveness deadline, or the safety ceiling.
616+
// Without this, events emitted during a sub's quiet period
617+
// would `notify()` once, see `is_report_allowed == false`, and
618+
// then have to wait the full ceiling before being looked at
619+
// again — turning a 1 s min-interval into a ~4 s observed gap.
620+
let now = Instant::now();
621+
let event_numbers_watermark = self.events.watermark();
622+
let ceiling_at = now + SLEEP_CEILING;
623+
let deadline = self
624+
.subscriptions
625+
.next_wake(event_numbers_watermark)
626+
.map(|d| if d < ceiling_at { d } else { ceiling_at })
627+
.unwrap_or(ceiling_at);
628+
629+
let mut timeout = pin!(Timer::at(deadline));
610630
let mut notification = pin!(self.subscriptions.notification.wait());
611631
let mut session_removed = pin!(matter.session_removed.wait());
612632

rs-matter/src/dm/subscriptions.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,57 @@ impl<const N: usize> Subscriptions<N> {
370370
})
371371
}
372372

373+
/// Earliest [`Instant`] at which the reporter loop next needs to wake to
374+
/// make forward progress, or `None` if no subscriptions exist.
375+
///
376+
/// Considered, per subscription:
377+
/// - If the subscription has pending changed attributes or new events,
378+
/// the end of its min-interval quiet period (`reported_at + min_int`).
379+
/// - The start of the max-interval liveness window
380+
/// (`reported_at + max_int / 2`), which is when `is_report_due` first
381+
/// flips true.
382+
///
383+
/// Callers should still clamp the returned instant against their own
384+
/// safety ceiling (e.g. the historical 4 s fallback) before sleeping.
385+
pub(crate) fn next_wake(&self, event_numbers_watermark: EventNumber) -> Option<Instant> {
386+
self.state.lock(|state| {
387+
let state = state.borrow();
388+
let mut earliest: Option<Instant> = None;
389+
390+
let mut consider = |inst: Instant| {
391+
earliest = Some(match earliest {
392+
Some(e) if e <= inst => e,
393+
_ => inst,
394+
});
395+
};
396+
397+
for sub in state.subscriptions.iter() {
398+
let has_pending = sub.is_affected_by_attr_changes(&[], &state.changed_attrs)
399+
|| sub.is_affected_by_new_events(&[], event_numbers_watermark);
400+
401+
if has_pending {
402+
if let Some(next_allowed) = sub
403+
.reported_at
404+
.checked_add(embassy_time::Duration::from_secs(sub.min_int_secs as _))
405+
{
406+
consider(next_allowed);
407+
}
408+
}
409+
410+
if let Some(due_at) =
411+
sub.reported_at
412+
.checked_add(embassy_time::Duration::from_secs(
413+
(sub.max_int_secs / 2) as _,
414+
))
415+
{
416+
consider(due_at);
417+
}
418+
}
419+
420+
earliest
421+
})
422+
}
423+
373424
/// Remove entries that every subscription has already reported on.
374425
pub(crate) fn purge_reported_changes(&self) {
375426
self.state
@@ -2141,6 +2192,40 @@ mod tests {
21412192
}
21422193
}
21432194

2195+
#[test]
2196+
fn next_wake_returns_min_int_when_events_pending() {
2197+
// With a pending event during the min-interval quiet period,
2198+
// `next_wake` returns the end of that quiet period — not the
2199+
// (much later) liveness deadline.
2200+
let subs: Subscriptions<1> = Subscriptions::new();
2201+
let pool = TestPool::<2>::new(0);
2202+
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
2203+
2204+
let now = Instant::now();
2205+
// min_int = 1 s, max_int = 60 s.
2206+
{
2207+
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 60);
2208+
rctx.set_keep();
2209+
}
2210+
2211+
// No pending events → next wake is the liveness window
2212+
// (now + max_int/2 = now + 30 s).
2213+
assert_eq!(subs.next_wake(0), Some(now + Duration::from_secs(30)),);
2214+
2215+
// Pending event (watermark bumped past the sub's max_seen=0) →
2216+
// next wake collapses to the end of the min-interval quiet period
2217+
// (now + min_int = now + 1 s), which is much sooner than the
2218+
// liveness window. Regression guard for the historical 4 s gap
2219+
// between InitialPress and the next event report.
2220+
assert_eq!(subs.next_wake(7), Some(now + Duration::from_secs(1)),);
2221+
}
2222+
2223+
#[test]
2224+
fn next_wake_is_none_when_no_subscriptions() {
2225+
let subs: Subscriptions<1> = Subscriptions::new();
2226+
assert_eq!(subs.next_wake(0), None);
2227+
}
2228+
21442229
#[test]
21452230
fn is_expired_uses_max_int() {
21462231
// `Subscription::is_expired` returns true once `max_int` has elapsed

0 commit comments

Comments
 (0)