How to create a sequential workflow with error handling in Celery
This is currently a proof of concept and not in production code.
Scenario
The scenario is this: A user wants to have a time consuming undertaking performed. He inputs the start parameters (e.g. a web site url to be processed and an e-mail address to send the results to) into a form on a web page and clicks "submit". The web page immediately returns informing the user that the undertaking has been accepted and that he will get an e-mail later on its completion.
Architecture
The web server that received the undertaking is on a relatively expensive server which we pay for having in a good datacenter with great uptime. We will call this machine the control machine. We do not want it to churn through any tasks since its precious computing resources are needed for front-end work. Instead the undertaking should be done on inexpensive back-end servers.
The back end servers starts churning, having the undertaking divided into two tasks. If all goes well a report will be sent to the user. If something goes wrong, the undertaking is set aside and staff notified that something is either wrong in the code or in the data.
Implementation
Step 1 - Make a worker
For this example we will use two tasks that do jobs. For simplicity in this example, they will just add numbers and multiply numbers. We will also define an error handling task that will handle the buggy add task. This module, called "test1.py" should be available both on the worker machine and on the control machine. But it won't be actually running on the control machine. It will just be reachable with an import by other scripts.
For simplicity, the error handling will just be a print statement with an apology, although it is unlikely that the user is looking at the worker machine's terminal output.
# Change these two to your back end. Here it is set to use a Redis server
# at 192.168.1.21, running on the Redis standard port, using database number 1
BROKER_URL = 'redis://192.168.1.21:6379/1' CELERY_RESULT_BACKEND = 'redis://192.168.1.21:6379/1' from celery import Celery celery = Celery('test1', backend= CELERY_RESULT_BACKEND, broker=BROKER_URL) @celery.task def add(x, y): why = x/0 # An error in the code!! return x + y @celery.task def mul(x, y): return x * y @celery.task def onerror(uuid): print "We apologize and sweat profusely over the sub standard processing of job %s" % uuid
Start the worker with:
celery -A test1 worker
Step 2 - Make the control script
This only needs to be installed on the control machine
The control script has a client that puts stuff into the system. It also specifies what should happen if a worker throws an exception.
# Change these two to your back end. Here it is set to use a Redis server
# at 192.168.1.21, running on the Redis standard port, using database number 1
BROKER_URL = 'redis://192.168.1.21:6379/1' CELERY_RESULT_BACKEND = 'redis://192.168.1.21:6379/1'
from test1 import add, mul, onerror res = add.apply_async((2, 2), link=mul.s(16), link_error=onerror.s()) print res.get(propagate=False)
So, thats it. Control calls add first, and links it to mul. This means that Celery will execute add first, and whenever that is ready, will execute mul with the result value of add as part of the input to mul. However, if add throws an error, the task specified by link_error will be executed instead. You should see the apology being printed in the terminal window of the worker. Normally you wouldn't wait for the result with get, since that is a blocking operation. The get here has propagate set to false, which means it will not re-raise the error from the worker.