Skip to content

Commit bfbfae3

Browse files
committed
BUG#24595459 - XPLUGIN: SERVER DOES NOT SEEM TO RESPOND TO "0-SIZED" MESSAGES CORRECTLY
Description =========== When client sends zero length message, X Plugin doesn't respond to that. After next following messages client receives an error E_X_BAD_MESSAGE and gets disconnected (it depends on next message header content). Analyzis ======== The X Protocol message consist from header and payload. The header contains only the payload lenght (4 bytes). First byte in payload is message type, after which comes the protobuf binary content. X Plugin always reads 5 bytes, thus sending message with "0-payload" transmits only the 4 bytes what causes out-of-sync on X Protocol packet layer. Fix === * X Plugin is going to read first 4 bytes, in case of zero lenght it is going to send back `Mysqlx.Error` message with error ER_X_BAD_MESSAGE (5000) and text "Messages without payload are not supported", in case of valid lenght it is going to read whole payload and take the message type from the first byte. * renamed status variable 'Mysqlx_unknown_message_type' to 'Mysqlx_errors_unknown_message_type' which was introduced in BUG#24611754 Reviewed-by: Grzegorz Szwarc <[email protected]> Reviewed by: Alfredo Kojima <[email protected]> RB: 14231
1 parent da2b6dd commit bfbfae3

22 files changed

+219
-151
lines changed

rapid/plugin/x/mysqlxtest_src/mysqlxtest.cc

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -892,9 +892,9 @@ class Command
892892
m_commands["assert_eq "] = &Command::cmd_assert_eq;
893893
m_commands["assert_gt "] = &Command::cmd_assert_gt;
894894
m_commands["assert_ge "] = &Command::cmd_assert_ge;
895-
m_commands["query_result"] = &Command::cmd_query;
896-
m_commands["noquery_result"] = &Command::cmd_noquery;
897-
m_commands["wait_for "] = &Command::cmd_wait_for;
895+
m_commands["query_result"] = &Command::cmd_query;
896+
m_commands["noquery_result"] = &Command::cmd_noquery;
897+
m_commands["wait_for "] = &Command::cmd_wait_for;
898898
}
899899

