Skip to content

Commit 8809635

Browse files
author
Venkatesh Duggirala
committed
Bug#23581389: SEMI-SYNC CAN'T RUN NORMALLY WHEN MANY CONNECTIONS
Problem & Analysis: Semisync Ack thread uses select() call to check if sockets are available for read. But select() uses FD_SET which is limited by FD_SETSIZE(1024 by default). Select() man page says: ####################### "An fd_set is a fixed size buffer. Executing FD_CLR() or FD_SET() with a value of fd that is negative or is equal to or larger than FD_SETSIZE will result in undefined behavior." ######################## Fix: Semisync ack thread is changed to use poll() instead of select(). On some systems, poll may not be available, hence the code is wrapped around HAVE_POLL define. On the systems that still use select(), Ack receive thread will generate an error and semi sync will be switched off. Windows systems is exception case because on windows this limitation does not exists.
1 parent 05bea2b commit 8809635

File tree

3 files changed

+150
-48
lines changed

3 files changed

+150
-48
lines changed

plugin/semisync/semisync_master_ack_receiver.cc

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include "semisync_master.h"
1717
#include "semisync_master_ack_receiver.h"
18+
#include "semisync_master_socket_listener.h"
1819

1920
extern ReplSemiSyncMaster repl_semisync;
2021

@@ -186,22 +187,6 @@ inline void Ack_receiver::wait_for_slave_connection()
186187
mysql_cond_wait(&m_cond, &m_mutex);
187188
}
188189

