How to profile your multi-threaded python production code programmatically

oryaacov

Or Yaacov

Posted on December 24, 2023

How to profile your multi-threaded python production code programmatically

Profiling multi-threaded Python applications can be challenging, especially in production environments. Unfortunately, 90% of the guides don’t explain how to really do it, especially with remote production multi-threaded code and no cli, and they will lead you to use something like python -m cProfile -s time my_app.py <args>
Don't worry, this is not another one of those.

So why is it “hard” to do? You can’t always run your program in production using the command-line interface (CLI), and cProfile only offers limited support for multi-threaded programming.

So, How to do Multi-Threading profiling?

To perform multi-threading profiling, we can use one of the following approaches:

  1. Sum different threads’ stats together to create one set of statistics using stats.add().
  2. Create a separate profiler for each thread.

Since our program is built from 3 different engine threads, I will cover only the second approach in this guide.

So, I created the following utility that allows me to create my threads:

import cProfile
import threading
import time

class ProfilerManager:
    def __init__(self, is_enabled, dump_interval_seconds, profiler_dir_path):
        self._is_enabled = is_enabled
        self._logger = "YOUR LOGGER"
        self._profiler_dir_path = profiler_dir_path
        self._profiler_dump_interval_seconds = dump_interval_seconds

        if self._profiler_dump_interval_seconds <= 0:
            self._logger.warning('invalid _profiler_dump_interval_seconds value {}, setting default'.format(self._profiler_dump_interval_seconds))
            self._profiler_dump_interval_seconds = 60   

        self._profilers = {}

    def thread_with_profiler(self, target, tag):
       if not self._is_enabled:
              return threading.Thread(target=target)

        self._disable_profiler_if_exists(tag)

        profiler = cProfile.Profile()

        self._profilers[tag] = profiler

        return threading.Thread(target=self._target_with_profiler(target, profiler, tag))

    def run(self, target, tag):
        if not self._is_enabled:
            return target

        profiler = cProfile.Profile()

        self._profilers[tag] = profiler

        return self._target_with_profiler(target, profiler, tag)

    def _disable_profiler_if_exists(self, tag):
        if tag in self._profilers:
            self._logger.warning('tag {} already exists, disabling, and overriding with new target'.format(tag))
            existing_profiler = self._profilers[tag]
            self._disable_profiler(existing_profiler, tag)

    def _target_with_profiler(self, target, profiler, tag):
        def target_with_profiler():
            self._enable_profiler(profiler, tag)
            target()
            self._disable_profiler(profiler, tag)

        return target_with_profiler

    def _enable_profiler(self, profiler, tag):
        self._logger.debug('enabling profiler {}'.format(tag))
        profiler.enable()

    def _disable_profiler(self, profiler, tag):
        self._logger.debug('disabling profiler {}'.format(tag))
        profiler.disable()

    def _dump_periodic_profilers(self):
        while True:
            time.sleep(self._profiler_dump_interval_seconds)

            self._logger.trace("dumping profiles")

            for tag, profiler in self._profilers.items():
                dump_file_path = '{}/{}.dump'.format(self._profiler_dir_path, tag)
                self._logger.trace("dumping profile, {}".format(dump_file_path))

                profiler.dump_stats(dump_file_path)
Enter fullscreen mode Exit fullscreen mode

Let’s take a look at the thread_with_profiler use which is similar to threading.Thread the following:

ip_manager_thread = profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='ip_manager')
ip_manager_thread.start()
Enter fullscreen mode Exit fullscreen mode

let’s do a quick dive into thread_with_profiler:

def thread_with_profiler(self, target, tag):
        if not self._is_enabled:
            return threading.Thread(target=target)

        self._disable_profiler_if_exists(tag)

        profiler = cProfile.Profile()

        self._profilers[tag] = profiler

        return threading.Thread(target=self._target_with_profiler(target, profiler, tag))
Enter fullscreen mode Exit fullscreen mode

All that this function does is disable the profiler with the same tag, in case it already exists. It creates a new profiler and a new thread, then runs _target_with_profiler, which is the real magic behind the scenes

def _target_with_profiler(self, target, profiler, tag):
      def target_with_profiler():
          self._enable_profiler(profiler, tag)
          target()
          self._disable_profiler(profiler, tag)

      return target_with_profiler
Enter fullscreen mode Exit fullscreen mode

As we can see, we create and return a new function wrapper that our thread will execute. All that the wrapper does is enable the profiler (start it), call your actual function, and disable it when/if it ends.

Then, we have the following function, which actually dumps the current stats to files by tags every X seconds

    def _dump_periodic_profilers(self):
        while True:
            time.sleep(self._profiler_dump_interval_seconds)

            self._logger.trace("dumping profiles")

            for tag, profiler in self._profilers.items():
                dump_file_path = '{}/{}.dump'.format(self._profiler_dir_path, tag)
                self._logger.trace("dumping profile, {}".format(dump_file_path))

                profiler.dump_stats(dump_file_path)
Enter fullscreen mode Exit fullscreen mode

so, now that we got how this all thing works, let’s see a full use example!

def async_worker1():
  #work

def async_worker2():
  #work

def main_worker():
  #work

def main():
    profiler_manager = ProfilerManager(is_enabled=True, dump_interval_seconds=10, profiler_dir_path="/var/logs")
    profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='async_worker1')
    profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='async_worker2')

   profiler_manager.run(target=self._run_ip_monitor, tag='main_worker')profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='ip_manager')

if __name__ == '__main__':
    main()
Enter fullscreen mode Exit fullscreen mode

happy profiling! :)

💖 💪 🙅 🚩
oryaacov
Or Yaacov

Posted on December 24, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related