Revolutionizing Real-Time Alerts with AI, NATs and Streamlit
Bobur Umurzokov
Posted on February 18, 2024
Imagine you have an AI-powered personal alerting chat assistant that interacts using up-to-date data. Whether it's a big move in the stock market that affects your investments, any significant change on your shared SharePoint documents, or discounts on Amazon you were waiting for, the application is designed to keep you informed and alert you about any significant changes based on the criteria you set in advance using your natural language. In this post, we will learn how to build a full-stack event-driven weather alert chat application in Python using pretty cool tools: Streamlit, NATS, and OpenAI. The app can collect real-time weather information, understand your criteria for alerts using AI, and deliver these alerts to the user interface.
This piece of content and code samples can be incredibly helpful for those who love technology or those who are developers to understand how modern real-time alerting systems work with Larger Language Models (LLMs) and how to implement one.
You can also quickly jump on the source code hosted on our GitHub and try it yourself.
The power behind the scenes
Let’s take a closer look at how the AI weather alert chat application works and transforms raw data into actionable alerts, keeping you one step ahead of the weather. At the core of our application lies a responsive backend implemented in Python, powered by NATS to ensure real-time data processing and message management. Integrating OpenAI's GPT model brings a conversational AI to life, capable of understanding alerts’ nature and responding to user queries. Users can specify their alert criteria in natural language, then the GPT model will interpret them.
Real-Time Data Collection
The journey begins with the continuous asynchronous collection of weather data from various sources in the backend. Our application now uses the api.weatherapi.com
service, fetching real-time weather information every 10 seconds. This data includes temperature, humidity, precipitation, and more, covering locations worldwide. This snippet asynchronously fetches current weather data for Estonia but the app can be improved to set the location from user input dynamically:
async def fetch_weather_data():
api_url = f"http://api.weatherapi.com/v1/current.json?key={weather_api_key}&q=estonia"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Error fetching weather data: HTTP {response.status}")
return None
except Exception as e:
logging.error(f"Error fetching weather data: {e}")
return None
The role of NATS in data streaming
The code segment in the main()
function in the backend.py file demonstrates the integration of NATS for even-driven messaging, continuous weather monitoring, and alerting. We use the nats.py library to integrate NATS within Python code. First, we establish a connection to the NATs server running in Docker at nats://localhost:4222
.
nats_client = await nats.connect("nats://localhost:4222")
Then we define an asynchronous message_handler
function that subscribes and processes messages received on the chat
subject from the NATs server. If a message starts with "Set Alert:" (we append it on the frontend side), it extracts and updates the user's alert criteria.
async def message_handler(msg):
nonlocal user_alert_criteria
data = msg.data.decode()
if data.startswith("Set Alert:"):
user_alert_criteria = data[len("Set Alert:"):].strip()
logging.info(f"User alert criteria updated: {user_alert_criteria}")
await nats_client.subscribe("chat", cb=message_handler)
The backend service integrates with both external services like Weather API and Open AI Chat Completion API. If both weather data and user alert criteria are present, the app constructs a prompt for OpenAI's GPT model to determine if the weather meets the user's criteria. The prompt asks the AI to analyze the current weather against the user's criteria and respond with "YES" or "NO" and a brief weather summary. Once the AI determines that the incoming weather data matches a user's alert criteria, it crafts a personalized alert message and publishes a weather alert to the chat_response
subject on the NATS server to update the frontend app with the latest changes. This message contains user-friendly notifications designed to inform and advise the user. For example, it might say, "Heads up! Rain is expected in Estonia tomorrow. Don't forget to bring an umbrella!"
while True:
current_weather = await fetch_weather_data()
if current_weather and user_alert_criteria:
logging.info(f"Current weather data: {current_weather}")
prompt = f"Use the current weather: {current_weather} information and user alert criteria: {user_alert_criteria}. Identify if the weather meets these criteria and return only YES or NO with a short weather temperature info without explaining why."
response_text = await get_openai_response(prompt)
if response_text and "YES" in response_text:
logging.info("Weather conditions met user criteria.")
ai_response = f"Weather alert! Your specified conditions have been met. {response_text}"
await nats_client.publish("chat_response", payload=ai_response.encode())
else:
logging.info("Weather conditions did not meet user criteria.")
else:
logging.info("No current weather data or user alert criteria set.")
await asyncio.sleep(10)
Delivering and receiving alerts in real-time
Let’s understand the overall communication flow between the backend and frontend.
- Through a simple chat interface built using Streamlit (see frontend.py file), the user inputs their weather alert criteria using natural language and submits it.
alert_criteria = st.text_input("Set your weather alert criteria", key="alert_criteria", disabled=st.session_state['alert_set'])
- Below Streamlit frontend code interacts with a backend service via NATS messaging. It publishes these criteria to the NATS server on the
chat
subject.
def send_message_to_nats_handler(message):
with NATSClient() as client:
client.connect()
client.publish("chat", payload=message.encode())
client.subscribe("chat_response", callback=read_message_from_nats_handler)
client.wait()
if set_alert_btn:
st.session_state['alert_set'] = True
st.success('Alert criteria set')
send_message_to_nats_handler(f"Set Alert: {alert_criteria}")
- As we have seen in the previous section, the backend listens to the
chat
subject, receives the criteria, fetches current weather data, and uses AI to determine if an alert should be triggered. If conditions are met, the backend sends an alert message to thechat_response
subject. The frontend receives this message and updates the UI to notify the user.
def read_message_from_nats_handler(msg):
message = msg.payload.decode()
st.session_state['conversation'].append(("AI", message))
st.markdown(f"<span style='color: red;'>🔔</span> AI: {message}", unsafe_allow_html=True)
Try It Out
To explore the real-time weather alert chat application in detail and try it out for yourself, please visit our GitHub repository. The repository contains all the necessary code, detailed setup instructions, and additional documentation to help you get started. Once the setup is complete, you can start the Streamlit frontend and the Python backend. Set your weather alert criteria, and see how the system processes real-time weather data to keep you informed.
Building Stream Processing Pipelines
Real-time weather alert chat application demonstrated a powerful use case of NATS for real-time messaging in a distributed system, allowing for efficient communication between a user-facing frontend and a data-processing backend. However, you should consider several key steps to ensure that the information presented to the user is relevant, accurate, and actionable. In the app, we are just fetching live raw weather data and sending it straightaway to OpenAI or the frontend. Sometimes you need to transform this data to filter, enrich, aggregate, or normalize it in real-time before it reaches the external services. You start to think about creating a stream processing pipeline with several stages.
For example, not all the data fetched from the API will be relevant to every user and you can filter out unnecessary information at an initial stage. Also, data can come in various formats, especially if you're sourcing information from multiple APIs for comprehensive alerting and you need to normalize this data. At the next stage, you enrich the data with extra context or information to the raw data to make it more useful. This could include comparing current weather conditions against historical data to identify unusual patterns or adding location-based insights using another external API, such as specific advice for weather conditions in a particular area. At later stages, you might aggregate hourly temperature data to give an average daytime temperature or to highlight the peak temperature reached during the day.
Next steps
When it comes to transforming data, deploying, running, and scaling the app in a production environment, you might want to use dedicated frameworks in Python like GlassFlow to build sophisticated stream-processing pipelines. GlassFlow offers a fully managed serverless infrastructure for stream processing, you don’t have to think about setup, or maintenance where the app can handle large volumes of data and user requests with ease. It provides advanced state management capabilities, making it easier to track user alert criteria and other application states. Your application can scale with its user base without compromising performance.
Recommended content
- Microservices Data Synchronization Using PostgreSQL, Debezium, and NATS
- Training Fraud Detection ML Models with Real-time Data Streams
About the author
Visit my blog: www.iambobur.com
Posted on February 18, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.