-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathutils.py
More file actions
177 lines (143 loc) · 5.38 KB
/
utils.py
File metadata and controls
177 lines (143 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import re
import requests
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F
import json
import subprocess
def simplify_answer(answer, convert_to_str=False):
if 'relational' in str(type(answer)):
return str(answer)
elif 'numpy' in str(type(answer)):
if answer.shape == ():
# scalar value
answer = round(float(answer), 2)
else:
# array value
answer = round(float(answer[0]), 2)
return str(answer) if convert_to_str else answer
elif not answer:
return "[FAIL]"
else:
if type(answer) in [list, tuple]:
if 'sympy' in str(type(answer[0])):
try:
answer = [round(float(x), 2) for x in answer]
except Exception:
answer = [str(x) for x in answer]
else:
answer = [str(x) for x in answer]
if len(answer) == 1:
answer = answer[0]
return answer
else:
if 'sympy' in str(type(answer)):
try:
answer = round(float(answer), 2)
except Exception:
answer = str(answer)
return answer
elif 'int' in str(type(answer)):
return str(answer) if convert_to_str else answer
else:
try:
answer = round(float(answer), 4)
return str(answer) if convert_to_str else answer
except:
return str(answer) if convert_to_str else answer
def find_max_position(scores):
if all(score is None for score in scores):
return 0
max_value = float('-inf')
max_position = 0
for idx, score in enumerate(scores):
if score is not None and score > max_value:
max_value = score
max_position = idx
return max_position
def extract_content_between_markers(file_path, start_marker, end_marker):
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# Regular expression to find all content between the specified markers
pattern = re.compile(re.escape(start_marker) + '(.*?)' + re.escape(end_marker), re.DOTALL)
# Find all matches
matches = pattern.findall(content)
return matches
# Load model from HuggingFace Hub
tokenizer = AutoTokenizer.from_pretrained('/mnt/liao/planner/models/strans')
model = AutoModel.from_pretrained('/mnt/liao/planner/models/strans')
def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[0] #First element of model_output contains all token embeddings
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
def semb(sentence, model, tokenizer):
# return the sentences' embeddings
encoded_input = tokenizer(sentence, padding=True, truncation=True, return_tensors='pt')
with torch.no_grad():
model_output = model(**encoded_input)
sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask'])
sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
return sentence_embeddings
def query_desc_embd(query, descriptions, model, tokenizer):
query_embd = semb(query, model, tokenizer)
descriptions_embd = semb(descriptions, model, tokenizer)
x_tensor = torch.cat((query_embd, descriptions_embd), 1)
return x_tensor
def score(query, descriptions, model, tokenizer, MLP):
x_tensor = query_desc_embd(query, descriptions, model, tokenizer)
score = MLP(x_tensor)
return score
def sim(s1, s2, model = model, tokenizer = tokenizer):
embds = semb([s1, s2], model, tokenizer)
cos_sim = F.cosine_similarity(embds[0].unsqueeze(0), embds[1].unsqueeze(0))
return cos_sim
def agent_rep(rep, threshold = 0.7, model = model, tokenizer = tokenizer):
rep_sim = [rep[0]]
for i in range(1, len(rep)):
flag = 0
for j in range(len(rep_sim)):
if sim(rep[i], rep_sim[j], model, tokenizer)>threshold:
flag = 1
break
if flag == 0:
rep_sim.append(rep[i])
return rep_sim
def query_subtasks(query, matches):
for match in matches:
match_json = json.loads(match)
if match_json[0]['original_query'] in query:
# print(1)
return match_json
else:
return None
def is_valid_json(variable):
try:
json.loads(variable)
except ValueError as e:
return False
return True
score_map = {
(2, 2, 2): 8,
(2, 1, 2): 7,
(2, 2, 1): 6,
(2, 1, 1): 5,
(1, 2, 2): 4,
(1, 1, 2): 3,
(1, 2, 1): 2,
(1, 1, 1): 1,
}
def level_score(correctness, relevance, completeness):
return score_map.get((correctness, relevance, completeness), 0)
def query_ollama_model(prompt, model_name="qwen2-math"):
try:
result = subprocess.run(
["ollama", "run", model_name],
# input=prompt.encode("utf-8"),
input=prompt,
capture_output=True,
text=True
)
return result.stdout.strip() if result.returncode == 0 else None
except Exception as e:
print(f"Error querying model: {e}")
return None