Skip to content

Commit b8c2a2b

Browse files
authored
Merge pull request #28 from kou/arrow-trace-log
Add support for trace log
2 parents 93a2563 + c654c67 commit b8c2a2b

2 files changed

Lines changed: 225 additions & 26 deletions

File tree

poyonga/result.py

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,29 @@ def __init__(self, data, output_type: OutputType = OutputType.JSON, encoding="ut
4949
else: # xml or other types...
5050
# TODO: not implement
5151
raise NotImplementedError(f"not implement output_type: {output_type}")
52-
self.status = _result[0][0]
53-
self.start_time = _result[0][1]
54-
self.elapsed = _result[0][2]
55-
if len(_result) == 1 and self.status != 0:
56-
self.body = _result[0][3]
57-
elif len(_result) == 1 and self.status == 0:
58-
self.body = "" # handling invalid result
52+
if isinstance(_result, dict):
53+
# command_version=3 or later
54+
header = _result["header"]
55+
self.status = header["return_code"]
56+
self.start_time = header["start_time"]
57+
self.elapsed = header["elapsed_time"]
58+
self.trace_logs = None
59+
trace_log = _result.get("trace_log")
60+
if trace_log:
61+
names = [column["name"] for column in trace_log["columns"]]
62+
self.trace_logs = [dict(zip(names, log)) for log in trace_log["logs"]]
63+
self.body = _result["body"]
5964
else:
60-
self.body = _result[1]
65+
self.status = _result[0][0]
66+
self.start_time = _result[0][1]
67+
self.elapsed = _result[0][2]
68+
self.trace_logs = None
69+
if len(_result) == 1 and self.status != 0:
70+
self.body = _result[0][3]
71+
elif len(_result) == 1 and self.status == 0:
72+
self.body = "" # handling invalid result
73+
else:
74+
self.body = _result[1]
6175

6276
def _is_apache_arrow(self, content_type):
6377
return content_type == "application/x-apache-arrow-streaming"
@@ -71,6 +85,15 @@ def is_metadata(schema):
7185
return False
7286
return schema.metadata.get(b"GROONGA:data_type") == b"metadata"
7387

88+
def is_trace_log(schema):
89+
if not schema.metadata:
90+
return False
91+
return schema.metadata.get(b"GROONGA:data_type") == b"trace_log"
92+
93+
self.hit_num = -1
94+
self.items = None
95+
self.body = None
96+
self.trace_logs = None
7497
source = pa.BufferReader(data)
7598
while source.tell() < source.size():
7699
with pa.RecordBatchStreamReader(source) as reader:
@@ -83,16 +106,16 @@ def is_metadata(schema):
83106
self.start_time = start_time_s
84107
self.elapsed = table["elapsed_time"][0].as_py()
85108
if self.status != 0:
86-
self.hit_num = -1
87-
self.items = None
88109
try:
89110
table.schema.field("error_message")
90111
self.body = table["error_message"][0].as_py()
91112
except KeyError:
92113
# For Groonga < 13.0.9.
93114
# Groonga < 13.0.9 doesn't provide the "error_message"
94115
# column.
95-
self.body = None
116+
pass
117+
elif is_trace_log(schema):
118+
self.trace_logs = table
96119
else:
97120
self._parse_apache_arrow_body(table)
98121

@@ -115,21 +138,29 @@ def __init__(self, data, output_type: OutputType = OutputType.JSON, encoding="ut
115138
return
116139
self.items = []
117140
self.hit_num = -1 # default is -1 (Error)
118-
if len(self.body) != 0:
119-
self.hit_num = self.body[0][0][0]
120-
if self.status == 0:
121-
keys = [k[0] for k in self.body[0][1]]
122-
self.items = [dict(list(zip(keys, item))) for item in self.body[0][2:]]
141+
if self.body is None:
142+
pass
143+
elif isinstance(self.body, dict):
144+
# command_version=3 or later
145+
if "n_hits" in self.body:
146+
self.hit_num = self.body["n_hits"]
147+
if "records" in self.body:
148+
keys = [column["name"] for column in self.body["columns"]]
149+
self.items = [dict(zip(keys, record)) for record in self.body["records"]]
150+
else:
151+
if len(self.body) != 0:
152+
self.hit_num = self.body[0][0][0]
153+
if self.status == 0:
154+
keys = [k[0] for k in self.body[0][1]]
155+
self.items = [dict(zip(keys, item)) for item in self.body[0][2:]]
123156

124157
def _parse_apache_arrow_body(self, table):
125158
self.body = table
126-
self.hit_num = -1
127-
if self.status == 0:
128-
metadata = table.schema.metadata
129-
if metadata:
130-
n_hits_raw = metadata.get(b"GROONGA:n_hits")
131-
if n_hits_raw:
132-
self.hit_num = int(n_hits_raw)
133-
self.items = table
134-
else:
135-
self.items = None
159+
if self.status != 0:
160+
return
161+
metadata = table.schema.metadata
162+
if metadata:
163+
n_hits_raw = metadata.get(b"GROONGA:n_hits")
164+
if n_hits_raw:
165+
self.hit_num = int(n_hits_raw)
166+
self.items = table

test/test_poyonga.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,174 @@ def test_select_apache_arrow(self, mock_urlopen):
187187
self.assertEqual(ret.hit_num, 29)
188188
self.assertEqual(ret.items, pa.Table.from_batches([result_set_record_batch]))
189189

190+
@patch("poyonga.client.urlopen")
191+
def test_select_json_trace_log(self, mock_urlopen):
192+
m = Mock()
193+
response = {
194+
"header": {
195+
"return_code": 0,
196+
"start_time": 1337566253.89858,
197+
"elapsed_time": 0.000354,
198+
},
199+
"trace_log": {
200+
"columns": [
201+
{"name": "depth"},
202+
{"name": "sequence"},
203+
{"name": "name"},
204+
{"name": "value"},
205+
{"name": "elapsed_time"},
206+
],
207+
"logs": [
208+
[1, 0, "ii.select.input", "Thas", 0],
209+
[1, 1, "ii.select.operator", "or", 1],
210+
[2, 0, "ii.select.exact.n_hits", 0, 2],
211+
[2, 0, "ii.select.fuzzy.input", "Thas", 3],
212+
[2, 1, "ii.select.fuzzy.input.actual", "that", 4],
213+
[2, 2, "ii.select.fuzzy.input.actual", "this", 5],
214+
[2, 3, "ii.select.fuzzy.n_hits", 2, 6],
215+
[1, 2, "ii.select.n_hits", 2, 7],
216+
[1, 0, "ii.select.input", "ere", 8],
217+
[1, 1, "ii.select.operator", "or", 9],
218+
[2, 0, "ii.select.exact.n_hits", 2, 10],
219+
[1, 2, "ii.select.n_hits", 2, 11],
220+
],
221+
},
222+
"body": {
223+
"n_hits": 2,
224+
"columns": [{"name": "content", "type": "ShortText"}, {"name": "_score", "type": "Float"}],
225+
"records": [["This is a pen", 1.0], ["That is a pen", 1.0]],
226+
},
227+
}
228+
m.read.side_effect = [json.dumps(response)]
229+
mock_urlopen.return_value = m
230+
ret = self.g.call("select", command_version="3", table="Site", output_trace_log="yes")
231+
self.assertEqual(ret.status, 0)
232+
self.assertEqual(ret.start_time, 1337566253.89858)
233+
self.assertEqual(ret.elapsed, 0.000354)
234+
self.assertEqual(ret.hit_num, 2)
235+
trace_log_column_names = ["depth", "sequence", "name", "value", "elapsed_time"]
236+
self.assertEqual(
237+
ret.trace_logs, [dict(zip(trace_log_column_names, log)) for log in response["trace_log"]["logs"]]
238+
)
239+
record_column_names = ["content", "_score"]
240+
self.assertEqual(ret.items, [dict(zip(record_column_names, record)) for record in response["body"]["records"]])
241+
242+
@unittest.skipUnless(pa, "require pyarrow")
243+
@patch("poyonga.client.urlopen")
244+
def test_select_apache_arrow_trace_log(self, mock_urlopen):
245+
m = Mock()
246+
247+
metadata_fields = [
248+
pa.field("return_code", pa.int32()),
249+
pa.field("start_time", pa.timestamp("ns")),
250+
pa.field("elapsed_time", pa.float64()),
251+
]
252+
metadata_metadata = {
253+
"GROONGA:data_type": "metadata",
254+
}
255+
metadata_schema = pa.schema(metadata_fields, metadata_metadata)
256+
sec_to_ns = 1_000_000_000
257+
metadata = [
258+
[0],
259+
[int(1337566253.89858 * sec_to_ns)],
260+
[0.000354],
261+
]
262+
metadata_record_batch = pa.record_batch(metadata, schema=metadata_schema)
263+
264+
value_type = pa.dense_union([pa.field("0", pa.uint32()), pa.field("1", pa.string())])
265+
value_type_dictionary = {
266+
int: 0,
267+
str: 1,
268+
}
269+
trace_log_fields = [
270+
pa.field("depth", pa.uint16()),
271+
pa.field("sequence", pa.uint16()),
272+
pa.field("name", pa.string()),
273+
pa.field("value", value_type),
274+
pa.field("elapsed_time", pa.uint64()),
275+
]
276+
trace_log_metadata = {
277+
"GROONGA:data_type": "trace_log",
278+
}
279+
trace_log_schema = pa.schema(trace_log_fields, trace_log_metadata)
280+
# Row-based for easy to maintain
281+
trace_logs = [
282+
[1, 0, "ii.select.input", "Thas", 0],
283+
[1, 1, "ii.select.operator", "or", 1],
284+
[2, 0, "ii.select.exact.n_hits", 0, 2],
285+
[2, 0, "ii.select.fuzzy.input", "Thas", 3],
286+
[2, 1, "ii.select.fuzzy.input.actual", "that", 4],
287+
[2, 2, "ii.select.fuzzy.input.actual", "this", 5],
288+
[2, 3, "ii.select.fuzzy.n_hits", 2, 6],
289+
[1, 2, "ii.select.n_hits", 2, 7],
290+
[1, 0, "ii.select.input", "ere", 8],
291+
[1, 1, "ii.select.operator", "or", 9],
292+
[2, 0, "ii.select.exact.n_hits", 2, 10],
293+
[1, 2, "ii.select.n_hits", 2, 11],
294+
]
295+
# Column-based for PyArrow
296+
trace_logs = list(zip(*trace_logs))
297+
# value: Row Python values to union array
298+
values = trace_logs[3]
299+
value_types = []
300+
value_offsets = []
301+
value_offset_dictionary = {
302+
int: 0,
303+
str: 0,
304+
}
305+
value_child_dictionary = {
306+
int: [],
307+
str: [],
308+
}
309+
for value in values:
310+
value_types.append(value_type_dictionary[type(value)])
311+
value_offsets.append(value_offset_dictionary[type(value)])
312+
value_offset_dictionary[type(value)] += 1
313+
value_child_dictionary[type(value)].append(value)
314+
value_children = [
315+
pa.array(child, type=value_type[i].type) for i, child in enumerate(value_child_dictionary.values())
316+
]
317+
value_type_field_names = [value_type.field(i).name for i in range(value_type.num_fields)]
318+
trace_logs[3] = pa.UnionArray.from_dense(
319+
pa.array(value_types, type=pa.int8()),
320+
pa.array(value_offsets, type=pa.int32()),
321+
value_children,
322+
value_type_field_names,
323+
value_type.type_codes,
324+
)
325+
trace_log_record_batch = pa.record_batch(trace_logs, schema=trace_log_schema)
326+
327+
result_set_fields = [
328+
pa.field("content", pa.string()),
329+
]
330+
result_set_metadata = {"GROONGA:n_hits": "2"}
331+
result_set_schema = pa.schema(result_set_fields, result_set_metadata)
332+
result_set = [
333+
["This is a pen", "That is a pen"],
334+
]
335+
result_set_record_batch = pa.record_batch(result_set, schema=result_set_schema)
336+
output = pa.BufferOutputStream()
337+
with pa.RecordBatchStreamWriter(output, metadata_schema) as writer:
338+
writer.write(metadata_record_batch)
339+
with pa.RecordBatchStreamWriter(output, trace_log_schema) as writer:
340+
writer.write(trace_log_record_batch)
341+
with pa.RecordBatchStreamWriter(output, result_set_schema) as writer:
342+
writer.write(result_set_record_batch)
343+
m.read.side_effect = [output.getvalue().to_pybytes()]
344+
m.headers = {
345+
"content-type": "application/x-apache-arrow-streaming",
346+
}
347+
mock_urlopen.return_value = m
348+
ret = self.g.call(
349+
"select", command_version="3", table="Site", output_type="apache-arrow", output_trace_log="yes"
350+
)
351+
self.assertEqual(ret.status, 0)
352+
self.assertEqual(ret.start_time, 1337566253.89858)
353+
self.assertEqual(ret.elapsed, 0.000354)
354+
self.assertEqual(ret.hit_num, 2)
355+
self.assertEqual(ret.trace_logs, pa.Table.from_batches([trace_log_record_batch]))
356+
self.assertEqual(ret.items, pa.Table.from_batches([result_set_record_batch]))
357+
190358

191359
class PoyongaHTTPSTestCase(unittest.TestCase):
192360
def setUp(self):

0 commit comments

Comments
 (0)