-
Notifications
You must be signed in to change notification settings - Fork 248
Expand file tree
/
Copy pathinterface.py
More file actions
125 lines (112 loc) · 3.59 KB
/
interface.py
File metadata and controls
125 lines (112 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from __future__ import annotations
import logging
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport, log
import config
log.setLevel(logging.WARNING) # disable noisy gql logs
FO_QUERY = """
query file_object($where: file_object_bool_exp, $limit: Int, $offset: Int) {
file_object_aggregate(where: $where) {
aggregate {
totalCount: count
}
}
file_object(where: $where, limit: $limit, offset: $offset) {
uid
}
}
"""
FW_QUERY = """
query firmware($where: firmware_bool_exp, $limit: Int, $offset: Int) {
firmware_aggregate(where: $where) {
aggregate {
totalCount: count
}
}
firmware(where: $where, limit: $limit, offset: $offset, order_by: {vendor: asc}) {
uid
}
}
"""
ANALYSIS_QUERY = """
query analysis($where: analysis_bool_exp, $limit: Int, $offset: Int) {
analysis_aggregate(where: $where, distinct_on: uid) {
aggregate {
totalCount: count
}
}
analysis(where: $where, limit: $limit, offset: $offset, distinct_on: uid) {
uid
}
}
"""
TABLE_TO_QUERY = {
'file_object': FO_QUERY,
'firmware': FW_QUERY,
'analysis': ANALYSIS_QUERY,
}
# these queries are simplified versions of the ones above that are displayed in the web interface
TEMPLATE_QUERIES = {
'file_object': (
'query file_object_query($where: file_object_bool_exp) {\n'
' file_object(where: $where) {\n'
' uid\n'
' }\n'
'}'
),
'firmware': (
'query firmware_query($where: firmware_bool_exp) {\n'
' firmware(where: $where, order_by: {vendor: asc}) {\n'
' uid\n'
' }\n'
'}'
),
'analysis': (
'query analysis_query($where: analysis_bool_exp) {\n'
' analysis(where: $where, distinct_on: uid) {\n'
' uid\n'
' }\n'
'}'
),
}
class GraphQLSearchError(Exception):
pass
class GraphQlInterface:
def __init__(self):
if config.frontend is None:
config.load()
url = f'http://{config.frontend.hasura.host}:{config.frontend.hasura.port}/v1/graphql'
headers = {
'Content-Type': 'application/json',
'X-Hasura-Role': 'admin',
'X-Hasura-Admin-Secret': config.frontend.hasura.admin_secret,
}
transport = AIOHTTPTransport(url=url, headers=headers)
self.client = Client(transport=transport)
def search_gql(
self,
where: dict,
table: str,
offset: int | None = None,
limit: int | None = None,
) -> tuple[list[str], int]:
"""
Search the database using a GraphQL query.
:param where: $where part of the query as dict.
:param table: name of the table we are searching. Must be one of "file_object", "firmware", "analysis".
:param offset: number of items to skip.
:param limit: number of items to return.
:return: Tuple with a list of matching uids and the total number of matches.
"""
variables = {'where': where}
if offset is not None:
variables['offset'] = offset
if limit is not None:
variables['limit'] = limit
if not (query := TABLE_TO_QUERY[table]):
raise GraphQLSearchError(f'Unknown table {table}')
response = self.client.execute(gql(query), variable_values=variables)
total = response.get(f'{table}_aggregate', {}).get('aggregate', {}).get('totalCount')
if total is None:
raise GraphQLSearchError('Could not determine total result count.')
return [e['uid'] for e in response.get(table, {})], total