44
44
"resp_tcp_seq" ,
45
45
"start_time_us" ,
46
46
"end_time_us" ,
47
- "vtap_id " ,
47
+ "agent_id " ,
48
48
"tap_port" ,
49
49
"tap_port_name" ,
50
50
"tap_port_type" ,
@@ -402,7 +402,7 @@ async def trace_l7_flow(self,
402
402
if syscall_trace_id_request > 0 or syscall_trace_id_response > 0 :
403
403
new_syscall_metas .add ((
404
404
dataframe_flowmetas ['_id' ][index ],
405
- dataframe_flowmetas ['vtap_id ' ][index ],
405
+ dataframe_flowmetas ['agent_id ' ][index ],
406
406
dataframe_flowmetas ['syscall_trace_id_request' ]
407
407
[index ],
408
408
dataframe_flowmetas ['syscall_trace_id_response' ]
@@ -486,7 +486,7 @@ async def trace_l7_flow(self,
486
486
id_to_related_tag = dict ()
487
487
for index in new_flows .index :
488
488
_id = new_flows .at [index , '_id_str' ]
489
- vtap_id = new_flows .at [index , 'vtap_id ' ]
489
+ agent_id = new_flows .at [index , 'agent_id ' ]
490
490
req_tcp_seq = new_flows .at [index , 'req_tcp_seq' ]
491
491
resp_tcp_seq = new_flows .at [index , 'resp_tcp_seq' ]
492
492
tap_side = new_flows .at [index , 'tap_side' ]
@@ -505,7 +505,7 @@ async def trace_l7_flow(self,
505
505
506
506
id_to_related_tag [_id ] = {
507
507
'_id' : _id ,
508
- 'vtap_id ' : vtap_id ,
508
+ 'agent_id ' : agent_id ,
509
509
'req_tcp_seq' : req_tcp_seq ,
510
510
'resp_tcp_seq' : resp_tcp_seq ,
511
511
'tap_side' : tap_side ,
@@ -630,7 +630,7 @@ async def query_flowmetas(self, time_filter: str,
630
630
通过tcp_seq及流日志的时间追踪
631
631
632
632
系统调用追踪信息:
633
- vtap_id , syscall_trace_id_request, syscall_trace_id_response
633
+ agent_id , syscall_trace_id_request, syscall_trace_id_response
634
634
通过eBPF获取到的coroutine_trace_id追踪
635
635
636
636
主动注入的追踪信息:
@@ -640,7 +640,7 @@ async def query_flowmetas(self, time_filter: str,
640
640
"""
641
641
sql = """SELECT
642
642
type, req_tcp_seq, resp_tcp_seq, toUnixTimestamp64Micro(start_time) AS start_time_us, toUnixTimestamp64Micro(end_time) AS end_time_us,
643
- vtap_id , syscall_trace_id_request, syscall_trace_id_response, span_id, parent_span_id, l7_protocol,
643
+ agent_id , syscall_trace_id_request, syscall_trace_id_response, span_id, parent_span_id, l7_protocol,
644
644
trace_id, x_request_id_0, x_request_id_1, toString(_id) AS `_id_str`, tap_side, auto_instance_0, auto_instance_1
645
645
FROM `l7_flow_log`
646
646
WHERE (({time_filter}) AND ({base_filter})) limit {l7_tracing_limit}
@@ -710,7 +710,7 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us):
710
710
resp_tcp_seq = dataframe_flowmetas .at [index , 'resp_tcp_seq' ]
711
711
tap_side = dataframe_flowmetas .at [index , 'tap_side' ]
712
712
_id = dataframe_flowmetas .at [index , '_id' ]
713
- vtap_id = dataframe_flowmetas .at [index , 'vtap_id ' ]
713
+ agent_id = dataframe_flowmetas .at [index , 'agent_id ' ]
714
714
_type = dataframe_flowmetas .at [index , 'type' ]
715
715
start_time_us = dataframe_flowmetas .at [index , 'start_time_us' ]
716
716
end_time_us = dataframe_flowmetas .at [index , 'end_time_us' ]
@@ -726,7 +726,7 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us):
726
726
727
727
id_to_related_tag [_id ] = {
728
728
'_id' : _id ,
729
- 'vtap_id ' : vtap_id ,
729
+ 'agent_id ' : agent_id ,
730
730
'req_tcp_seq' : req_tcp_seq ,
731
731
'resp_tcp_seq' : resp_tcp_seq ,
732
732
'tap_side' : tap_side ,
@@ -760,9 +760,9 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us):
760
760
syscall_trace_id_response = dataframe_flowmetas .at [
761
761
index , 'syscall_trace_id_response' ]
762
762
_id = dataframe_flowmetas .at [index , '_id' ]
763
- vtap_id = dataframe_flowmetas .at [index , 'vtap_id ' ]
763
+ agent_id = dataframe_flowmetas .at [index , 'agent_id ' ]
764
764
if syscall_trace_id_request > 0 or syscall_trace_id_response > 0 :
765
- new_syscall_metas .add ((_id , vtap_id , syscall_trace_id_request ,
765
+ new_syscall_metas .add ((_id , agent_id , syscall_trace_id_request ,
766
766
syscall_trace_id_response ))
767
767
if syscall_trace_id_request :
768
768
syscall_req_to_ids [syscall_trace_id_request ].add (_id )
@@ -932,30 +932,30 @@ def set_relate(self, _ids, related_map, id_to_related_tag):
932
932
class L7SyscallMeta :
933
933
"""
934
934
系统调用追踪信息:
935
- vtap_id , syscall_trace_id_request, syscall_trace_id_response, tap_side, start_time_us, end_time_us
935
+ agent_id , syscall_trace_id_request, syscall_trace_id_response, tap_side, start_time_us, end_time_us
936
936
"""
937
937
938
938
def __init__ (self , flow_metas : Tuple ):
939
939
self ._id = flow_metas [0 ]
940
- self .vtap_id = flow_metas [1 ]
940
+ self .agent_id = flow_metas [1 ]
941
941
self .syscall_trace_id_request = flow_metas [2 ]
942
942
self .syscall_trace_id_response = flow_metas [3 ]
943
943
944
944
def __eq__ (self , rhs ):
945
- return (self .vtap_id == rhs .vtap_id and self .syscall_trace_id_request
945
+ return (self .agent_id == rhs .agent_id and self .syscall_trace_id_request
946
946
== rhs .syscall_trace_id_request
947
947
and self .syscall_trace_id_response
948
948
== rhs .syscall_trace_id_response )
949
949
950
950
def set_relate (self , _ids , related_map , id_to_related_tag ):
951
951
for _id in _ids :
952
952
_id_df = id_to_related_tag [_id ]['_id' ]
953
- vtap_id_df = id_to_related_tag [_id ]['vtap_id ' ]
953
+ agent_id_df = id_to_related_tag [_id ]['agent_id ' ]
954
954
syscall_trace_id_request_df = id_to_related_tag [_id ][
955
955
'syscall_trace_id_request' ]
956
956
syscall_trace_id_response_df = id_to_related_tag [_id ][
957
957
'syscall_trace_id_response' ]
958
- if _id_df == self ._id or self .vtap_id != vtap_id_df :
958
+ if _id_df == self ._id or self .agent_id != agent_id_df :
959
959
continue
960
960
if self .syscall_trace_id_request > 0 :
961
961
if self .syscall_trace_id_request == syscall_trace_id_request_df or self .syscall_trace_id_request == syscall_trace_id_response_df :
@@ -1096,8 +1096,8 @@ def sort_and_set_parent(self):
1096
1096
1097
1097
class Service :
1098
1098
1099
- def __init__ (self , vtap_id : int , process_id : int ):
1100
- self .vtap_id = vtap_id
1099
+ def __init__ (self , agent_id : int , process_id : int ):
1100
+ self .agent_id = agent_id
1101
1101
self .process_id = process_id
1102
1102
1103
1103
self .direct_flows = []
@@ -1154,7 +1154,7 @@ def parent_set(self):
1154
1154
def check_client_process_flow (self , flow : dict ):
1155
1155
"""检查该flow是否与service有关联关系,s-p的时间范围需要覆盖c-p,否则拆分为两个service"""
1156
1156
if self .process_id != flow ["process_id_0" ] \
1157
- or self .vtap_id != flow ["vtap_id " ]:
1157
+ or self .agent_id != flow ["agent_id " ]:
1158
1158
return False
1159
1159
if self .start_time_us > flow ["start_time_us" ] \
1160
1160
or self .end_time_us < flow ["end_time_us" ]:
@@ -1164,7 +1164,7 @@ def check_client_process_flow(self, flow: dict):
1164
1164
def add_direct_flow (self , flow : dict ):
1165
1165
"""direct_flow是指该服务直接接收到的,或直接发出的flow"""
1166
1166
#assert (
1167
- # self.vtap_id == flow.get('vtap_id ')
1167
+ # self.agent_id == flow.get('agent_id ')
1168
1168
# and self.process_id == flow.get('process_id')
1169
1169
#)
1170
1170
if flow ['tap_side' ] == TAP_SIDE_SERVER_PROCESS :
@@ -1246,7 +1246,7 @@ def merge_flow(flows: list, flow: dict) -> bool:
1246
1246
if flow ['type' ] == L7_FLOW_TYPE_SESSION \
1247
1247
and flow ['tap_side' ] not in [TAP_SIDE_SERVER_PROCESS , TAP_SIDE_CLIENT_PROCESS ]:
1248
1248
return False
1249
- # vtap_id , l7_protocol, flow_id, request_id
1249
+ # agent_id , l7_protocol, flow_id, request_id
1250
1250
for i in range (len (flows )):
1251
1251
if flow ['_id' ] == flows [i ]['_id' ]:
1252
1252
continue
@@ -1281,7 +1281,7 @@ def merge_flow(flows: list, flow: dict) -> bool:
1281
1281
if not request_flow or not response_flow :
1282
1282
continue
1283
1283
for key in [
1284
- 'vtap_id ' , 'tap_port' , 'tap_port_type' , 'l7_protocol' ,
1284
+ 'agent_id ' , 'tap_port' , 'tap_port_type' , 'l7_protocol' ,
1285
1285
'request_id' , 'tap_side'
1286
1286
]:
1287
1287
if _get_df_key (request_flow , key ) != _get_df_key (
@@ -1428,37 +1428,37 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
1428
1428
f"{ id_map [_id ]} -{ ',' .join (related_types )} -{ _id } " )
1429
1429
flow ["related_ids" ] = list (related_ids )
1430
1430
1431
- # 从Flow中提取Service:一个<vtap_id , local_process_id>二元组认为是一个Service。
1431
+ # 从Flow中提取Service:一个<agent_id , local_process_id>二元组认为是一个Service。
1432
1432
service_map = defaultdict (Service )
1433
1433
for flow in syscall_flows :
1434
1434
if flow ['tap_side' ] != TAP_SIDE_SERVER_PROCESS :
1435
1435
continue
1436
1436
local_process_id = flow ['process_id_1' ]
1437
- vtap_id = flow ['vtap_id ' ]
1438
- if (vtap_id , local_process_id , 0 ) not in service_map :
1439
- service = Service (vtap_id , local_process_id )
1440
- service_map [(vtap_id , local_process_id , 0 )] = service
1437
+ agent_id = flow ['agent_id ' ]
1438
+ if (agent_id , local_process_id , 0 ) not in service_map :
1439
+ service = Service (agent_id , local_process_id )
1440
+ service_map [(agent_id , local_process_id , 0 )] = service
1441
1441
# Service直接接收或发送的Flows_
1442
1442
service .add_direct_flow (flow )
1443
1443
else :
1444
1444
index = 0
1445
1445
for key in service_map .keys ():
1446
- if key [0 ] == vtap_id and key [1 ] == local_process_id :
1446
+ if key [0 ] == agent_id and key [1 ] == local_process_id :
1447
1447
index += 1
1448
- service = Service (vtap_id , local_process_id )
1449
- service_map [(vtap_id , local_process_id , index )] = service
1448
+ service = Service (agent_id , local_process_id )
1449
+ service_map [(agent_id , local_process_id , index )] = service
1450
1450
service .add_direct_flow (flow )
1451
1451
1452
1452
for flow in syscall_flows :
1453
1453
if flow ['tap_side' ] != TAP_SIDE_CLIENT_PROCESS :
1454
1454
continue
1455
1455
local_process_id = flow ['process_id_0' ]
1456
- vtap_id = flow ['vtap_id ' ]
1456
+ agent_id = flow ['agent_id ' ]
1457
1457
index = 0
1458
1458
max_start_time_service = None
1459
- if (vtap_id , local_process_id , 0 ) in service_map :
1459
+ if (agent_id , local_process_id , 0 ) in service_map :
1460
1460
for key , service in service_map .items ():
1461
- if key [0 ] == vtap_id and key [1 ] == local_process_id :
1461
+ if key [0 ] == agent_id and key [1 ] == local_process_id :
1462
1462
index += 1
1463
1463
if service .check_client_process_flow (flow ):
1464
1464
if not max_start_time_service :
@@ -1470,8 +1470,8 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
1470
1470
max_start_time_service .add_direct_flow (flow )
1471
1471
continue
1472
1472
# 没有attach到service上的flow生成一个新的service
1473
- service = Service (vtap_id , local_process_id )
1474
- service_map [(vtap_id , local_process_id , index )] = service
1473
+ service = Service (agent_id , local_process_id )
1474
+ service_map [(agent_id , local_process_id , index )] = service
1475
1475
# Service直接接收或发送的Flow
1476
1476
service .add_direct_flow (flow )
1477
1477
@@ -1976,8 +1976,8 @@ def _get_flow_dict(flow: DataFrame):
1976
1976
flow .get ("childs" , []),
1977
1977
"process_id" :
1978
1978
flow .get ("process_id" , None ),
1979
- "vtap_id " :
1980
- flow .get ("vtap_id " , None ),
1979
+ "agent_id " :
1980
+ flow .get ("agent_id " , None ),
1981
1981
"service_uid" :
1982
1982
flow .get ("service_uid" , None ),
1983
1983
"service_uname" :
@@ -2078,18 +2078,18 @@ def network_flow_sort(traces):
2078
2078
sorted_traces += local_rest_traces
2079
2079
else :
2080
2080
for trace in local_rest_traces :
2081
- vtap_index = - 1
2081
+ agent_index = - 1
2082
2082
for i , sorted_trace in enumerate (sorted_traces ):
2083
- if vtap_index > 0 and sorted_trace ['vtap_id ' ] != trace [
2084
- 'vtap_id ' ]:
2083
+ if agent_index > 0 and sorted_trace ['agent_id ' ] != trace [
2084
+ 'agent_id ' ]:
2085
2085
break
2086
- if sorted_trace ['vtap_id ' ] == trace ['vtap_id ' ]:
2086
+ if sorted_trace ['agent_id ' ] == trace ['agent_id ' ]:
2087
2087
if sorted_trace ['start_time_us' ] < trace ['start_time_us' ]:
2088
- vtap_index = i + 1
2089
- elif vtap_index == - 1 :
2090
- vtap_index = i
2091
- if vtap_index >= 0 :
2092
- sorted_traces .insert (vtap_index , trace )
2088
+ agent_index = i + 1
2089
+ elif agent_index == - 1 :
2090
+ agent_index = i
2091
+ if agent_index >= 0 :
2092
+ sorted_traces .insert (agent_index , trace )
2093
2093
else :
2094
2094
for i , sorted_trace in enumerate (sorted_traces ):
2095
2095
if trace ['start_time_us' ] < sorted_trace ['start_time_us' ]:
0 commit comments