Skip to content

Commit d015d33

Browse files
committed
feat: implement directions validation to converters
Signed-off-by: Adria Montoto <75563346+adriamontoto@users.noreply.github.com>
1 parent f0d9a46 commit d015d33

11 files changed

+654
-9
lines changed

criteria_pattern/converters/criteria_to_mysql_converter.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Any, assert_never
77

88
from criteria_pattern import Criteria, Direction, Operator
9-
from criteria_pattern.errors import InvalidColumnError, InvalidOperatorError, InvalidTableError
9+
from criteria_pattern.errors import InvalidColumnError, InvalidDirectionError, InvalidOperatorError, InvalidTableError
1010
from criteria_pattern.models.criteria import AndCriteria, NotCriteria, OrCriteria
1111

1212

@@ -42,9 +42,11 @@ def convert(
4242
check_column_injection: bool = False,
4343
check_criteria_injection: bool = False,
4444
check_operator_injection: bool = False,
45+
check_direction_injection: bool = False,
4546
valid_tables: Sequence[str] | None = None,
4647
valid_columns: Sequence[str] | None = None,
4748
valid_operators: Sequence[Operator] | None = None,
49+
valid_directions: Sequence[Direction] | None = None,
4850
) -> tuple[str, list[Any]]:
4951
"""
5052
Convert the Criteria object to a MySQL query.
@@ -62,14 +64,18 @@ def convert(
6264
Default to False.
6365
check_operator_injection (bool, optional): Raise an error if the operator is not in the list of valid
6466
operators. Default to False.
67+
check_direction_injection (bool, optional): Raise an error if the direction is not in the list of valid
68+
directions. Default to False.
6569
valid_tables (Sequence[str], optional): List of valid tables to query. Default to empty list.
6670
valid_columns (Sequence[str], optional): List of valid columns to select. Default to empty list.
6771
valid_operators (Sequence[Operator], optional): List of valid operators to use. Default to empty list.
72+
valid_directions (Sequence[Direction], optional): List of valid directions to use. Default to empty list.
6873
6974
Raises:
7075
InvalidTableError: If the table is not in the list of valid tables (only if check_table_injection=True).
7176
InvalidColumnError: If the column is not in the list of valid columns (only if check_column_injection=True).
7277
InvalidOperatorError: If the operator is not in the list of valid operators (only if check_operator_injection=True).
78+
InvalidDirectionError: If the direction is not in the list of valid directions (only if check_direction_injection=True).
7379
7480
Returns:
7581
tuple[str, list[Any]]: The MySQL query string and the query parameters as a list.
@@ -95,6 +101,7 @@ def convert(
95101
valid_tables = valid_tables or []
96102
valid_columns = valid_columns or []
97103
valid_operators = valid_operators or []
104+
valid_directions = valid_directions or []
98105

99106
if check_table_injection:
100107
cls._validate_table(table=table, valid_tables=valid_tables)
@@ -108,6 +115,9 @@ def convert(
108115
if check_operator_injection:
109116
cls._validate_operators(criteria=criteria, valid_operators=valid_operators)
110117

118+
if check_direction_injection:
119+
cls._validate_directions(criteria=criteria, valid_directions=valid_directions)
120+
111121
query = f'SELECT {", ".join(columns)} FROM {table}' # noqa: S608 # nosec
112122
parameters: list[Any] = []
113123

@@ -205,6 +215,25 @@ def _validate_operators(cls, *, criteria: Criteria, valid_operators: Sequence[Op
205215
if filter.operator not in valid_operators:
206216
raise InvalidOperatorError(operator=Operator(value=filter.operator), valid_operators=valid_operators)
207217

218+
@classmethod
219+
def _validate_directions(cls, *, criteria: Criteria, valid_directions: Sequence[Direction]) -> None:
220+
"""
221+
Validate the Criteria object directions to prevent SQL injection.
222+
223+
Args:
224+
criteria (Criteria): Criteria to validate.
225+
valid_directions (Sequence[Direction]): List of valid directions to use.
226+
227+
Raises:
228+
InvalidDirectionError: If the direction is not in the list of valid directions.
229+
"""
230+
for order in criteria.orders:
231+
if order.direction not in valid_directions:
232+
raise InvalidDirectionError(
233+
direction=Direction(value=order.direction),
234+
valid_directions=valid_directions,
235+
)
236+
208237
@classmethod
209238
def _process_filters(cls, *, criteria: Criteria, columns_mapping: Mapping[str, str]) -> tuple[str, list[Any]]:
210239
"""