900900
bool is_command_syntax(const std::string &cmd) const
@@ -1056,7 +1056,21 @@ class Command
10561056
{
10571057
bool was_set = false;
10581058

1059-
cmd_recvresult(context, CMD_ARG_BE_QUIET, ngs::bind(&Command::set_variable, ngs::ref(was_set), args, ngs::placeholders::_1));
1059+
std::string args_cmd = args;
1060+
std::vector<std::string> args_array;
1061+
boost::algorithm::trim(args_cmd);
1062+
1063+
boost::split(args_array, args_cmd, boost::is_any_of(" "), boost::token_compress_off);
1064+
1065+
args_cmd = CMD_ARG_BE_QUIET;
1066+
1067+
if (args_array.size() > 1)
1068+
{
1069+
args_cmd += " ";
1070+
args_cmd += args_array.at(1);
1071+
}
1072+
1073+
cmd_recvresult(context, args_cmd, ngs::bind(&Command::set_variable, ngs::ref(was_set), args_array.at(0), ngs::placeholders::_1));
10601074

10611075
if (!was_set)
10621076
{
@@ -1560,10 +1574,11 @@ class Command
15601574
return Stop_with_failure;
15611575
}
15621576

1577+
context.m_cm->active()->set_closed();
1578+
15631579
if (context.m_cm->is_default_active())
15641580
return Stop_with_success;
15651581

1566-
context.m_cm->active()->set_closed();
15671582
context.m_cm->close_active(false);
15681583

15691584
return Continue;
@@ -1817,12 +1832,7 @@ class Command
18171832
std::string::size_type p = args.find(' ');
18181833
if (p == std::string::npos)
18191834
{
1820-
if (variables.find(args) == variables.end())
1821-
{
1822-
std::cerr << "Invalid variable " << args << "\n";
1823-
return Stop_with_failure;
1824-
}
1825-
variables.erase(args);
1835+
variables[args] = "";
18261836
}
18271837
else
18281838
{
@@ -2998,8 +3008,9 @@ class My_command_line_options : public Command_line_options
29983008
std::cout << " Read and print (if not quiet) one message from the server\n";
29993009
std::cout << "-->recvresult [print-columnsinfo] [" << CMD_ARG_BE_QUIET << "]\n";
30003010
std::cout << " Read and print one resultset from the server; if print-columnsinfo is present also print short columns status\n";
3001-
std::cout << "-->recvtovar <varname>\n";
3002-
std::cout << " Read and print one resultset from the server and sets the variable from first row\n";
3011+
std::cout << "-->recvtovar <varname> [COLUMN_NAME]\n";
3012+
std::cout << " Read first row and first column (or column with name COLUMN_NAME) of resultset\n";
3013+
std::cout << " and set the variable <varname>\n";
30033014
std::cout << "-->recverror <errno>\n";
30043015
std::cout << " Read a message and ensure that it's an error of the expected type\n";
30053016
std::cout << "-->recvtype <msgtype>\n";

rapid/plugin/x/ngs/include/ngs/protocol_monitor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,14 @@ class Protocol_monitor_interface
3232

3333
virtual void on_notice_warning_send() = 0;
3434
virtual void on_notice_other_send() = 0;
35-
virtual void on_error_send() = 0;
3635
virtual void on_fatal_error_send() = 0;
3736
virtual void on_init_error_send() = 0;
3837
virtual void on_row_send() = 0;
3938
virtual void on_send(long bytes_transferred) = 0;
4039
virtual void on_receive(long bytes_transferred) = 0;
41-
virtual void on_unknown_msg_type() = 0;
40+
41+
virtual void on_error_send() = 0;
42+
virtual void on_error_unknown_msg_type() = 0;
4243
};
4344

4445

rapid/plugin/x/ngs/src/client.cc

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ void Client::handle_message(Request &request)
200200

201201
default:
202202
// invalid message at this time
203-
m_protocol_monitor.on_unknown_msg_type();
203+
m_protocol_monitor.on_error_unknown_msg_type();
204204
log_info("%s: Invalid message %i received during client initialization", client_id(), request.get_type());
205205
m_encoder->send_result(ngs::Fatal(ER_X_BAD_MESSAGE, "Invalid message"));
206206
m_close_reason = Close_error;
@@ -417,7 +417,7 @@ Request *Client::read_one_message(Error_code &ret_error)
417417
{
418418
union
419419
{
420-
char buffer[5]; // Must be properly aligned
420+
char buffer[4]; // Must be properly aligned
421421
longlong dummy;
422422
};
423423
uint32_t msg_size;
@@ -431,7 +431,7 @@ Request *Client::read_one_message(Error_code &ret_error)
431431
// untill we get another message to process we mark the connection as idle (for PSF)
432432
m_connection->mark_idle();
433433
// read the frame
434-
ssize_t nread = m_connection->read(buffer, 5);
434+
ssize_t nread = m_connection->read(buffer, 4);
435435
m_connection->mark_active();
436436

437437
if (nread == 0) // EOF
@@ -460,7 +460,6 @@ Request *Client::read_one_message(Error_code &ret_error)
460460
#endif
461461
const uint32_t* pdata = (uint32_t*)(buffer);
462462
msg_size = *pdata;
463-
int8_t type = (int8_t)buffer[4];
464463

465464
if (msg_size > m_server.get_config()->max_message_size)
466465
{
@@ -471,49 +470,47 @@ Request *Client::read_one_message(Error_code &ret_error)
471470
return NULL;
472471
}
473472

474-
Request_unique_ptr request(ngs::allocate_object<Request>(type));
475-
476-
if (msg_size > 1)
473+
if (0 == msg_size)
477474
{
478-
if (m_msg_buffer_size < msg_size-1)
479-
{
480-
m_msg_buffer_size = msg_size - 1;
481-
ngs::reallocate_array(m_msg_buffer, m_msg_buffer_size, KEY_memory_x_recv_buffer);
482-
}
483-
484-
nread = m_connection->read(&m_msg_buffer[0], msg_size-1);
485-
if (nread == 0) // EOF
486-
{
487-
log_info("%s: peer disconnected while reading message body", client_id());
488-
on_network_error(0);
489-
return NULL;
490-
}
491-
if (nread < 0)
492-
{
493-
int err;
494-
std::string strerr;
495-
Connection_vio::get_error(err, strerr);
496-
log_info("%s: ERROR reading from socket %s (%i)", client_id(), strerr.c_str(), err);
497-
on_network_error(err);
498-
return NULL;
499-
}
500-
m_protocol_monitor.on_receive(static_cast<long>(nread));
501-
502-
request->buffer(m_msg_buffer, msg_size-1);
475+
ret_error = Error(ER_X_BAD_MESSAGE, "Messages without payload are not supported");
476+
return NULL;
477+
}
503478

504-
ret_error = m_decoder.parse(*request);
505-
return request.release();
479+
if (m_msg_buffer_size < msg_size)
480+
{
481+
m_msg_buffer_size = msg_size;
482+
ngs::reallocate_array(m_msg_buffer, m_msg_buffer_size, KEY_memory_x_recv_buffer);
506483
}
507-
else if (msg_size == 1)
484+
485+
nread = m_connection->read(&m_msg_buffer[0], msg_size);
486+
if (nread == 0) // EOF
508487
{
509-
ret_error = m_decoder.parse(*request);
510-
return request.release();
488+
log_info("%s: peer disconnected while reading message body", client_id());
489+
on_network_error(0);
490+
return NULL;
511491
}
512-
else
492+
493+
if (nread < 0)
513494
{
514-
ret_error = Error_code(ER_X_BAD_MESSAGE, "Invalid message");
495+
int err;
496+
std::string strerr;
497+
Connection_vio::get_error(err, strerr);
498+
log_info("%s: ERROR reading from socket %s (%i)", client_id(), strerr.c_str(), err);
499+
on_network_error(err);
515500
return NULL;
516501
}
502+
503+
m_protocol_monitor.on_receive(static_cast<long>(nread));
504+
505+
int8_t type = (int8_t)m_msg_buffer[0];
506+
Request_unique_ptr request(ngs::allocate_object<Request>(type));
507+
508+
if (msg_size > 1)
509+
request->buffer(&m_msg_buffer[1], msg_size - 1);
510+
511+
ret_error = m_decoder.parse(*request);
512+
513+
return request.release();
517514
}
518515

519516

rapid/plugin/x/ngs/src/client_session.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ bool Session::handle_auth_message(ngs::Request &command)
182182
}
183183
else
184184
{
185-
m_encoder->get_protocol_monitor().on_unknown_msg_type();
185+
m_encoder->get_protocol_monitor().on_error_unknown_msg_type();
186186
log_info("%s: Unexpected message of type %i received during authentication", m_client.client_id(), type);
187187
m_encoder->send_init_error(ngs::Fatal(ER_X_BAD_MESSAGE, "Invalid message"));
188188
stop_auth();

rapid/plugin/x/src/xpl_client.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ void Protocol_monitor::on_receive(long bytes_transferred)
278278
}
279279

