diff --git a/impala/exec_summary.py b/impala/exec_summary.py new file mode 100755 index 000000000..96f06ced4 --- /dev/null +++ b/impala/exec_summary.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from impala._thrift_gen.ExecStats.ttypes import TExecStats + + +def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output, + is_prettyprint=True, separate_prefix_column=False): + """Direct translation of Coordinator::PrintExecSummary() to recursively build a list + of rows of summary statistics, one per exec node + + summary: the TExecSummary object that contains all the summary data + + idx: the index of the node to print + + indent_level: the number of spaces to print before writing the node's label, to give + the appearance of a tree. The 0th child of a node has the same indent_level as its + parent. All other children have an indent_level of one greater than their parent. + + new_indent_level: If true, this indent level is different from the previous row's. + + output: the list of rows into which to append the rows produced for this node and its + children. + + is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty + printed format. + + separate_prefix_column: Optional. If True, the prefix and operator name will be + returned as separate column. Otherwise, prefix and operater name will be concatenated + into single column. + + Returns the index of the next exec node in summary.exec_nodes that should be + processed, used internally to this method only. + """ + if not summary.nodes: + # Summary nodes is empty or None. Nothing to build. + return + assert idx < len(summary.nodes), ( + "Index ({0}) must be less than exec summary count ({1})").format( + idx, len(summary.nodes)) + + attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"] + + # Initialise aggregate and maximum stats + agg_stats, max_stats = TExecStats(), TExecStats() + for attr in attrs: + setattr(agg_stats, attr, 0) + setattr(max_stats, attr, 0) + + node = summary.nodes[idx] + instances = 0 + if node.exec_stats: + # exec_stats is not None or an empty list. + instances = len(node.exec_stats) + for stats in node.exec_stats: + for attr in attrs: + val = getattr(stats, attr) + if val is not None: + setattr(agg_stats, attr, getattr(agg_stats, attr) + val) + setattr(max_stats, attr, max(getattr(max_stats, attr), val)) + avg_time = agg_stats.latency_ns / instances + else: + avg_time = 0 + + is_sink = node.node_id == -1 + # If the node is a broadcast-receiving exchange node, the cardinality of rows produced + # is the max over all instances (which should all have received the same number of + # rows). Otherwise, the cardinality is the sum over all instances which process + # disjoint partitions. + if is_sink: + cardinality = -1 + elif node.is_broadcast: + cardinality = max_stats.cardinality + else: + cardinality = agg_stats.cardinality + + est_stats = node.estimated_stats + label_prefix = "" + if indent_level > 0: + label_prefix = "|" + label_prefix += " |" * (indent_level - 1) + if new_indent_level: + label_prefix += "--" + else: + label_prefix += " " + + def prettyprint(val, units, divisor): + for unit in units: + if val < divisor: + if unit == units[0]: + return "%d%s" % (val, unit) + else: + return "%3.2f%s" % (val, unit) + val /= divisor + + def prettyprint_bytes(byte_val): + return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0) + + def prettyprint_units(unit_val): + return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0) + + def prettyprint_time(time_val): + return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) + + latency = max_stats.latency_ns + cardinality_est = est_stats.cardinality + memory_used = max_stats.memory_used + memory_est = est_stats.memory_used + if (is_prettyprint): + avg_time = prettyprint_time(avg_time) + latency = prettyprint_time(latency) + cardinality = "" if is_sink else prettyprint_units(cardinality) + cardinality_est = "" if is_sink else prettyprint_units(cardinality_est) + memory_used = prettyprint_bytes(memory_used) + memory_est = prettyprint_bytes(memory_est) + + row = list() + if separate_prefix_column: + row.append(label_prefix) + row.append(node.label) + else: + row.append(label_prefix + node.label) + + row.extend([ + node.num_hosts, + instances, + avg_time, + latency, + cardinality, + cardinality_est, + memory_used, + memory_est, + node.label_detail]) + + output.append(row) + try: + sender_idx = summary.exch_to_sender_map[idx] + # This is an exchange node or a join node with a separate builder, so the source + # is a fragment root, and should be printed next. + sender_indent_level = indent_level + node.num_children + sender_new_indent_level = node.num_children > 0 + build_exec_summary_table(summary, sender_idx, sender_indent_level, + sender_new_indent_level, output, is_prettyprint, + separate_prefix_column) + except (KeyError, TypeError): + # Fall through if idx not in map, or if exch_to_sender_map itself is not set + pass + + idx += 1 + if node.num_children > 0: + first_child_output = [] + idx = build_exec_summary_table(summary, idx, indent_level, False, first_child_output, + is_prettyprint, separate_prefix_column) + for _ in range(1, node.num_children): + # All other children are indented + idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output, + is_prettyprint, separate_prefix_column) + output += first_child_output + return idx diff --git a/impala/hiveserver2.py b/impala/hiveserver2.py index 258c6f9dd..daab9d56c 100644 --- a/impala/hiveserver2.py +++ b/impala/hiveserver2.py @@ -29,6 +29,7 @@ from thrift.transport.TTransport import TTransportException from thrift.Thrift import TApplicationException from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated +from impala._thrift_gen.ExecStats.ttypes import TExecStats from impala._thrift_gen.TCLIService.ttypes import ( TOpenSessionReq, TFetchResultsReq, TCloseSessionReq, TExecuteStatementReq, TGetInfoReq, TGetInfoType, TTypeId, @@ -44,6 +45,7 @@ from impala.compat import (Decimal, _xrange as xrange) from impala.error import (NotSupportedError, OperationalError, ProgrammingError, HiveServer2Error, HttpError) +from impala.exec_summary import build_exec_summary_table from impala.interface import Connection, Cursor, _bind_parameters from impala.util import get_logger_and_init_null @@ -727,8 +729,9 @@ def get_summary(self): def build_summary_table(self, summary, output, idx=0, is_fragment_root=False, indent_level=0): - return build_summary_table(summary, idx, is_fragment_root, - indent_level, output) + return build_exec_summary_table( + summary, idx, indent_level, is_fragment_root, output, is_prettyprint=True, + separate_prefix_column=False) def get_databases(self): def op(): @@ -1550,122 +1553,3 @@ def get_result_schema(self): log.debug('get_result_schema: schema=%s', schema) return schema - - -def build_summary_table(summary, idx, is_fragment_root, indent_level, output): - """Direct translation of Coordinator::PrintExecSummary() to recursively - build a list of rows of summary statistics, one per exec node - - summary: the TExecSummary object that contains all the summary data - - idx: the index of the node to print - - is_fragment_root: true if the node to print is the root of a fragment (and - therefore feeds into an exchange) - - indent_level: the number of spaces to print before writing the node's - label, to give the appearance of a tree. The 0th child of a node has the - same indent_level as its parent. All other children have an indent_level - of one greater than their parent. - - output: the list of rows into which to append the rows produced for this - node and its children. - - Returns the index of the next exec node in summary.exec_nodes that should - be processed, used internally to this method only. - """ - # pylint: disable=too-many-locals - - attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"] - - # Initialise aggregate and maximum stats - agg_stats, max_stats = TExecStats(), TExecStats() - for attr in attrs: - setattr(agg_stats, attr, 0) - setattr(max_stats, attr, 0) - - node = summary.nodes[idx] - for stats in node.exec_stats: - for attr in attrs: - val = getattr(stats, attr) - if val is not None: - setattr(agg_stats, attr, getattr(agg_stats, attr) + val) - setattr(max_stats, attr, max(getattr(max_stats, attr), val)) - - if len(node.exec_stats) > 0: - avg_time = agg_stats.latency_ns / len(node.exec_stats) - else: - avg_time = 0 - - # If the node is a broadcast-receiving exchange node, the cardinality of - # rows produced is the max over all instances (which should all have - # received the same number of rows). Otherwise, the cardinality is the sum - # over all instances which process disjoint partitions. - if node.is_broadcast and is_fragment_root: - cardinality = max_stats.cardinality - else: - cardinality = agg_stats.cardinality - - est_stats = node.estimated_stats - label_prefix = "" - if indent_level > 0: - label_prefix = "|" - if is_fragment_root: - label_prefix += " " * indent_level - else: - label_prefix += "--" * indent_level - - def prettyprint(val, units, divisor): - for unit in units: - if val < divisor: - if unit == units[0]: - return "%d%s" % (val, unit) - else: - return "%3.2f%s" % (val, unit) - val /= divisor - - def prettyprint_bytes(byte_val): - return prettyprint( - byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0) - - def prettyprint_units(unit_val): - return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0) - - def prettyprint_time(time_val): - return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) - - row = [label_prefix + node.label, - len(node.exec_stats), - prettyprint_time(avg_time), - prettyprint_time(max_stats.latency_ns), - prettyprint_units(cardinality), - prettyprint_units(est_stats.cardinality), - prettyprint_bytes(max_stats.memory_used), - prettyprint_bytes(est_stats.memory_used), - node.label_detail] - - output.append(row) - try: - sender_idx = summary.exch_to_sender_map[idx] - # This is an exchange node, so the sender is a fragment root, and - # should be printed next. - build_summary_table(summary, sender_idx, True, indent_level, output) - except (KeyError, TypeError): - # Fall through if idx not in map, or if exch_to_sender_map itself is - # not set - pass - - idx += 1 - if node.num_children > 0: - first_child_output = [] - idx = build_summary_table(summary, idx, False, indent_level, - first_child_output) - # pylint: disable=unused-variable - # TODO: is child_idx supposed to be unused? See #120 - for child_idx in range(1, node.num_children): - # All other children are indented (we only have 0, 1 or 2 children - # for every exec node at the moment) - idx = build_summary_table(summary, idx, False, indent_level + 1, - output) - output += first_child_output - return idx diff --git a/impala/tests/test_impala.py b/impala/tests/test_impala.py index 0be7ef74e..e902d60ee 100644 --- a/impala/tests/test_impala.py +++ b/impala/tests/test_impala.py @@ -112,3 +112,67 @@ def validate_log(cur): assert len(node.exec_stats) >= node.num_hosts profile = cur.get_profile() assert profile is not None + +def test_build_summary_table(tmp_db, cur, empty_table): + """Test build_exec_summary function of impyla. + """ + tmp_db_lower = tmp_db.lower() + # Assert column Operator, #Host, #Inst, #Rows, Est. #Rows, Est. Peak Mem, and Detail. + # Skip column Avg Time, Max Time, and Peak Mem. + + def skip_cols(row): + assert len(row) == 10, row + output = list(row) + del output[7] + del output[4] + del output[3] + return output + + def validate_summary_table(table, expected): + for i in range(0, len(expected)): + row = skip_cols(table[i]) + assert expected[i] == row, 'Expect {0} but found {1}'.format( + str(expected[i]), str(row)) + + query = """SELECT * FROM {0} a INNER JOIN {1} b ON (a.i = b.i)""".format( + empty_table, empty_table) + cur.execute(query) + cur.fetchall() + summary = cur.get_summary() + output_dop_0 = list() + cur.build_summary_table(summary, output_dop_0) + assert len(output_dop_0) == 8, output_dop_0 + expected_dop_0 = [ + ['F02:ROOT', 1, 1, '', '', '4.00 MB', ''], + ['04:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'UNPARTITIONED'], + ['F00:EXCHANGE SENDER', 1, 1, '', '', '64.00 KB', ''], + ['02:HASH JOIN', 1, 1, '0', '0', '1.94 MB', 'INNER JOIN, BROADCAST'], + ['|--03:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'BROADCAST'], + ['| F01:EXCHANGE SENDER', 1, 1, '', '', '32.00 KB', ''], + ['| 01:SCAN HDFS', 1, 1, '0', '0', '0 B', + '{0}.{1} b'.format(tmp_db_lower, empty_table)], + ['00:SCAN HDFS', 1, 1, '0', '0', '0 B', + '{0}.{1} a'.format(tmp_db_lower, empty_table)], + ] + validate_summary_table(output_dop_0, expected_dop_0) + + cur.execute(query, configuration={'mt_dop': '2'}) + cur.fetchall() + summary = cur.get_summary() + output_dop_2 = list() + cur.build_summary_table(summary, output_dop_2) + assert len(output_dop_2) == 9, output_dop_2 + expected_dop_2 = [ + ['F02:ROOT', 1, 1, '', '', '4.00 MB', ''], + ['04:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'UNPARTITIONED'], + ['F00:EXCHANGE SENDER', 1, 1, '', '', '64.00 KB', ''], + ['02:HASH JOIN', 1, 1, '0', '0', '0 B', 'INNER JOIN, BROADCAST'], + ['|--F03:JOIN BUILD', 1, 1, '', '', '3.88 MB', ''], + ['| 03:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'BROADCAST'], + ['| F01:EXCHANGE SENDER', 1, 1, '', '', '32.00 KB', ''], + ['| 01:SCAN HDFS', 1, 1, '0', '0', '0 B', + '{0}.{1} b'.format(tmp_db_lower, empty_table)], + ['00:SCAN HDFS', 1, 1, '0', '0', '0 B', + '{0}.{1} a'.format(tmp_db_lower, empty_table)], + ] + validate_summary_table(output_dop_2, expected_dop_2)