criteria_pattern/converters/criteria_to_postgresql_converter.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Any, assert_never
77

88
from criteria_pattern import Criteria, Direction, Operator
9-
from criteria_pattern.errors import InvalidColumnError, InvalidOperatorError, InvalidTableError
9+
from criteria_pattern.errors import InvalidColumnError, InvalidDirectionError, InvalidOperatorError, InvalidTableError
1010
from criteria_pattern.models.criteria import AndCriteria, NotCriteria, OrCriteria
1111

1212

@@ -42,9 +42,11 @@ def convert(
4242
check_column_injection: bool = False,
4343
check_criteria_injection: bool = False,
4444
check_operator_injection: bool = False,
45+
check_direction_injection: bool = False,
4546
valid_tables: Sequence[str] | None = None,
4647
valid_columns: Sequence[str] | None = None,
4748
valid_operators: Sequence[Operator] | None = None,
49+
valid_directions: Sequence[Direction] | None = None,
4850
) -> tuple[str, dict[str, Any]]:
4951
"""
5052
Convert the Criteria object to a Postgresql query.
@@ -62,14 +64,18 @@ def convert(
6264
Default to False.
6365
check_operator_injection (bool, optional): Raise an error if the operator is not in the list of valid operators.
6466
Default to False.
67+
check_direction_injection (bool, optional): Raise an error if the direction is not in the list of valid
68+
directions. Default to False.
6569
valid_tables (Sequence[str], optional): List of valid tables to query. Default to empty list.
6670
valid_columns (Sequence[str], optional): List of valid columns to select. Default to empty list.
6771
valid_operators (Sequence[Operator], optional): List of valid operators to use. Default to empty list.
72+
valid_directions (Sequence[Direction], optional): List of valid directions to use. Default to empty list.
6873
6974
Raises:
7075
InvalidTableError: If the table is not in the list of valid tables (only if check_table_injection=True).
7176
InvalidColumnError: If the column is not in the list of valid columns (only if check_column_injection=True).
7277
InvalidOperatorError: If the operator is not in the list of valid operators (only if check_operator_injection=True).
78+
InvalidDirectionError: If the direction is not in the list of valid directions (only if check_direction_injection=True).
7379
7480
Returns:
7581
tuple[str, dict[str, Any]]: The Postgresql query string and the query parameters.
@@ -95,6 +101,7 @@ def convert(
95101
valid_tables = valid_tables or []
96102
valid_columns = valid_columns or []
97103
valid_operators = valid_operators or []
104+
valid_directions = valid_directions or []
98105

99106
if check_table_injection:
100107
cls._validate_table(table=table, valid_tables=valid_tables)
@@ -108,6 +115,9 @@ def convert(
108115
if check_operator_injection:
109116
cls._validate_operators(criteria=criteria, valid_operators=valid_operators)
110117

118+
if check_direction_injection:
119+
cls._validate_directions(criteria=criteria, valid_directions=valid_directions)
120+
111121
quoted_columns = ['*' if column == '*' else f'"{column}"' for column in columns]
112122
quoted_table = '.'.join(f'"{part}"' for part in table.split('.'))
113123
query = f'SELECT {", ".join(quoted_columns)} FROM {quoted_table}' # noqa: S608 # nosec
@@ -207,6 +217,25 @@ def _validate_operators(cls, *, criteria: Criteria, valid_operators: Sequence[Op
207217
if filter.operator not in valid_operators:
208218
raise InvalidOperatorError(operator=Operator(value=filter.operator), valid_operators=valid_operators)
209219

220+
@classmethod
221+
def _validate_directions(cls, *, criteria: Criteria, valid_directions: Sequence[Direction]) -> None:
222+
"""
223+
Validate the Criteria object directions to prevent SQL injection.
224+
225+
Args:
226+
criteria (Criteria): Criteria to validate.
227+
valid_directions (Sequence[Direction]): List of valid directions to use.
228+
229+
Raises:
230+
InvalidDirectionError: If the direction is not in the list of valid directions.
231+
"""
232+
for order in criteria.orders:
233+
if order.direction not in valid_directions:
234+
raise InvalidDirectionError(
235+
direction=Direction(value=order.direction),
236+
valid_directions=valid_directions,
237+
)
238+
210239
@classmethod
211240
def _process_filters(cls, *, criteria: Criteria, columns_mapping: Mapping[str, str]) -> tuple[str, dict[str, Any]]:
212241
"""

