Scalable, Producer-Consumer-based, Remote Log Monitor in Python

xsub

Pawel Suchanecki

Posted on March 10, 2023

Scalable, Producer-Consumer-based, Remote Log Monitor in Python

Intro

In this continuation of the article "How to Implement the Producer-Consumer Concurrency Design Pattern with asyncio Coroutines", we will show how to enable monitoring of multiple log files on remote machines.

We will explain the changes introduced in the code to achieve the goal and deepen the context of the ideas and discuss the used API.

The data

- host: testhost.almalinux.org
   user: someuser
   key_filename: /home/pawel/.ssh/id_ed25519 
   log_file: /var/log/nginx/access.log
   delay: 5
   buffer_lines: 10

- host: refurby
   user: pablo
   key_filename: /home/pawel/.ssh/id_rsa
   log_file: /var/log/syslog
   delay: 60
   buffer_lines: 5
Enter fullscreen mode Exit fullscreen mode

The code

#!/usr/bin/env python3

import asyncio
import datetime
import time
from fabric import Connection
from asyncio.queues import Queue
import yaml


async def producer(queue: Queue, host: dict):
    conn = Connection(host['host'], user=host['user'], connect_kwargs={'key_filename': host['key_filename']})
    with conn.cd('/tmp'):
        tail_cmd = f"sudo tail -n {host['buffer_lines']} {host['log_file']}"
        old_data = ()
        while True:
            channel = conn.run(command=tail_cmd, hide='both')
            data = channel.stdout.splitlines()
            if old_data != data:
                old_d = set(old_data)
                delta_data = [x for x in data if x not in old_d]
                for d in delta_data:
                    await queue.put((host['host'], host['log_file'], str(d)))
            old_data = data.copy()
            data.clear()
            await asyncio.sleep(host['delay'])
    conn.close()


async def consumer(queue: Queue):
    while True:
        line = await queue.get()
        host, log_file, data = line
        dt = datetime.datetime.fromtimestamp(time.time())
        print(f"@{dt} {host}:{log_file}:\n{data}\n-----", end='\n', flush=True)
        queue.task_done()


async def main():
    with open('hosts.yml') as f:
        hosts = yaml.safe_load(f)
        queue = asyncio.Queue()
        producers = [asyncio.create_task(producer(queue, host)) for host in hosts]
        consumer_task = asyncio.create_task(consumer(queue))
        await asyncio.gather(*producers)
        await queue.join()
        consumer_task.cancel()


if __name__ == '__main__':
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Explanation: Supporting Multiple Hosts (Producers)

The main idea of the changes introduced in the code is to enable monitoring of multiple log files on remote machines using the same producer-consumer concurrency design pattern. To achieve this, we introduce a YAML configuration file hosts.yml that specifies the hosts to be monitored along with their properties such as the log file path, delay between checks, and number of lines to read.

In the main() function, we load the configuration from the YAML file using the yaml module and create an asyncio.Queue() object for inter-coroutine communication. We then create a separate producer coroutine for each host using a list comprehension and add each of them to the producers list. The asyncio.gather() method is used to run all producers and the consumer coroutine concurrently.

Explanation: Consuming the output from multiple Producers

The consumer() coroutine remains the same as in the previous version, except for the modification to handle data from multiple hosts. Each log line is now a tuple containing the host name, the log file path, and the log line text.

In the producer() coroutine, we now pass the host dictionary to the coroutine instead of using global variables. The dictionary contains the properties of the host to be monitored, such as the host name, SSH key filename, log file path, delay between checks, and number of lines to read. As in the original version, we use these properties to establish an SSH connection to the remote machine and execute the tail command on the log file. The resulting data is put into the queue as a tuple containing the host name, log file path, and log line text.

Explanation: How sleeping allows producers to reduce load

The await asyncio.sleep(host['delay']) line is used to introduce a delay between log file checks in the producer() coroutine. The delay property is specified in the host configuration dictionary loaded from the YAML file for each host individually, and the asyncio.sleep() function is used to pause execution of the coroutine for the specified amount of time. This allows the producer to wait for new log entries to appear before checking again, reducing unnecessary network and system load.

Summary

Overall, changes made to the previous version of the code enable the script to monitor multiple log files on remote machines in a scalable and maintainable way. By using a YAML configuration file, adding new hosts or modifying properties of existing hosts is easy and doesn't require changes to the code.

About API

As for the APIs used in the code, we used the yaml module to load the configuration from the YAML file and the [asyncio.queues.Queue()](https://docs.python.org/3/library/asyncio-queue.html) class to create the queue for inter-coroutine communication. The create_task() method is used to create the producer tasks and the gather() method is used to run all tasks concurrently. Finally, the join() and cancel() methods of the queue and consumer task are used to gracefully exit the program.

Outro

We hope this continuation has helped you understand how Consumer-Producer based code is easy to extend to support more producers in a scalable way. As always, feel free to experiment with the code and ask any questions you may have.

p.s. Stay tuned for more articles in the "Producer-Consumer Concurrency Design Pattern" series.

💖 💪 🙅 🚩
xsub
Pawel Suchanecki

Posted on March 10, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related