Skip to content

Commit c0f1af3

Browse files
committed
Use readers/writers instead of fdmap
1 parent 9d5aa7c commit c0f1af3

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

celery/worker/consumer.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
from Queue import Empty
8585

8686
from kombu.utils.encoding import safe_repr
87+
from kombu.utils.eventio import READ, WRITE, ERR
8788

8889
from celery.app import app_or_default
8990
from celery.datastructures import AttributeDict
@@ -370,7 +371,7 @@ def consume_messages(self, sleep=sleep, min=min, Empty=Empty):
370371
qos = self.qos
371372
update_qos = qos.update
372373
update_readers = hub.update_readers
373-
fdmap = hub.fdmap
374+
readers, writers = hub.readers, hub.writers
374375
poll = hub.poller.poll
375376
fire_timers = hub.fire_timers
376377
scheduled = hub.timer._queue
@@ -431,12 +432,18 @@ def on_task_received(body, message):
431432
update_qos()
432433

433434
update_readers(on_poll_start())
434-
if fdmap:
435+
if readers or writers:
435436
connection.more_to_read = True
436437
while connection.more_to_read:
437438
for fileno, event in poll(time_to_sleep) or ():
438439
try:
439-
fdmap[fileno](fileno, event)
440+
if event & READ:
441+
readers[fileno](fileno, event)
442+
if event & WRITE:
443+
writers[fileno](fileno, event)
444+
if event & ERR:
445+
readers[fileno](fileno, event)
446+
writers[fileno](fileno, event)
440447
except Empty:
441448
break
442449
except socket.error:

celery/worker/hub.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ class Hub(object):
5151
READ, WRITE, ERR = READ, WRITE, ERR
5252

5353
def __init__(self, timer=None):
54-
self.fdmap = {}
54+
self.readers = {}
55+
self.writers = {}
5556
self.timer = Schedule() if timer is None else timer
5657
self.on_init = []
58+
self.on_close = []
5759
self.on_task = []
5860

5961
def start(self):
@@ -85,11 +87,12 @@ def fire_timers(self, min_delay=1, max_delay=10, max_timers=10):
8587

8688
def add(self, fd, callback, flags):
8789
self.poller.register(fd, flags)
88-
try:
89-
fileno = fd.fileno()
90-
except AttributeError:
91-
fileno = fd
92-
self.fdmap[fileno] = callback
90+
if not isinstance(fd, int):
91+
fd = fd.fileno()
92+
if flags & READ:
93+
self.readers[fd] = callback
94+
if flags & WRITE:
95+
self.writers[fd] = callback
9396

9497
def add_reader(self, fd, callback):
9598
return self.add(fd, callback, READ | ERR)
@@ -110,7 +113,10 @@ def remove(self, fd):
110113
pass
111114

112115
def close(self):
113-
[self.remove(fd) for fd in self.fdmap.keys()]
116+
[self.remove(fd) for fd in self.readers.keys()]
117+
[self.remove(fd) for fd in self.writers.keys()]
118+
for callback in self.on_close:
119+
callback(self)
114120

115121
@cached_property
116122
def scheduler(self):

0 commit comments

Comments
 (0)