Skip to content

Update build_summary_table function #578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions impala/exec_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from impala._thrift_gen.ExecStats.ttypes import TExecStats


def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output,
is_prettyprint=True, separate_prefix_column=False):
"""Direct translation of Coordinator::PrintExecSummary() to recursively build a list
of rows of summary statistics, one per exec node

summary: the TExecSummary object that contains all the summary data

idx: the index of the node to print

indent_level: the number of spaces to print before writing the node's label, to give
the appearance of a tree. The 0th child of a node has the same indent_level as its
parent. All other children have an indent_level of one greater than their parent.

new_indent_level: If true, this indent level is different from the previous row's.

output: the list of rows into which to append the rows produced for this node and its
children.

is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty
printed format.

separate_prefix_column: Optional. If True, the prefix and operator name will be
returned as separate column. Otherwise, prefix and operater name will be concatenated
into single column.

Returns the index of the next exec node in summary.exec_nodes that should be
processed, used internally to this method only.
"""
if not summary.nodes:
# Summary nodes is empty or None. Nothing to build.
return
assert idx < len(summary.nodes), (
"Index ({0}) must be less than exec summary count ({1})").format(
idx, len(summary.nodes))

attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]

# Initialise aggregate and maximum stats
agg_stats, max_stats = TExecStats(), TExecStats()
for attr in attrs:
setattr(agg_stats, attr, 0)
setattr(max_stats, attr, 0)

node = summary.nodes[idx]
instances = 0
if node.exec_stats:
# exec_stats is not None or an empty list.
instances = len(node.exec_stats)
for stats in node.exec_stats:
for attr in attrs:
val = getattr(stats, attr)
if val is not None:
setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
setattr(max_stats, attr, max(getattr(max_stats, attr), val))
avg_time = agg_stats.latency_ns / instances
else:
avg_time = 0

is_sink = node.node_id == -1
# If the node is a broadcast-receiving exchange node, the cardinality of rows produced
# is the max over all instances (which should all have received the same number of
# rows). Otherwise, the cardinality is the sum over all instances which process
# disjoint partitions.
if is_sink:
cardinality = -1
elif node.is_broadcast:
cardinality = max_stats.cardinality
else:
cardinality = agg_stats.cardinality

est_stats = node.estimated_stats
label_prefix = ""
if indent_level > 0:
label_prefix = "|"
label_prefix += " |" * (indent_level - 1)
if new_indent_level:
label_prefix += "--"
else:
label_prefix += " "

def prettyprint(val, units, divisor):
for unit in units:
if val < divisor:
if unit == units[0]:
return "%d%s" % (val, unit)
else:
return "%3.2f%s" % (val, unit)
val /= divisor

def prettyprint_bytes(byte_val):
return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)

def prettyprint_units(unit_val):
return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)

def prettyprint_time(time_val):
return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)

latency = max_stats.latency_ns
cardinality_est = est_stats.cardinality
memory_used = max_stats.memory_used
memory_est = est_stats.memory_used
if (is_prettyprint):
avg_time = prettyprint_time(avg_time)
latency = prettyprint_time(latency)
cardinality = "" if is_sink else prettyprint_units(cardinality)
cardinality_est = "" if is_sink else prettyprint_units(cardinality_est)
memory_used = prettyprint_bytes(memory_used)
memory_est = prettyprint_bytes(memory_est)

row = list()
if separate_prefix_column:
row.append(label_prefix)
row.append(node.label)
else:
row.append(label_prefix + node.label)

row.extend([
node.num_hosts,
instances,
avg_time,
latency,
cardinality,
cardinality_est,
memory_used,
memory_est,
node.label_detail])

output.append(row)
try:
sender_idx = summary.exch_to_sender_map[idx]
# This is an exchange node or a join node with a separate builder, so the source
# is a fragment root, and should be printed next.
sender_indent_level = indent_level + node.num_children
sender_new_indent_level = node.num_children > 0
build_exec_summary_table(summary, sender_idx, sender_indent_level,
sender_new_indent_level, output, is_prettyprint,
separate_prefix_column)
except (KeyError, TypeError):
# Fall through if idx not in map, or if exch_to_sender_map itself is not set
pass

