Skip to content

Commit 696dc62

Browse files
committed
Some bug fix
1 parent 310cc3a commit 696dc62

File tree

4 files changed

+117
-175
lines changed

4 files changed

+117
-175
lines changed

cpp/arcticdb/util/query_stats.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ namespace arcticdb::util::query_stats {
1515
std::shared_ptr<StatsGroupLayer> QueryStats::current_layer(){
1616
// current_layer_ != nullptr && root_layer_ != nullptr -> stats has been setup; Nothing to do
1717
// current_layer_ == nullptr && root_layer_ == nullptr -> clean slate; Need to setup
18-
// current_layer_ != nullptr && root_layer_ == nullptr -> stats have been reset; Need to setup
18+
// current_layer_ != nullptr && root_layer_ == nullptr -> Something is off
1919
// current_layer_ == nullptr && root_layer_ != nullptr -> Something is off
2020
if (!thread_local_var_.current_layer_ || !thread_local_var_.root_layer_) {
2121
check(!async::is_folly_thread, "Folly thread should have its StatsGroupLayer passed by caller only");
22-
check(thread_local_var_.current_layer_ || !thread_local_var_.root_layer_, "QueryStats root_layer_ should be null if current_layer_ is null");
22+
check(thread_local_var_.current_layer_.operator bool() == thread_local_var_.root_layer_.operator bool(),
23+
"QueryStats root_layer_ and current_layer_ should be either both null or both non-null");
2324
thread_local_var_.root_layer_ = std::make_shared<StatsGroupLayer>();
2425
{
2526
std::lock_guard<std::mutex> lock(root_layer_mutex_);
@@ -44,7 +45,7 @@ void QueryStats::reset_stats() {
4445
check(!async::TaskScheduler::instance()->tasks_pending(), "Folly tasks are still running");
4546
std::lock_guard<std::mutex> lock(root_layer_mutex_);
4647
for (auto& layer : root_layers_) {
47-
layer.reset();
48+
layer->reset_stats();
4849
}
4950
}
5051

+68-109
Original file line numberDiff line numberDiff line change
@@ -1,130 +1,89 @@
1-
import time
2-
import pandas as pd
31
from contextlib import contextmanager
42
import numpy as np
53

6-
from arcticdb.exceptions import UserInputException
74
from arcticdb_ext.tools import QueryStats
5+
from arcticdb_ext.tools.QueryStats import StatsGroupName, StatsName
86

97
class QueryStatsTool:
10-
def __init__(self):
11-
self._create_time = time.time_ns()
12-
self._is_context_manager = False
13-
QueryStats.register_new_query_stat_tool()
8+
# Define enum values as lists since pybind11 enums are not iterable
9+
_STATS_NAME_VALUES = [StatsName.result_count, StatsName.total_time_ms, StatsName.count]
10+
_STATS_GROUP_NAME_VALUES = [StatsGroupName.arcticdb_call, StatsGroupName.key_type, StatsGroupName.storage_ops]
1411

15-
def __del__(self):
16-
QueryStats.deregister_query_stat_tool()
17-
18-
def __sub__(self, other):
19-
return self._populate_stats(other._create_time, self._create_time)
12+
@classmethod
13+
def context_manager(cls):
14+
@contextmanager
15+
def _func():
16+
cls.enable()
17+
yield
18+
cls.disable()
19+
return _func()
2020

21-
def _populate_stats(self, start_time, end_time):
22-
df = pd.DataFrame(QueryStats.get_stats())
23-
if df.empty:
24-
return {}
21+
@classmethod
22+
def get_query_stats(cls):
23+
# Get raw stats from C++ layer
24+
raw_stats = QueryStats.root_layers()
2525

26-
df["exec_time"] = pd.to_numeric(df["exec_time"], errors="coerce")
27-
df = df[df["exec_time"].between(start_time, end_time)]
28-
df = df.drop(columns=["exec_time"])
26+
# Transform raw stats into structured dictionary
27+
result = {}
2928

30-
if "result_count" in df.columns:
31-
df["result_count"] = pd.to_numeric(df["result_count"], errors="coerce")
29+
# Process each layer
30+
for layer in raw_stats:
31+
if layer:
32+
cls._process_layer(layer, result)
33+
34+
return result
35+
36+
@classmethod
37+
def _process_layer(cls, layer, current_dict):
38+
def _get_enum_name(enum_value):
39+
return str(enum_value).split('.')[-1]
3240

33-
groupby_cols = ["arcticdb_call", "stage", "key_type", "storage_op"]
41+
# Process stats array
42+
stats_array = layer.stats
43+
for stat_enum in cls._STATS_NAME_VALUES:
44+
stat_idx = int(stat_enum)
45+
if stats_array[stat_idx] > 0:
46+
stat_name = _get_enum_name(stat_enum)
47+
if stat_name not in current_dict:
48+
current_dict[stat_name] = stats_array[stat_idx]
49+
else:
50+
current_dict[stat_name] += stats_array[stat_idx]
3451

35-
for col in groupby_cols:
36-
if col not in df.columns:
37-
df[col] = pd.Series(dtype='object')
38-
39-
def process_time_values(time_values):
40-
time_buckets = {}
41-
for time_val in time_values:
42-
bucket = (time_val // 10) * 10
43-
time_buckets[str(bucket)] = time_buckets.get(str(bucket), 0) + 1
44-
return time_buckets
45-
46-
def get_non_grouped_times(data, current_level):
47-
# Only process NaN values for the current grouping level
48-
mask = data[current_level].isna()
49-
if not mask.any():
50-
return {}
52+
# Process next_layer_maps
53+
next_layer_maps = layer.next_layer_maps
54+
for group_enum in cls._STATS_GROUP_NAME_VALUES:
55+
group_idx = int(group_enum)
5156

52-
time_values = pd.to_numeric(data.loc[mask, "time"].dropna(), errors="coerce")
53-
if not time_values.empty:
54-
time_buckets = process_time_values(time_values)
55-
if time_buckets:
56-
return {"time": time_buckets}
57-
return {}
58-
59-
def process_group(group_data, is_leaf):
60-
result = {}
61-
62-
if is_leaf:
63-
numeric_cols = [col for col in group_data.columns if col not in groupby_cols and col != "time"]
64-
for col in numeric_cols:
65-
values = pd.to_numeric(group_data[col].dropna(), errors="coerce")
66-
if not values.empty:
67-
total = values.sum()
68-
if not np.isnan(total):
69-
result[col] = int(total)
57+
if not next_layer_maps[group_idx]:
58+
continue
7059

71-
time_values = pd.to_numeric(group_data["time"].dropna(), errors="coerce")
72-
if not time_values.empty:
73-
time_buckets = process_time_values(time_values)
74-
if time_buckets:
75-
result["time"] = time_buckets
60+
next_layer_map = next_layer_maps[group_idx]
7661

77-
return result
62+
# top level
63+
if group_enum == StatsGroupName.arcticdb_call:
64+
for op_name, op_layer in next_layer_map.items():
65+
if op_name not in current_dict:
66+
current_dict[op_name] = {}
67+
cls._process_layer(op_layer, current_dict[op_name])
68+
else:
69+
layer_type = _get_enum_name(group_enum)
7870

79-
def group_by_level(data, columns):
80-
if not columns:
81-
return process_group(data, True)
82-
83-
result = {}
84-
current_col = columns[0]
85-
86-
non_grouped = get_non_grouped_times(data, current_col)
87-
result.update(non_grouped)
88-
89-
grouped = data[~data[current_col].isna()].groupby(current_col)
90-
nested = {}
91-
92-
for name, group in grouped:
93-
sub_result = group_by_level(group, columns[1:])
94-
if sub_result:
95-
nested[str(name)] = sub_result
96-
97-
if nested:
98-
result[f"{current_col}s"] = nested
99-
100-
return result
101-
102-
result = {}
103-
for call_name, call_group in df.groupby("arcticdb_call"):
104-
if pd.isna(call_name):
105-
continue
106-
call_result = group_by_level(call_group, groupby_cols[1:])
107-
if call_result:
108-
result[str(call_name)] = call_result
109-
110-
return result
71+
if layer_type not in current_dict:
72+
current_dict[layer_type] = {}
73+
for sub_name, sub_layer in next_layer_map.items():
74+
if sub_name not in current_dict[layer_type]:
75+
current_dict[layer_type][sub_name] = {}
76+
cls._process_layer(sub_layer, current_dict[layer_type][sub_name])
11177

11278
@classmethod
113-
def context_manager(cls):
114-
@contextmanager
115-
def _func():
116-
query_stats_tools = cls()
117-
query_stats_tools._is_context_manager = True
118-
yield query_stats_tools
119-
query_stats_tools._end_time = time.time_ns()
120-
return _func()
79+
def reset_stats(cls):
80+
QueryStats.reset_stats()
12181

122-
def get_query_stats(self):
123-
if self._is_context_manager:
124-
return self._populate_stats(self._create_time, self._end_time)
125-
else:
126-
raise UserInputException("get_query_stats should be used with a context manager initialized QueryStatsTools")
82+
@classmethod
83+
def enable(cls):
84+
QueryStats.enable()
12785

12886
@classmethod
129-
def reset_stats(cls):
130-
QueryStats.reset_stats()
87+
def disable(cls):
88+
QueryStats.disable()
89+

python/tests/conftest.py

+7
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
SSL_TEST_SUPPORTED,
5656
)
5757
from arcticdb.storage_fixtures.utils import safer_rmtree
58+
from arcticdb.toolbox.query_stats import QueryStatsTool
5859

5960

6061
# region =================================== Misc. Constants & Setup ====================================
@@ -1089,3 +1090,9 @@ def in_memory_version_store_tiny_segment(in_memory_store_factory):
10891090
@pytest.fixture(params=["lmdb_version_store_tiny_segment", "in_memory_version_store_tiny_segment"])
10901091
def lmdb_or_in_memory_version_store_tiny_segment(request):
10911092
return request.getfixturevalue(request.param)
1093+
1094+
1095+
@pytest.fixture
1096+
def clear_query_stats():
1097+
yield
1098+
QueryStatsTool.reset_stats()
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,33 @@
11
from arcticdb.toolbox.query_stats import QueryStatsTool
2-
from arcticdb_ext.tools import QueryStats
32

4-
def test_query_stats(s3_version_store_v1):
5-
query_stats_tools_write = QueryStatsTool() # For testing whether stats has been filtered
3+
def test_query_stats(s3_version_store_v1, clear_query_stats):
64
s3_version_store_v1.write("a", 1)
7-
query_stats_tools_start = QueryStatsTool()
5+
QueryStatsTool.enable()
86
s3_version_store_v1.list_symbols()
9-
query_stats_tools_end = QueryStatsTool()
10-
stats = query_stats_tools_end - query_stats_tools_start
7+
QueryStatsTool.disable()
8+
stats = QueryStatsTool.get_query_stats()
119
"""
1210
Sample output:
1311
{
1412
"list_symbols": {
15-
"time": {
16-
"500": 1
17-
},
18-
"stages": {
19-
"list": {
20-
"time": {
21-
"500": 1
22-
},
23-
"key_types": {
24-
"l": {
25-
"storage_ops": {
26-
"ListObjectsV2": {
27-
"result_count": 1,
28-
"time": {
29-
"20": 1,
30-
"10": 1
31-
}
32-
}
33-
}
34-
},
35-
"r": {
36-
"storage_ops": {
37-
"ListObjectsV2": {
38-
"result_count": 1,
39-
"time": {
40-
"10": 1
41-
}
42-
}
43-
}
13+
"total_time_ms": 476,
14+
"count": 1,
15+
"key_type": {
16+
"l": {
17+
"storage_ops": {
18+
"ListObjectsV2": {
19+
"result_count": 1,
20+
"total_time_ms": 48,
21+
"count": 2
22+
}
23+
}
24+
},
25+
"r": {
26+
"storage_ops": {
27+
"ListObjectsV2": {
28+
"result_count": 1,
29+
"total_time_ms": 21,
30+
"count": 1
4431
}
4532
}
4633
}
@@ -49,10 +36,8 @@ def test_query_stats(s3_version_store_v1):
4936
}
5037
"""
5138
assert "list_symbols" in stats
52-
assert "stages" in stats["list_symbols"]
53-
assert "list" in stats["list_symbols"]["stages"]
54-
assert "key_types" in stats["list_symbols"]["stages"]["list"]
55-
key_types = stats["list_symbols"]["stages"]["list"]["key_types"]
39+
assert "key_type" in stats["list_symbols"]
40+
key_types = stats["list_symbols"]["key_type"]
5641
assert "l" in key_types
5742
assert "r" in key_types
5843

@@ -63,32 +48,22 @@ def test_query_stats(s3_version_store_v1):
6348
assert key_types[key_type]["storage_ops"]["ListObjectsV2"]["result_count"] == 1
6449
# Not asserting the time values as they are non-deterministic
6550

66-
def test_query_stats_context(s3_version_store_v1):
67-
with QueryStatsTool.context_manager(): # For testing whether stats has been filtered
68-
s3_version_store_v1.write("a", 1)
69-
with QueryStatsTool.context_manager() as query_stats_tools:
70-
s3_version_store_v1.list_symbols()
71-
stats = query_stats_tools.get_query_stats()
72-
key_types = stats["list_symbols"]["stages"]["list"]["key_types"]
73-
for key_type in ["l", "r"]:
74-
assert key_types[key_type]["storage_ops"]["ListObjectsV2"]["result_count"] == 1
75-
76-
77-
def test_query_stats_clear(s3_version_store_v1):
51+
def test_query_stats_context(s3_version_store_v1, clear_query_stats):
7852
s3_version_store_v1.write("a", 1)
79-
query_stats_tools_start = QueryStatsTool()
80-
s3_version_store_v1.list_symbols()
81-
query_stats_tools_end = QueryStatsTool()
82-
QueryStats.reset()
83-
assert not (query_stats_tools_end - query_stats_tools_start)
53+
with QueryStatsTool.context_manager():
54+
s3_version_store_v1.list_symbols()
55+
stats = QueryStatsTool.get_query_stats()
56+
key_types = stats["list_symbols"]["key_type"]
57+
for key_type in ["l", "r"]:
58+
assert key_types[key_type]["storage_ops"]["ListObjectsV2"]["result_count"] == 1
8459

8560

86-
def test_query_stats_tool_counter(s3_version_store_v1):
87-
query_stats_tools_start = QueryStatsTool()
61+
def test_query_stats_clear(s3_version_store_v1, clear_query_stats):
62+
s3_version_store_v1.write("a", 1)
63+
QueryStatsTool.enable()
8864
s3_version_store_v1.list_symbols()
89-
query_stats_tools_end = QueryStatsTool()
90-
del query_stats_tools_start
91-
del query_stats_tools_end
92-
93-
assert not QueryStats.get_stats()
65+
QueryStatsTool.disable()
66+
QueryStatsTool.get_query_stats()
67+
QueryStatsTool.reset_stats()
68+
assert not QueryStatsTool.get_query_stats()
9469

0 commit comments

Comments
 (0)