criteria_pattern/converters/criteria_to_sqlite_converter.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Any, assert_never
77

88
from criteria_pattern import Criteria, Direction, Operator
9-
from criteria_pattern.errors import InvalidColumnError, InvalidOperatorError, InvalidTableError
9+
from criteria_pattern.errors import InvalidColumnError, InvalidDirectionError, InvalidOperatorError, InvalidTableError
1010
from criteria_pattern.models.criteria import AndCriteria, NotCriteria, OrCriteria
1111

1212

@@ -42,9 +42,11 @@ def convert(
4242
check_column_injection: bool = False,
4343
check_criteria_injection: bool = False,
4444
check_operator_injection: bool = False,
45+
check_direction_injection: bool = False,
4546
valid_tables: Sequence[str] | None = None,
4647
valid_columns: Sequence[str] | None = None,
4748
valid_operators: Sequence[Operator] | None = None,
49+
valid_directions: Sequence[Direction] | None = None,
4850
) -> tuple[str, dict[str, Any]]:
4951
"""
5052
Convert the Criteria object to a SQLite query.
@@ -62,14 +64,18 @@ def convert(
6264
Default to False.
6365
check_operator_injection (bool, optional): Raise an error if the operator is not in the list of valid
6466
operators. Default to False.
67+
check_direction_injection (bool, optional): Raise an error if the direction is not in the list of valid
68+
directions. Default to False.
6569
valid_tables (Sequence[str], optional): List of valid tables to query. Default to empty list.
6670
valid_columns (Sequence[str], optional): List of valid columns to select. Default to empty list.
6771
valid_operators (Sequence[Operator], optional): List of valid operators to use. Default to empty list.
72+
valid_directions (Sequence[Direction], optional): List of valid directions to use. Default to empty list.
6873
6974
Raises:
7075
InvalidTableError: If the table is not in the list of valid tables (only if check_table_injection=True).
7176
InvalidColumnError: If the column is not in the list of valid columns (only if check_column_injection=True).
7277
InvalidOperatorError: If the operator is not in the list of valid operators (only if check_operator_injection=True).
78+
InvalidDirectionError: If the direction is not in the list of valid directions (only if check_direction_injection=True).
7379
7480
Returns:
7581
tuple[str, dict[str, Any]]: The SQLite query string and the query parameters.
@@ -95,6 +101,7 @@ def convert(
95101
valid_tables = valid_tables or []
96102
valid_columns = valid_columns or []
97103
valid_operators = valid_operators or []
104+
valid_directions = valid_directions or []
98105

99106
if check_table_injection:
100107
cls._validate_table(table=table, valid_tables=valid_tables)
@@ -108,6 +115,9 @@ def convert(
108115
if check_operator_injection:
109116
cls._validate_operators(criteria=criteria, valid_operators=valid_operators)
110117

118+
if check_direction_injection:
119+
cls._validate_directions(criteria=criteria, valid_directions=valid_directions)
120+
111121
quoted_columns = ['*' if column == '*' else f'"{column}"' for column in columns]
112122
quoted_table = '.'.join(f'"{part}"' for part in table.split('.'))
113123
query = f'SELECT {", ".join(quoted_columns)} FROM {quoted_table}' # noqa: S608 # nosec
@@ -207,6 +217,25 @@ def _validate_operators(cls, *, criteria: Criteria, valid_operators: Sequence[Op
207217
if filter.operator not in valid_operators:
208218
raise InvalidOperatorError(operator=Operator(value=filter.operator), valid_operators=valid_operators)
209219

220+
@classmethod
221+
def _validate_directions(cls, *, criteria: Criteria, valid_directions: Sequence[Direction]) -> None:
222+
"""
223+
Validate the Criteria object directions to prevent SQL injection.
224+
225+
Args:
226+
criteria (Criteria): Criteria to validate.
227+
valid_directions (Sequence[Direction]): List of valid directions to use.
228+
229+
Raises:
230+
InvalidDirectionError: If the direction is not in the list of valid directions.
231+
"""
232+
for order in criteria.orders:
233+
if order.direction not in valid_directions:
234+
raise InvalidDirectionError(
235+
direction=Direction(value=order.direction),
236+
valid_directions=valid_directions,
237+
)
238+
210239
@classmethod
211240
def _process_filters(cls, *, criteria: Criteria, columns_mapping: Mapping[str, str]) -> tuple[str, dict[str, Any]]:
212241
"""

