Skip to content

Commit 2e10637

Browse files
committed
Merge branch 'Gjum-multisocket'
2 parents 2cbebd1 + 3e87135 commit 2e10637

File tree

3 files changed

+129
-84
lines changed

3 files changed

+129
-84
lines changed

spockbot/plugins/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from spockbot.plugins.core import auth, event, net, taskmanager, ticker, timers
1+
from spockbot.plugins.core import auth, event, net, select, \
2+
taskmanager, ticker, timers
23
from spockbot.plugins.helpers import auxiliary, channels, chat, clientinfo, \
34
craft, entities, interact, inventory, movement, \
45
pathfinding, physics, start, world
@@ -7,6 +8,7 @@
78
('auth', auth.AuthPlugin),
89
('event', event.EventPlugin),
910
('net', net.NetPlugin),
11+
('select', select.SelectPlugin),
1012
('taskmanager', taskmanager.TaskManager),
1113
('ticker', ticker.TickerPlugin),
1214
('timers', timers.TimersPlugin),

spockbot/plugins/core/net.py

Lines changed: 56 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"""
66

77
import logging
8-
import select
98
import socket
109
import time
1110

@@ -36,44 +35,11 @@ def decrypt(self, data):
3635
return self.decryptifier.update(data)
3736

3837

39-
class SelectSocket(socket.socket):
40-
"""
41-
Provides an asynchronous socket with a poll method built on
42-
top of select.select for cross-platform compatiability
43-
"""
44-
def __init__(self, timer):
45-
super(SelectSocket, self).__init__(socket.AF_INET, socket.SOCK_STREAM)
46-
self.sending = False
47-
self.timer = timer
48-
49-
def poll(self):
50-
flags = []
51-
if self.sending:
52-
self.sending = False
53-
slist = [(self,), (self,), (self,)]
54-
else:
55-
slist = [(self,), (), (self,)]
56-
timeout = self.timer.get_timeout()
57-
if timeout >= 0:
58-
slist.append(timeout)
59-
try:
60-
rlist, wlist, xlist = select.select(*slist)
61-
except select.error as e:
62-
logger.error("SELECTSOCKET: Socket Error: %s", str(e))
63-
rlist, wlist, xlist = [], [], []
64-
if rlist:
65-
flags.append('SOCKET_RECV')
66-
if wlist:
67-
flags.append('SOCKET_SEND')
68-
if xlist:
69-
flags.append('SOCKET_ERR')
70-
return flags
71-
72-
7338
class NetCore(object):
74-
def __init__(self, sock, event):
39+
def __init__(self, sock, event, select):
7540
self.sock = sock
7641
self.event = event
42+
self.select = select
7743
self.host = None
7844
self.port = None
7945
self.connected = False
@@ -84,21 +50,24 @@ def __init__(self, sock, event):
8450
self.sbuff = b''
8551
self.rbuff = BoundBuffer()
8652

53+
def reset(self, sock):
54+
self.__init__(sock, self.event, self.select)
55+
8756
def connect(self, host='localhost', port=25565):
8857
self.host = host
8958
self.port = port
9059
try:
91-
logger.debug("NETCORE: Attempting to connect to host: %s port: %s",
60+
logger.debug('NETCORE: Attempting to connect to host: %s port: %s',
9261
host, port)
9362
# Set the connect to be a blocking operation
9463
self.sock.setblocking(True)
95-
self.sock.connect((self.host, self.port))
64+
self.sock.connect((host, port))
9665
self.sock.setblocking(False)
9766
self.connected = True
98-
self.event.emit('net_connect', (self.host, self.port))
99-
logger.debug("NETCORE: Connected to host: %s port: %s", host, port)
67+
self.event.emit('net_connect', (host, port))
68+
logger.debug('NETCORE: Connected to host: %s port: %s', host, port)
10069
except socket.error as error:
101-
logger.error("NETCORE: Error on Connect")
70+
logger.error('NETCORE: Error on Connect')
10271
self.event.emit('SOCKET_ERR', error)
10372

10473
def set_proto_state(self, state):
@@ -115,7 +84,7 @@ def push(self, packet):
11584
self.sbuff += (self.cipher.encrypt(data) if self.encrypted else data)
11685
self.event.emit(packet.ident, packet)
11786
self.event.emit(packet.str_ident, packet)
118-
self.sock.sending = True
87+
self.select.schedule_sending(self.sock)
11988

12089
def push_packet(self, ident, data):
12190
self.push(mcpacket.Packet(ident, data))
@@ -152,21 +121,19 @@ def disable_crypto(self):
152121
self.cipher = None
153122
self.encrypted = False
154123

155-
def reset(self, sock):
156-
self.__init__(sock, self.event)
157-
158124

159125
@pl_announce('Net')
160126
class NetPlugin(PluginBase):
161-
requires = ('Event', 'Timers')
127+
requires = ('Event', 'Select', 'Timers')
162128
defaults = {
163129
'bufsize': 4096,
164130
'sock_quit': True,
165131
}
166132
events = {
167133
'event_tick': 'tick',
168-
'SOCKET_RECV': 'handle_recv',
169-
'SOCKET_SEND': 'handle_send',
134+
'select_recv': 'handle_recv',
135+
'select_send': 'handle_send',
136+
'select_err': 'handle_err',
170137
'SOCKET_ERR': 'handle_err',
171138
'SOCKET_HUP': 'handle_hup',
172139
'PLAY<Disconnect': 'handle_disconnect',
@@ -182,25 +149,25 @@ def __init__(self, ploader, settings):
182149
super(NetPlugin, self).__init__(ploader, settings)
183150
self.bufsize = self.settings['bufsize']
184151
self.sock_quit = self.settings['sock_quit']
185-
self.sock = SelectSocket(self.timers)
186-
self.net = NetCore(self.sock, self.event)
152+
self.sock = None
153+
self.net = NetCore(self.sock, self.event, self.select)
154+
self.reset_sock()
187155
self.sock_dead = False
188156
ploader.provides('Net', self.net)
189157

190158
def tick(self, name, data):
191159
if self.net.connected:
192-
for flag in self.sock.poll():
193-
self.event.emit(flag)
160+
self.net.select.poll()
194161
else:
195162
timeout = self.timers.get_timeout()
196163
if timeout == -1:
197164
time.sleep(1)
198165
else:
199166
time.sleep(timeout)
200167

201-
# SOCKET_RECV - Socket is ready to recieve data
202-
def handle_recv(self, name, data):
203-
if self.net.connected:
168+
def handle_recv(self, name, fileno):
169+
"""Socket is ready to recieve data"""
170+
if self.net.connected and fileno == self.net.sock.fileno():
204171
try:
205172
data = self.sock.recv(self.bufsize)
206173
if not data:
@@ -210,67 +177,73 @@ def handle_recv(self, name, data):
210177
except socket.error as error:
211178
self.event.emit('SOCKET_ERR', error)
212179

213-
# SOCKET_SEND - Socket is ready to send data and Send buffer contains
214-
# data to send
215-
def handle_send(self, name, data):
216-
if self.net.connected:
180+
def handle_send(self, name, fileno):
181+
"""Socket is ready to send data and send buffer has data to send"""
182+
if self.net.connected and fileno == self.net.sock.fileno():
217183
try:
218184
sent = self.sock.send(self.net.sbuff)
219185
self.net.sbuff = self.net.sbuff[sent:]
220186
if self.net.sbuff:
221-
self.sock.sending = True
187+
self.net.select.schedule_sending(self.sock)
222188
except socket.error as error:
223189
self.event.emit('SOCKET_ERR', error)
224190

225-
# SOCKET_ERR - Socket Error has occured
226-
def handle_err(self, name, data):
227-
self.sock.close()
228-
self.sock = SelectSocket(self.timers)
229-
self.net.reset(self.sock)
230-
logger.error("NETPLUGIN: Socket Error: %s", data)
231-
self.event.emit('net_disconnect', data)
232-
if self.sock_quit and not self.event.kill_event:
233-
self.sock_dead = True
234-
self.event.kill()
191+
def handle_select_err(self, name, fileno):
192+
if self.net.connected and fileno == self.net.sock.fileno():
193+
self.event.emit('SOCKET_ERR', 'select error')
194+
195+
def handle_err(self, name, error):
196+
"""Socket Error has occured"""
197+
logger.error('NETPLUGIN: Socket Error: %s', error)
198+
self.reset_sock()
199+
self.event.emit('net_disconnect', error)
200+
self.check_quit()
235201

236-
# SOCKET_HUP - Socket has hung up
237202
def handle_hup(self, name, data):
238-
self.sock.close()
239-
self.sock = SelectSocket(self.timers)
203+
"""Socket has hung up"""
204+
logger.error('NETPLUGIN: Socket has hung up')
205+
self.reset_sock()
206+
self.event.emit('net_disconnect', 'Socket Hung Up')
207+
self.check_quit()
208+
209+
def reset_sock(self):
210+
if self.sock:
211+
self.sock.close()
212+
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
213+
self.net.select.register_socket(self.sock)
240214
self.net.reset(self.sock)
241-
logger.error("NETPLUGIN: Socket has hung up")
242-
self.event.emit('net_disconnect', "Socket Hung Up")
215+
216+
def check_quit(self):
243217
if self.sock_quit and not self.event.kill_event:
244218
self.sock_dead = True
245219
self.event.kill()
246220

247-
# Handshake - Change to whatever the next state is going to be
248221
def handle_handshake(self, name, packet):
222+
"""Change to whatever the next state is going to be"""
249223
self.net.set_proto_state(packet.data['next_state'])
250224

251-
# Login Success - Change to Play state
252225
def handle_login_success(self, name, packet):
226+
"""Change to Play state"""
253227
self.net.set_proto_state(proto.PLAY_STATE)
254228

255-
# Handle Set Compression packets
256229
def handle_comp(self, name, packet):
230+
"""Handle Set Compression packets"""
257231
self.net.set_comp_state(packet.data['threshold'])
258232

259233
def handle_disconnect(self, name, packet):
260-
logger.debug("NETPLUGIN: Disconnected: %s", packet.data['reason'])
234+
logger.debug('NETPLUGIN: Disconnected: %s', packet.data['reason'])
261235
self.event.emit('net_disconnect', packet.data['reason'])
262236

263237
def handle_login_disconnect(self, name, packet):
264-
265238
reason = packet.data.get('json_data', {}).get('text', '???')
266-
267239
logger.debug("NETPLUGIN: Disconnected: %s", reason)
268240
self.event.emit('net_disconnect', reason)
269241

270-
# Kill event - Try to shutdown the socket politely
271242
def handle_kill(self, name, data):
243+
"""Try to shutdown the socket politely"""
272244
if self.net.connected:
273-
logger.debug("NETPLUGIN: Kill event received, closing socket")
245+
logger.debug('NETPLUGIN: Kill event received, closing socket')
274246
if not self.sock_dead:
275247
self.sock.shutdown(socket.SHUT_WR)
276248
self.sock.close()
249+
self.net.select.unregister_socket(self.net.sock)

spockbot/plugins/core/select.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""
2+
Provides an asynchronous multi-socket selector with a poll method
3+
built on top of select.select for cross-platform compatibility.
4+
5+
After polling select, two events are emitted for each socket and kind-of-ready,
6+
``select_<kind>`` and ``select_<kind>_<sock.fileno()>``, where
7+
``<kind>`` is one of ``recv, send, err``.
8+
9+
The event payload is always the fileno of the corresponding socket.
10+
(The event plugin deep-copies the payload, but sockets are not serializable)
11+
12+
Note that the event loop is stopped during selecting. This is good in that
13+
the loop does not consume 100% CPU, but it means you have to register
14+
at least a slow timer if you do stuff on ``event_tick`` and
15+
expect it to be emitted frequently.
16+
"""
17+
18+
import logging
19+
import select
20+
21+
from spockbot.plugins.base import PluginBase, pl_announce
22+
23+
logger = logging.getLogger('spockbot')
24+
25+
26+
@pl_announce('Select')
27+
class SelectPlugin(PluginBase):
28+
requires = ('Event', 'Timers')
29+
30+
def __init__(self, ploader, settings):
31+
super(SelectPlugin, self).__init__(ploader, settings)
32+
self.sockets = set()
33+
self.sending = set()
34+
ploader.provides('Select', self)
35+
36+
def register_socket(self, sock):
37+
"""``poll()``ing will emit events when this socket is ready."""
38+
self.sockets.add(sock)
39+
40+
def unregister_socket(self, sock):
41+
self.sockets.remove(sock)
42+
43+
def schedule_sending(self, sock):
44+
"""Emit one event the next time this socket is ready to send."""
45+
self.sending.add(sock)
46+
47+
def poll(self):
48+
timeout = self.timers.get_timeout()
49+
if timeout < 0:
50+
timeout = 5 # do not hang
51+
52+
select_args = [
53+
tuple(self.sockets),
54+
tuple(self.sending),
55+
tuple(self.sockets),
56+
timeout,
57+
]
58+
self.sending.clear()
59+
60+
try:
61+
ready_lists = select.select(*select_args)
62+
except select.error as e:
63+
logger.error('SELECTSOCKET: Socket Error: "%s" %s', str(e), e.args)
64+
return
65+
66+
for ready_socks, kind in zip(ready_lists, ('recv', 'send', 'err')):
67+
for sock in ready_socks:
68+
self.event.emit('select_%s' % kind, sock.fileno())
69+
self.event.emit('select_%s_%s' % (kind, sock.fileno()),
70+
sock.fileno())

0 commit comments

Comments
 (0)