idx += 1
if node.num_children > 0:
first_child_output = []
idx = build_exec_summary_table(summary, idx, indent_level, False, first_child_output,
is_prettyprint, separate_prefix_column)
for _ in range(1, node.num_children):
# All other children are indented
idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output,
is_prettyprint, separate_prefix_column)
output += first_child_output
return idx
126 changes: 5 additions & 121 deletions impala/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from thrift.transport.TTransport import TTransportException
from thrift.Thrift import TApplicationException
from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated
from impala._thrift_gen.ExecStats.ttypes import TExecStats
from impala._thrift_gen.TCLIService.ttypes import (
TOpenSessionReq, TFetchResultsReq, TCloseSessionReq,
TExecuteStatementReq, TGetInfoReq, TGetInfoType, TTypeId,
Expand All @@ -44,6 +45,7 @@
from impala.compat import (Decimal, _xrange as xrange)
from impala.error import (NotSupportedError, OperationalError,
ProgrammingError, HiveServer2Error, HttpError)
from impala.exec_summary import build_exec_summary_table
from impala.interface import Connection, Cursor, _bind_parameters
from impala.util import get_logger_and_init_null

Expand Down Expand Up @@ -727,8 +729,9 @@ def get_summary(self):

def build_summary_table(self, summary, output, idx=0,
is_fragment_root=False, indent_level=0):
return build_summary_table(summary, idx, is_fragment_root,
indent_level, output)
return build_exec_summary_table(
summary, idx, indent_level, is_fragment_root, output, is_prettyprint=True,
separate_prefix_column=False)

def get_databases(self):
def op():
Expand Down Expand Up @@ -1550,122 +1553,3 @@ def get_result_schema(self):
log.debug('get_result_schema: schema=%s', schema)

return schema


def build_summary_table(summary, idx, is_fragment_root, indent_level, output):
"""Direct translation of Coordinator::PrintExecSummary() to recursively
build a list of rows of summary statistics, one per exec node

summary: the TExecSummary object that contains all the summary data

idx: the index of the node to print

is_fragment_root: true if the node to print is the root of a fragment (and
therefore feeds into an exchange)

indent_level: the number of spaces to print before writing the node's
label, to give the appearance of a tree. The 0th child of a node has the
same indent_level as its parent. All other children have an indent_level
of one greater than their parent.

output: the list of rows into which to append the rows produced for this
node and its children.

Returns the index of the next exec node in summary.exec_nodes that should
be processed, used internally to this method only.
"""
# pylint: disable=too-many-locals

attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]

# Initialise aggregate and maximum stats
agg_stats, max_stats = TExecStats(), TExecStats()
for attr in attrs:
setattr(agg_stats, attr, 0)
setattr(max_stats, attr, 0)

node = summary.nodes[idx]
for stats in node.exec_stats:
for attr in attrs:
val = getattr(stats, attr)
if val is not None:
setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
setattr(max_stats, attr, max(getattr(max_stats, attr), val))

if len(node.exec_stats) > 0:
avg_time = agg_stats.latency_ns / len(node.exec_stats)
else:
avg_time = 0

# If the node is a broadcast-receiving exchange node, the cardinality of
# rows produced is the max over all instances (which should all have
# received the same number of rows). Otherwise, the cardinality is the sum
# over all instances which process disjoint partitions.
if node.is_broadcast and is_fragment_root:
cardinality = max_stats.cardinality
else:
cardinality = agg_stats.cardinality

est_stats = node.estimated_stats
label_prefix = ""
if indent_level > 0:
label_prefix = "|"
if is_fragment_root:
label_prefix += " " * indent_level
else:
label_prefix += "--" * indent_level

def prettyprint(val, units, divisor):
for unit in units:
if val < divisor:
if unit == units[0]:
return "%d%s" % (val, unit)
else:
return "%3.2f%s" % (val, unit)
val /= divisor

def prettyprint_bytes(byte_val):
return prettyprint(
byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)

def prettyprint_units(unit_val):
return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)

def prettyprint_time(time_val):
return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)

row = [label_prefix + node.label,
len(node.exec_stats),
prettyprint_time(avg_time),
prettyprint_time(max_stats.latency_ns),
prettyprint_units(cardinality),
prettyprint_units(est_stats.cardinality),
prettyprint_bytes(max_stats.memory_used),
prettyprint_bytes(est_stats.memory_used),
node.label_detail]

