|
| 1 | +=============== |
| 2 | + Workers Guide |
| 3 | +=============== |
| 4 | + |
| 5 | +Starting the worker |
| 6 | +=================== |
| 7 | + |
| 8 | +Starting celeryd in the foreground:: |
| 9 | + |
| 10 | + $ celeryd --loglevel=INFO |
| 11 | + |
| 12 | +You probably want to use a daemonization tool to start and stop |
| 13 | +``celeryd`` in the background, see :doc:`../cookbook/daemonizing` for help using |
| 14 | +some of the most popular solutions. |
| 15 | + |
| 16 | +For a full list of available command line options see :mod:`~celery.bin.celeryd`. |
| 17 | + |
| 18 | +You can also start multiple celeryd's on the same machine, but if you do so |
| 19 | +be sure to give a unique name to each individual worker by specifying the |
| 20 | +``-hostname`` argument:: |
| 21 | + |
| 22 | + $ celeryd --loglevel=INFO --concurrency=10 -n worker1.example.com |
| 23 | + $ celeryd --loglevel=INFO --concurrency=10 -n worker2.example.com |
| 24 | + $ celeryd --loglevel=INFO --concurrency=10 -n worker3.example.com |
| 25 | + |
| 26 | +Stopping the worker |
| 27 | +=================== |
| 28 | + |
| 29 | +Shutdown should be accomplished using the ``TERM`` signal (although ``INT`` |
| 30 | +also works). |
| 31 | + |
| 32 | +When shutdown is initiated the worker will finish any tasks it's currently |
| 33 | +executing before it terminates, so if these tasks are important you should |
| 34 | +wait for it to finish before doing anything drastic (like sending the ``KILL`` |
| 35 | +signal). |
| 36 | + |
| 37 | +If the worker won't shutdown after considerate time, you probably have hanging |
| 38 | +tasks, in this case it's safe to use the ``KILL`` signal but be aware that |
| 39 | +currently executing tasks will be lost (unless the tasks have the |
| 40 | +:attr:`~celery.task.base.Task.acks_late` option set). |
| 41 | + |
| 42 | +Also, since the ``KILL`` signal can't be catched by processes the worker will |
| 43 | +not be able to reap its children, so make sure you do it manually. This |
| 44 | +command usually does the trick:: |
| 45 | + |
| 46 | + $ ps auxww | grep celeryd | awk '{print $2}' | xargs kill -KILL |
| 47 | + |
| 48 | +Restarting the worker |
| 49 | +===================== |
| 50 | + |
| 51 | +Other than stopping then starting the worker to restart, you can also |
| 52 | +restart the worker using the ``HUP`` signal:: |
| 53 | + |
| 54 | + $ kill -HUP $pid |
| 55 | + |
| 56 | +The worker will then replace itself using the same arguments as it was |
| 57 | +started with. |
| 58 | + |
| 59 | +Concurrency |
| 60 | +=========== |
| 61 | + |
| 62 | +Multiprocessing is used to perform concurrent execution of tasks. The number |
| 63 | +of worker processes can be changed using the ``--concurrency`` argument, and |
| 64 | +defaults to the number of CPUs in the system. |
| 65 | + |
| 66 | +More worker processes are usually better, but there's a cut-off point where |
| 67 | +adding more processes affects performance in negative ways. |
| 68 | +There is even some evidence to support that having multiple celeryd's running, |
| 69 | +may perform better than having a single worker. For example 3 celeryd's with |
| 70 | +10 worker processes each, but you need to experiment to find the values that |
| 71 | +works best for you, as this varies based on application, work load, task |
| 72 | +runtimes and other factors. |
| 73 | + |
| 74 | +Time limits |
| 75 | +=========== |
| 76 | + |
| 77 | +A single task can potentially run forever, if you have lots of tasks |
| 78 | +waiting for some event that will never happen you will block the worker |
| 79 | +from processing new tasks indefinitely. The best way to defend against |
| 80 | +this scenario happening is enabling time limits. |
| 81 | + |
| 82 | +The time limit (``--time-limit``) is the maximum number of seconds a task |
| 83 | +may run before the process executing it is terminated and replaced by a |
| 84 | +new process. You can also enable a soft time limit (``--soft-time-limit``), |
| 85 | +this raises an exception that the task can catch to clean up before the hard |
| 86 | +time limit kills it: |
| 87 | + |
| 88 | +.. code-block:: python |
| 89 | + |
| 90 | + from celery.decorators import task |
| 91 | + from celery.exceptions import SoftTimeLimitExceeded |
| 92 | + |
| 93 | + @task() |
| 94 | + def mytask(): |
| 95 | + try: |
| 96 | + do_work() |
| 97 | + except SoftTimeLimitExceeded: |
| 98 | + clean_up_in_a_hurry() |
| 99 | + |
| 100 | +Time limits can also be set using the ``CELERYD_TASK_TIME_LIMIT`` / |
| 101 | +``CELERYD_SOFT_TASK_TIME_LIMIT`` settings. |
| 102 | + |
| 103 | +**NOTE** Time limits does not currently work on Windows. |
| 104 | + |
| 105 | + |
| 106 | +Max tasks per child setting |
| 107 | +=========================== |
| 108 | + |
| 109 | +With this option you can configure the maximum number of tasks |
| 110 | +a worker can execute before it's replaced by a new process. |
| 111 | + |
| 112 | +This is useful if you have memory leaks you have no control over, |
| 113 | +for example closed source C extensions. |
| 114 | + |
| 115 | +The option can be set using the ``--maxtasksperchild`` argument |
| 116 | +to ``celeryd`` or using the ``CELERYD_MAX_TASKS_PER_CHILD`` setting. |
| 117 | + |
| 118 | +Remote control |
| 119 | +============== |
| 120 | + |
| 121 | +Workers have the ability to be remote controlled using a broadcast message |
| 122 | +queue. The commands can be directed to all, or a specific list of workers. |
| 123 | + |
| 124 | +Commands can also have replies, the client can then wait for and collect |
| 125 | +those replies, but since there's no central authority to know how many |
| 126 | +workers are available in the cluster, there is also no way to estimate |
| 127 | +how many workers may send a reply, therefore the client has a configurable |
| 128 | +timeout - the deadline in seconds for replies to arrive in. This timeout |
| 129 | +defaults to one second. If the worker didn't reply within the deadline, |
| 130 | +it doesn't necessarily mean the worker didn't reply, or worse is dead, but |
| 131 | +may just be caused by network latency or the worker being slow at processing |
| 132 | +commands, so adjust the timeout accordingly. |
| 133 | + |
| 134 | +In addition to timeouts, the client can specify the maximum number |
| 135 | +of replies to wait for. If a destination is specified this limit is set |
| 136 | +to the number of destinations. |
| 137 | + |
| 138 | +The :func:`~celery.task.control.broadcast` function. |
| 139 | +---------------------------------------------------- |
| 140 | + |
| 141 | +This is the client function used to send commands to the workers. |
| 142 | +Some remote control commands also have higher-level interfaces using |
| 143 | +:func:`~celery.task.control.broadcast` in the background, like |
| 144 | +:func:`~celery.task.control.rate_limit` and :func:`~celery.task.control.ping`. |
| 145 | + |
| 146 | +Sending the ``rate_limit`` command and keyword arguments:: |
| 147 | + |
| 148 | + >>> from celery.task.control import broadcast |
| 149 | + >>> broadcast("rate_limit", arguments={"task_name": "myapp.mytask", |
| 150 | + ... "rate_limit": "200/m"}) |
| 151 | + |
| 152 | +This will send the command asynchronously, without waiting for a reply. |
| 153 | +To request a reply you have to use the ``reply`` argument:: |
| 154 | + |
| 155 | + >>> broadcast("rate_limit", {"task_name": "myapp.mytask", |
| 156 | + ... "rate_limit": "200/m"}, reply=True) |
| 157 | + [{'worker1.example.com': 'New rate limit set successfully'}, |
| 158 | + {'worker2.example.com': 'New rate limit set successfully'}, |
| 159 | + {'worker3.example.com': 'New rate limit set successfully'}] |
| 160 | + |
| 161 | +Using the ``destination`` argument you can specify a list of workers |
| 162 | +to receive the command:: |
| 163 | + |
| 164 | + >>> broadcast |
| 165 | + >>> broadcast("rate_limit", {"task_name": "myapp.mytask", |
| 166 | + ... "rate_limit": "200/m"}, reply=True, |
| 167 | + ... destination=["worker1.example.com"]) |
| 168 | + [{'worker1.example.com': 'New rate limit set successfully'}] |
| 169 | + |
| 170 | + |
| 171 | +Of course, using the higher-level interface to set rate limits is much |
| 172 | +more convenient, but there are commands that can only be requested |
| 173 | +using :func:`~celery.task.control.broadcast`. |
| 174 | + |
| 175 | +Rate limits |
| 176 | +----------- |
| 177 | + |
| 178 | +Example changing the rate limit for the ``myapp.mytask`` task to accept |
| 179 | +200 tasks a minute on all servers: |
| 180 | + |
| 181 | + >>> from celery.task.control import rate_limit |
| 182 | + >>> rate_limit("myapp.mytask", "200/m") |
| 183 | + |
| 184 | +Example changing the rate limit on a single host by specifying the |
| 185 | +destination hostname:: |
| 186 | + |
| 187 | + >>> rate_limit("myapp.mytask", "200/m", |
| 188 | + ... destination=["worker1.example.com"]) |
| 189 | + |
| 190 | +**NOTE** This won't affect workers with the ``CELERY_DISABLE_RATE_LIMITS`` |
| 191 | +setting on. To re-enable rate limits you have to restart the worker. |
| 192 | + |
| 193 | + |
| 194 | +Remote shutdown |
| 195 | +--------------- |
| 196 | + |
| 197 | +This command will gracefully shut down the worker from remote. |
| 198 | + |
| 199 | + >>> broadcast("shutdown") # shutdown all workers |
| 200 | + >>> broadcast("shutdown, destination="worker1.example.com") |
| 201 | + |
| 202 | +Ping |
| 203 | +---- |
| 204 | + |
| 205 | +This command requests a ping from alive workers. |
| 206 | +The workers reply with the string 'pong', and that's just about it. |
| 207 | +It will use the default one second limit for replies unless you specify |
| 208 | +a custom ``timeout``. |
| 209 | + |
| 210 | + >>> from celery.task.control import ping |
| 211 | + >>> ping() |
| 212 | + [{'worker1.example.com': 'pong'}, |
| 213 | + {'worker2.example.com': 'pong'}, |
| 214 | + {'worker3.example.com': 'pong'}] |
| 215 | + |
| 216 | +:func:`~celery.task.control.ping` also supports the ``destination`` argument, |
| 217 | +so you can specify which workers to ping:: |
| 218 | + |
| 219 | + >>> ping(['worker2.example.com', 'worker3.example.com']) |
| 220 | + [{'worker2.example.com': 'pong'}, |
| 221 | + {'worker3.example.com': 'pong'}] |
| 222 | + |
| 223 | +Enable/disable events |
| 224 | +--------------------- |
| 225 | + |
| 226 | +You can enable/disable events by using the ``enable_events``, |
| 227 | +``disable_events`` commands. This is useful to temporarily monitor |
| 228 | +a worker using celeryev/celerymon. |
| 229 | + |
| 230 | + >>> broadcast("enable_events") |
| 231 | + >>> broadcast("disable_events") |
| 232 | + |
| 233 | +Writing your own remote control commands |
| 234 | +---------------------------------------- |
| 235 | + |
| 236 | +Remote control commands are registered in the control panel and |
| 237 | +they take a single argument: the current |
| 238 | +:class:`~celery.worker.control.ControlDispatch` instance. |
| 239 | +From there you have access to the active |
| 240 | +:class:`celery.worker.listener.CarrotListener` if needed. |
| 241 | + |
| 242 | +Here's an example control command that restarts the broker connection: |
| 243 | + |
| 244 | +.. code-block:: python |
| 245 | + |
| 246 | + from celery.worker.control import Panel |
| 247 | + |
| 248 | + @Panel.register |
| 249 | + def reset_connection(panel): |
| 250 | + panel.logger.critical("Connection reset by remote control.") |
| 251 | + panel.listener.reset_connection() |
| 252 | + return {"ok": "connection reset"} |
| 253 | + |
| 254 | + |
| 255 | +These can be added to task modules, or you can keep them in their own module |
| 256 | +then import them using the ``CELERY_IMPORTS`` setting:: |
| 257 | + |
| 258 | + CELERY_IMPORTS = ("myapp.worker.control", ) |
| 259 | + |
| 260 | +Debugging |
| 261 | +========= |
| 262 | + |
| 263 | +Dump of registered tasks |
| 264 | +------------------------ |
| 265 | + |
| 266 | +You can get a list of tasks registered in the worker using the |
| 267 | +``dump_tasks`` remote control command:: |
| 268 | + |
| 269 | + >>> broadcast("dump_tasks", reply=True) |
| 270 | + [{'worker1.example.com': ['celery.delete_expired_task_meta', |
| 271 | + 'celery.execute_remote', |
| 272 | + 'celery.map_async', |
| 273 | + 'celery.ping', |
| 274 | + 'celery.task.http.HttpDispatchTask', |
| 275 | + 'tasks.add', |
| 276 | + 'tasks.sleeptask']}] |
| 277 | + |
| 278 | +Dump of scheduled (ETA) tasks |
| 279 | +----------------------------- |
| 280 | + |
| 281 | +You can get a list of tasks waiting to be scheduled by using |
| 282 | +the ``dump_schedule`` remote control command. |
| 283 | + |
| 284 | + >>> broadcast("dump_schedule", reply=True) |
| 285 | + [{'worker1.example.com': |
| 286 | + ['0. 2010-06-07 09:07:52 pri0 <TaskRequest: { |
| 287 | + name:"tasks.sleeptask", |
| 288 | + id:"1a7980ea-8b19-413e-91d2-0b74f3844c4d", |
| 289 | + args:"[1]", kwargs:"{}"}>', |
| 290 | + '1. 2010-06-07 09:07:53 pri0 <TaskRequest: { |
| 291 | + name:"tasks.sleeptask", |
| 292 | + id:"49661b9a-aa22-4120-94b7-9ee8031d219d", |
| 293 | + args:"[2]", |
| 294 | + kwargs:"{}"}>', |
| 295 | + |
| 296 | +The outputted fields are (in order): position, eta, priority, request. |
| 297 | + |
| 298 | +Note that these are tasks with an eta/countdown argument, not periodic tasks. |
| 299 | + |
| 300 | +Dump of reserved tasks |
| 301 | +---------------------- |
| 302 | + |
| 303 | +Reserved tasks are tasks that has been received by the broker and is waiting |
| 304 | +for immediate execution. |
| 305 | + |
| 306 | +You can get a list of these using the ``dump_reserved`` remote control command. |
| 307 | + |
| 308 | + >>> broadcast("dump_reserved", reply=True) |
| 309 | + [{'worker1.example.com': |
| 310 | + ['<TaskRequest: {name:"tasks.sleeptask", |
| 311 | + id:"32666e9b-809c-41fa-8e93-5ae0c80afbbf", |
| 312 | + args:"(8,)", kwargs:"{}"}>']}] |
0 commit comments