280280

281-
void Protocol_monitor::on_unknown_msg_type()
281+
void Protocol_monitor::on_error_unknown_msg_type()
282282
{
283-
update_status<&Common_status_variables::m_unknown_message_type>(m_client->get_session());
283+
update_status<&Common_status_variables::m_errors_unknown_message_type>(m_client->get_session());
284284
}

rapid/plugin/x/src/xpl_client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ namespace xpl
4646
virtual void on_row_send();
4747
virtual void on_send(long bytes_transferred);
4848
virtual void on_receive(long bytes_transferred);
49-
virtual void on_unknown_msg_type();
49+
virtual void on_error_unknown_msg_type();
5050

5151
private:
5252
Client *m_client;

rapid/plugin/x/src/xpl_common_status_variables.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class Common_status_variables
7070
Variable m_rows_sent;
7171
Variable m_notice_warning_sent;
7272
Variable m_notice_other_sent;
73-
Variable m_unknown_message_type;
73+
Variable m_errors_unknown_message_type;
7474

7575
private:
7676
Common_status_variables(const Common_status_variables &);

rapid/plugin/x/src/xpl_dispatcher.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ ngs::Error_code do_dispatch_command(xpl::Session &session, xpl::Crud_command_han
193193
return on_expect_close(session, expect, static_cast<const Mysqlx::Expect::Close&>(*command.message()));
194194
}
195195

