Skip to content

Commit 337c8cb

Browse files
committed
Eventlet/gevent can now recover from broker connection loss. Closes celery#959
1 parent e3ef841 commit 337c8cb

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

celery/worker/consumer.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
from time import sleep
8181
from Queue import Empty
8282

83+
from kombu.syn import _detect_environment
8384
from kombu.utils.encoding import safe_repr
8485
from kombu.utils.eventio import READ, WRITE, ERR
8586

@@ -356,6 +357,12 @@ def __init__(self, ready_queue,
356357
if not hub:
357358
self.amqheartbeat = 0
358359

360+
if _detect_environment() == 'gevent':
361+
# there's a gevent bug that causes timeouts to not be reset,
362+
# so if the connection timeout is exceeded once, it can NEVER
363+
# connect again.
364+
self.app.conf.BROKER_CONNECTION_TIMEOUT = None
365+
359366
def update_strategies(self):
360367
S = self.strategies
361368
app = self.app
@@ -605,7 +612,7 @@ def close_connection(self):
605612
debug('Closing broker connection...')
606613
self.maybe_conn_error(connection.close)
607614

608-
def stop_consumers(self, close_connection=True):
615+
def stop_consumers(self, close_connection=True, join=True):
609616
"""Stop consuming tasks and broadcast commands, also stops
610617
the heartbeat thread and event dispatcher.
611618
@@ -622,7 +629,7 @@ def stop_consumers(self, close_connection=True):
622629
self.heart = self.heart.stop()
623630

624631
debug('Cancelling task consumer...')
625-
if self.task_consumer:
632+
if join and self.task_consumer:
626633
self.maybe_conn_error(self.task_consumer.cancel)
627634

628635
if self.event_dispatcher:
@@ -631,7 +638,7 @@ def stop_consumers(self, close_connection=True):
631638
self.maybe_conn_error(self.event_dispatcher.close)
632639

633640
debug('Cancelling broadcast consumer...')
634-
if self.broadcast_consumer:
641+
if join and self.broadcast_consumer:
635642
self.maybe_conn_error(self.broadcast_consumer.cancel)
636643

637644
if close_connection:
@@ -706,7 +713,7 @@ def reset_connection(self):
706713
"""Re-establish the broker connection and set up consumers,
707714
heartbeat and the event dispatcher."""
708715
debug('Re-establishing connection to the broker...')
709-
self.stop_consumers()
716+
self.stop_consumers(join=False)
710717

711718
# Clear internal queues to get rid of old messages.
712719
# They can't be acked anyway, as a delivery tag is specific
@@ -793,7 +800,7 @@ def stop(self):
793800
# anymore.
794801
self.close()
795802
debug('Stopping consumers...')
796-
self.stop_consumers(close_connection=False)
803+
self.stop_consumers(close_connection=False, join=True)
797804

798805
def close(self):
799806
self._state = CLOSE

0 commit comments

Comments
 (0)