@@ -160,7 +160,7 @@ async def query(self):
160
160
return self .status , rst , self .failed_regions
161
161
162
162
async def get_id_by_trace_id (self , trace_id , time_filter ):
163
- sql = f"SELECT toString(_id) AS `_id` FROM l7_flow_log WHERE trace_id='{ trace_id } ' AND { time_filter } limit 1"
163
+ sql = f"SELECT toString(_id) AS `_id` FROM l7_flow_log WHERE FastFilter( trace_id) ='{ trace_id } ' AND { time_filter } limit 1"
164
164
resp = await self .query_ck (sql )
165
165
self .status .append ("Query _id" , resp )
166
166
data = resp ["data" ]
@@ -217,8 +217,11 @@ async def trace_l7_flow(self,
217
217
filters = []
218
218
new_trace_id_flows = pd .DataFrame ()
219
219
new_trace_id_filters = []
220
+ new_trace_ids = set ()
220
221
# 主动注入的追踪信息
221
222
if not allow_multiple_trace_ids_in_tracing_result :
223
+ if trace_id :
224
+ new_trace_ids .add (trace_id )
222
225
delete_index = []
223
226
deleted_trace_ids = set ()
224
227
for index in range (len (dataframe_flowmetas .index )):
@@ -230,15 +233,13 @@ async def trace_l7_flow(self,
230
233
deleted_trace_ids .add (flow_trace_id )
231
234
if not trace_id :
232
235
trace_id = flow_trace_id
236
+ new_trace_ids .add (trace_id )
233
237
if trace_id and not query_simple_trace_id :
234
- new_trace_id_filters .append (f"trace_id='{ trace_id } '" )
238
+ new_trace_id_filters .append (
239
+ f"FastFilter(trace_id)='{ trace_id } '" )
235
240
# Trace id query separately
236
241
new_trace_id_flows = await self .query_flowmetas (
237
242
time_filter , ' OR ' .join (new_trace_id_filters ))
238
- if type (new_trace_id_flows ) != DataFrame :
239
- break
240
- new_trace_id_flows .rename (columns = {'_id_str' : '_id' },
241
- inplace = True )
242
243
query_simple_trace_id = True
243
244
if delete_index :
244
245
dataframe_flowmetas = dataframe_flowmetas .drop (
@@ -266,7 +267,6 @@ async def trace_l7_flow(self,
266
267
ignore_index = True ).drop_duplicates (
267
268
["_id" ]).reset_index (drop = True )
268
269
else :
269
- new_trace_ids = set ()
270
270
third_app_spans = []
271
271
for index in range (len (dataframe_flowmetas .index )):
272
272
if dataframe_flowmetas ['trace_id' ][index ] in [0 , '' ]:
@@ -299,18 +299,27 @@ async def trace_l7_flow(self,
299
299
new_trace_ids -= trace_ids
300
300
trace_ids |= new_trace_ids
301
301
if new_trace_ids :
302
- trace_ids_set = set ([nxrid [1 ] for nxrid in new_trace_ids ])
303
- new_trace_id_filters .append ('(' + ' OR ' .join ([
304
- "trace_id='{tid}'" .format (tid = tid )
305
- for tid in trace_ids_set
306
- ]) + ')' )
302
+ new_trace_id_filters .append (
303
+ f"FastFilter(trace_id) IN ({ ',' .join (new_trace_ids )} )" )
307
304
# Trace id query separately
308
305
new_trace_id_flows = await self .query_flowmetas (
309
306
time_filter , ' OR ' .join (new_trace_id_filters ))
310
- if type (new_trace_id_flows ) != DataFrame :
311
- break
312
- new_trace_id_flows .rename (columns = {'_id_str' : '_id' },
313
- inplace = True )
307
+
308
+ if type (new_trace_id_flows ) != DataFrame :
309
+ break
310
+ # Delete different trace id data
311
+ new_trace_id_flow_delete_index = []
312
+ deleted_trace_ids = set ()
313
+ for index in range (len (new_trace_id_flows .index )):
314
+ flow_trace_id = new_trace_id_flows ['trace_id' ][index ]
315
+ if flow_trace_id not in new_trace_ids :
316
+ new_trace_id_flow_delete_index .append (index )
317
+ deleted_trace_ids .add (flow_trace_id )
318
+ if new_trace_id_flow_delete_index :
319
+ new_trace_id_flows = new_trace_id_flows .drop (
320
+ new_trace_id_flow_delete_index )
321
+ new_trace_id_flows = new_trace_id_flows .reset_index (drop = True )
322
+ new_trace_id_flows .rename (columns = {'_id_str' : '_id' }, inplace = True )
314
323
315
324
# 新的网络追踪信息
316
325
new_network_metas = set ()
0 commit comments