-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathexpressions.py
682 lines (532 loc) · 21.8 KB
/
expressions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
"""Grammar and transformer for Pavilion expression syntax.
.. code-block:: none
{}
"""
import ast
from typing import Dict, Callable, Any, List
import lark
import pavilion.errors
from pavilion import expression_functions as functions
from pavilion.utils import auto_type_convert
from .common import PavTransformer, ParserValueError
EXPR_GRAMMAR = r'''
// All expressions will resolve to the start expression.
start: expr _WS?
| // An empty string is valid
// Trailing whitespace is ignored. Whitespace between tokens is
// ignored below.
_WS: /\s+/
expr: or_expr
// These set order of operations.
// See https://en.wikipedia.org/wiki/Operator-precedence_parser
or_expr: and_expr ( OR and_expr )*
and_expr: not_expr ( AND not_expr )*
not_expr: NOT? compare_expr
compare_expr: add_expr ((EQ | NOT_EQ | LT | GT | LT_EQ | GT_EQ ) add_expr)*
add_expr: mult_expr ((PLUS | MINUS) mult_expr)*
mult_expr: pow_expr ((TIMES | DIVIDE | INT_DIV | MODULUS) pow_expr)*
pow_expr: primary ("^" primary)?
primary: literal
| var_ref
| negative
| "(" expr ")"
| function_call
| list_
// A function call can contain zero or more arguments.
function_call: NAME "(" (expr ("," expr)*)? ")"
negative: (MINUS|PLUS) primary
// A literal value is just what it appears to be.
literal: INTEGER
| FLOAT
| BOOL
| ESCAPED_STRING
// Allows for trailing commas
list_: L_BRACKET (expr ("," expr)* ","?)? R_BRACKET
// Variable references are kept generic. We'll use this both
// for Pavilion string variables and result calculation variables.
var_ref: NAME ("." var_key)*
var_key: NAME
| INTEGER
| TIMES
// Strings can contain anything as long as they don't end in an odd
// number of backslashes, as that would escape the closing quote.
_STRING_INNER: /.*?/
_STRING_ESC_INNER: _STRING_INNER /(?<!\\)(\\\\)*?/
ESCAPED_STRING : "\"" _STRING_ESC_INNER "\""
L_BRACKET: "["
R_BRACKET: "]"
PLUS: "+"
MINUS: "-"
TIMES: "*"
DIVIDE: "/"
INT_DIV: "//"
MODULUS: "%"
AND: /and(?![a-zA-Z_])/
OR: /or(?![a-zA-Z_])/
NOT.2: /not(?![a-zA-Z_])/
EQ: "=="
NOT_EQ: "!="
LT: "<"
GT: ">"
LT_EQ: "<="
GT_EQ: ">="
INTEGER: /\d+/
FLOAT: /\d+\.\d+/
// This will be prioritized over 'NAME' matches
BOOL.2: "True" | "False"
// Names can be lower-case or capitalized, but must start with a letter or
// underscore
NAME.1: /[a-zA-Z_][a-zA-Z0-9_]*/
// Ignore all whitespace between tokens.
%ignore / +(?=[^.])/
'''
_EXPR_PARSER = None
__doc__ = __doc__.format('\n '.join(EXPR_GRAMMAR.split('\n')))
def get_expr_parser(debug=False):
"""Return an expression parser (cached if possible)."""
global _EXPR_PARSER # pylint: disable=global-usage
if debug or _EXPR_PARSER is None:
parser = lark.Lark(
grammar=EXPR_GRAMMAR,
parser='lalr',
debug=debug
)
else:
parser = _EXPR_PARSER
if not debug and _EXPR_PARSER is None:
_EXPR_PARSER = parser
return parser
class BaseExprTransformer(PavTransformer):
"""Transforms the expression parse tree into an actual value. The
resolved value will be one of the literal types."""
# pylint: disable=no-self-use,invalid-name
NUM_TYPES = (
int,
float,
bool
)
def _apply_op(self, op_func: Callable[[Any, Any], Any],
arg1: lark.Token, arg2: lark.Token, allow_strings=True):
""""""
# Verify that the arg value types are something numeric, or that it's a
# string and strings are allowed.
for arg in arg1, arg2:
if isinstance(arg.value, list):
for val in arg.value:
if (isinstance(val, str) and not allow_strings and
not isinstance(val, self.NUM_TYPES)):
raise ParserValueError(
token=arg,
message="Non-numeric value '{}' in list in math "
"operation.".format(val))
else:
if (isinstance(arg.value, str) and not allow_strings and
not isinstance(arg.value, self.NUM_TYPES)):
raise ParserValueError(
token=arg1,
message="Non-numeric value '{}' in math operation."
.format(arg.value))
if (isinstance(arg1.value, list) and isinstance(arg2.value, list)
and len(arg1.value) != len(arg2.value)):
raise ParserValueError(
token=arg2,
message="List operations must be between two equal length "
"lists. Arg1 had {} values, arg2 had {}."
.format(len(arg1.value), len(arg2.value)))
val1 = arg1.value
val2 = arg2.value
if isinstance(val1, list) and not isinstance(val2, list):
return [op_func(val1_part, val2) for val1_part in val1]
elif not isinstance(val1, list) and isinstance(val2, list):
return [op_func(val1, val2_part) for val2_part in val2]
elif isinstance(val1, list) and isinstance(val2, list):
return [op_func(val1[i], val2[i]) for i in range(len(val1))]
else:
return op_func(val1, val2)
def start(self, items):
"""Returns the final value of the expression."""
if not items:
return ''
return items[0].value
def expr(self, items):
"""Simply pass up the expression result."""
return items[0]
def or_expr(self, items):
"""Pass a single item up. Otherwise, apply ``'or'`` logical operations.
:param list[lark.Token] items: Tokens to logically ``'or``. The
'or' terminals are not included.
:return:
"""
# Remove 'or' tokens
or_items = []
for i in range(len(items)):
if i%2 == 0:
or_items.append(items[i])
or_items.reverse()
base_tok = or_items.pop()
while or_items:
next_tok = or_items.pop()
acc = self._apply_op(lambda a, b: bool(a or b), base_tok, next_tok)
base_tok = self._merge_tokens([base_tok, next_tok], acc)
return base_tok
def and_expr(self, items):
"""Pass a single item up. Otherwise, apply ``'and'`` logical operations.
:param list[lark.Token] items: Tokens to logically ``'and'``. The
'and' terminals are not included.
:return:
"""
# Remove 'and' tokens
and_items = []
for i in range(len(items)):
if i%2 == 0:
and_items.append(items[i])
and_items.reverse()
base_tok = and_items.pop()
while and_items:
next_tok = and_items.pop()
acc = self._apply_op(lambda a, b: bool(a and b),
base_tok, next_tok)
base_tok = self._merge_tokens([base_tok, next_tok], acc)
return base_tok
def not_expr(self, items) -> lark.Token:
"""Apply a logical not, if ``'not'`` is present.
:param list[lark.Token] items: One or two tokens
"""
if items[0] == 'not':
# Ok, this is weird. _apply op is written for binary operations,
# but to retrofit it for unary ops we just pass the same token
# as both ops (so types can be checked) and make sure our lambda
# doesn't use the second argument.
val = self._apply_op(lambda a, b: not a, items[1], items[1])
return self._merge_tokens(items, val)
return items[0]
def compare_expr(self, items) -> lark.Token:
"""Pass a single item up. Otherwise, perform the chain of comparisons.
Chained comparisons ``'3 < 7 < 10'`` will be evaluated as
``'3 < 7 and 7 < 10'``, just like in Python.
:param list[lark.Token] items: An odd number of tokens. Every second
token is an comparison operator (``'=='``, ``'!='``, ``'<'``,
``'>'``, ``'<='``, ``'>='``).
"""
if len(items) == 1:
return items[0]
comp_items = items.copy()
comp_items.reverse()
left = comp_items.pop()
base_tok = self._merge_tokens([left], True)
while comp_items:
comparator = comp_items.pop()
right = comp_items.pop()
if comparator == '==':
op_func = lambda a, b: a == b # NOQA
elif comparator == '!=':
op_func = lambda a, b: a != b # NOQA
elif comparator == '<':
op_func = lambda a, b: a < b # NOQA
elif comparator == '>':
op_func = lambda a, b: a > b # NOQA
elif comparator == '<=':
op_func = lambda a, b: a <= b # NOQA
elif comparator == '>=':
op_func = lambda a, b: a >= b # NOQA
else:
raise RuntimeError("Invalid comparator '{}'".format(comparator))
result = self._apply_op(op_func, left, right)
next_tok = self._merge_tokens([left, right], result)
acc = self._apply_op(lambda a, b: a and b, base_tok, next_tok)
base_tok = self._merge_tokens([base_tok, right], acc)
left = right
return base_tok
def math_expr(self, items) -> lark.Token:
"""Pass single items up, otherwise, perform the chain of
math operations. This function will be used for all binary math
operations with a tokenized operator.
:param list[lark.Token] items: An odd number of tokens. Every second
token is an operator.
"""
math_items = items.copy()
math_items.reverse()
base_tok = math_items.pop()
while math_items:
operator = math_items.pop()
next_tok = math_items.pop()
if operator == '+':
op_func = lambda a, b: a + b # NOQA
elif operator == '-':
op_func = lambda a, b: a - b # NOQA
elif operator == '*':
op_func = lambda a, b: a * b # NOQA
elif operator == '/':
op_func = lambda a, b: a / b # NOQA
elif operator == '//':
op_func = lambda a, b: a // b # NOQA
elif operator == '%':
op_func = lambda a, b: a % b # NOQA
else:
raise RuntimeError("Invalid operation '{}' in expression."
.format(operator))
try:
acc = self._apply_op(op_func, base_tok, next_tok,
allow_strings=False)
except ZeroDivisionError:
# This should obviously only occur for division operations.
raise ParserValueError(
self._merge_tokens([operator, next_tok], None),
"Division by zero")
base_tok = self._merge_tokens([base_tok, next_tok], acc)
return base_tok
# These have been generalized.
add_expr = math_expr
mult_expr = math_expr
def pow_expr(self, items) -> lark.Token:
"""Pass single items up, otherwise raise the first item to the
power of the second item.
:param list[lark.Token] items: One or two tokens
"""
if len(items) == 2:
result = self._apply_op(lambda a, b: a ** b, items[0], items[1],
allow_strings=False)
if isinstance(result, complex):
raise ParserValueError(
self._merge_tokens(items, None),
"Power expression has complex result")
return self._merge_tokens(items, result)
else:
return items[0]
def primary(self, items) -> lark.Token:
"""Simply pass the value up to the next layer.
:param list[Token] items: Will only be a single item.
"""
# Parenthetical expressions are handled implicitly, since
# the parenthesis aren't captured as tokens.
return items[0]
def negative(self, items) -> lark.Token:
"""
:param list[lark.Token] items:
:return:
"""
value = items[1].value
if items[0].value == '-':
value = self._apply_op(lambda a, b: -a, items[1], items[1],
allow_strings=False)
elif items[0].value == '+':
value = self._apply_op(lambda a, b: a, items[1], items[1],
allow_strings=False)
return self._merge_tokens(items, value)
def literal(self, items) -> lark.Token:
"""Just pass up the literal value.
:param list[lark.Token] items: A single token.
"""
return items[0]
def list_(self, items) -> lark.Token:
"""Handle explicit lists.
:param list[lark.Token] items: The list item tokens.
"""
return self._merge_tokens(items, [item.value for item in items[1:-1]])
def function_call(self, items) -> lark.Token:
"""Look up the function call, and call it with the given argument
values.
:param list[lark.Token] items: A function name token and zero or more
argument tokens.
"""
func_name = items[0].value
args = [tok.value for tok in items[1:]]
try:
func = functions.get_plugin(func_name)
except pavilion.errors.FunctionPluginError:
raise ParserValueError(
token=items[0],
message="No such function '{}'".format(func_name))
try:
result = func(*args)
except pavilion.errors.FunctionArgError as err:
raise ParserValueError(
self._merge_tokens(items, None),
"Invalid arguments: {}".format(err))
except pavilion.errors.FunctionPluginError as err:
# The function plugins give a reasonable message.
raise ParserValueError(self._merge_tokens(items, None), err.args[0])
return self._merge_tokens(items, result)
def INTEGER(self, tok) -> lark.Token:
"""Convert to an int.
:param lark.Token tok:
"""
# Ints are a series of digits, so this can't fail
tok.value = int(tok.value)
return tok
def FLOAT(self, tok: lark.Token) -> lark.Token:
"""Convert to a float.
:param lark.Token tok:
"""
# Similar to ints, this can't fail either.
tok.value = float(tok.value)
return tok
def BOOL(self, tok: lark.Token) -> lark.Token:
"""Convert to a boolean."""
# Assumes BOOL only matches 'True' or 'False'
tok.value = tok.value == 'True'
return tok
def ESCAPED_STRING(self, tok: lark.Token) -> lark.Token:
"""Remove quotes from the given string."""
tok.value = ast.literal_eval('r' + tok.value)
return tok
class ExprTransformer(BaseExprTransformer):
"""Convert Pavilion string expressions into their final values given
a variable manager."""
def __init__(self, var_man):
"""Initialize the transformer.
:param pavilion.test_config.variables.VariableSetManager var_man:
The variable manager to use to resolve references.
"""
self.var_man = var_man
super().__init__()
def var_ref(self, items) -> lark.Token:
"""Resolve a Pavilion variable reference.
:param items:
:return:
"""
var_key_parts = [str(item.value) for item in items]
var_key = '.'.join(var_key_parts)
if len(var_key_parts) > 4:
raise ParserValueError(
self._merge_tokens(items, var_key),
"Invalid variable '{}': too many name parts."
.format(var_key))
try:
# This may also raise a DeferredError, but we don't want to
# catch those.
val = self.var_man[var_key]
except KeyError as err:
raise ParserValueError(
self._merge_tokens(items, var_key),
err.args[0])
# Convert val into the type it looks most like.
if isinstance(val, str):
val = auto_type_convert(val)
return self._merge_tokens(items, val)
@staticmethod
def var_key(items) -> lark.Token:
"""Just return the key component."""
return items[0]
class EvaluationExprTransformer(BaseExprTransformer):
"""Transform result evaluation expressions into their final value.
The result dictionary referenced for values will be updated in place,
so subsequent uses of this will have the cumulative results.
"""
def __init__(self, results: Dict):
super().__init__()
self.results = results
def var_ref(self, items) -> lark.Token:
"""Iteratively traverse the results structure to find a value
given a key. A '*' in the key will return a list of all values
located by the remaining key. ('foo.*.bar' will return a list
of all 'bar' elements under the 'foo' key.).
:param items:
:return:
"""
key_parts = [item.value for item in items]
try:
value = self._resolve_ref(self.results, key_parts)
except ValueError as err:
raise ParserValueError(
token=self._merge_tokens(items, None),
message=err.args[0])
return self._merge_tokens(items, value)
def _resolve_ref(self, base, key_parts: list, seen_parts: tuple = tuple(),
allow_listing: bool = True):
"""Recursively resolve a variable reference by navigating dicts and
lists using the key parts until we reach the final value. If a
'*' is given, a list of the value found from looking up the
remainder of the key are returned. For example, for a dict
of lists of dicts, we might have a key 'a.*.b', which would return
the value of the 'b' key for each item in the list at 'a'.
:param base: The next item to apply a key lookup too.
:param key_parts: The remaining parts of the key.
:param seen_parts: The parts of the key we've seen so far.
:param allow_listing: Allow '*' in the key_parts. This is turned off
once we've seen one.
:return:
"""
key_parts = key_parts.copy()
if not key_parts:
return auto_type_convert(base)
key_part = key_parts.pop(0)
seen_parts = seen_parts + (key_part,)
if key_part == '*':
if not allow_listing:
raise ValueError(
"References can only contain a single '*'.")
if isinstance(base, dict):
# The 'sorted' here is important, as it ensures the values
# are always in the same order.
return [self._resolve_ref(base[sub_base], key_parts,
seen_parts, False)
for sub_base in sorted(base.keys())]
elif isinstance(base, list):
return [self._resolve_ref(sub_base, key_parts,
seen_parts, False)
for sub_base in base]
else:
raise ValueError(
"Used a '*' in a variable name, but the "
"component at that point '{}' isn't a list or dict."
.format('.'.join(seen_parts)))
elif isinstance(base, list):
try:
idx = int(key_part)
except ValueError:
raise ValueError(
"Invalid key component '{}'. The results structure at "
"'{}' is a list (so that component must be an integer)."
.format(key_part, '.'.join(seen_parts)))
if idx >= len(base):
raise ValueError(
"Key component '{}' is out of range for the list"
"at '{}'.".format(idx, '.'.join(seen_parts))
)
return self._resolve_ref(base[idx], key_parts, seen_parts,
allow_listing)
elif isinstance(base, dict):
if key_part not in base:
raise ValueError(
"Results dict does not have the key '{}'."
.format('.'.join([str(part) for part in seen_parts]))
)
return self._resolve_ref(base[key_part], key_parts, seen_parts,
allow_listing)
raise ValueError("Key component '{}' given, but value '{}' at '{}'"
"is a '{}' not a dict or list."
.format(key_part, base, '.'.join(seen_parts),
type(base)))
@staticmethod
def var_key(items) -> lark.Token:
"""Just return the key component."""
return items[0]
class VarRefVisitor(lark.Visitor):
"""Finds all of the variable references in an expression parse tree."""
def __default__(self, tree):
"""By default, return an empty list for each subtree, as
most trees will have no variable references."""
return None
def visit(self, tree) -> List[str]:
"""Visit the tree bottom up and return all the variable references
found."""
var_refs = []
for subtree in tree.iter_subtrees():
refs = self._call_userfunc(subtree)
if refs is None:
continue
for ref in refs:
if ref not in var_refs:
var_refs.append(ref)
return var_refs
# We're not supporting this method (always just use .visit())
visit_topdown = None
@staticmethod
def var_ref(tree: lark.Tree) -> List[str]:
"""Assemble and return the given variable reference."""
var_parts = []
for val in tree.scan_values(lambda c: True):
var_parts.append(val)
var_name = '.'.join(var_parts)
return [var_name]