196-
session.proto().get_protocol_monitor().on_unknown_msg_type();
196+
session.proto().get_protocol_monitor().on_error_unknown_msg_type();
197197
return ngs::Error(ER_UNKNOWN_COM_ERROR, "Unexpected message received");
198198
}
199199

rapid/plugin/x/src/xpl_plugin.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ static struct st_mysql_show_var xpl_plugin_status[]=
245245
{ MYSQLX_STATUS_VARIABLE_PREFIX("rows_sent"), xpl_func_ptr(xpl::Server::common_status_variable<long long, &xpl::Common_status_variables::m_rows_sent>), SHOW_FUNC, SHOW_SCOPE_GLOBAL },
246246
{ MYSQLX_STATUS_VARIABLE_PREFIX("notice_warning_sent"), xpl_func_ptr(xpl::Server::common_status_variable<long long, &xpl::Common_status_variables::m_notice_warning_sent>), SHOW_FUNC, SHOW_SCOPE_GLOBAL },
247247
{ MYSQLX_STATUS_VARIABLE_PREFIX("notice_other_sent"), xpl_func_ptr(xpl::Server::common_status_variable<long long, &xpl::Common_status_variables::m_notice_other_sent>), SHOW_FUNC, SHOW_SCOPE_GLOBAL },
248-
{ MYSQLX_STATUS_VARIABLE_PREFIX("unknown_message_type"), xpl_func_ptr(xpl::Server::common_status_variable<long long, &xpl::Common_status_variables::m_unknown_message_type>), SHOW_FUNC, SHOW_SCOPE_GLOBAL },
248+
{ MYSQLX_STATUS_VARIABLE_PREFIX("errors_unknown_message_type"), xpl_func_ptr(xpl::Server::common_status_variable<long long, &xpl::Common_status_variables::m_errors_unknown_message_type>), SHOW_FUNC, SHOW_SCOPE_GLOBAL },
249249
{ MYSQLX_STATUS_VARIABLE_PREFIX("sessions"), xpl_func_ptr(xpl::Server::global_status_variable_server<long long, &xpl::Global_status_variables::m_sessions_count>), SHOW_FUNC, SHOW_SCOPE_GLOBAL },
250250
{ MYSQLX_STATUS_VARIABLE_PREFIX("sessions_closed"), xpl_func_ptr(xpl::Server::global_status_variable_server<long long, &xpl::Global_status_variables::m_closed_sessions_count>), SHOW_FUNC, SHOW_SCOPE_GLOBAL },
251251
{ MYSQLX_STATUS_VARIABLE_PREFIX("sessions_fatal_error"), xpl_func_ptr(xpl::Server::global_status_variable_server<long long, &xpl::Global_status_variables::m_sessions_fatal_errors_count>), SHOW_FUNC, SHOW_SCOPE_GLOBAL },

rapid/plugin/x/tests/mtr/r/largedata.result

Lines changed: 0 additions & 70 deletions
This file was deleted.

0 commit comments

Comments
 (0)