output.append(row)
try:
sender_idx = summary.exch_to_sender_map[idx]
# This is an exchange node, so the sender is a fragment root, and
# should be printed next.
build_summary_table(summary, sender_idx, True, indent_level, output)
except (KeyError, TypeError):
# Fall through if idx not in map, or if exch_to_sender_map itself is
# not set
pass

idx += 1
if node.num_children > 0:
first_child_output = []
idx = build_summary_table(summary, idx, False, indent_level,
first_child_output)
# pylint: disable=unused-variable
# TODO: is child_idx supposed to be unused? See #120
for child_idx in range(1, node.num_children):
# All other children are indented (we only have 0, 1 or 2 children
# for every exec node at the moment)
idx = build_summary_table(summary, idx, False, indent_level + 1,
output)
output += first_child_output
return idx
64 changes: 64 additions & 0 deletions impala/tests/test_impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,67 @@ def validate_log(cur):
assert len(node.exec_stats) >= node.num_hosts
profile = cur.get_profile()
assert profile is not None

def test_build_summary_table(tmp_db, cur, empty_table):
"""Test build_exec_summary function of impyla.
"""
tmp_db_lower = tmp_db.lower()
# Assert column Operator, #Host, #Inst, #Rows, Est. #Rows, Est. Peak Mem, and Detail.
# Skip column Avg Time, Max Time, and Peak Mem.

def skip_cols(row):
assert len(row) == 10, row
output = list(row)
del output[7]
del output[4]
del output[3]
return output

def validate_summary_table(table, expected):
for i in range(0, len(expected)):
row = skip_cols(table[i])
assert expected[i] == row, 'Expect {0} but found {1}'.format(
str(expected[i]), str(row))

query = """SELECT * FROM {0} a INNER JOIN {1} b ON (a.i = b.i)""".format(
empty_table, empty_table)
cur.execute(query)
cur.fetchall()
summary = cur.get_summary()
output_dop_0 = list()
cur.build_summary_table(summary, output_dop_0)
assert len(output_dop_0) == 8, output_dop_0
expected_dop_0 = [
['F02:ROOT', 1, 1, '', '', '4.00 MB', ''],
['04:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'UNPARTITIONED'],
['F00:EXCHANGE SENDER', 1, 1, '', '', '64.00 KB', ''],
['02:HASH JOIN', 1, 1, '0', '0', '1.94 MB', 'INNER JOIN, BROADCAST'],
['|--03:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'BROADCAST'],
['| F01:EXCHANGE SENDER', 1, 1, '', '', '32.00 KB', ''],
['| 01:SCAN HDFS', 1, 1, '0', '0', '0 B',
'{0}.{1} b'.format(tmp_db_lower, empty_table)],
['00:SCAN HDFS', 1, 1, '0', '0', '0 B',
'{0}.{1} a'.format(tmp_db_lower, empty_table)],
]
validate_summary_table(output_dop_0, expected_dop_0)

cur.execute(query, configuration={'mt_dop': '2'})
cur.fetchall()
summary = cur.get_summary()
output_dop_2 = list()
cur.build_summary_table(summary, output_dop_2)
assert len(output_dop_2) == 9, output_dop_2
expected_dop_2 = [
['F02:ROOT', 1, 1, '', '', '4.00 MB', ''],
['04:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'UNPARTITIONED'],
['F00:EXCHANGE SENDER', 1, 1, '', '', '64.00 KB', ''],
['02:HASH JOIN', 1, 1, '0', '0', '0 B', 'INNER JOIN, BROADCAST'],
['|--F03:JOIN BUILD', 1, 1, '', '', '3.88 MB', ''],
['| 03:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'BROADCAST'],
['| F01:EXCHANGE SENDER', 1, 1, '', '', '32.00 KB', ''],
['| 01:SCAN HDFS', 1, 1, '0', '0', '0 B',
'{0}.{1} b'.format(tmp_db_lower, empty_table)],
['00:SCAN HDFS', 1, 1, '0', '0', '0 B',
'{0}.{1} a'.format(tmp_db_lower, empty_table)],
]
validate_summary_table(output_dop_2, expected_dop_2)