@@ -431,6 +431,115 @@ TEST(OnTest, NextSkipsOnAlignmentMismatch) {
431431 EXPECT_EQ (call_count, 0 );
432432}
433433
434+ // / @brief Test that Reset() advances the watermark so stale pre-reset data does not
435+ // / re-trigger the source node, while new data written after reset does.
436+ TEST (OnTest, ResetAdvancesHighWaterMarkPreventingStaleData) {
437+ ir::Param output_param;
438+ output_param.name = ir::default_output_param;
439+ output_param.type = types::Type (types::Kind::F32 );
440+
441+ ir::Node ir_node;
442+ ir_node.key = " source" ;
443+ ir_node.type = " on" ;
444+ ir_node.outputs .params .push_back (output_param);
445+
446+ ir::Param channel_config;
447+ channel_config.name = " channel" ;
448+ channel_config.value = static_cast <uint32_t >(10 );
449+ ir_node.config .params .push_back (channel_config);
450+
451+ ir::IR ir;
452+ ir.nodes .push_back (ir_node);
453+
454+ state::Config cfg{.ir = ir, .channels = {{10 , x::telem::FLOAT32_T , 11 }}};
455+ state::State s (cfg, errors::noop_handler);
456+
457+ Factory factory;
458+ auto state_node = ASSERT_NIL_P (s.node (" source" ));
459+ auto node = ASSERT_NIL_P (
460+ factory.create (node::Config (ir, ir_node, std::move (state_node)))
461+ );
462+
463+ // Ingest data at alignment 0.
464+ x::telem::Frame frame1 (2 );
465+ auto d1 = x::telem::Series (1 .0f );
466+ d1.alignment = x::telem::Alignment (0 );
467+ auto t1 = x::telem::Series (static_cast <int64_t >(100 ));
468+ t1.alignment = x::telem::Alignment (0 );
469+ frame1.emplace (10 , std::move (d1));
470+ frame1.emplace (11 , std::move (t1));
471+ s.ingest (frame1);
472+
473+ // Reset: advances the watermark to 1 (past alignment 0).
474+ node->reset ();
475+
476+ // Next should NOT fire: data at alignment 0 is now below the watermark.
477+ bool changed = false ;
478+ auto ctx = node::Context{
479+ .elapsed = x::telem::SECOND ,
480+ .mark_changed = [&changed](const std::string &) { changed = true ; },
481+ .report_error = [](const x::errors::Error &) {},
482+ .activate_stage = [] {},
483+ };
484+ ASSERT_NIL (node->next (ctx));
485+ EXPECT_FALSE (changed) << " stale pre-reset data should not trigger the source" ;
486+
487+ // Ingest new data at alignment 10 (well above the watermark of 1).
488+ x::telem::Frame frame2 (2 );
489+ auto d2 = x::telem::Series (2 .0f );
490+ d2.alignment = x::telem::Alignment (10 );
491+ auto t2 = x::telem::Series (static_cast <int64_t >(200 ));
492+ t2.alignment = x::telem::Alignment (10 );
493+ frame2.emplace (10 , std::move (d2));
494+ frame2.emplace (11 , std::move (t2));
495+ s.ingest (frame2);
496+
497+ ASSERT_NIL (node->next (ctx));
498+ EXPECT_TRUE (changed) << " data written after reset should trigger the source" ;
499+ }
500+
501+ // / @brief Test that Reset() is a no-op when the channel has no data.
502+ TEST (OnTest, ResetIsNoOpWithNoData) {
503+ ir::Param output_param;
504+ output_param.name = ir::default_output_param;
505+ output_param.type = types::Type (types::Kind::F32 );
506+
507+ ir::Node ir_node;
508+ ir_node.key = " source" ;
509+ ir_node.type = " on" ;
510+ ir_node.outputs .params .push_back (output_param);
511+
512+ ir::Param channel_config;
513+ channel_config.name = " channel" ;
514+ channel_config.value = static_cast <uint32_t >(10 );
515+ ir_node.config .params .push_back (channel_config);
516+
517+ ir::IR ir;
518+ ir.nodes .push_back (ir_node);
519+
520+ state::Config cfg{.ir = ir, .channels = {{10 , x::telem::FLOAT32_T , 11 }}};
521+ state::State s (cfg, errors::noop_handler);
522+
523+ Factory factory;
524+ auto state_node = ASSERT_NIL_P (s.node (" source" ));
525+ auto node = ASSERT_NIL_P (
526+ factory.create (node::Config (ir, ir_node, std::move (state_node)))
527+ );
528+
529+ // Reset with no ingested data should not crash.
530+ node->reset ();
531+
532+ bool changed = false ;
533+ auto ctx = node::Context{
534+ .elapsed = x::telem::SECOND ,
535+ .mark_changed = [&changed](const std::string &) { changed = true ; },
536+ .report_error = [](const x::errors::Error &) {},
537+ .activate_stage = [] {},
538+ };
539+ ASSERT_NIL (node->next (ctx));
540+ EXPECT_FALSE (changed);
541+ }
542+
434543// / @brief Test source node calls mark_changed callback.
435544TEST (OnTest, NextCallsMarkChanged) {
436545 ir::Param output_param;
0 commit comments