189-
my_socket Ack_receiver::get_slave_sockets(fd_set *fds)
190-
{
191-
my_socket max_fd= INVALID_SOCKET;
192-
unsigned int i;
193-
194-
FD_ZERO(fds);
195-
for (i= 0; i < m_slaves.size(); i++)
196-
{
197-
my_socket fd= m_slaves[i].sock_fd();
198-
max_fd= (fd > max_fd ? fd : max_fd);
199-
FD_SET(fd, fds);
200-
}
201-
202-
return max_fd;
203-
}
204-
205190
/* Auxilary function to initialize a NET object with given net buffer. */
206191
static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
207192
{
@@ -216,10 +201,12 @@ void Ack_receiver::run()
216201
{
217202
NET net;
218203
unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
219-
220-
fd_set read_fds;
221-
my_socket max_fd= INVALID_SOCKET;
222204
uint i;
205+
#ifdef HAVE_POLL
206+
Poll_socket_listener listener(m_slaves);
207+
#else
208+
Select_socket_listener listener(m_slaves);
209+
#endif //HAVE_POLL
223210

224211
sql_print_information("Starting ack receiver thread");
225212

@@ -231,7 +218,6 @@ void Ack_receiver::run()
231218

232219
while (1)
233220
{
234-
fd_set fds;
235221
Slave_vector_it it;
236222
int ret;
237223

@@ -248,25 +234,19 @@ void Ack_receiver::run()
248234
mysql_mutex_unlock(&m_mutex);
249235
continue;
250236
}
251-
252-
max_fd= get_slave_sockets(&read_fds);
237+
if (!listener.init_slave_sockets())
238+
goto end;
253239
m_slaves_changed= false;
254-
DBUG_PRINT("info", ("fd count %lu, max_fd %d", (ulong)m_slaves.size(),
255-
max_fd));
256240
}
257-
258-
struct timeval tv= {1, 0};
259-
fds= read_fds;
260-
/* select requires max fd + 1 for the first argument */
261-
ret= select(max_fd+1, &fds, NULL, NULL, &tv);
241+
ret= listener.listen_on_sockets();
262242
if (ret <= 0)
263243
{
264244
mysql_mutex_unlock(&m_mutex);
265245

266246
ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
267247

268-
if (ret == -1)
269-
sql_print_information("Failed to select() on semi-sync dump sockets, "
248+
if (ret == -1 && errno != EINTR)
249+
sql_print_information("Failed to wait on semi-sync dump sockets, "
270250
"error: errno=%d", socket_errno);
271251
/* Sleep 1us, so other threads can catch the m_mutex easily. */
272252
my_sleep(1);
@@ -277,7 +257,7 @@ void Ack_receiver::run()
277257
i= 0;
278258
while (i < m_slaves.size())
279259
{
280-
if (FD_ISSET(m_slaves[i].sock_fd(), &fds))
260+
if (listener.is_socket_active(i))
281261
{
282262
ulong len;
283263

@@ -289,7 +269,7 @@ void Ack_receiver::run()
289269
repl_semisync.reportReplyPacket(m_slaves[i].server_id(),
290270
net.read_pos, len);
291271
else if (net.last_errno == ER_NET_READ_ERROR)
292-
FD_CLR(m_slaves[i].sock_fd(), &read_fds);
272+
listener.clear_socket_info(i);
293273
}
294274
i++;
295275
}

plugin/semisync/semisync_master_ack_receiver.h

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
1+
/* Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved.
22
33
This program is free software; you can redistribute it and/or modify
44
it under the terms of the GNU General Public License as published by
@@ -21,6 +21,18 @@
2121
#include "my_thread.h"
2222
#include "sql_class.h"
2323

24+
struct Slave
25+
{
26+
THD *thd;
27+
Vio vio;
28+
29+
my_socket sock_fd() const { return vio.mysql_socket.fd; }
30+
uint server_id() const { return thd->server_id; }
31+
};
32+
33+
typedef std::vector<Slave> Slave_vector;
34+
typedef Slave_vector::iterator Slave_vector_it;
35+
2436
/**
2537
Ack_receiver is responsible to control ack receive thread and maintain
2638
slave information used by ack receive thread.
@@ -100,20 +112,7 @@ class Ack_receiver : public ReplSemiSyncBase
100112
mysql_cond_t m_cond;
101113
/* If slave list is updated(add or remove). */
102114
bool m_slaves_changed;
103-
104-
struct Slave
105-
{
106-
THD *thd;
107-
Vio vio;
108-
109-
my_socket sock_fd() { return vio.mysql_socket.fd; }
110-
uint server_id() { return thd->server_id; }
111-
};
112-
113-
typedef std::vector<Slave> Slave_vector;
114-
typedef Slave_vector::iterator Slave_vector_it;
115115
Slave_vector m_slaves;
116-
117116
my_thread_handle m_pid;
118117

119118
/* Declare them private, so no one can copy the object. */
@@ -122,7 +121,6 @@ class Ack_receiver : public ReplSemiSyncBase
122121

123122
void set_stage_info(const PSI_stage_info &stage);
124123
void wait_for_slave_connection();
125-
my_socket get_slave_sockets(fd_set *fds);
126124
};
127125

128126
extern Ack_receiver ack_receiver;
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/* Copyright (c) 2016 Oracle and/or its affiliates. All rights reserved.
2+
3+
This program is free software; you can redistribute it and/or modify
4+
it under the terms of the GNU General Public License as published by
5+
the Free Software Foundation; version 2 of the License.
6+
7+
This program is distributed in the hope that it will be useful,
8+
but WITHOUT ANY WARRANTY; without even the implied warranty of
9+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10+
GNU General Public License for more details.
11+
12+
You should have received a copy of the GNU General Public License
13+
along with this program; if not, write to the Free Software
14+
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15+
16+
#ifndef SEMISYNC_MASTER_SOCKET_LISTENER
17+
#define SEMISYNC_MASTER_SOCKET_LISTENER
18+
#include "semisync_master_ack_receiver.h"
19+
20+
#ifdef HAVE_POLL
21+
#include <sys/poll.h>
22+
#include <vector>
23+
24+
class Poll_socket_listener
25+
{
26+
public:
27+
Poll_socket_listener(const Slave_vector &slaves)
28+
:m_slaves(slaves)
29+
{
30+
}
31+
32+
bool listen_on_sockets()
33+
{
34+
return poll(m_fds.data(), m_fds.size(), 1000 /*1 Second timeout*/);
35+
}
36+
37+
bool is_socket_active(int index)
38+
{
39+
return m_fds[index].revents & POLLIN;
40+
}
41+
42+
void clear_socket_info(int index)
43+
{
44+
m_fds[index].fd= -1;
45+
m_fds[index].events= 0;
46+
}
47+
48+
bool init_slave_sockets()
49+
{
50+
m_fds.clear();
51+
for (uint i= 0; i < m_slaves.size(); i++)
52+
{
53+
pollfd poll_fd;
54+
poll_fd.fd= m_slaves[i].sock_fd();
55+
poll_fd.events= POLLIN;
56+
m_fds.push_back(poll_fd);
57+
}
58+
return true;
59+
}
60+
61+
private:
62+
const Slave_vector &m_slaves;
63+
std::vector<pollfd> m_fds;
64+
};
65+
66+
#else //NO POLL
67+
68+
class Select_socket_listener
69+
{
70+
public:
71+
Select_socket_listener(const Slave_vector &slaves)
72+
:m_slaves(slaves), m_max_fd(INVALID_SOCKET)
73+
{
74+
}
75+
76+
bool listen_on_sockets()
77+
{
78+
/* Reinitialze the fds with active fds before calling select */
79+
m_fds= m_init_fds;
80+
struct timeval tv= {1,0};
81+
/* select requires max fd + 1 for the first argument */
82+
return select(m_max_fd+1, &m_fds, NULL, NULL, &tv);
83+
}
84+
85+
bool is_socket_active(int index)
86+
{
87+
return FD_ISSET(m_slaves[index].sock_fd(), &m_fds);
88+
}
89+
90+
void clear_socket_info(int index)
91+
{
92+
FD_CLR(m_slaves[index].sock_fd(), &m_init_fds);
93+
}
94+
95+
bool init_slave_sockets()
96+
{
97+
FD_ZERO(&m_init_fds);
98+
for (uint i= 0; i < m_slaves.size(); i++)
99+
{
100+
my_socket socket_id= m_slaves[i].sock_fd();
101+
m_max_fd= (socket_id > m_max_fd ? socket_id : m_max_fd);
102+
#ifndef WINDOWS
103+
if (socket_id > FD_SETSIZE)
104+
{
105+
sql_print_error("Semisync slave socket fd is %u. "
106+
"select() cannot handle if the socket fd is "
107+
"bigger than %u (FD_SETSIZE).", socket_id, FD_SETSIZE);
108+
return false;
109+
}
110+
#endif //WINDOWS
111+
FD_SET(socket_id, &m_init_fds);
112+
}
113+
return true;
114+
}
115+
116+
private:
117+
const Slave_vector &m_slaves;
118+
my_socket m_max_fd;
119+
fd_set m_init_fds;
120+
fd_set m_fds;
121+
};
122+
123+
#endif //HAVE_POLL
124+
#endif //SEMISYNC_MASTER_SOCKET_LISTENER

0 commit comments

Comments
 (0)