Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DONOTMERGE] switch_to_fast_forward query rules attribute #4449

Draft
wants to merge 1 commit into
base: v2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/query_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class QP_query_digest_stats {
struct _Query_Processor_rule_t {
int rule_id;
bool active;
bool switch_to_fast_forward;
char *username;
char *schemaname;
int flagIN;
Expand Down Expand Up @@ -145,6 +146,7 @@ class Query_Processor_Output {
char *comment; // #643
char *min_gtid;
bool create_new_conn;
bool switch_to_fast_forward;
std::string *new_query;
void * operator new(size_t size) {
return l_alloc(size);
Expand Down
108 changes: 108 additions & 0 deletions lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4007,6 +4007,114 @@ int MySQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {
(begint.tv_sec*1000000000+begint.tv_nsec);
}
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo



if (qpo->switch_to_fast_forward == true) {
// In this switch we handle commands that download binlog events from MySQL
// servers. For these commands a lot of the features provided by ProxySQL
// aren't useful, like multiplexing, query parsing, etc. For this reason,
// ProxySQL enables fast_forward when it receives these commands. 
{
// we use a switch to write the command in the info message
std::string q = "";
/*
switch ((enum_mysql_command)c) {
case _MYSQL_COM_BINLOG_DUMP:
q += "MYSQL_COM_BINLOG_DUMP";
break;
case _MYSQL_COM_BINLOG_DUMP_GTID:
q += "MYSQL_COM_BINLOG_DUMP_GTID";
break;
case _MYSQL_COM_REGISTER_SLAVE:
q += "MYSQL_COM_REGISTER_SLAVE";
break;
default:
assert(0);
break;
};
*/
// we add the client details in the info message
if (client_myds && client_myds->addr.addr) {
q += "Client " + std::string(client_myds->addr.addr) + ":" + std::to_string(client_myds->addr.port);
}
q += " : changing session fast_forward to true";
proxy_info("%s\n", q.c_str());
}
session_fast_forward = true;

if (client_myds->PSarrayIN->len) {
proxy_error("UNEXPECTED PACKET FROM CLIENT -- PLEASE REPORT A BUG\n");
assert(0);
}
client_myds->PSarrayIN->add(pkt.ptr, pkt.size);

// The following code prepares the session as if it was configured with fast
// forward before receiving the command. This way the state machine will
// handle the command automatically.
current_hostgroup = previous_hostgroup;
mybe = find_or_create_backend(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
// We reinitialize the 'wait_until' since this session shouldn't wait for processing as
// we are now transitioning to 'FAST_FORWARD'.
mybe->server_myds->wait_until = 0;
if (mybe->server_myds->DSS==STATE_NOT_INITIALIZED) {
// NOTE: This section is entirely borrowed from 'STATE_SLEEP' for 'session_fast_forward'.
// Check comments there for extra information.
// =============================================================================
if (mybe->server_myds->max_connect_time == 0) {
uint64_t connect_timeout =
mysql_thread___connect_timeout_server < mysql_thread___connect_timeout_server_max ?
mysql_thread___connect_timeout_server_max : mysql_thread___connect_timeout_server;
mybe->server_myds->max_connect_time = thread->curtime + connect_timeout * 1000;
}
mybe->server_myds->connect_retries_on_failure = mysql_thread___connect_retries_on_failure;
CurrentQuery.start_time=thread->curtime;
// =============================================================================

// we don't have a connection
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD
set_status(CONNECTING_SERVER); // now we need a connection
} else {
// In case of having a connection, we need to make user to reset the state machine
// for current server 'MySQL_Data_Stream', setting it outside of any state handled
// by 'mariadb' library. Otherwise 'MySQL_Thread' will threat this
// 'MySQL_Data_Stream' as library handled.
mybe->server_myds->DSS = STATE_READY;
// myds needs to have encrypted value set correctly
{
MySQL_Data_Stream * myds = mybe->server_myds;
MySQL_Connection * myconn = myds->myconn;
assert(myconn != NULL);
// PMC-10005
// if backend connection uses SSL we will set
// encrypted = true and we will start using the SSL structure
// directly from P_MARIADB_TLS structure.
MYSQL *mysql = myconn->mysql;
if (mysql && myconn->ret_mysql) {
if (mysql->options.use_ssl == 1) {
P_MARIADB_TLS * matls = (P_MARIADB_TLS *)mysql->net.pvio->ctls;
if (matls != NULL) {
myds->encrypted = true;
myds->ssl = (SSL *)matls->ssl;
myds->rbio_ssl = BIO_new(BIO_s_mem());
myds->wbio_ssl = BIO_new(BIO_s_mem());
SSL_set_bio(myds->ssl, myds->rbio_ssl, myds->wbio_ssl);
} else {
// if mysql->options.use_ssl == 1 but matls == NULL
// it means that ProxySQL tried to use SSL to connect to the backend
// but the backend didn't support SSL
}
}
}
}
set_status(FAST_FORWARD); // we can set status to FAST_FORWARD
}
break;
}



// This block was moved from 'handler_special_queries' to support
// handling of 'USE' statements which are preceded by a comment.
// For more context check issue: #3493.
Expand Down
44 changes: 43 additions & 1 deletion lib/Query_Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,48 @@ QP_rule_t * Query_Processor::new_query_rule(int rule_id, bool active, char *user
newQR->flagOUT_weights_total = 0;
newQR->flagOUT_ids = NULL;
newQR->flagOUT_weights = NULL;
newQR->switch_to_fast_forward = false;
if (newQR->attributes != NULL) {
if (strlen(newQR->attributes)) {
nlohmann::json j_attributes = nlohmann::json::parse(newQR->attributes);
nlohmann::json j_attributes = nlohmann::json::parse(newQR->attributes);
if ( j_attributes.find("switch_to_fast_forward") != j_attributes.end() ) {
bool parsed = false;
const nlohmann::json& j = j_attributes;
if (j["switch_to_fast_forward"].type() == nlohmann::json::value_t::number_unsigned) {
if (j["switch_to_fast_forward"] == 0 || j["switch_to_fast_forward"] == 1) {
if (j["switch_to_fast_forward"] == 1) {
newQR->switch_to_fast_forward = true;
}
parsed = true;
}
}
if (parsed == false) {
if (j["switch_to_fast_forward"].type() == nlohmann::json::value_t::boolean) {
if (j["switch_to_fast_forward"] == true) {
newQR->switch_to_fast_forward = true;
}
parsed = true;
}
}
if (parsed == false) {
if (j["switch_to_fast_forward"].type() == nlohmann::json::value_t::string) {
string s = j["switch_to_fast_forward"];
const char *a = s.c_str();
if (
(strcasecmp(a,"yes") == 0) || (strcasecmp(a,"true") == 0) || (strcasecmp(a,"1") == 0)
||
(strcasecmp(a,"no") == 0) || (strcasecmp(a,"false") == 0) || (strcasecmp(a,"0") == 0)
) {
if (
(strcasecmp(a,"yes") == 0) || (strcasecmp(a,"true") == 0) || (strcasecmp(a,"1") == 0)
) {
newQR->switch_to_fast_forward = true;
}
}
parsed = true;
}
}
}
if ( j_attributes.find("flagOUTs") != j_attributes.end() ) {
newQR->flagOUT_ids = new vector<int>;
newQR->flagOUT_weights = new vector<int>;
Expand Down Expand Up @@ -1978,6 +2017,9 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses
// if we arrived here, we have a match
qr->hits++; // this is done without atomic function because it updates only the local variables
bool set_flagOUT=false;
if (qr->switch_to_fast_forward == true) {
ret->switch_to_fast_forward = true;
}
if (qr->flagOUT_weights_total > 0) {
int rnd = random() % qr->flagOUT_weights_total;
for (unsigned int i=0; i< qr->flagOUT_weights->size(); i++) {
Expand Down
Loading