Skip to content

Commit db179aa

Browse files
committed
Implement DDS close-streams using open-streams control
1 parent defc7dd commit db179aa

File tree

6 files changed

+97
-32
lines changed

6 files changed

+97
-32
lines changed

src/dds/rs-dds-sensor-proxy.cpp

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -261,39 +261,28 @@ dds_sensor_proxy::find_profile( sid_index sidx, realdds::dds_motion_stream_profi
261261
}
262262

263263

264-
void dds_sensor_proxy::open( const stream_profiles & profiles )
264+
realdds::dds_stream_profiles dds_sensor_proxy::find_dds_profiles( const librealsense::stream_profiles & source_profiles ) const
265265
{
266-
_formats_converter.prepare_to_convert( profiles );
267-
_active_converted_profiles = profiles;
268-
const auto & source_profiles = _formats_converter.get_active_source_profiles();
269-
// TODO - register processing block options?
270-
271266
realdds::dds_stream_profiles realdds_profiles;
272267
for( size_t i = 0; i < source_profiles.size(); ++i )
273268
{
274269
auto & sp = source_profiles[i];
275270
sid_index sidx( sp->get_unique_id(), sp->get_stream_index() );
276271
if( auto const vsp = As< video_stream_profile >( sp ) )
277272
{
278-
auto video_profile = find_profile(
279-
sidx,
280-
realdds::dds_video_stream_profile( sp->get_framerate(),
281-
realdds::dds_video_encoding::from_rs2( sp->get_format() ),
282-
vsp->get_width(),
283-
vsp->get_height() ) );
273+
auto video_profile = find_profile( sidx,
274+
realdds::dds_video_stream_profile( sp->get_framerate(),
275+
realdds::dds_video_encoding::from_rs2( sp->get_format() ),
276+
vsp->get_width(),
277+
vsp->get_height() ) );
284278
if( video_profile )
285-
{
286279
realdds_profiles.push_back( video_profile );
287-
calculate_bandwidth( vsp );
288-
}
289280
else
290281
LOG_ERROR( "no profile found in stream for rs2 profile " << vsp );
291282
}
292283
else if( Is< motion_stream_profile >( sp ) )
293284
{
294-
auto motion_profile = find_profile(
295-
sidx,
296-
realdds::dds_motion_stream_profile( source_profiles[i]->get_framerate() ) );
285+
auto motion_profile = find_profile( sidx, realdds::dds_motion_stream_profile( source_profiles[i]->get_framerate() ) );
297286
if( motion_profile )
298287
realdds_profiles.push_back( motion_profile );
299288
else
@@ -305,23 +294,38 @@ void dds_sensor_proxy::open( const stream_profiles & profiles )
305294
}
306295
}
307296

297+
return realdds_profiles;
298+
}
299+
300+
void dds_sensor_proxy::open( const stream_profiles & profiles )
301+
{
302+
_formats_converter.prepare_to_convert( profiles );
303+
_active_converted_profiles = profiles;
304+
const auto & source_profiles = _formats_converter.get_active_source_profiles();
305+
// TODO - register processing block options?
306+
307+
for( size_t i = 0; i < source_profiles.size(); ++i )
308+
if( auto const vsp = As< video_stream_profile >( source_profiles[i] ) )
309+
log_bandwidth( vsp );
310+
308311
try
309312
{
313+
software_sensor::open( source_profiles ); // Call before send to device to check SDK conditions (not open/streaming/etc...)
310314
if( source_profiles.size() > 0 )
311315
{
316+
realdds::dds_stream_profiles realdds_profiles = find_dds_profiles( source_profiles );
312317
_dev->open( realdds_profiles );
313318
}
314-
315-
software_sensor::open( source_profiles );
316319
}
317320
catch( realdds::dds_runtime_error const & e )
318321
{
322+
software_sensor::close();
319323
throw invalid_value_exception( e.what() );
320324
}
321325
}
322326

323327

