From f8e64fc52d01ea9dd17c35a44f45aa3eacf4d4d4 Mon Sep 17 00:00:00 2001 From: Riza Suminto Date: Tue, 11 Mar 2025 14:18:16 -0700 Subject: [PATCH 1/3] Update build_summary_table function This patch update build_summary_table to match the same function in impala-shell. https://github.com/apache/impala/blob/a07bf84/shell/impala_client.py#L113 Testing: Run and pass following command ``` tox -- -ktest_build_summary_table ``` --- impala/hiveserver2.py | 114 ++++++++++++++++++++++++------------ impala/tests/test_impala.py | 66 +++++++++++++++++++++ 2 files changed, 143 insertions(+), 37 deletions(-) diff --git a/impala/hiveserver2.py b/impala/hiveserver2.py index 258c6f9dd..3cca1cdef 100644 --- a/impala/hiveserver2.py +++ b/impala/hiveserver2.py @@ -29,6 +29,7 @@ from thrift.transport.TTransport import TTransportException from thrift.Thrift import TApplicationException from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated +from impala._thrift_gen.ExecStats.ttypes import TExecStats from impala._thrift_gen.TCLIService.ttypes import ( TOpenSessionReq, TFetchResultsReq, TCloseSessionReq, TExecuteStatementReq, TGetInfoReq, TGetInfoType, TTypeId, @@ -727,8 +728,9 @@ def get_summary(self): def build_summary_table(self, summary, output, idx=0, is_fragment_root=False, indent_level=0): - return build_summary_table(summary, idx, is_fragment_root, - indent_level, output) + return build_exec_summary_table( + summary, idx, indent_level, is_fragment_root, output, is_prettyprint=True, + separate_prefix_column=False) def get_databases(self): def op(): @@ -1551,8 +1553,8 @@ def get_result_schema(self): return schema - -def build_summary_table(summary, idx, is_fragment_root, indent_level, output): +def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output, + is_prettyprint=True, separate_prefix_column=False): """Direct translation of Coordinator::PrintExecSummary() to recursively build a list of rows of summary statistics, one per exec node @@ -1560,17 +1562,23 @@ def build_summary_table(summary, idx, is_fragment_root, indent_level, output): idx: the index of the node to print - is_fragment_root: true if the node to print is the root of a fragment (and - therefore feeds into an exchange) - indent_level: the number of spaces to print before writing the node's label, to give the appearance of a tree. The 0th child of a node has the same indent_level as its parent. All other children have an indent_level of one greater than their parent. + new_indent_level: If true, this indent level is different from the previous row's. + output: the list of rows into which to append the rows produced for this node and its children. + is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty + printed format. + + separate_prefix_column: Optional. If True, the prefix and operator name will be + returned as separate column. Otherwise, prefix and operater name will be concatenated + into single column. + Returns the index of the next exec node in summary.exec_nodes that should be processed, used internally to this method only. """ @@ -1585,23 +1593,27 @@ def build_summary_table(summary, idx, is_fragment_root, indent_level, output): setattr(max_stats, attr, 0) node = summary.nodes[idx] - for stats in node.exec_stats: - for attr in attrs: - val = getattr(stats, attr) - if val is not None: - setattr(agg_stats, attr, getattr(agg_stats, attr) + val) - setattr(max_stats, attr, max(getattr(max_stats, attr), val)) - - if len(node.exec_stats) > 0: + if node.exec_stats is not None: + for stats in node.exec_stats: + for attr in attrs: + val = getattr(stats, attr) + if val is not None: + setattr(agg_stats, attr, getattr(agg_stats, attr) + val) + setattr(max_stats, attr, max(getattr(max_stats, attr), val)) + + if node.exec_stats is not None and node.exec_stats: avg_time = agg_stats.latency_ns / len(node.exec_stats) else: avg_time = 0 + is_sink = node.node_id == -1 # If the node is a broadcast-receiving exchange node, the cardinality of # rows produced is the max over all instances (which should all have # received the same number of rows). Otherwise, the cardinality is the sum # over all instances which process disjoint partitions. - if node.is_broadcast and is_fragment_root: + if is_sink: + cardinality = -1 + elif node.is_broadcast: cardinality = max_stats.cardinality else: cardinality = agg_stats.cardinality @@ -1610,10 +1622,11 @@ def build_summary_table(summary, idx, is_fragment_root, indent_level, output): label_prefix = "" if indent_level > 0: label_prefix = "|" - if is_fragment_root: - label_prefix += " " * indent_level + label_prefix += " |" * (indent_level - 1) + if new_indent_level: + label_prefix += "--" else: - label_prefix += "--" * indent_level + label_prefix += " " def prettyprint(val, units, divisor): for unit in units: @@ -1634,22 +1647,49 @@ def prettyprint_units(unit_val): def prettyprint_time(time_val): return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) - row = [label_prefix + node.label, - len(node.exec_stats), - prettyprint_time(avg_time), - prettyprint_time(max_stats.latency_ns), - prettyprint_units(cardinality), - prettyprint_units(est_stats.cardinality), - prettyprint_bytes(max_stats.memory_used), - prettyprint_bytes(est_stats.memory_used), - node.label_detail] + instances = 0 + if node.exec_stats is not None: + instances = len(node.exec_stats) + latency = max_stats.latency_ns + cardinality_est = est_stats.cardinality + memory_used = max_stats.memory_used + memory_est = est_stats.memory_used + if (is_prettyprint): + avg_time = prettyprint_time(avg_time) + latency = prettyprint_time(latency) + cardinality = "" if is_sink else prettyprint_units(cardinality) + cardinality_est = "" if is_sink else prettyprint_units(cardinality_est) + memory_used = prettyprint_bytes(memory_used) + memory_est = prettyprint_bytes(memory_est) + + row = list() + if separate_prefix_column: + row.append(label_prefix) + row.append(node.label) + else: + row.append(label_prefix + node.label) + + row.extend([ + node.num_hosts, + instances, + avg_time, + latency, + cardinality, + cardinality_est, + memory_used, + memory_est, + node.label_detail]) output.append(row) try: sender_idx = summary.exch_to_sender_map[idx] - # This is an exchange node, so the sender is a fragment root, and - # should be printed next. - build_summary_table(summary, sender_idx, True, indent_level, output) + # This is an exchange node or a join node with a separate builder, so the source + # is a fragment root, and should be printed next. + sender_indent_level = indent_level + node.num_children + sender_new_indent_level = node.num_children > 0 + build_exec_summary_table(summary, sender_idx, sender_indent_level, + sender_new_indent_level, output, is_prettyprint, + separate_prefix_column) except (KeyError, TypeError): # Fall through if idx not in map, or if exch_to_sender_map itself is # not set @@ -1658,14 +1698,14 @@ def prettyprint_time(time_val): idx += 1 if node.num_children > 0: first_child_output = [] - idx = build_summary_table(summary, idx, False, indent_level, - first_child_output) + idx = build_exec_summary_table(summary, idx, indent_level, False, + first_child_output, is_prettyprint, + separate_prefix_column) # pylint: disable=unused-variable - # TODO: is child_idx supposed to be unused? See #120 - for child_idx in range(1, node.num_children): + for child_idx in xrange(1, node.num_children): # All other children are indented (we only have 0, 1 or 2 children # for every exec node at the moment) - idx = build_summary_table(summary, idx, False, indent_level + 1, - output) + idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output, + is_prettyprint, separate_prefix_column) output += first_child_output return idx diff --git a/impala/tests/test_impala.py b/impala/tests/test_impala.py index 0be7ef74e..a766c0ec6 100644 --- a/impala/tests/test_impala.py +++ b/impala/tests/test_impala.py @@ -112,3 +112,69 @@ def validate_log(cur): assert len(node.exec_stats) >= node.num_hosts profile = cur.get_profile() assert profile is not None + +def test_build_summary_table(tmp_db, cur, empty_table): + """Test build_exec_summary function of impyla. + """ + tmp_db_lower = tmp_db.lower() + # Assert column Operator, #Host, #Inst, #Rows, Est. #Rows, Est. Peak Mem, and Detail. + # Skip column Avg Time, Max Time, and Peak Mem. + + def skip_cols(row): + assert len(row) == 10, row + output = list(row) + del output[7] + del output[4] + del output[3] + return output + + def validate_summary_table(table, expected): + for i in range(0, len(expected)): + row = skip_cols(table[i]) + assert expected[i] == row, 'Expect {0} but found {1}'.format( + str(expected[i]), str(row)) + + query = """SELECT * FROM {0} a INNER JOIN {1} b ON (a.i = b.i)""".format( + empty_table, empty_table) + cur.execute(query) + cur.fetchall() + summary = cur.get_summary() + output_dop_0 = list() + cur.build_summary_table(summary, output_dop_0) + assert len(output_dop_0) == 8, output_dop_0 + expected_dop_0 = [ + ['F02:ROOT', 1, 1, '', '', '4.00 MB', ''], + ['04:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'UNPARTITIONED'], + ['F00:EXCHANGE SENDER', 1, 1, '', '', '64.00 KB', ''], + ['02:HASH JOIN', 1, 1, '0', '0', '1.94 MB', 'INNER JOIN, BROADCAST'], + ['|--03:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'BROADCAST'], + ['| F01:EXCHANGE SENDER', 1, 1, '', '', '32.00 KB', ''], + ['| 01:SCAN HDFS', 1, 1, '0', '0', '0 B', + '{0}.{1} b'.format(tmp_db_lower, empty_table)], + ['00:SCAN HDFS', 1, 1, '0', '0', '0 B', + '{0}.{1} a'.format(tmp_db_lower, empty_table)], + ] + validate_summary_table(output_dop_0, expected_dop_0) + cur.close_operation() + + cur.execute(query, configuration={'mt_dop': '2'}) + cur.fetchall() + summary = cur.get_summary() + output_dop_2 = list() + cur.build_summary_table(summary, output_dop_2) + assert len(output_dop_2) == 9, output_dop_2 + expected_dop_2 = [ + ['F02:ROOT', 1, 1, '', '', '4.00 MB', ''], + ['04:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'UNPARTITIONED'], + ['F00:EXCHANGE SENDER', 1, 1, '', '', '64.00 KB', ''], + ['02:HASH JOIN', 1, 1, '0', '0', '0 B', 'INNER JOIN, BROADCAST'], + ['|--F03:JOIN BUILD', 1, 1, '', '', '3.88 MB', ''], + ['| 03:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'BROADCAST'], + ['| F01:EXCHANGE SENDER', 1, 1, '', '', '32.00 KB', ''], + ['| 01:SCAN HDFS', 1, 1, '0', '0', '0 B', + '{0}.{1} b'.format(tmp_db_lower, empty_table)], + ['00:SCAN HDFS', 1, 1, '0', '0', '0 B', + '{0}.{1} a'.format(tmp_db_lower, empty_table)], + ] + validate_summary_table(output_dop_2, expected_dop_2) + cur.close_operation() From 2287e2469d7f0306ac75788070548116408a6e79 Mon Sep 17 00:00:00 2001 From: Riza Suminto Date: Fri, 28 Mar 2025 09:46:45 -0700 Subject: [PATCH 2/3] Copy exec_summary.py from https://github.com/apache/impala/commit/e73e2d40da115ed3804ffaecc0850c853b0e6330 --- impala/exec_summary.py | 176 ++++++++++++++++++++++++++++++++++++ impala/hiveserver2.py | 158 +------------------------------- impala/tests/test_impala.py | 1 - 3 files changed, 177 insertions(+), 158 deletions(-) create mode 100755 impala/exec_summary.py diff --git a/impala/exec_summary.py b/impala/exec_summary.py new file mode 100755 index 000000000..96f06ced4 --- /dev/null +++ b/impala/exec_summary.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from impala._thrift_gen.ExecStats.ttypes import TExecStats + + +def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output, + is_prettyprint=True, separate_prefix_column=False): + """Direct translation of Coordinator::PrintExecSummary() to recursively build a list + of rows of summary statistics, one per exec node + + summary: the TExecSummary object that contains all the summary data + + idx: the index of the node to print + + indent_level: the number of spaces to print before writing the node's label, to give + the appearance of a tree. The 0th child of a node has the same indent_level as its + parent. All other children have an indent_level of one greater than their parent. + + new_indent_level: If true, this indent level is different from the previous row's. + + output: the list of rows into which to append the rows produced for this node and its + children. + + is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty + printed format. + + separate_prefix_column: Optional. If True, the prefix and operator name will be + returned as separate column. Otherwise, prefix and operater name will be concatenated + into single column. + + Returns the index of the next exec node in summary.exec_nodes that should be + processed, used internally to this method only. + """ + if not summary.nodes: + # Summary nodes is empty or None. Nothing to build. + return + assert idx < len(summary.nodes), ( + "Index ({0}) must be less than exec summary count ({1})").format( + idx, len(summary.nodes)) + + attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"] + + # Initialise aggregate and maximum stats + agg_stats, max_stats = TExecStats(), TExecStats() + for attr in attrs: + setattr(agg_stats, attr, 0) + setattr(max_stats, attr, 0) + + node = summary.nodes[idx] + instances = 0 + if node.exec_stats: + # exec_stats is not None or an empty list. + instances = len(node.exec_stats) + for stats in node.exec_stats: + for attr in attrs: + val = getattr(stats, attr) + if val is not None: + setattr(agg_stats, attr, getattr(agg_stats, attr) + val) + setattr(max_stats, attr, max(getattr(max_stats, attr), val)) + avg_time = agg_stats.latency_ns / instances + else: + avg_time = 0 + + is_sink = node.node_id == -1 + # If the node is a broadcast-receiving exchange node, the cardinality of rows produced + # is the max over all instances (which should all have received the same number of + # rows). Otherwise, the cardinality is the sum over all instances which process + # disjoint partitions. + if is_sink: + cardinality = -1 + elif node.is_broadcast: + cardinality = max_stats.cardinality + else: + cardinality = agg_stats.cardinality + + est_stats = node.estimated_stats + label_prefix = "" + if indent_level > 0: + label_prefix = "|" + label_prefix += " |" * (indent_level - 1) + if new_indent_level: + label_prefix += "--" + else: + label_prefix += " " + + def prettyprint(val, units, divisor): + for unit in units: + if val < divisor: + if unit == units[0]: + return "%d%s" % (val, unit) + else: + return "%3.2f%s" % (val, unit) + val /= divisor + + def prettyprint_bytes(byte_val): + return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0) + + def prettyprint_units(unit_val): + return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0) + + def prettyprint_time(time_val): + return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) + + latency = max_stats.latency_ns + cardinality_est = est_stats.cardinality + memory_used = max_stats.memory_used + memory_est = est_stats.memory_used + if (is_prettyprint): + avg_time = prettyprint_time(avg_time) + latency = prettyprint_time(latency) + cardinality = "" if is_sink else prettyprint_units(cardinality) + cardinality_est = "" if is_sink else prettyprint_units(cardinality_est) + memory_used = prettyprint_bytes(memory_used) + memory_est = prettyprint_bytes(memory_est) + + row = list() + if separate_prefix_column: + row.append(label_prefix) + row.append(node.label) + else: + row.append(label_prefix + node.label) + + row.extend([ + node.num_hosts, + instances, + avg_time, + latency, + cardinality, + cardinality_est, + memory_used, + memory_est, + node.label_detail]) + + output.append(row) + try: + sender_idx = summary.exch_to_sender_map[idx] + # This is an exchange node or a join node with a separate builder, so the source + # is a fragment root, and should be printed next. + sender_indent_level = indent_level + node.num_children + sender_new_indent_level = node.num_children > 0 + build_exec_summary_table(summary, sender_idx, sender_indent_level, + sender_new_indent_level, output, is_prettyprint, + separate_prefix_column) + except (KeyError, TypeError): + # Fall through if idx not in map, or if exch_to_sender_map itself is not set + pass + + idx += 1 + if node.num_children > 0: + first_child_output = [] + idx = build_exec_summary_table(summary, idx, indent_level, False, first_child_output, + is_prettyprint, separate_prefix_column) + for _ in range(1, node.num_children): + # All other children are indented + idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output, + is_prettyprint, separate_prefix_column) + output += first_child_output + return idx diff --git a/impala/hiveserver2.py b/impala/hiveserver2.py index 3cca1cdef..daab9d56c 100644 --- a/impala/hiveserver2.py +++ b/impala/hiveserver2.py @@ -45,6 +45,7 @@ from impala.compat import (Decimal, _xrange as xrange) from impala.error import (NotSupportedError, OperationalError, ProgrammingError, HiveServer2Error, HttpError) +from impala.exec_summary import build_exec_summary_table from impala.interface import Connection, Cursor, _bind_parameters from impala.util import get_logger_and_init_null @@ -1552,160 +1553,3 @@ def get_result_schema(self): log.debug('get_result_schema: schema=%s', schema) return schema - -def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output, - is_prettyprint=True, separate_prefix_column=False): - """Direct translation of Coordinator::PrintExecSummary() to recursively - build a list of rows of summary statistics, one per exec node - - summary: the TExecSummary object that contains all the summary data - - idx: the index of the node to print - - indent_level: the number of spaces to print before writing the node's - label, to give the appearance of a tree. The 0th child of a node has the - same indent_level as its parent. All other children have an indent_level - of one greater than their parent. - - new_indent_level: If true, this indent level is different from the previous row's. - - output: the list of rows into which to append the rows produced for this - node and its children. - - is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty - printed format. - - separate_prefix_column: Optional. If True, the prefix and operator name will be - returned as separate column. Otherwise, prefix and operater name will be concatenated - into single column. - - Returns the index of the next exec node in summary.exec_nodes that should - be processed, used internally to this method only. - """ - # pylint: disable=too-many-locals - - attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"] - - # Initialise aggregate and maximum stats - agg_stats, max_stats = TExecStats(), TExecStats() - for attr in attrs: - setattr(agg_stats, attr, 0) - setattr(max_stats, attr, 0) - - node = summary.nodes[idx] - if node.exec_stats is not None: - for stats in node.exec_stats: - for attr in attrs: - val = getattr(stats, attr) - if val is not None: - setattr(agg_stats, attr, getattr(agg_stats, attr) + val) - setattr(max_stats, attr, max(getattr(max_stats, attr), val)) - - if node.exec_stats is not None and node.exec_stats: - avg_time = agg_stats.latency_ns / len(node.exec_stats) - else: - avg_time = 0 - - is_sink = node.node_id == -1 - # If the node is a broadcast-receiving exchange node, the cardinality of - # rows produced is the max over all instances (which should all have - # received the same number of rows). Otherwise, the cardinality is the sum - # over all instances which process disjoint partitions. - if is_sink: - cardinality = -1 - elif node.is_broadcast: - cardinality = max_stats.cardinality - else: - cardinality = agg_stats.cardinality - - est_stats = node.estimated_stats - label_prefix = "" - if indent_level > 0: - label_prefix = "|" - label_prefix += " |" * (indent_level - 1) - if new_indent_level: - label_prefix += "--" - else: - label_prefix += " " - - def prettyprint(val, units, divisor): - for unit in units: - if val < divisor: - if unit == units[0]: - return "%d%s" % (val, unit) - else: - return "%3.2f%s" % (val, unit) - val /= divisor - - def prettyprint_bytes(byte_val): - return prettyprint( - byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0) - - def prettyprint_units(unit_val): - return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0) - - def prettyprint_time(time_val): - return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) - - instances = 0 - if node.exec_stats is not None: - instances = len(node.exec_stats) - latency = max_stats.latency_ns - cardinality_est = est_stats.cardinality - memory_used = max_stats.memory_used - memory_est = est_stats.memory_used - if (is_prettyprint): - avg_time = prettyprint_time(avg_time) - latency = prettyprint_time(latency) - cardinality = "" if is_sink else prettyprint_units(cardinality) - cardinality_est = "" if is_sink else prettyprint_units(cardinality_est) - memory_used = prettyprint_bytes(memory_used) - memory_est = prettyprint_bytes(memory_est) - - row = list() - if separate_prefix_column: - row.append(label_prefix) - row.append(node.label) - else: - row.append(label_prefix + node.label) - - row.extend([ - node.num_hosts, - instances, - avg_time, - latency, - cardinality, - cardinality_est, - memory_used, - memory_est, - node.label_detail]) - - output.append(row) - try: - sender_idx = summary.exch_to_sender_map[idx] - # This is an exchange node or a join node with a separate builder, so the source - # is a fragment root, and should be printed next. - sender_indent_level = indent_level + node.num_children - sender_new_indent_level = node.num_children > 0 - build_exec_summary_table(summary, sender_idx, sender_indent_level, - sender_new_indent_level, output, is_prettyprint, - separate_prefix_column) - except (KeyError, TypeError): - # Fall through if idx not in map, or if exch_to_sender_map itself is - # not set - pass - - idx += 1 - if node.num_children > 0: - first_child_output = [] - idx = build_exec_summary_table(summary, idx, indent_level, False, - first_child_output, is_prettyprint, - separate_prefix_column) - # pylint: disable=unused-variable - for child_idx in xrange(1, node.num_children): - # All other children are indented (we only have 0, 1 or 2 children - # for every exec node at the moment) - idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output, - is_prettyprint, separate_prefix_column) - output += first_child_output - return idx diff --git a/impala/tests/test_impala.py b/impala/tests/test_impala.py index a766c0ec6..87eceb65b 100644 --- a/impala/tests/test_impala.py +++ b/impala/tests/test_impala.py @@ -177,4 +177,3 @@ def validate_summary_table(table, expected): '{0}.{1} a'.format(tmp_db_lower, empty_table)], ] validate_summary_table(output_dop_2, expected_dop_2) - cur.close_operation() From 8f4015d8dc7642261ac28dc4c92fa5129cb105bc Mon Sep 17 00:00:00 2001 From: Riza Suminto Date: Fri, 28 Mar 2025 09:52:37 -0700 Subject: [PATCH 3/3] Remove one more cur.close_operation() --- impala/tests/test_impala.py | 1 - 1 file changed, 1 deletion(-) diff --git a/impala/tests/test_impala.py b/impala/tests/test_impala.py index 87eceb65b..e902d60ee 100644 --- a/impala/tests/test_impala.py +++ b/impala/tests/test_impala.py @@ -155,7 +155,6 @@ def validate_summary_table(table, expected): '{0}.{1} a'.format(tmp_db_lower, empty_table)], ] validate_summary_table(output_dop_0, expected_dop_0) - cur.close_operation() cur.execute(query, configuration={'mt_dop': '2'}) cur.fetchall()