@@ -162,8 +162,21 @@ struct RcvTopicsPubs {
162162 uORB :: Publication < @ (sub[' simple_base_type' ])_s> @ (sub[' topic_simple' ])_pub{ORB_ID (@ (sub[' topic_simple' ]))};
163163@ [ end for ]@
164164
165- @ [ for sub in subscriptions_multi]@
166- uORB :: PublicationMulti < @ (sub[' simple_base_type' ])_s> @ (sub[' topic_simple' ])_pub{ORB_ID (@ (sub[' topic_simple' ]))};
165+ @ # Group subscriptions_multi by topic to create arrays
166+ @ {
167+ multi_topic_groups = {}
168+ for sub in subscriptions_multi :
169+ topic_simple = sub[' topic_simple' ]
170+ if topic_simple not in multi_topic_groups :
171+ multi_topic_groups[topic_simple] = []
172+ multi_topic_groups[topic_simple].append (sub)
173+ }@
174+ @ [ for topic_simple, subs in multi_topic_groups .items ()]@
175+ uORB :: PublicationMulti < @ (subs[0 ][' simple_base_type' ])_s> @ (topic_simple)_pubs[@ (len (subs))] {
176+ @ [ for idx, sub in enumerate (subs)]@
177+ {ORB_ID (@ (topic_simple))}@ (' ' if idx == len (subs)- 1 else ' ,' )
178+ @ [ end for ]@
179+ };
167180@ [ end for ]@
168181
169182 uint32_t num_payload_received{};
@@ -179,7 +192,7 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
179192 pubs-> num_payload_received += length;
180193
181194 switch (object_id .id ) {
182- @ [ for idx, sub in enumerate (subscriptions + subscriptions_multi )]@
195+ @ [ for idx, sub in enumerate (subscriptions)]@
183196 case @ (idx)+ (65535U / 32U ) + 1 : {
184197 @ (sub[' simple_base_type' ])_s data;
185198
@@ -190,6 +203,18 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
190203 }
191204 break ;
192205
206+ @ [ end for ]@
207+ @ [ for idx, sub in enumerate (subscriptions_multi)]@
208+ case @ (idx + len (subscriptions))+ (65535U / 32U ) + 1 : {
209+ @ (sub[' simple_base_type' ])_s data;
210+
211+ if (ucdr_deserialize_@ (sub[' simple_base_type' ])(* ub, data, time_offset_us)) {
212+ // print_message (ORB_ID (@ (sub[' simple_base_type' ])), data);
213+ pubs-> @ (sub[' topic_simple' ])_pubs[@ (sub[' instance' ])].publish (data);
214+ }
215+ }
216+ break ;
217+
193218@ [ end for ]@
194219
195220 default :
@@ -200,13 +225,20 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
200225
201226bool RcvTopicsPubs :: init (uxrSession * session, uxrStreamId reliable_out_stream_id, uxrStreamId reliable_in_stream_id, uxrStreamId best_effort_in_stream_id, uxrObjectId participant_id, const char * client_namespace)
202227{
203- @ [ for idx, sub in enumerate (subscriptions + subscriptions_multi )]@
228+ @ [ for idx, sub in enumerate (subscriptions)]@
204229 {
205230 uint16_t queue_depth = orb_get_queue_size (ORB_ID (@ (sub[' simple_base_type' ]))) * 2 ; // use a bit larger queue size than internal
206231 uint32_t message_version = get_message_version< @ (sub[' simple_base_type' ])_s> ();
207232 create_data_reader (session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @ (idx), client_namespace, " @(sub['topic'])" , message_version, " @(sub['dds_type'])" , queue_depth);
208233 }
209234@ [ end for ]@
235+ @ [ for idx, sub in enumerate (subscriptions_multi)]@
236+ {
237+ uint16_t queue_depth = orb_get_queue_size (ORB_ID (@ (sub[' topic_simple' ]))) * 2 ; // use a bit larger queue size than internal
238+ uint32_t message_version = get_message_version< @ (sub[' simple_base_type' ])_s> ();
239+ create_data_reader (session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @ (idx + len (subscriptions)), client_namespace, " @(sub['topic'])" , message_version, " @(sub['dds_type'])" , queue_depth);
240+ }
241+ @ [ end for ]@
210242
211243 uxr_set_topic_callback (session, on_topic_update, this );
212244
0 commit comments