1
+ import bisect
1
2
import logging
2
3
import re
3
4
import textwrap
12
13
13
14
from seer .automation .agent .client import GeminiProvider , LlmClient
14
15
from seer .automation .agent .embeddings import GoogleProviderEmbeddings
15
- from seer .automation .codebase .models import StaticAnalysisWarning
16
+ from seer .automation .codebase .models import Location , StaticAnalysisWarning
16
17
from seer .automation .codegen .codegen_context import CodegenContext
17
18
from seer .automation .codegen .models import (
18
19
AssociateWarningsWithIssuesOutput ,
@@ -66,8 +67,35 @@ def _left_truncated_paths(path: Path, max_num_paths: int = 2) -> list[str]:
66
67
result .append (Path (* parts ).as_posix ())
67
68
return result
68
69
69
- def _get_pr_changed_lines (self , pr_file : PrFile ) -> list [int ]:
70
- """Returns the 1-indexed changed line numbers in the updated pr file.
70
+ def _build_filepath_mapping (self , pr_files : list [PrFile ]) -> dict [str , PrFile ]:
71
+ """Build mapping of possible filepaths to PR files, including truncated variations."""
72
+ filepath_to_pr_file : dict [str , PrFile ] = {}
73
+ for pr_file in pr_files :
74
+ pr_path = Path (pr_file .filename )
75
+ filepath_to_pr_file [pr_path .as_posix ()] = pr_file
76
+ for truncated in self ._left_truncated_paths (pr_path , max_num_paths = 1 ):
77
+ filepath_to_pr_file [truncated ] = pr_file
78
+ return filepath_to_pr_file
79
+
80
+ def _is_warning_in_diff (
81
+ self ,
82
+ warning : StaticAnalysisWarning ,
83
+ filepath_to_pr_file : dict [str , PrFile ],
84
+ ) -> bool :
85
+ matching_pr_files = self ._get_matching_pr_files (warning , filepath_to_pr_file )
86
+ warning_location = Location .from_encoded (warning .encoded_location )
87
+ for pr_file in matching_pr_files :
88
+ hunk_ranges = self ._get_sorted_hunk_ranges (pr_file )
89
+ if self ._do_ranges_overlap (
90
+ (int (warning_location .start_line ), int (warning_location .end_line )),
91
+ hunk_ranges ,
92
+ ):
93
+ return True
94
+
95
+ return False
96
+
97
+ def _get_sorted_hunk_ranges (self , pr_file : PrFile ) -> list [tuple [int , int ]]:
98
+ """Returns sorted tuples of 1-indexed line numbers (start_inclusive, end_exclusive) in the updated pr file.
71
99
72
100
Determined by parsing git diff hunk headers of the form:
73
101
@@ -n,m +p,q @@ where:
@@ -83,46 +111,43 @@ def hello():
83
111
+ print("world") # Line 3 is added
84
112
print("goodbye")
85
113
86
- This would return [1,2,3,4] since these are all the lines in the updated file
114
+ @@ -20,3 +21,4 @@
115
+ print("end")
116
+ + print("new end") # Line 22 is added
117
+ return
118
+
119
+ This would return [(1,5), (21,25)] representing the modified file's hunk ranges.
87
120
88
121
Args:
89
- pr_file: PrFile object containing the patch/diff
122
+ pr_file: PrFile object containing the patch/diff (sorted by line number)
90
123
91
124
Returns:
92
- List of 1-indexed line numbers in the updated file
125
+ List of sorted tuples containing 1-indexed line numbers (start_inclusive, end_exclusive) in the updated file
93
126
"""
94
127
patch_lines = pr_file .patch .split ("\n " )
95
- changed_lines : list [int ] = []
128
+ hunk_ranges : list [tuple [ int , int ] ] = []
96
129
97
130
for line in patch_lines :
98
131
if line .startswith ("@@" ):
99
132
try :
100
133
match = re .match (r"@@ -(\d+),(\d+) \+(\d+),(\d+) @@" , line )
101
134
if match :
102
135
_ , _ , new_start , num_lines = map (int , match .groups ())
103
- changed_lines . extend ( range (new_start , new_start + num_lines ))
136
+ hunk_ranges . append ( (new_start , new_start + num_lines ))
104
137
except Exception :
105
138
self .logger .warning (f"Could not parse hunk header: { line } " )
106
139
continue
107
140
108
- return changed_lines
141
+ return hunk_ranges
109
142
110
- def _get_possible_pr_files (
111
- self , warning : StaticAnalysisWarning , pr_files : list [ PrFile ]
143
+ def _get_matching_pr_files (
144
+ self , warning : StaticAnalysisWarning , filepath_to_pr_file : dict [ str , PrFile ]
112
145
) -> list [PrFile ]:
113
146
"""Find PR files that may match a warning's location.
114
-
115
147
This handles cases where the warning location and PR file paths may be specified differently:
116
148
- With different numbers of parent directories
117
149
- With or without a repo prefix
118
150
- With relative vs absolute paths
119
-
120
- Args:
121
- warning: The static analysis warning to check
122
- pr_files: List of PR files to match against
123
-
124
- Returns:
125
- List of PR files that may match the warning's location
126
151
"""
127
152
filename = warning .encoded_location .split (":" )[0 ]
128
153
path = Path (filename )
@@ -135,62 +160,46 @@ def _get_possible_pr_files(
135
160
f"Found `..` in the middle of path. Encoded location: { warning .encoded_location } "
136
161
)
137
162
138
- # Make possible variations of the warning's path
139
163
warning_filepath_variations = {
140
164
path .as_posix (),
141
165
* self ._left_truncated_paths (path , max_num_paths = 2 ),
142
166
}
143
167
144
- # Make possible variations of the pr files' paths
145
- pr_file_by_filepath_variation : dict [str , PrFile ] = {}
146
- for pr_file in pr_files :
147
- pr_path = Path (pr_file .filename )
148
- pr_file_by_filepath_variation [pr_path .as_posix ()] = pr_file
149
- for truncated in self ._left_truncated_paths (pr_path , max_num_paths = 1 ):
150
- pr_file_by_filepath_variation [truncated ] = pr_file
151
-
152
- # Find all matching PR files
153
- matching_pr_files : list [PrFile ] = []
154
- for filepath in warning_filepath_variations :
155
- if filepath in pr_file_by_filepath_variation :
156
- matching_pr_files .append (pr_file_by_filepath_variation [filepath ])
157
-
158
- if matching_pr_files :
159
- self .logger .debug (
160
- "Found matching PR files" ,
161
- extra = {
162
- "warning_location" : warning .encoded_location ,
163
- "matching_files" : [pf .filename for pf in matching_pr_files ],
164
- },
165
- )
166
-
167
- return matching_pr_files
168
+ return [
169
+ filepath_to_pr_file [filepath ]
170
+ for filepath in warning_filepath_variations & set (filepath_to_pr_file )
171
+ ]
168
172
169
- def _is_warning_line_in_pr_files (
170
- self , warning : StaticAnalysisWarning , matching_pr_files : list [PrFile ]
173
+ def _do_ranges_overlap (
174
+ self , warning_range : tuple [ int , int ], sorted_hunk_ranges : list [tuple [ int , int ] ]
171
175
) -> bool :
172
- # Encoded location format: "file:line:col"
173
- location_parts = warning .encoded_location .split (":" )
174
- if len (location_parts ) < 2 :
175
- self .logger .warning (
176
- f"Invalid warning location format - missing line number: { warning .encoded_location } "
177
- )
176
+ if not sorted_hunk_ranges or not warning_range :
178
177
return False
179
-
180
- warning_line = int (location_parts [1 ])
181
- return any (
182
- warning_line in self ._get_pr_changed_lines (pr_file ) for pr_file in matching_pr_files
178
+ target_start , target_end = warning_range
179
+ # Handle special case of single line warning by making end inclusive
180
+ if target_start == target_end :
181
+ target_end += 1
182
+ index = bisect .bisect_left (sorted_hunk_ranges , (target_start ,))
183
+ return (index > 0 and sorted_hunk_ranges [index - 1 ][1 ] > target_start ) or (
184
+ index < len (sorted_hunk_ranges ) and sorted_hunk_ranges [index ][0 ] < target_end
183
185
)
184
186
185
187
@observe (name = "Codegen - Relevant Warnings - Filter Warnings Component" )
186
188
@ai_track (description = "Codegen - Relevant Warnings - Filter Warnings Component" )
187
189
def invoke (self , request : FilterWarningsRequest ) -> FilterWarningsOutput :
190
+ filepath_to_pr_file = self ._build_filepath_mapping (request .pr_files )
191
+
188
192
filtered_warnings : list [StaticAnalysisWarning ] = []
189
193
for warning in request .warnings :
190
- possible_pr_files = self ._get_possible_pr_files (warning , request .pr_files )
191
-
192
- if self ._is_warning_line_in_pr_files (warning , possible_pr_files ):
193
- filtered_warnings .append (warning )
194
+ try :
195
+ if self ._is_warning_in_diff (warning , filepath_to_pr_file ):
196
+ filtered_warnings .append (warning )
197
+ except Exception as e :
198
+ self .logger .warning (
199
+ f"Failed to evaluate warning, skipping: { warning .id } ({ warning .encoded_location } )" ,
200
+ exc_info = e ,
201
+ )
202
+ continue
194
203
195
204
return FilterWarningsOutput (warnings = filtered_warnings )
196
205
0 commit comments