Skip to content

Enforce CLIENT_DEPRECATE_EOF capability match for fast-forward sessions #4819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: v3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/MySQL_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class MySQL_Session: public Base_Session<MySQL_Session, MySQL_Data_Stream, MySQL
void GPFC_DetectedMultiPacket_SetDDS();
int GPFC_WaitingClientData_FastForwardSession(PtrSize_t&);
void GPFC_PreparedStatements(PtrSize_t&, unsigned char);
void GPFC_Replication_SwitchToFastForward(PtrSize_t&, unsigned char);
int GPFC_Replication_SwitchToFastForward(PtrSize_t&, unsigned char);
bool GPFC_QueryUSE(PtrSize_t&, int&);

void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(PtrSize_t&);
Expand Down
1 change: 1 addition & 0 deletions include/mysql_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class MySQL_Connection {
bool get_gtid(char *buff, uint64_t *trx_id);
void reduce_auto_increment_delay_token() { if (auto_increment_delay_token) auto_increment_delay_token--; };

bool match_ff_req_options(const MySQL_Connection *c);
bool match_tracked_options(const MySQL_Connection *c);
bool requires_CHANGE_USER(const MySQL_Connection *client_conn);
unsigned int number_of_matching_session_variables(const MySQL_Connection *client_conn, unsigned int& not_matching);
Expand Down
1 change: 1 addition & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ enum PROXYSQL_MYSQL_ERR {
ER_PROXYSQL_SRV_NULL_REPLICATION_LAG = 9019,
ER_PROXYSQL_CONNECT_TIMEOUT = 9020,
ER_PROXYSQL_READONLY_TIMEOUT = 9021,
ER_PROXYSQL_FAST_FORWARD_CONN_CREATE = 9022,
};

enum proxysql_session_type {
Expand Down
111 changes: 104 additions & 7 deletions lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ MySQL_Session::MySQL_Session() {

current_hostgroup=-1;
default_hostgroup=-1;
previous_hostgroup=-1;
locked_on_hostgroup=-1;
locked_on_hostgroup_and_all_variables_set=false;
next_query_flagIN=-1;
Expand Down Expand Up @@ -1089,6 +1090,7 @@ void MySQL_Session::generate_proxysql_internal_session_json(json &j) {
j["default_schema"] = ( default_schema ? default_schema : "" );
j["user_attributes"] = ( user_attributes ? user_attributes : "" );
j["transaction_persistent"] = transaction_persistent;
j["fast_forward"] = session_fast_forward;
if (client_myds != NULL) { // only if client_myds is defined
client_myds->get_client_myds_info_json(j);
}
Expand Down Expand Up @@ -2786,9 +2788,7 @@ bool MySQL_Session::handler_again___status_CHANGING_SCHEMA(int *_rc) {
return false;
}


bool MySQL_Session::handler_again___status_CONNECTING_SERVER(int *_rc) {
//fprintf(stderr,"CONNECTING_SERVER\n");
unsigned long long curtime=monotonic_time();
thread->atomic_curtime=curtime;
if (mirror) {
Expand Down Expand Up @@ -2898,7 +2898,69 @@ bool MySQL_Session::handler_again___status_CONNECTING_SERVER(int *_rc) {
st=previous_status.top();
previous_status.pop();
myds->wait_until=0;
if (session_fast_forward) {

// NOTE: Even if a connection has correctly been created, since the CLIENT_DEPRECATE_EOF
// capability isn't always enforced to match for backend conns (no direct propagation), a
// mismatch can take place after the creation. Right now this is only true for
// 'CLIENT_DEPRECATE_EOF' since the other capabilities are propagated from client.
if (!client_myds->myconn->match_ff_req_options(mybe->server_myds->myconn)) {
if (myds->connect_retries_on_failure > 0) {
proxy_info(
"Failed to obtain suitable connection for fast-forward; server lacks the required capabilities"
" hostgroup=%d client_flags=%u server_capabilities=%lu\n",
current_hostgroup,
client_myds->myconn->options.client_flag,
mybe->server_myds->myconn->mysql->server_capabilities
);

const MySrvC* parent { myconn->parent };
MyHGM->p_update_mysql_error_counter(
p_mysql_error_type::proxysql, parent->myhgc->hid, parent->address, parent->port,
ER_PROXYSQL_FAST_FORWARD_CONN_CREATE
);
myds->connect_retries_on_failure--;
myds->destroy_MySQL_Connection_From_Pool(false);

// We are still in 'FAST_FORWARD' and we require a new connection, since we are
// moving to 'CONNECTING_SERVER' the previous status shouldn't be consumed.
previous_status.push(st);

// NOTE-connect_retries_delay: In case of failure to connect, if
// 'mysql_thread___connect_retries_delay' is set, we impose a delay in the session
// processing via 'pause_until'. Complementary NOTE above.
if (mysql_thread___connect_retries_delay) {
pause_until = thread->curtime + mysql_thread___connect_retries_delay*1000;
set_status(CONNECTING_SERVER);

return false;
}

NEXT_IMMEDIATE_NEW(CONNECTING_SERVER);
} else {
char buf[256] = { 0 };
cstr_format(buf,
"Fast-forward connection attempt failed; server lacks the required capabilities"
" hostgroup=%d client_flags=%u server_capabilities=%lu\n",
current_hostgroup,
client_myds->myconn->options.client_flag,
mybe->server_myds->myconn->mysql->server_capabilities
);

client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1815, (char *)"HY000", buf, true);

while (previous_status.size()) {
st=previous_status.top();
previous_status.pop();
}

myds->destroy_MySQL_Connection_From_Pool(true);
myds->max_connect_time=0;

NEXT_IMMEDIATE_NEW(WAITING_CLIENT_DATA);
}
}

if (session_fast_forward==true) {
// we have a successful connection and session_fast_forward enabled
// set DSS=STATE_SLEEP or it will believe it have to use MARIADB client library
myds->DSS=STATE_SLEEP;
Expand Down Expand Up @@ -3804,7 +3866,16 @@ void MySQL_Session::GPFC_PreparedStatements(PtrSize_t& pkt, unsigned char c) {
}
}

void MySQL_Session::GPFC_Replication_SwitchToFastForward(PtrSize_t& pkt, unsigned char c) {
int MySQL_Session::GPFC_Replication_SwitchToFastForward(PtrSize_t& pkt, unsigned char c) {
if (session_type != PROXYSQL_SESSION_MYSQL) { // only MySQL module supports replication!!
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",(char *)"Command not supported");
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
return 0;
}

// In this switch we handle commands that download binlog events from MySQL
// servers. For these commands a lot of the features provided by ProxySQL
// aren't useful, like multiplexing, query parsing, etc. For this reason,
Expand Down Expand Up @@ -3850,6 +3921,7 @@ void MySQL_Session::GPFC_Replication_SwitchToFastForward(PtrSize_t& pkt, unsigne
// We reinitialize the 'wait_until' since this session shouldn't wait for processing as
// we are now transitioning to 'FAST_FORWARD'.
mybe->server_myds->wait_until = 0;

if (mybe->server_myds->DSS==STATE_NOT_INITIALIZED) {
// NOTE: This section is entirely borrowed from 'STATE_SLEEP' for 'session_fast_forward'.
// Check comments there for extra information.
Expand All @@ -3868,6 +3940,30 @@ void MySQL_Session::GPFC_Replication_SwitchToFastForward(PtrSize_t& pkt, unsigne
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD
set_status(CONNECTING_SERVER); // now we need a connection
} else {
bool match_tracked {
// If DSS **IS** initialized, we **MUST** have a connection
mybe->server_myds->DSS != STATE_NOT_INITIALIZED
// Due to the first condition it's safe check the conn tracked options. Matching capabilities are
// mandatory for fast-forward transitions, otherwise session should be terminated.
&& client_myds->myconn->match_ff_req_options(mybe->server_myds->myconn)
};

// If a connection has been already acquired, but it doesn't match the required capabilities, the
// mismatch should be reported, and session should be killed.
if (!match_tracked) {
proxy_info(
"Failed to switch to fast-forward; session connection lacks the required capabilities"
" hostgroup=%d client_flags=%u server_capabilities=%lu\n",
current_hostgroup,
client_myds->myconn->options.client_flag,
mybe->server_myds->myconn->mysql->server_capabilities
);
mybe->server_myds->destroy_MySQL_Connection_From_Pool(false);
mybe->server_myds->fd=0;

return -1;
}

// In case of having a connection, we need to make user to reset the state machine
// for current server 'MySQL_Data_Stream', setting it outside of any state handled
// by 'mariadb' library. Otherwise 'MySQL_Thread' will threat this
Expand Down Expand Up @@ -3902,6 +3998,8 @@ void MySQL_Session::GPFC_Replication_SwitchToFastForward(PtrSize_t& pkt, unsigne
}
set_status(FAST_FORWARD); // we can set status to FAST_FORWARD
}

return 0;
}

bool MySQL_Session::GPFC_QueryUSE(PtrSize_t& pkt, int& handler_ret) {
Expand Down Expand Up @@ -4218,7 +4316,8 @@ int MySQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {
case _MYSQL_COM_BINLOG_DUMP:
case _MYSQL_COM_BINLOG_DUMP_GTID:
case _MYSQL_COM_REGISTER_SLAVE:
GPFC_Replication_SwitchToFastForward(pkt, c);
handler_ret = GPFC_Replication_SwitchToFastForward(pkt, c);
if (handler_ret) { return handler_ret; }
break;
case _MYSQL_COM_QUIT:
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_QUIT packet\n");
Expand Down Expand Up @@ -5184,8 +5283,6 @@ int MySQL_Session::handler() {
int rc=0;
if (handler_again___status_CONNECTING_SERVER(&rc))
goto handler_again; // we changed status
//if (rc==1) //handler_again___status_CONNECTING_SERVER returns 1
// goto __exit_DSS__STATE_NOT_INITIALIZED;
}
break;
case session_status___NONE:
Expand Down
2 changes: 1 addition & 1 deletion lib/MySQL_Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4007,7 +4007,7 @@ void MySQL_Thread::process_all_sessions() {
if (sess->to_process==1) {
if (sess->pause_until <= curtime) {
rc=sess->handler();
//total_active_transactions_+=sess->active_transactions;

if (rc==-1 || sess->killed==true) {
char _buf[1024];
if (sess->client_myds && sess->killed)
Expand Down
15 changes: 15 additions & 0 deletions lib/mysql_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ MySQL_Connection::MySQL_Connection() {
options.ldap_user_variable_value=NULL;
options.ldap_user_variable_sent=false;
options.session_track_gtids_int=0;
options.server_capabilities=0;

compression_pkt_id=0;
mysql_result=NULL;
query.ptr=NULL;
Expand Down Expand Up @@ -729,6 +731,19 @@ unsigned int MySQL_Connection::number_of_matching_session_variables(const MySQL_
return ret;
}

bool MySQL_Connection::match_ff_req_options(const MySQL_Connection *c) {
// 'server_capabilities' is empty for backend connections
const MySQL_Connection* backend { !c->options.server_capabilities ? c : this };
const MySQL_Connection* frontend { c->options.server_capabilities ? c : this };

// Only required to be checked for fast_forward sessions
if (frontend->myds && frontend->myds->sess->session_fast_forward) {
return (frontend->options.client_flag & CLIENT_DEPRECATE_EOF) ==
(backend->mysql->server_capabilities & CLIENT_DEPRECATE_EOF);
} else {
return true;
}
}

bool MySQL_Connection::match_tracked_options(const MySQL_Connection *c) {
uint32_t cf1 = options.client_flag; // own client flags
Expand Down
5 changes: 3 additions & 2 deletions src/SQLite3_Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,8 @@ void SQLite3_Server_session_handler(MySQL_Session* sess, void *_pa, PtrSize_t *p
if ((*proxy_sqlite3_get_autocommit)(db)==0) {
in_trans = true;
}
sess->SQLite3_to_MySQL(resultset, error, affected_rows, &sess->client_myds->myprot, in_trans);
bool deprecate_eof = sess->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF;
sess->SQLite3_to_MySQL(resultset, error, affected_rows, &sess->client_myds->myprot, in_trans, deprecate_eof);
delete resultset;
#ifdef TEST_READONLY
if (strncasecmp("SELECT",query_no_space,6)) {
Expand Down Expand Up @@ -1058,7 +1059,7 @@ static void *child_mysql(void *arg) {
fds[0].revents=0;
fds[0].events=POLLIN|POLLOUT;
free(arg);
sess->client_myds->myprot.generate_pkt_initial_handshake(true,NULL,NULL, &sess->thread_session_id, false);
sess->client_myds->myprot.generate_pkt_initial_handshake(true,NULL,NULL, &sess->thread_session_id, true);

while (__sync_fetch_and_add(&glovars.shutdown,0)==0) {
if (myds->available_data_out()) {
Expand Down
83 changes: 70 additions & 13 deletions test/tap/tap/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ std::size_t count_matches(const string& str, const string& substr) {
}

int mysql_query_t__(MYSQL* mysql, const char* query, const char* f, int ln, const char* fn) {
diag("%s:%d:%s(): Issuing query '%s' to ('%s':%d)", f, ln, fn, query, mysql->host, mysql->port);
diag("%s:%d:%s(): Issuing query \"%s\" to ('%s':%d)", f, ln, fn, query, mysql->host, mysql->port);
return mysql_query(mysql, query);
}

Expand Down Expand Up @@ -656,9 +656,9 @@ ext_val_t<int32_t> ext_single_row_val(const mysql_res_row& row, const int32_t& d
if (row.empty() || row.front().empty()) {
return { -1, def_val, {} };
} else {
errno = 0;
char* p_end {};
const int32_t val = std::strtol(row.front().c_str(), &p_end, 10);
errno = 0;
char* p_end {};
const int32_t val = std::strtol(row.front().c_str(), &p_end, 10);

if (row[0] == p_end || errno == ERANGE) {
return { -2, def_val, string { row[0] } };
Expand All @@ -672,9 +672,9 @@ ext_val_t<uint32_t> ext_single_row_val(const mysql_res_row& row, const uint32_t&
if (row.empty() || row.front().empty()) {
return { -1, def_val, {} };
} else {
errno = 0;
char* p_end {};
const uint32_t val = std::strtoul(row.front().c_str(), &p_end, 10);
errno = 0;
char* p_end {};
const uint32_t val = std::strtoul(row.front().c_str(), &p_end, 10);

if (row[0] == p_end || errno == ERANGE) {
return { -2, def_val, string { row[0] } };
Expand All @@ -689,9 +689,9 @@ ext_val_t<int64_t> ext_single_row_val(const mysql_res_row& row, const int64_t& d
if (row.empty() || row.front().empty()) {
return { -1, def_val, {} };
} else {
errno = 0;
char* p_end {};
const int64_t val = std::strtoll(row.front().c_str(), &p_end, 10);
errno = 0;
char* p_end {};
const int64_t val = std::strtoll(row.front().c_str(), &p_end, 10);

if (row[0] == p_end || errno == ERANGE) {
return { -2, def_val, string { row[0] } };
Expand All @@ -705,9 +705,9 @@ ext_val_t<uint64_t> ext_single_row_val(const mysql_res_row& row, const uint64_t&
if (row.empty() || row.front().empty()) {
return { -1, def_val, {} };
} else {
errno = 0;
char* p_end {};
const uint64_t val = std::strtoull(row.front().c_str(), &p_end, 10);
errno = 0;
char* p_end {};
const uint64_t val = std::strtoull(row.front().c_str(), &p_end, 10);

if (row[0] == p_end || errno == ERANGE) {
return { -2, def_val, string { row[0] } };
Expand Down Expand Up @@ -1933,6 +1933,61 @@ int dump_conn_stats(MYSQL* admin, const vector<uint32_t> hgs) {
return EXIT_SUCCESS;
}

string row_to_str(const mysql_res_row& row) {
string res { "[" };

for (const auto& e : row) {
res += "\"" + e + "\"";

if (&e != &row.back()) {
res += ",";
}
}

res += "]";

return res;
}

ext_val_t<hg_pool_st_t> ext_single_row_val(const mysql_res_row& row, const hg_pool_st_t& def_val) {
if (row.empty() || row.size() != sizeof(hg_pool_st_t)/sizeof(uint32_t)) {
return { -1, def_val, {} };
} else {
for (int i = 0; i < sizeof(hg_pool_st_t)/sizeof(uint32_t); i++) {
if (row[i].empty()) {
return { -1, def_val, {} };
}
}

errno = 0;
char* p_end { nullptr };
hg_pool_st_t res {};
const string row_str { row_to_str(row) };

res.hostgroup = std::strtoull(row.front().c_str(), &p_end, 10);
if (row[0].c_str() == p_end || errno == ERANGE) { return { -2, def_val, row_str }; }
res.conn_used = std::strtoull(row[1].c_str(), &p_end, 10);
if (row[1].c_str() == p_end || errno == ERANGE) { return { -2, def_val, row_str }; }
res.conn_free = std::strtoull(row[2].c_str(), &p_end, 10);
if (row[2].c_str() == p_end || errno == ERANGE) { return { -2, def_val, row_str }; }
res.conn_ok = std::strtoull(row[3].c_str(), &p_end, 10);
if (row[3].c_str() == p_end || errno == ERANGE) { return { -2, def_val, row_str }; }
res.conn_err = std::strtoull(row[4].c_str(), &p_end, 10);
if (row[4].c_str() == p_end || errno == ERANGE) { return { -2, def_val, row_str }; }
res.max_conn_used = std::strtoull(row[5].c_str(), &p_end, 10);
if (row[5].c_str() == p_end || errno == ERANGE) { return { -2, def_val, row_str }; }
res.max_conn_used = std::strtoull(row[6].c_str(), &p_end, 10);
if (row[6].c_str() == p_end || errno == ERANGE) { return { -2, def_val, row_str }; }

return { EXIT_SUCCESS, res, row_str };
}
}

ext_val_t<hg_pool_st_t> get_conn_pool_hg_stats(MYSQL* admin, uint32_t hg) {
const string HG_STATS_QUERY { gen_conn_stats_query({ hg }) };
return mysql_query_ext_val(admin, HG_STATS_QUERY, hg_pool_st_t {});
}

pair<int,pool_state_t> fetch_conn_stats(MYSQL* admin, const vector<uint32_t> hgs) {
const string stats_query { gen_conn_stats_query(hgs) };
const pair<int,vector<mysql_row_t>> conn_pool_stats { exec_dql_query(admin, stats_query, true) };
Expand Down Expand Up @@ -1985,6 +2040,8 @@ int check_cond(MYSQL* mysql, const string& q) {
res = 0;
}
}

mysql_free_result(myres);
}
} else {
diag("Check failed with error '%s'", mysql_error(mysql));
Expand Down
Loading