44# "numpy",
55# "pandas",
66# "tabulate",
7+ # "orjson"
78# ]
89# ///
910
1819from typing import Any
1920
2021import numpy as np
22+ import orjson
2123import pandas as pd
2224
2325# Analysis overview:
@@ -59,9 +61,7 @@ def extract_dataset_key(df: pd.DataFrame) -> pd.DataFrame:
5961 if "dataset" not in df .columns :
6062 df ["dataset_key" ] = pd .NA
6163 else :
62- df ["dataset_key" ] = df ["dataset" ].apply (
63- lambda x : str (sorted (x .items ())) if pd .notna (x ) and isinstance (x , dict ) else pd .NA
64- )
64+ df ["dataset_key" ] = df ["dataset" ].apply (dataset_key )
6565 return df
6666
6767
@@ -77,6 +77,126 @@ def split_file_size_rows(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
7777 return df [mask ].copy (), df [~ mask ].copy ()
7878
7979
80+ def identity_value (value : Any ) -> Any :
81+ """Normalize missing values so benchmark identities compare reliably."""
82+
83+ return None if pd .isna (value ) else value
84+
85+
86+ def dataset_key (value : Any ) -> str | None :
87+ """Normalize dataset metadata into the join-key representation."""
88+
89+ if isinstance (value , dict ):
90+ return str (sorted (value .items ()))
91+ return None
92+
93+
94+ def benchmark_identity (row : Any ) -> tuple [Any , Any , Any ] | None :
95+ """Return the timing-row identity used to find a matching baseline."""
96+
97+ if row .get ("metric" ) == FILE_SIZE_METRIC or row .get ("file_size" ) is not None :
98+ return None
99+
100+ name = row .get ("name" )
101+ if name is None :
102+ return None
103+
104+ return (
105+ identity_value (name ),
106+ identity_value (row .get ("storage" )),
107+ dataset_key (row .get ("dataset" )),
108+ )
109+
110+
111+ def benchmark_identity_rows (df : pd .DataFrame ) -> pd .DataFrame :
112+ """Return timing rows with the identity used to match a PR benchmark."""
113+
114+ _file_size_rows , timing_rows = split_file_size_rows (df )
115+ if timing_rows .empty or "name" not in timing_rows .columns :
116+ return pd .DataFrame (columns = ["commit_id" , "benchmark_identity" ])
117+
118+ timing_rows = timing_rows .copy ()
119+ if "storage" not in timing_rows .columns :
120+ timing_rows ["storage" ] = pd .NA
121+ if "commit_id" not in timing_rows .columns :
122+ timing_rows ["commit_id" ] = pd .NA
123+
124+ timing_rows = extract_dataset_key (timing_rows )
125+ timing_rows ["benchmark_identity" ] = [
126+ tuple (identity_value (row [column ]) for column in ("name" , "storage" , "dataset_key" ))
127+ for _ , row in timing_rows .iterrows ()
128+ ]
129+
130+ return timing_rows [["commit_id" , "benchmark_identity" ]]
131+
132+
133+ def read_jsonl_rows_for_commit (path : str , commit_id : str ) -> pd .DataFrame :
134+ """Read only rows matching a commit from a JSONL benchmark history."""
135+
136+ rows = []
137+ with open (path , encoding = "utf-8" ) as lines :
138+ for line in lines :
139+ if '"commit_id"' not in line or f'"{ commit_id } "' not in line :
140+ continue
141+ record = orjson .loads (line )
142+ if record .get ("commit_id" ) == commit_id :
143+ rows .append (record )
144+ return pd .DataFrame (rows )
145+
146+
147+ def read_latest_baseline_rows (path : str , pr : pd .DataFrame ) -> pd .DataFrame :
148+ """Read rows from the latest history commit matching the PR benchmark."""
149+
150+ pr_identities = set (benchmark_identity_rows (pr )["benchmark_identity" ])
151+ if not pr_identities :
152+ return pd .read_json (path , lines = True )
153+
154+ baseline_commit_id = None
155+ with open (path , encoding = "utf-8" ) as lines :
156+ for line in lines :
157+ if '"name"' not in line or '"commit_id"' not in line :
158+ continue
159+ record = orjson .loads (line )
160+ if benchmark_identity (record ) in pr_identities :
161+ commit_id = record .get ("commit_id" )
162+ if commit_id is not None :
163+ baseline_commit_id = commit_id
164+
165+ if baseline_commit_id is None :
166+ raise ValueError ("No baseline rows found for the benchmark under test" )
167+
168+ return read_jsonl_rows_for_commit (path , baseline_commit_id )
169+
170+
171+ def select_latest_baseline_rows (base : pd .DataFrame , pr : pd .DataFrame ) -> pd .DataFrame :
172+ """Select rows from the latest baseline commit containing this benchmark.
173+
174+ The persisted benchmark history is append-only. A row only appears after
175+ that benchmark job uploaded results, so the newest commit with matching row
176+ identities is the latest successful baseline for the benchmark under test.
177+ """
178+
179+ if base .empty or "commit_id" not in base .columns :
180+ return base
181+
182+ commit_ids = base ["commit_id" ].dropna ().unique ()
183+ if len (commit_ids ) <= 1 :
184+ return base
185+
186+ pr_identities = set (benchmark_identity_rows (pr )["benchmark_identity" ])
187+ if not pr_identities :
188+ return base
189+
190+ base_identities = benchmark_identity_rows (base )
191+ matches = base_identities [base_identities ["benchmark_identity" ].isin (pr_identities )]
192+ matches = matches [matches ["commit_id" ].notna ()]
193+ if matches .empty :
194+ raise ValueError ("No baseline rows found for the benchmark under test" )
195+
196+ baseline_commit_id = matches ["commit_id" ].iloc [- 1 ]
197+ return base [base ["commit_id" ] == baseline_commit_id ].copy ()
198+
199+
80200def extract_target_fields (name : str ) -> pd .Series :
81201 """Parse query, engine, and format from the benchmark name."""
82202
@@ -702,8 +822,8 @@ def main() -> None:
702822
703823 benchmark_name = sys .argv [3 ] if len (sys .argv ) > 3 else ""
704824
705- base = pd .read_json (sys .argv [1 ], lines = True )
706825 pr = pd .read_json (sys .argv [2 ], lines = True )
826+ base = read_latest_baseline_rows (sys .argv [1 ], pr )
707827
708828 base_commit_id = set (base ["commit_id" ].unique ())
709829 pr_commit_id = set (pr ["commit_id" ].unique ())
0 commit comments