How to cleanly stop Celery tasks on exceptions

onticdani

Daniel

Posted on March 18, 2024

How to cleanly stop Celery tasks on exceptions

If you just want to see my solution without previous context scroll down!

Context

I’ve been struggling quite a bit with exiting Celery tasks cleanly while developing getemil.io

We use Django as the backend and Celery to run tasks on workers.

I had cases like this (code is made up and might not make much sense here, but the structure is the same):

# tasks.py

@shared_task
task():
  logging.info("Running task")
  function1()
Enter fullscreen mode Exit fullscreen mode

There are cases where function2 is not be able to fetch the result, because of a known reason (i.e. The user has revoked access) . In those cases, I want to stop my celery task cleanly and just log an error line. I do not need the full stack trace or traceback contaminating unnecessarily the logs.

I had 3 handling options here:

  1. Raise an exception so the code (task) is interrupted but have an unneeded stack trace in the logs.
def function2(input):
  try:
    logging.info("Fetching result...")
    result = fetch_something(input)
    logging.info(f"Result: {result}")
    return result
  except KnownException:
    raise Exception("Cound not fetch the result because...")
Enter fullscreen mode Exit fullscreen mode

2. return None to avoid a stack trace but have to implement if everywhere:

# util.py

def function2(input):
  try:
    logging.info("Fetching result...")
    result = fetch_something(input)
    logging.info(f"Result: {result}")
    return result
  except KnownException:
    logging.error("Could not fetch the result because...")
    return None

def function1():
  ...some stuff...

  result = function2(input)
  if not result:
    return

  ...do some other stuff...
Enter fullscreen mode Exit fullscreen mode

Now imagine if you have multiple complex functions or multiple classes and definitions in OOP (Object Oriented Programming) where you are catching many exceptions. This can get nasty, 90% of your code will become if statements, plus returning None is not really descriptive in some cases.

3. Use revoke with terminate=True in celery tasks

This is NOT recommended by the Celery docs as it might not be able to stop the task as one intends. It would also complicate the code quite a bit.

My solution

I’m now using this template on every task:

# tasks.py

@shared_task
task():
    try:
        ...do task stuff...
    except SystemExit:
        logging.info("SystemExit requested, stopping task...")
    except:
        logging.error("An uncaught error occurred while running the task")
        raise
Enter fullscreen mode Exit fullscreen mode

Whenever I need to cleanly exit the Celery task, without a stack trace, I just raise a SystemExit exception in whatever function I’m using:

# util.py

def function2(input):
  try:
    logging.info("Fetching result...")
    result = fetch_something(input)
    logging.info(f"Result: {result}")
    return result
  except KnownException:
    logging.error("Could not fetch the result")
    raise SystemExit(1)

def function1():
  ...some stuff...

  result = function2(input)

  ...do some other stuff...
Enter fullscreen mode Exit fullscreen mode

This way:

  1. If I use a SystemExit exception, the code will exit cleanly, only logging the line logging.error(“Could not fetch the result”) which is the only thing I really need.
  2. If any other uncaught exception happens, I will have the full stack trace or traceback logged without issues!

Rules to consider

I have a set of 2rules you can also use with this template:

  1. If I have code immediately inside a task, I use the same logic as it if were inside another function:
# tasks.py

@shared_task
task():
    try:
        ... some code ...

        try:
          result = fetch_something()
        except:
          logging.error("Could not fetch the result")
          raise SystemExit(1)

        ... more code ...

    except SystemExit:
        logging.info("SystemExit requested, stopping task...")
    except:
        logging.error("An uncaught error occurred while running the task")
        raise
Enter fullscreen mode Exit fullscreen mode

2. If I have a loop inside a task, for instance, run a check on every user, I make sure I catch the exceptions inside the loop to avoid interrupting it:

# tasks.py

@shared_task
task():
  try:
    ... some code ...

    for user in users:
      try:
        result = fetch_something(user)
      except:
        logging.error(f"Could not fetch the result for user {user}")
        continue
        # Notice I do not raise anything to allow the for loop to continue

      ... do something with the user result ...

    ... more code ...

  except SystemExit:
    logging.info("SystemExit requested, stopping task...")
  except:
    logging.error("An uncaught error occurred while running the task")
    raise
Enter fullscreen mode Exit fullscreen mode
💖 💪 🙅 🚩
onticdani
Daniel

Posted on March 18, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related