Skip to content

Commit a3653d0

Browse files
committed
supoport fetch pg_class and pg_statistic from videx-statistic-server
1 parent 4ca0bd6 commit a3653d0

10 files changed

Lines changed: 498 additions & 204 deletions

File tree

src/pg/videx/Makefile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ OBJS = \
55
$(WIN32RES) \
66
stats.o \
77
videxam.o \
8+
videx_json_item.o \
89

910
CURRENT_DIR := $(shell pwd)
1011
$(info CURRENT_DIR = $(CURRENT_DIR))
@@ -23,6 +24,12 @@ REGRESS = videx
2324
CXX = g++
2425
CC = gcc
2526

27+
ifdef DEBUG
28+
CFLAGS += -O0 -g3
29+
CXXFLAGS += -O0 -g3
30+
PG_CPPFLAGS += -O0 -g3
31+
endif
32+
2633
TAP_TESTS = 1
2734

2835
ifdef USE_PGXS

src/pg/videx/stats.cc

Lines changed: 249 additions & 71 deletions
Large diffs are not rendered by default.

src/pg/videx/videx_json_item.cc

Lines changed: 27 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#include "videx_json_item.h"
2-
2+
#include <nlohmann/json.hpp>
33
/**
44
* A simple parsing function is written here instead,
55
* since rapid_json always encounters strange segmentation faults across platforms,
@@ -11,70 +11,39 @@
1111
* @return
1212
*/
1313
int videx_parse_simple_json(const std::string &json, int &code, std::string &message,
14-
std::map<std::string, std::string> &data_dict) {
15-
try {
16-
// find code and message
17-
std::size_t pos_code = json.find("\"code\":");
18-
std::size_t pos_message = json.find("\"message\":");
19-
std::size_t pos_data = json.find("\"data\":");
20-
21-
if (pos_code == std::string::npos || pos_message == std::string::npos || pos_data == std::string::npos) {
22-
throw std::invalid_argument("Missing essential components in JSON.");
23-
}
24-
25-
// parse code
26-
std::size_t start = json.find_first_of("0123456789", pos_code);
27-
std::size_t end = json.find(',', start);
28-
code = std::stoi(json.substr(start, end - start));
29-
30-
// parse message
31-
start = json.find('\"', pos_message + 10) + 1;
32-
end = json.find('\"', start);
33-
message = json.substr(start, end - start);
34-
35-
// parse data
36-
start = json.find('{', pos_data) + 1;
37-
end = json.find('}', start);
38-
std::string data_content = json.substr(start, end - start);
39-
std::istringstream data_stream(data_content);
40-
std::string line;
41-
42-
while (std::getline(data_stream, line, ',')) {
43-
std::size_t colon_pos = line.find(':');
44-
if (colon_pos == std::string::npos) {
45-
continue; // Skip malformed line
46-
}
47-
std::string key = line.substr(0, colon_pos);
48-
std::string value = line.substr(colon_pos + 1);
49-
50-
// clean key 和 value
51-
auto trim_quotes_and_space = [](std::string &str) {
52-
// Trim whitespace and surrounding quotes
53-
size_t first = str.find_first_not_of(" \t\n\"");
54-
size_t last = str.find_last_not_of(" \t\n\"");
55-
if (first == std::string::npos || last == std::string::npos) {
56-
str.clear(); // All whitespace or empty
57-
} else {
58-
str = str.substr(first, last - first + 1);
14+
std::map<std::string, std::string> &data_dict)
15+
{
16+
try
17+
{
18+
auto root = nlohmann::json::parse(json);
19+
code = root.at("code").get<int>();
20+
message = root.at("message").get<std::string>();
21+
22+
if (root.contains("data"))
23+
{
24+
const auto &data = root["data"];
25+
if (data.is_object())
26+
{
27+
for (auto it = data.begin(); it != data.end(); ++it)
28+
{
29+
if (it->is_string())
30+
data_dict[it.key()] = it->get<std::string>();
31+
else
32+
data_dict[it.key()] = it->dump();
5933
}
60-
};
61-
62-
trim_quotes_and_space(key);
63-
trim_quotes_and_space(value);
64-
65-
data_dict[key] = value;
34+
}
6635
}
67-
6836
return 0;
69-
} catch (std::exception &e) {
37+
}
38+
catch (const std::exception &e)
39+
{
7040
std::cerr << "Failed to parse JSON: " << e.what() << std::endl;
7141
message = e.what();
7242
code = -1;
7343
return 1;
7444
}
7545
}
7646

77-
7847
/**
7948
* This function is used to escape double quotes in a string.
8049
* @param input
@@ -127,21 +96,19 @@ size_t write_callback(void *contents, size_t size, size_t nmemb, std::string *ou
12796

12897
int ask_from_videx_http(VidexJsonItem &request, VidexStringMap &res_json){
12998
const char *host_ip = "127.0.0.1:5001";
130-
char value[1000];
13199

132100
//VIDEX_SERVER
133101
if(videx_server)
134-
host_ip = value;
102+
host_ip = videx_server;
135103
std::string url = std::string("http://") + host_ip + "/ask_videx";
136104
CURL *curl;
137105
CURLcode res_code;
138106
std::string readBuffer;
139-
curl = curl_easy_init(); // 初始化一个CURL easy handle。
107+
curl = curl_easy_init();
140108
if(curl) {
141109
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
142110
curl_easy_setopt(curl, CURLOPT_POST, 1);
143111

144-
145112
std::string request_str = request.to_json();
146113
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, request_str.c_str());
147114

@@ -171,6 +138,7 @@ int ask_from_videx_http(VidexJsonItem &request, VidexStringMap &res_json){
171138
std::string message;
172139
int error = videx_parse_simple_json(readBuffer.c_str(), code, message, res_json);
173140
if (error) {
141+
std::cout << "videx_server raw response: " << readBuffer << std::endl;
174142
std::cout << "!__!__!__!__!__! JSON parse error: " << message << '\n';
175143
return 1;
176144
} else {

src/pg/videx/videxam.cc

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -129,30 +129,36 @@ videx_table_block_relation_estimate_size(Relation rel, int32 *attr_widths,
129129
/** try to fetch relation infos (pg_class) from videx-statistic-server, else use local pg_class instead*/
130130
char *dbname = get_database_name(MyDatabaseId);
131131
char *nspname = get_namespace_name(rel->rd_rel->relnamespace);
132+
char *relname = rel->rd_rel->relname.data;
133+
std::string ns_relname = std::string(nspname) + "." + std::string(relname);
132134
VidexStringMap res_json;
133135
VidexJsonItem request_item = construct_request(
134-
dbname, rel->rd_rel->relname.data, nspname, __PRETTY_FUNCTION__);
136+
dbname, nspname, ns_relname.c_str(), __PRETTY_FUNCTION__);
135137
if (!ask_from_videx_http(request_item, res_json)) {
136138
/**fetch from videx-statistic-sever success, update local cache*/
139+
curpages = (BlockNumber) std::stoi(res_json["relpages"]);
140+
relpages = (BlockNumber) std::stoi(res_json["relpages"]);
141+
reltuples = (double) std::stof(res_json["reltuples"]);
142+
relallvisible = (BlockNumber) (res_json["relallvisible"] == "True" ? 1 : 0);
137143
vac_update_relstats(rel,
138-
std::stoi(res_json["relpages"]),
139-
std::stof(res_json["reltuples"]),
140-
std::stoi(res_json["relallvisible"]),
141-
std::stoi(res_json["relhasindex"]),
144+
curpages,
145+
reltuples,
146+
relallvisible,
147+
res_json["relhasindex"] == "True" ? true : false,
142148
InvalidTransactionId,
143149
InvalidMultiXactId,
144150
NULL,
145151
NULL,
146152
false);
147-
}
153+
} else {
154+
/* it should have storage, so we can call the smgr */
155+
curpages = (BlockNumber) rel->rd_rel->relpages;
148156

149-
/* it should have storage, so we can call the smgr */
150-
curpages = (BlockNumber) rel->rd_rel->relpages;
151-
152-
/* coerce values in pg_class to more desirable types */
153-
relpages = (BlockNumber) rel->rd_rel->relpages;
154-
reltuples = (double) rel->rd_rel->reltuples;
155-
relallvisible = (BlockNumber) rel->rd_rel->relallvisible;
157+
/* coerce values in pg_class to more desirable types */
158+
relpages = (BlockNumber) rel->rd_rel->relpages;
159+
reltuples = (double) rel->rd_rel->reltuples;
160+
relallvisible = (BlockNumber) rel->rd_rel->relallvisible;
161+
}
156162

157163
/*
158164
* HACK: if the relation has never yet been vacuumed, use a minimum size
@@ -247,7 +253,7 @@ void videx_relation_set_new_filelocator (Relation rel,
247253
const RelFileLocator *newrlocator,
248254
char persistence,
249255
TransactionId *freezeXid,
250-
MultiXactId *minmulti){
256+
MultiXactId *minmulti){
251257
return;
252258
}
253259

src/sub_platforms/sql_opt/databases/pg/pg_command.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def get_table_meta(self, db_name, schema_table_name):
166166
table = PGTable(
167167
dbname = db_name,
168168
table_schema = schema_name,
169-
table_name = table_name,
169+
table_name = schema_table_name,
170170
relpages = df['relpages'].values[0],
171171
reltuples = df['reltuples'].values[0],
172172
relallvisible = df['relallvisible'].values[0],
Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,112 @@
1+
import logging
2+
import json
13
from typing import List
24

35
from sub_platforms.sql_opt.videx.videx_metadata import VidexTableStats
46
from sub_platforms.sql_opt.videx.model.videx_strategy import VidexModelBase, VidexStrategy
57
from sub_platforms.sql_opt.videx.model.videx_model_innodb import VidexModelInnoDB
6-
from sub_platforms.sql_opt.videx.videx_metadata import VidexTableStats, VidexDBTaskStats
8+
from sub_platforms.sql_opt.videx.videx_metadata import VidexDBTaskStats
9+
from sub_platforms.sql_opt.videx.videx_pg_metadata import PGVidexTableStats
710
from sub_platforms.sql_opt.videx.model.videx_strategy import VidexStrategy
811
from sub_platforms.sql_opt.videx.videx_utils import IndexRangeCond
912
from sub_platforms.sql_opt.pg_meta import PGTable
1013

1114
class VidexModelPG(VidexModelBase):
12-
def __init__(self, db_stats: VidexDBTaskStats, **kwargs):
15+
def __init__(self, table_stats: PGVidexTableStats, **kwargs):
1316
super().__init__(None, VidexStrategy.postgresql)
14-
self.videx_db_task_stats: VidexDBTaskStats = db_stats
17+
self.table_stats: PGVidexTableStats = table_stats
1518

1619
def scan_time(self, req_json_item: dict) -> float:
20+
# no used in pg
1721
return 0.0
22+
1823
def get_memory_buffer_size(self, req_json_item: dict) -> int:
24+
# no used in pg
1925
return -1
2026

27+
def info_low(self, req_json_item: dict) -> int:
28+
# no used in pg
29+
return 0
30+
2131
def cardinality(self, idx_range_cond: IndexRangeCond) -> int:
2232
return 0
2333

2434
def ndv(self, index_name, field_list: List[str]) -> int:
25-
return 0
35+
if len(field_list) == 1:
36+
colname = field_list[0]
37+
col_stats_info = self.table_stats.table_statistic.statistic_dict.get(colname)
38+
if col_stats_info is not None:
39+
return int(col_stats_info.stadistinct)
40+
else:
41+
return 0
42+
else:
43+
#TODO: try to fetch from pg_statistic_ext for multi-column NDV,
44+
# moreover, support ndv learned model (eg: PLM4NDV)
45+
return 0
2646

2747
def get_relation_stats(self, req_json_item: dict) -> dict:
28-
return None
48+
if self.table_stats.table_statistic is None:
49+
logging.warning(f"Table statistic is None for "
50+
f"db_name: {self.table_stats.table_meta.dbname}, "
51+
f"table_name: {self.table_stats.table_meta.table_name}")
52+
return {}
53+
data_items = req_json_item.get("data")
54+
if isinstance(data_items, list):
55+
for item in data_items:
56+
if item.get("item_type") == "colname":
57+
colname = (item.get("properties") or {}).get("name")
58+
if colname:
59+
break
60+
if not colname:
61+
logging.warning("Column name missing in request: %s", req_json_item)
62+
return {}
63+
col_stats_info = self.table_stats.table_statistic.statistic_dict.get(colname)
64+
if col_stats_info is None:
65+
logging.warning(f"Column statistic not found for "
66+
f"db_name: {self.table_stats.table_meta.dbname}, "
67+
f"table_name: {self.table_stats.table_meta.table_name}, "
68+
f"req_json_item {req_json_item}")
69+
return {}
70+
ndv_value = self.ndv(None, [colname])
71+
slots_payload = []
72+
for slot in (col_stats_info.slots or []):
73+
if hasattr(slot, "model_dump"):
74+
slots_payload.append(slot.model_dump(exclude_none=True))
75+
else:
76+
slots_payload.append({
77+
"kind": getattr(slot, "kind", None),
78+
"op": getattr(slot, "op", None),
79+
"coll": getattr(slot, "coll", None),
80+
"numbers": getattr(slot, "numbers", None),
81+
"values": getattr(slot, "values", None),
82+
})
83+
res = {
84+
"stanullfrac": col_stats_info.stanullfrac,
85+
"stawidth": col_stats_info.stawidth,
86+
"stainherit": col_stats_info.stainherit,
87+
"stadistinct": ndv_value,
88+
"slots": slots_payload,
89+
}
90+
logging.info(f"Get pg column statistic for "
91+
f"db_name: {self.table_stats.table_meta.dbname}, "
92+
f"table_name: {self.table_stats.table_meta.table_name}, "
93+
f"column {colname}: {res}")
94+
return res
2995

3096
def table_block_relation_estimate_size(self, req_json_item: dict) -> dict:
31-
properties = req_json_item['properties']
32-
videx_db = properties['dbname'].lower()
33-
table_name = properties['table_name'].lower()
34-
table: PGTable = self.videx_db_task_stats.get_table_meta(videx_db, table_name)
97+
table : PGTable = self.table_stats.table_meta
98+
logging.info(f"Start to get pg table block relation estimate size for "
99+
f"db_name: {table.dbname}, "
100+
f"table_name: {table.table_name}")
35101
res = {
36102
"relpages": table.relpages,
37103
"reltuples": table.reltuples,
38104
"relallvisible": table.relallvisible,
39105
"relhasindex": len(table.indexes) > 0,
40106
}
41-
return res
107+
logging.info(f"Get pg table block relation estimate size for "
108+
f"db_name: {table.dbname}, "
109+
f"table_name: {table.table_name}: {res}")
110+
return res
111+
def get_index_stats(self, req_json_item: dict) -> dict:
112+
return NotImplementedError("get_index_stats is not implemented yet.")

src/sub_platforms/sql_opt/videx/model/videx_strategy.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,7 @@ def records_in_range(self, req_json_item: dict) -> int:
155155
}
156156
"""
157157
return self.cardinality(idx_range_cond)
158-
159-
def get_relation_stats(self, req_json_item: dict) -> dict:
160-
pass
161158

162-
def table_block_relation_estimate_size(self, req_json_item: dict) -> dict:
163-
pass
164159

165160
def record_range_request_to_str(min_key: dict, max_key: dict) -> str:
166161
"""

0 commit comments

Comments
 (0)