Skip to content

Commit 8df415d

Browse files
Add utilities for plotting latency distribution for libp2p
1 parent 888d384 commit 8df415d

File tree

3 files changed

+111
-2
lines changed

3 files changed

+111
-2
lines changed

analysis/src/mesh_analysis/analyzers/nimlibp2p_analyzer.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# Python Imports
22
import logging
33
from pathlib import Path
4-
54
import pandas as pd
65
from typing import List, Optional, Tuple
76
from result import Result, Err, Ok
@@ -10,7 +9,7 @@
109
from src.mesh_analysis.readers.tracers.nimlibp2p_tracer import Nimlibp2pTracer
1110
from src.mesh_analysis.stacks.vaclab_stack_analysis import VaclabStackAnalysis
1211
# Project Imports
13-
from src.utils import path_utils, file_utils
12+
1413

1514
logger = logging.getLogger(__name__)
1615

@@ -42,6 +41,21 @@ def _set_up_paths(self, dump_analysis_dir: str, local_folder_to_analyze: str):
4241
logger.error(result.err_value)
4342
exit(1)
4443

44+
def grab_latencies(self):
45+
tracer = Nimlibp2pTracer(extra_fields=self._kwargs['extra_fields']) \
46+
.with_latency_pattern()
47+
48+
queries = ['milliseconds']
49+
reader_builder = VictoriaReaderBuilder(tracer, queries, **self._kwargs)
50+
stack_analysis = VaclabStackAnalysis(reader_builder, **self._kwargs)
51+
52+
dfs = stack_analysis.get_all_node_dataframes(4)
53+
df = self._merge_latency_dfs(dfs)
54+
result = self._dump_latency_df(df)
55+
if result.is_err():
56+
logger.error(f'Issue dumping message summary. {result.err_value}')
57+
58+
4559
def analyze_reliability(self, n_jobs: int):
4660
"""
4761
This function automatically assumes two scenarios, analysis from online data, and analysis from local data.
@@ -151,6 +165,15 @@ def _merge_mix_dfs(self, dfs: List[List[pd.DataFrame]]) -> List[pd.DataFrame]:
151165

152166
return [received_df, sent_df, mix_df]
153167

168+
def _merge_latency_dfs(self, dfs: List[List[pd.DataFrame]]) -> pd.DataFrame:
169+
logger.info("Merging and sorting information")
170+
171+
received_df = pd.concat([pd.concat(group[0], ignore_index=True) for group in dfs], ignore_index=True)
172+
received_df.set_index(['latency'], inplace=True)
173+
received_df.sort_index(inplace=True)
174+
175+
return received_df
176+
154177
def _dump_dfs(self, dfs: List[pd.DataFrame]) -> Result:
155178
received = dfs[0].reset_index()
156179
received = received.astype(str)
@@ -197,6 +220,17 @@ def _dump_mix_dfs(self, dfs: List[pd.DataFrame]) -> Result:
197220

198221
return Ok(None)
199222

223+
def _dump_latency_df(self, df: pd.DataFrame) -> Result:
224+
received = df.reset_index()
225+
received = received.astype(str)
226+
logger.info("Dumping received information")
227+
result = file_utils.dump_df_as_csv(received, self._dump_analysis_path / 'summary' / 'latencies.csv', False)
228+
if result.is_err():
229+
logger.warning(result.err_value)
230+
return Err(result.err_value)
231+
232+
return Ok(None)
233+
200234
def _has_message_reliability_issues(self, msg_identifier: str, peer_identifier: str,
201235
received_df: pd.DataFrame, sent_df: pd.DataFrame,
202236
issue_dump_location: Path) -> Optional[List[str]]:

analysis/src/mesh_analysis/analyzers/waku/waku_plots.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,57 @@ def get_mixnet_and_outside_time(group):
137137

138138
return Ok(None)
139139

140+
def plot_message_latency_distributions(
141+
received_summary_paths: list[Path],
142+
plot_title: str,
143+
dump_path: Path
144+
) -> Result[None, str]:
145+
146+
# Validate input files
147+
for p in received_summary_paths:
148+
if not p.exists():
149+
error = f"Received summary file {p} does not exist"
150+
logger.error(error)
151+
return Err(error)
152+
153+
sns.set_theme()
154+
155+
all_ranges = []
156+
157+
# Process each file separately
158+
for path in received_summary_paths:
159+
df = pd.read_csv(path)
160+
#df.set_index(['shard', 'msg_hash', 'timestamp'], inplace=True)
161+
162+
df["source"] = path.stem # label dataset
163+
164+
all_ranges.append(df)
165+
166+
# Merge all datasets into one
167+
merged = pd.concat(all_ranges, ignore_index=True)
168+
169+
plt.figure(figsize=(14, 6))
170+
ax = sns.boxplot(
171+
x='source',
172+
y='latency',
173+
data=merged,
174+
showfliers=False
175+
)
176+
177+
add_boxplot_stat_labels(ax, value_type="min")
178+
add_boxplot_stat_labels(ax, value_type="max")
179+
add_boxplot_stat_labels(ax, value_type="median")
180+
181+
plt.ylabel("Latency distribution (ms)")
182+
plt.xlabel("Dataset")
183+
plt.title(plot_title)
184+
185+
plt.savefig(dump_path)
186+
plt.show()
187+
188+
return Ok(None)
189+
190+
140191
def plot_message_distribution(received_summary_path: Path, plot_title: str, dump_path: Path) -> Result[
141192
None, str]:
142193
"""

analysis/src/mesh_analysis/readers/tracers/nimlibp2p_tracer.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,20 @@ def get_num_patterns_group(self) -> int:
3030
def get_patterns(self) -> List[List[str]]:
3131
return self._patterns
3232

33+
def with_latency_pattern(self):
34+
patterns = [
35+
r'(\d+)$'
36+
]
37+
38+
tracers = [
39+
self._trace_latencies_in_logs,
40+
]
41+
42+
self._patterns.append(patterns)
43+
self._tracings.append(tracers)
44+
45+
return self
46+
3347
def with_received_pattern_group(self) -> Self:
3448
patterns = [
3549
r'Received message.*?msgId=([\w*]+).*?sentAt=([\w*]+).*?current=([\w*]+).*?delayMs=([\w*]+)'
@@ -101,6 +115,16 @@ def _trace_received_in_logs(self, parsed_logs: List) -> pd.DataFrame:
101115

102116
return df
103117

118+
def _trace_latencies_in_logs(self, parsed_logs: List) -> pd.DataFrame:
119+
columns = ['latency']
120+
if self._extra_fields is not None:
121+
columns.extend(self._extra_fields)
122+
123+
df = pd.DataFrame(parsed_logs, columns=columns)
124+
df['latency'] = pd.to_numeric(df['latency'], errors='coerce').fillna(-1).astype(int)
125+
126+
return df
127+
104128
def _trace_sent_in_logs(self, parsed_logs: List) -> pd.DataFrame:
105129
columns = ['msgId', 'timestamp']
106130
if self._extra_fields is not None:

0 commit comments

Comments
 (0)