Skip to content

Commit 53bc170

Browse files
committed
New implementation of fast forward , #528
1 parent 2987d0b commit 53bc170

File tree

4 files changed

+40
-4
lines changed

4 files changed

+40
-4
lines changed

include/MySQL_Data_Stream.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ class MySQL_Data_Stream
192192
MyHGM->destroy_MyConn_from_pool(mc);
193193
}
194194
void free_mysql_real_query();
195-
195+
void reinit_queues();
196196
// void destroy_MySQL_Connection() {
197197
// MySQL_Connection *mc=myconn;
198198
// detach_connection();

lib/MySQL_Session.cpp

+24-3
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,7 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) {
542542
return false;
543543
}
544544

545+
#define NEXT_IMMEDIATE(new_st) do { set_status(new_st); goto handler_again; } while (0)
545546

546547
int MySQL_Session::handler() {
547548
bool wrong_pass=false;
@@ -654,6 +655,13 @@ int MySQL_Session::handler() {
654655
current_hostgroup=default_hostgroup;
655656
}
656657
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: WAITING_CLIENT_DATA - STATE_SLEEP\n");
658+
if (session_fast_forward==true) { // if it is fast forward
659+
mybe=find_or_create_backend(current_hostgroup); // set a backend
660+
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
661+
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); // move the first packet
662+
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD . Now we need a connection
663+
NEXT_IMMEDIATE(CONNECTING_SERVER); // we create a connection . next status will be FAST_FORWARD
664+
}
657665
//unsigned char c;
658666
c=*((unsigned char *)pkt.ptr+sizeof(mysql_hdr));
659667
switch ((enum_mysql_command)c) {
@@ -824,14 +832,21 @@ int MySQL_Session::handler() {
824832
}
825833

826834

827-
#define NEXT_IMMEDIATE(new_st) do { set_status(new_st); goto handler_again; } while (0)
828835

829836
handler_again:
830837

831838
switch (status) {
832839
case FAST_FORWARD:
833-
fprintf(stderr,"FAST_FORWARD\n");
834-
// FIXME: to implement
840+
if (mybe->server_myds->mypolls==NULL) {
841+
// register the mysql_data_stream
842+
thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
843+
}
844+
// copy all packets from backend to frontend
845+
for (unsigned int k=0; k < mybe->server_myds->PSarrayIN->len; k++) {
846+
PtrSize_t pkt;
847+
mybe->server_myds->PSarrayIN->remove_index(0,&pkt);
848+
client_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
849+
}
835850
break;
836851
case CONNECTING_CLIENT:
837852
//fprintf(stderr,"CONNECTING_CLIENT\n");
@@ -1421,6 +1436,11 @@ int MySQL_Session::handler() {
14211436
st=previous_status.top();
14221437
previous_status.pop();
14231438
myds->wait_until=0;
1439+
if (session_fast_forward==true) {
1440+
// we have a successful connection and session_fast_forward enabled
1441+
// set DSS=STATE_SLEEP or it will believe it have to use MARIADB client library
1442+
myds->DSS=STATE_SLEEP;
1443+
}
14241444
NEXT_IMMEDIATE(st);
14251445
break;
14261446
case -1:
@@ -1923,6 +1943,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
19231943
mybe->server_myds->DSS=STATE_READY;
19241944
if (session_fast_forward==true) {
19251945
status=FAST_FORWARD;
1946+
mybe->server_myds->myconn->reusable=false; // the connection cannot be usable anymore
19261947
}
19271948
}
19281949
}

lib/MySQL_Thread.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -1673,6 +1673,14 @@ void MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned
16731673
if (myds->sess->client_myds==myds) {
16741674
proxy_debug(PROXY_DEBUG_NET,1, "Session=%p, DataStream=%p -- Deleting FD %d\n", myds->sess, myds, myds->fd);
16751675
myds->sess->set_unhealthy();
1676+
} else {
1677+
// if this is a backend with fast_forward, set unhealthy
1678+
// if this is a backend without fast_forward, do not set unhealthy: it will be handled by client library
1679+
if (myds->sess->session_fast_forward) { // if fast forward
1680+
if (myds->myds_type==MYDS_BACKEND) { // and backend
1681+
myds->sess->set_unhealthy(); // set unhealthy
1682+
}
1683+
}
16761684
}
16771685
}
16781686
}

lib/mysql_data_stream.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,13 @@ void MySQL_Data_Stream::init() {
237237
}
238238
}
239239

240+
void MySQL_Data_Stream::reinit_queues() {
241+
if (queueIN.buffer==NULL)
242+
queue_init(queueIN,QUEUE_T_DEFAULT_SIZE);
243+
if (queueOUT.buffer==NULL)
244+
queue_init(queueOUT,QUEUE_T_DEFAULT_SIZE);
245+
}
246+
240247
// this function initializes a MySQL_Data_Stream with arguments
241248
void MySQL_Data_Stream::init(enum MySQL_DS_type _type, MySQL_Session *_sess, int _fd) {
242249
myds_type=_type;

0 commit comments

Comments
 (0)