324-
void dds_sensor_proxy::calculate_bandwidth( const std::shared_ptr< video_stream_profile > & vsp )
328+
void dds_sensor_proxy::log_bandwidth( const std::shared_ptr< video_stream_profile > & vsp ) const
325329
{
326330
size_t width = vsp->get_width();
327331
size_t height = vsp->get_height();
@@ -632,8 +636,19 @@ void dds_sensor_proxy::stop()
632636

633637
void dds_sensor_proxy::close()
634638
{
635-
software_sensor::close();
636-
_active_converted_profiles.clear();
639+
const auto & source_profiles = _formats_converter.get_active_source_profiles();
640+
realdds::dds_stream_profiles realdds_profiles = find_dds_profiles( source_profiles );
641+
642+
try
643+
{
644+
software_sensor::close();
645+
_dev->close( realdds_profiles );
646+
_active_converted_profiles.clear();
647+
}
648+
catch( realdds::dds_runtime_error const & e )
649+
{
650+
throw invalid_value_exception( e.what() );
651+
}
637652
}
638653

639654

src/dds/rs-dds-sensor-proxy.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <realdds/dds-defines.h>
1212
#include <realdds/dds-metadata-syncer.h>
1313
#include <realdds/dds-embedded-filter.h>
14+
#include <realdds/dds-stream-profile.h>
1415

1516
#include <rsutils/json-fwd.h>
1617
#include <memory>
@@ -98,14 +99,16 @@ class dds_sensor_proxy : public software_sensor
9899
protected:
99100
void register_converters();
100101
stream_profiles init_stream_profiles() override;
101-
void calculate_bandwidth( const std::shared_ptr< librealsense::video_stream_profile > & vsp );
102+
void log_bandwidth( const std::shared_ptr< librealsense::video_stream_profile > & vsp ) const;
102103

103104
std::shared_ptr< realdds::dds_video_stream_profile >
104105
find_profile( sid_index sidx, realdds::dds_video_stream_profile const & profile ) const;
105106

106107
std::shared_ptr< realdds::dds_motion_stream_profile >
107108
find_profile( sid_index sidx, realdds::dds_motion_stream_profile const & profile ) const;
108109

110+
realdds::dds_stream_profiles find_dds_profiles( const librealsense::stream_profiles & source_profiles ) const;
111+
109112
void handle_video_data( std::vector< uint8_t > &&,
110113
realdds::dds_time &&,
111114
realdds::dds_sample &&,

third-party/realdds/include/realdds/dds-device.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class dds_device : public dds_discovery_sink
7878
size_t foreach_option( std::function< void( std::shared_ptr< dds_option > option ) > fn ) const;
7979

8080
void open( const dds_stream_profiles & profiles );
81+
void close( const dds_stream_profiles & profiles );
8182

8283
void set_option_value( const std::shared_ptr< dds_option > & option, rsutils::json new_value );
8384
rsutils::json query_option_value( const std::shared_ptr< dds_option > & option );

third-party/realdds/src/dds-device-impl.cpp

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -489,32 +489,68 @@ void dds_device::impl::on_log( json const & j, dds_sample const & )
489489
}
490490

491491

492-
void dds_device::impl::open( const dds_stream_profiles & profiles )
492+
void dds_device::impl::add_profiles_to_json( const realdds::dds_stream_profiles & profiles, rsutils::json & profiles_as_json ) const
493493
{
494-
if( profiles.empty() )
495-
DDS_THROW( runtime_error, "must provide at least one profile" );
496-
497-
json stream_profiles;
498494
for( auto & profile : profiles )
499495
{
500496
auto stream = profile->stream();
501497
if( ! stream )
502498
DDS_THROW( runtime_error, "profile '" << profile->to_string() << "' is not part of any stream" );
503-
if( stream_profiles.nested( stream->name() ) )
499+
if( profiles_as_json.nested( stream->name() ) )
504500
DDS_THROW( runtime_error, "more than one profile found for stream '" << stream->name() << "'" );
505501

506-
stream_profiles[stream->name()] = profile->to_json();
502+
profiles_as_json[stream->name()] = profile->to_json();
507503
}
504+
}
505+
506+
void dds_device::impl::open( const dds_stream_profiles & profiles )
507+
{
508+
if( profiles.empty() )
509+
DDS_THROW( runtime_error, "must provide at least one profile" );
510+
511+
json profiles_to_open;
512+
add_profiles_to_json( profiles, profiles_to_open );
513+
// Not needed, already open streams are kept open by FW
514+
// add_profiles_to_json( _open_profiles_list, profiles_to_open ); // Add already open profiles to the list
508515

509516
json j = {
510517
{ topics::control::key::id, topics::control::open_streams::id },
511-
{ topics::control::open_streams::key::stream_profiles, std::move( stream_profiles ) },
518+
// D555 initial FW treats reset field as implicitly true, so we explicitly mention it here
519+
{ topics::control::open_streams::key::reset, false }
512520
};
521+
if( ! profiles_to_open.empty() )
522+
j[topics::control::open_streams::key::stream_profiles] = std::move( profiles_to_open );
513523

514524
json reply;
515525
write_control_message( j, &reply );
526+
527+
// If no exception writing to the device then save profiles in open profiles list
528+
_open_profiles_list.insert( _open_profiles_list.end(), profiles.begin(), profiles.end() );
516529
}
517530

531+
void dds_device::impl::close( const dds_stream_profiles & profiles )
532+
{
533+
// Remove profiles from open profiles list. Not using erase-remove idiom but for a small number of profiles it does not really matter...
534+
for( auto & profile : profiles )
535+
{
536+
auto it = find( _open_profiles_list.begin(), _open_profiles_list.end(), profile );
537+
if( it != _open_profiles_list.end() )
538+
_open_profiles_list.erase( it );
539+
}
540+
541+
json keep_open_profiles;
542+
add_profiles_to_json( _open_profiles_list, keep_open_profiles );
543+
544+
json j = {
545+
{ topics::control::key::id, topics::control::open_streams::id },
546+
{ topics::control::open_streams::key::reset, true }
547+
};
548+
if( ! keep_open_profiles.empty() )
549+
j[topics::control::open_streams::key::stream_profiles] = std::move( keep_open_profiles );
550+
551+
json reply;
552+
write_control_message( j, &reply );
553+
}
518554

519555
void dds_device::impl::set_option_value( const std::shared_ptr< dds_option > & option, json new_value )
520556
{

third-party/realdds/src/dds-device-impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class dds_device::impl
8080
bool is_ready() const { return state_t::READY == _state; }
8181

8282
void open( const dds_stream_profiles & profiles );
83+
void close( const dds_stream_profiles & profiles );
8384

8485
void write_control_message( rsutils::json const &, rsutils::json * reply = nullptr );
8586

@@ -140,10 +141,13 @@ class dds_device::impl
140141
void on_set_filter(rsutils::json const&, dds_sample const&);
141142
void on_query_filter(rsutils::json const&, dds_sample const&);
142143

144+
void add_profiles_to_json( const realdds::dds_stream_profiles & profiles, rsutils::json & profiles_as_json ) const;
145+
143146
on_metadata_available_signal _on_metadata_available;
144147
on_device_log_signal _on_device_log;
145148
on_notification_signal _on_notification;
146149
on_calibration_changed_signal _on_calibration_changed;
150+
dds_stream_profiles _open_profiles_list;
147151
};
148152

149153

third-party/realdds/src/dds-device.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ void dds_device::open( const dds_stream_profiles & profiles )
192192
_impl->open( profiles );
193193
}
194194

195+
void dds_device::close( const dds_stream_profiles & profiles )
196+
{
197+
wait_until_ready( 0 ); // throw if not
198+
_impl->close( profiles );
199+
}
200+
195201
void dds_device::set_option_value( const std::shared_ptr< dds_option > & option, json new_value )
196202
{
197203
wait_until_ready( 0 ); // throw if not

0 commit comments

Comments
 (0)