Daniel
Posted on March 18, 2024
If you just want to see my solution without previous context scroll down!
Context
I’ve been struggling quite a bit with exiting Celery tasks cleanly while developing getemil.io
We use Django as the backend and Celery to run tasks on workers.
I had cases like this (code is made up and might not make much sense here, but the structure is the same):
# tasks.py
@shared_task
task():
logging.info("Running task")
function1()
There are cases where function2 is not be able to fetch the result, because of a known reason (i.e. The user has revoked access) . In those cases, I want to stop my celery task cleanly and just log an error line. I do not need the full stack trace or traceback contaminating unnecessarily the logs.
I had 3 handling options here:
- Raise an exception so the code (task) is interrupted but have an unneeded stack trace in the logs.
def function2(input):
try:
logging.info("Fetching result...")
result = fetch_something(input)
logging.info(f"Result: {result}")
return result
except KnownException:
raise Exception("Cound not fetch the result because...")
2. return None
to avoid a stack trace but have to implement if
everywhere:
# util.py
def function2(input):
try:
logging.info("Fetching result...")
result = fetch_something(input)
logging.info(f"Result: {result}")
return result
except KnownException:
logging.error("Could not fetch the result because...")
return None
def function1():
...some stuff...
result = function2(input)
if not result:
return
...do some other stuff...
Now imagine if you have multiple complex functions or multiple classes and definitions in OOP (Object Oriented Programming) where you are catching many exceptions. This can get nasty, 90% of your code will become if statements, plus returning None
is not really descriptive in some cases.
3. Use revoke with terminate=True
in celery tasks
This is NOT recommended by the Celery docs as it might not be able to stop the task as one intends. It would also complicate the code quite a bit.
My solution
I’m now using this template on every task:
# tasks.py
@shared_task
task():
try:
...do task stuff...
except SystemExit:
logging.info("SystemExit requested, stopping task...")
except:
logging.error("An uncaught error occurred while running the task")
raise
Whenever I need to cleanly exit the Celery task, without a stack trace, I just raise a SystemExit
exception in whatever function I’m using:
# util.py
def function2(input):
try:
logging.info("Fetching result...")
result = fetch_something(input)
logging.info(f"Result: {result}")
return result
except KnownException:
logging.error("Could not fetch the result")
raise SystemExit(1)
def function1():
...some stuff...
result = function2(input)
...do some other stuff...
This way:
- If I use a
SystemExit
exception, the code will exit cleanly, only logging the linelogging.error(“Could not fetch the result”)
which is the only thing I really need. - If any other uncaught exception happens, I will have the full stack trace or traceback logged without issues!
Rules to consider
I have a set of 2rules you can also use with this template:
- If I have code immediately inside a task, I use the same logic as it if were inside another function:
# tasks.py
@shared_task
task():
try:
... some code ...
try:
result = fetch_something()
except:
logging.error("Could not fetch the result")
raise SystemExit(1)
... more code ...
except SystemExit:
logging.info("SystemExit requested, stopping task...")
except:
logging.error("An uncaught error occurred while running the task")
raise
2. If I have a loop inside a task, for instance, run a check on every user, I make sure I catch the exceptions inside the loop to avoid interrupting it:
# tasks.py
@shared_task
task():
try:
... some code ...
for user in users:
try:
result = fetch_something(user)
except:
logging.error(f"Could not fetch the result for user {user}")
continue
# Notice I do not raise anything to allow the for loop to continue
... do something with the user result ...
... more code ...
except SystemExit:
logging.info("SystemExit requested, stopping task...")
except:
logging.error("An uncaught error occurred while running the task")
raise
Posted on March 18, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.