criteria_pattern/converters/url_to_criteria_converter.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from urllib.parse import parse_qs, unquote_plus, urlparse
99

1010
from criteria_pattern import Criteria, Direction, Filter, Operator, Order
11-
from criteria_pattern.errors import InvalidColumnError, InvalidOperatorError
11+
from criteria_pattern.errors import InvalidColumnError, InvalidDirectionError, InvalidOperatorError
1212

1313

1414
class UrlToCriteriaConverter:
@@ -66,8 +66,10 @@ def convert(
6666
fields_mapping: Mapping[str, str] | None = None,
6767
check_field_injection: bool = False,
6868
check_operator_injection: bool = False,
69+
check_direction_injection: bool = False,
6970
valid_fields: Sequence[str] | None = None,
7071
valid_operators: Sequence[Operator] | None = None,
72+
valid_directions: Sequence[Direction] | None = None,
7173
) -> Criteria:
7274
"""
7375
Converts an URL query string into a Criteria object.
@@ -77,8 +79,10 @@ def convert(
7779
fields_mapping (Mapping[str, str], optional): Mapping of field names to aliases. Default to empty dict.
7880
check_field_injection (bool, optional): Whether to check for field injection.
7981
check_operator_injection (bool, optional): Whether to check for operator injection.
82+
check_direction_injection (bool, optional): Whether to check for direction injection.
8083
valid_fields (Sequence[str], optional): A list of valid field names. Default to empty list.
8184
valid_operators (Sequence[Operator], optional): A list of valid operators. Default to empty list.
85+
valid_directions (Sequence[Direction], optional): A list of valid directions. Default to empty list.
8286
8387
Raises:
8488
TypeError: If the filter index is not an integer.
@@ -93,6 +97,7 @@ def convert(
9397
InvalidColumnError: If an invalid field name is found in filters.
9498
InvalidColumnError: If an invalid field name is found in orders.
9599
InvalidOperatorError: If an invalid operator is found in filters.
100+
InvalidDirectionError: If an invalid direction is found in orders.
96101
97102
Example:
98103
```python
@@ -107,6 +112,7 @@ def convert(
107112
valid_fields = valid_fields or []
108113
fields_mapping = fields_mapping or {}
109114
valid_operators = valid_operators or []
115+
valid_directions = valid_directions or []
110116

111117
query_params = parse_qs(qs=urlparse(url=url).query, keep_blank_values=True)
112118

@@ -128,6 +134,9 @@ def convert(
128134
if check_operator_injection:
129135
cls._validate_operators(criteria=criteria, valid_operators=valid_operators)
130136

137+
if check_direction_injection:
138+
cls._validate_directions(criteria=criteria, valid_directions=valid_directions)
139+
131140
return criteria
132141

133142
@classmethod
@@ -417,3 +426,22 @@ def _validate_operators(cls, *, criteria: Criteria, valid_operators: Sequence[Op
417426
for filter in criteria.filters:
418427
if filter.operator not in valid_operators:
419428
raise InvalidOperatorError(operator=Operator(value=filter.operator), valid_operators=valid_operators)
429+
430+
@classmethod
431+
def _validate_directions(cls, *, criteria: Criteria, valid_directions: Sequence[Direction]) -> None:
432+
"""
433+
Validate the Criteria object directions to prevent injection.
434+
435+
Args:
436+
criteria (Criteria): Criteria to validate.
437+
valid_directions (Sequence[Direction]): List of valid directions to use.
438+
439+
Raises:
440+
InvalidDirectionError: If the direction is not in the list of valid directions.
441+
"""
442+
for order in criteria.orders:
443+
if order.direction not in valid_directions:
444+
raise InvalidDirectionError(
445+
direction=Direction(value=order.direction),
446+
valid_directions=valid_directions,
447+
)

criteria_pattern/errors/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
from .invalid_column_error import InvalidColumnError
2+
from .invalid_direction_error import InvalidDirectionError
23
from .invalid_operator_error import InvalidOperatorError
34
from .invalid_table_error import InvalidTableError
45
from .sql_converter_error import SqlConverterError
56

67
__all__ = (
78
'InvalidColumnError',
9+
'InvalidDirectionError',
810
'InvalidOperatorError',
911
'InvalidTableError',
1012
'SqlConverterError',

0 commit comments

Comments
 (0)