@@ -102,92 +102,188 @@ void MsgBusImpl_redis::update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, ob
102102/* *
103103 * Abstract method Implementation - See MsgBusInterface.hpp for details
104104 */
105- void MsgBusImpl_redis::update_unicastPrefix (obj_bgp_peer &peer, vector<obj_rib> &rib,
106- obj_path_attr *attr, unicast_prefix_action_code code) {
107- if (attr == NULL )
105+ void MsgBusImpl_redis::update_unicastPrefix (obj_bgp_peer &peer,
106+ std::vector<obj_rib> &rib,
107+ obj_path_attr *attr,
108+ unicast_prefix_action_code code)
109+ {
110+ // Only enforce attr for ADD; DEL should proceed without it.
111+ if (code == UNICAST_PREFIX_ACTION_ADD && attr == nullptr ) {
112+ LOG_INFO (" MsgBusImpl_redis update_unicastPrefix: ADD requested but attr==NULL (peer=%s) — ignoring" , peer.peer_addr );
108113 return ;
114+ }
115+
116+ const char * actionStr = (code == UNICAST_PREFIX_ACTION_ADD) ? " add" : " del" ;
117+
118+ // Timestamp + hashes (to match Kafka line contents)
119+ std::string ts, path_hash_str, peer_hash_str, router_hash_str;
120+ getTimestamp (peer.timestamp_secs , peer.timestamp_us , ts);
121+ if (attr) hash_toStr (attr->hash_id , path_hash_str);
122+ hash_toStr (peer.hash_id , peer_hash_str);
123+ hash_toStr (peer.router_hash_id , router_hash_str);
124+
125+ LOG_INFO (" MsgBusImpl_redis update_unicastPrefix[start] action=%s peer=%s as=%u ts=%s rib_size=%zu isAdjIn=%d isPrePolicy=%d" ,
126+ actionStr, peer.peer_addr , peer.peer_as , ts.c_str (), rib.size (),
127+ (int )peer.isAdjIn , (int )peer.isPrePolicy );
128+
129+ std::vector<std::string> del_keys;
130+ del_keys.reserve (rib.size ());
131+
132+ size_t add_count = 0 , del_count = 0 ;
109133
110- vector<string> del_keys;
111- string neigh = peer.peer_addr ;
112-
113- for (size_t i = 0 ; i < rib.size (); i++) {
114- // Loop through the vector array of rib entries
115- vector<swss::FieldValueTuple> addFieldValues;
116- addFieldValues.reserve (MAX_ATTRIBUTES_COUNT);
117-
118- // rib table schema as BGP_RIB_OUT_TABLE|192.181.168.0/25|10.0.0.59
119- vector<string> keys;
120- string redisMgr_pfx = rib[i].prefix ;
121- redisMgr_pfx += " /" ;
122- redisMgr_pfx += to_string (rib[i].prefix_len );
123- keys.reserve (MAX_ATTRIBUTES_COUNT);
124- keys.emplace_back (redisMgr_pfx);
134+ for (size_t i = 0 ; i < rib.size (); ++i) {
135+ // Compose "prefix/len"
136+ std::string pfx_len = std::string (rib[i].prefix ) + " /" + std::to_string (rib[i].prefix_len );
137+
138+ // Redis key parts (prefix/len, neighbor)
139+ std::vector<std::string> keys;
140+ keys.reserve (2 );
141+ keys.emplace_back (pfx_len);
125142 keys.emplace_back (peer.peer_addr );
126143
127- switch (code) {
128-
129- case UNICAST_PREFIX_ACTION_ADD:
130- {
131- addFieldValues.emplace_back (make_pair (" origin" , string (attr->origin )));
132- addFieldValues.emplace_back (make_pair (" as_path" , attr->as_path ));
133- stringstream as_path_count;
134- as_path_count << attr->as_path_count ;
135- addFieldValues.emplace_back (make_pair (" as_path_count" , as_path_count.str ()));
136- stringstream origin_as;
137- origin_as << attr->origin_as ;
138- addFieldValues.emplace_back (make_pair (" origin_as" , origin_as.str ()));
139- addFieldValues.emplace_back (make_pair (" next_hop" , string (attr->next_hop )));
140- stringstream local_pref;
141- local_pref << attr->local_pref ;
142- addFieldValues.emplace_back (make_pair (" local_pref" , local_pref.str ()));
143- addFieldValues.emplace_back (make_pair (" community_list" , attr->community_list ));
144- addFieldValues.emplace_back (make_pair (" ext_community_list" , attr->ext_community_list ));
145- addFieldValues.emplace_back (make_pair (" large_community_list" , attr->large_community_list ));
146- addFieldValues.emplace_back (make_pair (" originator_id" , string (attr->originator_id )));
147-
148- for (const auto & fieldValue : addFieldValues) {
149- const std::string& field = std::get<0 >(fieldValue);
150- const std::string& value = std::get<1 >(fieldValue);
151- DEBUG (" MsgBusImpl_redis update_unicastPrefix field = %s, value = %s" , field.c_str (), value.c_str ());
152- }
153- if (peer.isAdjIn )
154- {
155- redisMgr_.WriteBMPTable (BMP_TABLE_RIB_IN, keys, addFieldValues);
156- }
157- else
158- {
159- redisMgr_.WriteBMPTable (BMP_TABLE_RIB_OUT, keys, addFieldValues);
160- }
161- }
162- break ;
163-
164- case UNICAST_PREFIX_ACTION_DEL:
165- {
166- string com_key;
167- if (peer.isAdjIn )
168- {
169- com_key = BMP_TABLE_RIB_IN;
170- }
171- else
172- {
173- com_key = BMP_TABLE_RIB_OUT;
174- }
175- com_key += redisMgr_.GetKeySeparator ();
176- com_key += redisMgr_pfx;
177- com_key += redisMgr_.GetKeySeparator ();
178- com_key += neigh;
179- del_keys.push_back (com_key);
180- }
181- break ;
144+ // RIB hash (already computed by producer of obj_rib)
145+ std::string rib_hash_str; hash_toStr (rib[i].hash_id , rib_hash_str);
146+
147+ LOG_INFO (" MsgBusImpl_redis RIB[%zu] prefix=%s/%d isIPv4=%d path_id=%u labels=%s rib_hash=%s" ,
148+ i, rib[i].prefix , rib[i].prefix_len , (int )rib[i].isIPv4 ,
149+ (unsigned )rib[i].path_id , rib[i].labels , rib_hash_str.c_str ());
150+
151+ if (code == UNICAST_PREFIX_ACTION_ADD) {
152+ // Convert all fields to std::string safely (works for char[] or std::string sources)
153+ // NOTE: adjust these initializations if your obj_path_attr uses different types.
154+ const std::string origin = attr->origin ; // char[] or std::string
155+ const std::string as_path = attr->as_path ; // std::string
156+ const uint32_t as_path_count = attr->as_path_count ;
157+ const uint32_t origin_as = attr->origin_as ;
158+ const std::string next_hop = attr->next_hop ; // char[] or std::string
159+ const uint32_t med = attr->med ;
160+ const uint32_t local_pref = attr->local_pref ;
161+ const std::string aggregator = attr->aggregator ; // char[] or std::string
162+ const std::string community_list = attr->community_list ; // std::string
163+ const std::string ext_comm_list = attr->ext_community_list ; // std::string
164+ const std::string cluster_list = attr->cluster_list ; // std::string
165+ const int atomic_agg = (int )attr->atomic_agg ;
166+ const int nexthop_isIPv4 = (int )attr->nexthop_isIPv4 ;
167+ const std::string originator_id = attr->originator_id ; // char[] or std::string
168+ const uint32_t path_id = rib[i].path_id ;
169+ const std::string labels = rib[i].labels ; // char[] -> std::string
170+ const int isPrePolicy = (int )peer.isPrePolicy ;
171+ const int isAdjIn = (int )peer.isAdjIn ;
172+ const std::string large_comm_list = attr->large_community_list ; // std::string
173+
174+
175+ char raw[4096 ];
176+ std::snprintf (raw, sizeof (raw),
177+ " %s\t %llu\t %s\t %s\t %s\t %s\t %u\t %s\t %s\t %d\t %d\t %s\t %s\t %u\t %u\t %s\t %u\t %u\t %s\t %s\t %s\t %s\t %d\t %d\t %s\t %u\t %s\t %d\t %d\t %s" ,
178+ actionStr,
179+ (unsigned long long )0 , /* seq if you track one */
180+ rib_hash_str.c_str (),
181+ router_hash_str.c_str (),
182+ /* router_ip column removed */
183+ path_hash_str.c_str (),
184+ peer_hash_str.c_str (),
185+ (unsigned )peer.peer_as ,
186+ ts.c_str (),
187+ rib[i].prefix ,
188+ rib[i].prefix_len ,
189+ (int )rib[i].isIPv4 ,
190+ origin.c_str (),
191+ as_path.c_str (),
192+ (unsigned )as_path_count,
193+ (unsigned )origin_as,
194+ next_hop.c_str (),
195+ (unsigned )med,
196+ (unsigned )local_pref,
197+ aggregator.c_str (),
198+ community_list.c_str (),
199+ ext_comm_list.c_str (),
200+ cluster_list.c_str (),
201+ atomic_agg,
202+ nexthop_isIPv4,
203+ originator_id.c_str (),
204+ (unsigned )path_id,
205+ labels.c_str (),
206+ isPrePolicy,
207+ isAdjIn,
208+ large_comm_list.c_str ());
209+ LOG_INFO (" MsgBusImpl_redis RAW_LINE[%zu]=%s" , i, raw);
210+
211+ const char * table = peer.isAdjIn ? BMP_TABLE_RIB_IN : BMP_TABLE_RIB_OUT;
212+ LOG_INFO (" MsgBusImpl_redis RAW_LINE ADD -> table=%s key=%s|%s" , table, pfx_len.c_str (), peer.peer_addr );
213+
214+ LOG_INFO (" MsgBusImpl_redis RAW_LINE ADD fields: origin='%s' as_path='%s' as_path_count=%u origin_as=%u next_hop='%s' "
215+ " med=%u local_pref=%u aggregator='%s' comm.len=%zu ext_comm.len=%zu cluster.len=%zu "
216+ " atomic_agg=%d nexthop_isIPv4=%d originator_id='%s' path_id=%u labels.len=%zu "
217+ " peer_as=%u isPrePolicy=%d isAdjIn=%d large_comm.len=%zu" ,
218+ origin.c_str (), as_path.c_str (), as_path_count, origin_as, next_hop.c_str (),
219+ med, local_pref, aggregator.c_str (), community_list.size (), ext_comm_list.size (),
220+ cluster_list.size (), atomic_agg, nexthop_isIPv4, originator_id.c_str (), path_id,
221+ labels.size (), peer.peer_as , isPrePolicy, isAdjIn, large_comm_list.size ());
222+
223+ // Populate Redis fields with the same mapped values
224+ std::vector<swss::FieldValueTuple> fvs;
225+ fvs.reserve (32 );
226+ fvs.emplace_back (std::make_pair (" origin" , origin));
227+ fvs.emplace_back (std::make_pair (" as_path" , as_path));
228+ fvs.emplace_back (std::make_pair (" as_path_count" , std::to_string (as_path_count)));
229+ fvs.emplace_back (std::make_pair (" origin_as" , std::to_string (origin_as)));
230+ fvs.emplace_back (std::make_pair (" next_hop" , next_hop));
231+ fvs.emplace_back (std::make_pair (" med" , std::to_string (med)));
232+ fvs.emplace_back (std::make_pair (" local_pref" , std::to_string (local_pref)));
233+ fvs.emplace_back (std::make_pair (" aggregator" , aggregator));
234+ fvs.emplace_back (std::make_pair (" community_list" , community_list));
235+ fvs.emplace_back (std::make_pair (" ext_community_list" , ext_comm_list));
236+ fvs.emplace_back (std::make_pair (" cluster_list" , cluster_list));
237+ fvs.emplace_back (std::make_pair (" atomic_agg" , std::to_string (atomic_agg)));
238+ fvs.emplace_back (std::make_pair (" nexthop_isIPv4" , std::to_string (nexthop_isIPv4)));
239+ fvs.emplace_back (std::make_pair (" originator_id" , originator_id));
240+ fvs.emplace_back (std::make_pair (" large_community_list" , large_comm_list));
241+
242+ // RIB/peer context + hashes + timestamp (Kafka includes them; store for parity)
243+ fvs.emplace_back (std::make_pair (" path_id" , std::to_string (path_id)));
244+ fvs.emplace_back (std::make_pair (" labels" , labels));
245+ fvs.emplace_back (std::make_pair (" isIPv4" , std::to_string ((int )rib[i].isIPv4 )));
246+ fvs.emplace_back (std::make_pair (" peer_as" , std::to_string (peer.peer_as )));
247+ fvs.emplace_back (std::make_pair (" isPrePolicy" , std::to_string (isPrePolicy)));
248+ fvs.emplace_back (std::make_pair (" isAdjIn" , std::to_string (isAdjIn)));
249+ fvs.emplace_back (std::make_pair (" rib_hash" , rib_hash_str));
250+ fvs.emplace_back (std::make_pair (" path_hash" , path_hash_str));
251+ fvs.emplace_back (std::make_pair (" peer_hash" , peer_hash_str));
252+ fvs.emplace_back (std::make_pair (" router_hash" , router_hash_str));
253+ fvs.emplace_back (std::make_pair (" ts" , ts));
254+
255+ const char * table = peer.isAdjIn ? BMP_TABLE_RIB_IN : BMP_TABLE_RIB_OUT;
256+ LOG_INFO (" MsgBusImpl_redis ADD -> table=%s key=%s|%s fields_count=%zu" ,
257+ table, pfx_len.c_str (), peer.peer_addr , fvs.size ());
258+
259+ if (peer.isAdjIn )
260+ redisMgr_.WriteBMPTable (BMP_TABLE_RIB_IN, keys, fvs);
261+ else
262+ redisMgr_.WriteBMPTable (BMP_TABLE_RIB_OUT, keys, fvs);
263+
264+ ++add_count;
265+ } else {
266+ std::string com_key = peer.isAdjIn ? BMP_TABLE_RIB_IN : BMP_TABLE_RIB_OUT;
267+ com_key += redisMgr_.GetKeySeparator () + pfx_len;
268+ com_key += redisMgr_.GetKeySeparator () + peer.peer_addr ;
269+ LOG_INFO (" DEL -> key=%s" , com_key.c_str ());
270+ del_keys.push_back (com_key);
271+ ++del_count;
182272 }
183273 }
184274
185275 if (!del_keys.empty ()) {
276+ LOG_INFO (" MsgBusImpl_redis RemoveEntityFromBMPTable count=%zu" , del_keys.size ());
186277 redisMgr_.RemoveEntityFromBMPTable (del_keys);
187278 }
279+
280+ LOG_INFO (" MsgBusImpl_redis update_unicastPrefix[done] action=%s adds=%zu dels=%zu" ,
281+ actionStr, add_count, del_count);
188282}
189283
190284
285+
286+
191287/* *
192288 * Abstract method Implementation - See MsgBusInterface.hpp for details
193289 */
0 commit comments