forked from EvolvingLMMs-Lab/lmms-eval
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
316 lines (252 loc) · 10.9 KB
/
utils.py
File metadata and controls
316 lines (252 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import json
import os
import random
import re
from collections import defaultdict
from pathlib import Path
import torch
import yaml
from decord import VideoReader, cpu
from loguru import logger as eval_logger
from PIL import Image
from lmms_eval import utils as lmms_utils
from lmms_eval.tasks._task_utils.file_utils import generate_submission_file
def timestamp_to_seconds(timestamp):
# Split the timestamp into hours, minutes, and seconds
h, m, s = timestamp.split(":")
# Convert hours, minutes, and total seconds (including fractions) to float and compute total seconds
total_seconds = int(h) * 3600 + int(m) * 60 + float(s)
return total_seconds
def load_video(video_file, duration, max_num_frames=16):
vr = VideoReader(video_file, ctx=cpu(0), num_threads=1)
fps = vr.get_avg_fps()
total_valid_frames = int(duration * fps)
num_frames = min(max_num_frames, int(duration))
frame_indices = [int(total_valid_frames / num_frames) * i for i in range(num_frames)]
frames = vr.get_batch(frame_indices)
if isinstance(frames, torch.Tensor):
frames = frames.numpy()
else:
frames = frames.asnumpy()
frame_timestamps = [frame_index / fps for frame_index in frame_indices]
return [Image.fromarray(fr).convert("RGB") for fr in frames]
def compute_frame_timestamps(duration, max_num_frames=16):
if duration > max_num_frames:
return [duration / max_num_frames * i for i in range(max_num_frames)]
else:
return [i for i in range(int(duration))]
def insert_subtitles_into_frames(frame_timestamps, subtitles, starting_timestamp_for_subtitles, duration):
interleaved_list = []
cur_i = 0
for subtitle in subtitles:
if "timestamp" in subtitle:
start, end = subtitle["timestamp"]
if not isinstance(end, float):
end = duration
start -= starting_timestamp_for_subtitles
end -= starting_timestamp_for_subtitles
subtitle_timestamp = (start + end) / 2
subtitle_text = subtitle["text"]
else:
start, end = subtitle["start"], subtitle["end"]
start = timestamp_to_seconds(start)
end = timestamp_to_seconds(end)
start -= starting_timestamp_for_subtitles
end -= starting_timestamp_for_subtitles
subtitle_timestamp = (start + end) / 2
subtitle_text = subtitle["line"]
for i, frame_timestamp in enumerate(frame_timestamps[cur_i:]):
if frame_timestamp <= subtitle_timestamp:
# print("frame:", frame_timestamp)
interleaved_list.append("<image>")
cur_i += 1
else:
break
if end - start < 1:
end = subtitle_timestamp + 0.5
start = subtitle_timestamp - 0.5
covering_frames = False
for frame_timestamp in frame_timestamps:
if frame_timestamp < end and frame_timestamp > start:
covering_frames = True
break
if covering_frames:
# print("subtitle:", subtitle_timestamp, start, end)
interleaved_list.append(subtitle_text)
else:
pass
# print("leaving out subtitle:", start, end)
for i, frame_timestamp in enumerate(frame_timestamps[cur_i:]):
# print(frame_timestamp)
interleaved_list.append("<image>")
return "\n".join(interleaved_list)
def _load_task_config(task_yaml_name):
with open(Path(__file__).parent / task_yaml_name, "r") as f:
raw_data = f.readlines()
safe_data = []
for line in raw_data:
# remove function definition since yaml load cannot handle it
if "!function" not in line:
safe_data.append(line)
return yaml.safe_load("".join(safe_data))
def _resolve_dataset_dir(task_yaml_name, subdir_key, default_subdir):
task_config = _load_task_config(task_yaml_name)
dataset_kwargs = task_config["dataset_kwargs"]
hf_home = os.path.expanduser(os.getenv("HF_HOME", "~/.cache/huggingface/"))
cache_dir = lmms_utils.resolve_cache_dir(dataset_kwargs["cache_dir"], base_dir=hf_home)
return os.path.join(cache_dir, dataset_kwargs.get(subdir_key, default_subdir)), dataset_kwargs
def longvideobench_doc_to_text(doc, lmms_eval_specific_kwargs):
candidates = []
for i in range(5):
candidate = doc.get(f"option{i}")
if candidate != "N/A":
candidates.append(candidate)
question = doc["question"] + "\n" + "\n".join([". ".join([chr(ord("A") + i), candidate]) for i, candidate in enumerate(candidates)])
pre_prompt = lmms_eval_specific_kwargs["pre_prompt"]
post_prompt = lmms_eval_specific_kwargs["post_prompt"]
if lmms_eval_specific_kwargs.get("insert_interleave_subtitles", False):
cache_dir, dataset_kwargs = _resolve_dataset_dir("longvideobench_val_i.yaml", "subtitle_subdir", "subtitles")
with open(os.path.join(cache_dir, doc["subtitle_path"])) as f:
subtitles = json.load(f)
max_num_frames = dataset_kwargs.get("max_num_frames", 16)
frame_timestamps = compute_frame_timestamps(doc["duration"], max_num_frames)
interleaved_prefix = insert_subtitles_into_frames(frame_timestamps, subtitles, doc["starting_timestamp_for_subtitles"], doc["duration"])
return f"{pre_prompt}{interleaved_prefix}\n{question}\n{post_prompt}"
else:
return f"{pre_prompt}{question}\n{post_prompt}"
def longvideobench_doc_to_visual_v(doc):
cache_dir, _ = _resolve_dataset_dir("longvideobench_val_v.yaml", "video_subdir", "videos/")
video_path = doc["video_path"]
video_path = os.path.join(cache_dir, video_path)
return [video_path]
def longvideobench_doc_to_visual_i(doc):
cache_dir, dataset_kwargs = _resolve_dataset_dir("longvideobench_val_i.yaml", "video_subdir", "videos/")
video_path = doc["video_path"]
video_path = os.path.join(cache_dir, video_path)
max_num_frames = dataset_kwargs.get("max_num_frames", 16)
return load_video(video_path, doc["duration"], max_num_frames)
def get_multi_choice_info(options):
"""
Given the list of options for multiple choice question
Return the index2ans and all_choices
https://github.com/MMMU-Benchmark/MMMU/blob/51ce7f3e829c16bb44bc5445782686b4c3508794/eval/data_utils.py#L54
"""
start_chr = "A"
all_choices = []
index2ans = {}
for i, option in enumerate(options):
index2ans[chr(ord(start_chr) + i)] = option
all_choices.append(chr(ord(start_chr) + i))
return index2ans, all_choices
def parse_multi_choice_response(response, all_choices, index2ans):
"""
Changed from MMMU-style complex parsing into simple parsing.
Fixed to avoid 'D. A book' be parsed as A.
Same as original LongVideoBench paper (from author Haoning Wu), if parsing failed, it will assign a random choice to model.
"""
s = response.strip()
answer_prefixes = [
"The best answer is",
"The correct answer is",
"The answer is",
"The answer",
"The best option is",
"The correct option is",
"Best answer:",
"Best option:",
]
for answer_prefix in answer_prefixes:
s = s.replace(answer_prefix, "")
if len(s.split()) > 10 and not re.search("[ABCDE]", s):
return random.choice(all_choices)
matches = re.search(r"[ABCDE]", s)
if matches is None:
return random.choice(all_choices)
return matches[0]
def evaluate_longvideobench(samples):
pred_correct = 0
judge_dict = dict()
for sample in samples:
gold_i = sample["answer"]
pred_i = sample["parsed_pred"]
correct = eval_multi_choice(gold_i, pred_i)
if correct:
judge_dict[sample["id"]] = "Correct"
pred_correct += 1
else:
judge_dict[sample["id"]] = "Wrong"
if len(samples) == 0:
return {"acc": 0}
return judge_dict, {"acc": pred_correct / len(samples)}
def eval_multi_choice(gold_i, pred_i):
correct = False
# only they are exactly the same, we consider it as correct
if isinstance(gold_i, list):
for answer in gold_i:
if answer == pred_i:
correct = True
break
else: # gold_i is a string
if gold_i == pred_i:
correct = True
return correct
def calculate_ins_level_acc(results):
"""Calculate the instruction level accuracy for given Subject results
https://github.com/MMMU-Benchmark/MMMU/blob/51ce7f3e829c16bb44bc5445782686b4c3508794/eval/eval_utils.py#L246
"""
acc = 0
ins_num = 0
for cat_results in results.values():
acc += cat_results["acc"] * cat_results["num_example"]
ins_num += cat_results["num_example"]
if ins_num == 0:
return 0
return acc / ins_num
def longvideobench_process_results(doc, results):
pred = results[0]
all_choices = []
index2ans = {}
for i in range(5):
option = doc.get(f"option{i}")
if option == "N/A":
break
index2ans[chr(ord("A") + i)] = option
all_choices.append(chr(ord("A") + i))
parsed_pred = parse_multi_choice_response(pred, all_choices, index2ans)
id = doc["id"]
lvb_acc = {"id": id, "duration_group": doc["duration_group"], "question_category": doc["question_category"], "answer": chr(ord("A") + doc["correct_choice"]), "parsed_pred": parsed_pred}
return {
"lvb_acc": lvb_acc,
"submission": {
id: pred,
},
}
def longvideobench_aggregate_results(results):
evaluation_result = {}
subset_to_eval_samples = defaultdict(list)
for result in results:
subset_to_eval_samples[result["duration_group"]].append(result)
subset_to_eval_samples[result["question_category"]].append(result)
for subset, sub_eval_samples in subset_to_eval_samples.items():
judge_dict, metric_dict = evaluate_longvideobench(sub_eval_samples)
metric_dict.update({"num_example": len(sub_eval_samples)})
evaluation_result[subset] = metric_dict
printable_results = {}
for cat_name, cat_results in evaluation_result.items():
printable_results[cat_name] = {
"num": int(cat_results["num_example"]),
"acc": round(cat_results["acc"], 5),
}
all_ins_acc = calculate_ins_level_acc(evaluation_result)
printable_results["Overall"] = {
"num": sum([cat_results["num_example"] for cat_results in evaluation_result.values()]),
"acc": round(all_ins_acc, 5),
}
eval_logger.info(printable_results)
return printable_results["Overall"]["acc"]
def longvideobench_aggregate_results_for_submission(results, args):
path = generate_submission_file("longvideobench_test_for_submission.json", args)
results_dict = {list(item.keys())[0]: list(item.values())[0] for item in results}
with open(path, "w") as f:
json.dump(results_dict, f)
eval_logger.info(f"Results saved to {path}.")