@@ -162,8 +162,16 @@ struct RcvTopicsPubs {
162162 uORB :: Publication < @ (sub[' simple_base_type' ])_s> @ (sub[' topic_simple' ])_pub{ORB_ID (@ (sub[' topic_simple' ]))};
163163@ [ end for ]@
164164
165- @ [ for sub in subscriptions_multi]@
166- uORB :: PublicationMulti < @ (sub[' simple_base_type' ])_s> @ (sub[' topic_simple' ])_pub{ORB_ID (@ (sub[' topic_simple' ]))};
165+ @ [ for sub in subscriptions_demux]@
166+ uORB :: PublicationMulti < @ (sub[' simple_base_type' ])_s> @ (sub[' topic_simple' ])_pubs[@ (sub[' max_instances' ])] {
167+ @ [ for idx in range (sub[' max_instances' ])]@
168+ {ORB_ID (@ (sub[' topic_simple' ]))}@ (' ' if idx == sub[' max_instances' ]- 1 else ' ,' )
169+ @ [ end for ]@
170+ };
171+ struct {
172+ uint8_t assigned_ids[@ (sub[' max_instances' ])] {};
173+ uint8_t num_assigned {0 };
174+ } @ (sub[' topic_simple' ])_demux;
167175@ [ end for ]@
168176
169177 uint32_t num_payload_received{};
@@ -179,7 +187,7 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
179187 pubs-> num_payload_received += length;
180188
181189 switch (object_id .id ) {
182- @ [ for idx, sub in enumerate (subscriptions + subscriptions_multi )]@
190+ @ [ for idx, sub in enumerate (subscriptions)]@
183191 case @ (idx)+ (65535U / 32U ) + 1 : {
184192 @ (sub[' simple_base_type' ])_s data;
185193
@@ -190,6 +198,34 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
190198 }
191199 break ;
192200
201+ @ [ end for ]@
202+ @ [ for idx, sub in enumerate (subscriptions_demux)]@
203+ case @ (idx + len (subscriptions))+ (65535U / 32U ) + 1 : {
204+ @ (sub[' simple_base_type' ])_s data;
205+
206+ if (ucdr_deserialize_@ (sub[' simple_base_type' ])(* ub, data, time_offset_us)) {
207+ // print_message (ORB_ID (@ (sub[' simple_base_type' ])), data);
208+ int instance = - 1 ;
209+
210+ for (uint8_t i = 0 ; i < pubs-> @ (sub[' topic_simple' ])_demux .num_assigned ; i++ ) {
211+ if (pubs-> @ (sub[' topic_simple' ])_demux .assigned_ids [i] == data.@ (sub[' route_field' ])) {
212+ instance = i;
213+ break ;
214+ }
215+ }
216+
217+ if (instance < 0 && pubs-> @ (sub[' topic_simple' ])_demux .num_assigned < @ (sub[' max_instances' ])) {
218+ instance = pubs-> @ (sub[' topic_simple' ])_demux .num_assigned ++ ;
219+ pubs-> @ (sub[' topic_simple' ])_demux .assigned_ids [instance] = data.@ (sub[' route_field' ]);
220+ }
221+
222+ if (instance >= 0 ) {
223+ pubs-> @ (sub[' topic_simple' ])_pubs[instance].publish (data);
224+ }
225+ }
226+ }
227+ break ;
228+
193229@ [ end for ]@
194230
195231 default :
@@ -200,13 +236,20 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
200236
201237bool RcvTopicsPubs :: init (uxrSession * session, uxrStreamId reliable_out_stream_id, uxrStreamId reliable_in_stream_id, uxrStreamId best_effort_in_stream_id, uxrObjectId participant_id, const char * client_namespace)
202238{
203- @ [ for idx, sub in enumerate (subscriptions + subscriptions_multi )]@
239+ @ [ for idx, sub in enumerate (subscriptions)]@
204240 {
205241 uint16_t queue_depth = orb_get_queue_size (ORB_ID (@ (sub[' simple_base_type' ]))) * 2 ; // use a bit larger queue size than internal
206242 uint32_t message_version = get_message_version< @ (sub[' simple_base_type' ])_s> ();
207243 create_data_reader (session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @ (idx), client_namespace, " @(sub['topic'])" , message_version, " @(sub['dds_type'])" , queue_depth);
208244 }
209245@ [ end for ]@
246+ @ [ for idx, sub in enumerate (subscriptions_demux)]@
247+ {
248+ uint16_t queue_depth = orb_get_queue_size (ORB_ID (@ (sub[' topic_simple' ]))) * @ (sub[' max_instances' ]); // scale queue for multiple sources
249+ uint32_t message_version = get_message_version< @ (sub[' simple_base_type' ])_s> ();
250+ create_data_reader (session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @ (idx + len (subscriptions)), client_namespace, " @(sub['topic'])" , message_version, " @(sub['dds_type'])" , queue_depth);
251+ }
252+ @ [ end for ]@
210253
211254 uxr_set_topic_callback (session, on_topic_update, this );
212255
0 commit comments