Generating production-level streaming microservices using AI
Davor Runje
Posted on October 5, 2023
faststream-gen is a Python library that uses generative AI to automatically generate FastStream applications. Simply describe your microservice in plain English, and faststream-gen will generate a production-level FastStream application ready to deploy in a few minutes and under $1 cost.
Documentation: https://faststream-gen.airt.ai
Source Code: https://github.com/airtai/faststream-gen
The code generator for FastStream is a Python library that automates the process of creating streaming microservices for Apache Kafka, RabbitMQ and other supported protocols. It works by taking your application descriptions and swiftly turning them into a ready-to-deploy FastStream applications.
The key features are:
Automatic code generation: faststream-gen enables you to easily generate complete FastStream application with minimal effort. This library allows you to outline your application requirements, and it will quickly transform them into a fully-fledged FastStream project.
Automatic test generation: faststream-gen provides dependable code through rigorous testing, including pre-implemented integration tests, ensuring stability and functionality, saving development time, and preventing common bugs.
Script Templates: Streamline the deployment of your FastStream application using faststream-gen’s built-in scripts, tailored for initiating, subscribing to Kafka topic and shutting down the local Kafka broker.
Automatic CI/CD integration: faststream-gen integrates seamlessly with your version control and continuous integration pipeline through its GitHub workflow files. These predefined configuration files are optimized for FastStream projects, enabling smooth integration with GitHub Actions. You can automate tasks such as code validation, testing, and deployment, ensuring that your FastStream application remains in top shape throughout its development lifecycle.
In this tutorial, we will walk through the process of using the faststream-gen Python library to generate two FastStream applications.
The first application will demonstrate how to retrieve current cryptocurrency prices from various web sources. Subsequently, we will utilize the functionalities of FastStream to publish retrieved data as messages to a Kafka topic.
The second application will showcase how to consume messages from the Kafka topic using FastStream consumer capabilities. It will process these messages to extract information about cryptocurrencies and calculate the price mean for each cryptocurrency within a certain time window and publish the price mean to another topic.
A short version of the tutorial is available as a video at the following link (roughly half of it):
Let’s get started!
Cryptocurrency analysis with FastStream
In this tutorial, we will walk you through the process of using the faststream-gen
Python library to retrieve cryptocurrency prices in real time and calculate their moving average. To accomplish that, we will generate the following two FastStream applications:
A microservice retrieving current cryptocurrency prices from an external web service
A microservice consuming such messages, calculating the moving average price for each cryptocurrency and publishing it to another Kafka topic.
Let’s get started!
Installation
To complete this tutorial, you will need the following software and
Python library:
Python (version 3.8 and upward)
a valid OPENAI API key
[optional] github account and installed git command
It is recommended to use a virtual environment for your Python projects. In this tutorial, we will be using Python’s venv
module to create a virtual environment.
First, create a root directory for this tutorial. Navigate to the
desired location and create a new directory called
faststream_gen_tutorial
and enter it.
mkdir faststream_gen_tutorial
cd faststream_gen_tutorial
Creating and activating a new Python virtual environment
Create a new virtual environment using venv:
python3 -m venv venv
Next, activate your new virtual environment:
source venv/bin/activate
Installing the packages
Upgrade pip if needed and install faststream-gen
package:
pip install --upgrade pip && pip install faststream-gen
Check that the installation was successful by running the following
command:
faststream_gen --help
You should see the full list of options of the command in the output.
Now you have successfully set up the environment and installed the
faststream-gen
package.
Setting up OpenAI API key
faststream-gen
uses OpenAI API and you need to export your API key in
environment variable OPENAI_API_KEY
. If you use bash or compatible
shell, you can do that with the following command:
export OPENAI_API_KEY="sk-your-openai-api-key"
If you don’t already have OPENAI_API_KEY
, you can create one here.
Generating FastStream applications
Retrieving and publishing crypto prices
Now, we will create an application which retrieves information about
current cryptocurrency prices from an external web
service and publishes messages to a Kafka topic. In order to achieve
this, we need to provide a high-level description of the application in
plain English containing only the necessary information needed
by a knowledgeable Python developer familiar with FastStream
framework to implement it. This should include details such as the
message schema, instructions on external API-s and web service, and
guidance on selecting the appropriate topic and partition keys.
Below is an example of such a description used in this particular case.
Notice that we did not specify steps needed to actually implement, we
specified what the service should do, but not how to do it.
Create a FastStream application which will retrieve the current cryptocurrency
price and publish it to the new_crypto_price topic.
The application should retrieve the data every 2 seconds.
A message which will be produced is JSON with the two attributes:
- price: non-negative float (current price of cryptocurrency in USD)
- crypto_currency: string (the cryptocurrency e.g. BTC, ETH...)
The current price of Bitcoin can be retrieved by a simple GET request to:
- https://api.coinbase.com/v2/prices/BTC-USD/spot
The current price of Ethereum can be retrieved by a simple GET request to:
- https://api.coinbase.com/v2/prices/ETH-USD/spot
The response of this GET request is a JSON and you can get information about
the crypto_currency in:
response['data']['base']
and the information about the price in:
response['data']['amount']
Use utf-8 encoded crypto_currency attribute as a partition key when publishing
the message to new_crypto_price topic.
Let’s generate a new FastStream
project inside the
retrieve-publish-crypto
directory. First, copy the previous
description and paste it into a file called
description_retrieve_publish.txt
.
Next, run the following command (parameter -i
specifies the file path
for the app description file, while the parameter -o
specifies the
directory where the generated project files will be saved):
faststream_gen -i description_retrieve_publish.txt -o retrieve-publish-crypto
✨ Generating a new FastStream application!
✔ Application description validated.
✔ FastStream app skeleton code generated.
✔ The app and the tests are generated.
✔ New FastStream project created.
✔ Integration tests were successfully completed.
Tokens used: 36938
Total Cost (USD): $0.11436
✨ All files were successfully generated!
Failed generation
The generation process is not bullet-proof and it could fail with a message like this:
✨ Generating a new FastStream application!
✔ Application description validated.
✔ New FastStream project created.
✔ FastStream app skeleton code generated.
✘ Error: Failed to generate a valid application and test code.
✘ Error: Integration tests failed.
Tokens used: 79384
Total Cost (USD): $0.24567
Apologies, we couldn't generate a working application and test code from your application description.
Please run the following command to start manual debugging:
cd retrieve_without_logs && pytest
For in-depth debugging, check the retrieve-publish-crypto/_faststream_gen_logs directory for complete logs, including individual step information.
There can be a number of reasons for that, the most common being:
The specification you provided was not sufficiently detailed to generate the application. In such cases, you could try to add more detailed instructions and try again.
The task is too difficult for the default GPT-3.5 model to handle and you could try to use GPT-4 instead:
faststream_gen --model gpt-4 -i description_retrieve_publish.txt -o retrieve-publish-crypto
- You were unlucky and you just need to execute the command again. Large language models are stochastic in their nature and they always give different answers to the same questions. There are retry mechanisms in the engine, but sometimes they are not enough and just rerunning the command can mediate the problem.
If none of the above strategies work, check the already generated files and see what the potential problem could be. You can also finish the implementation or tests yourself.
Successful generation
If successful, the command will generate a FastStream
project with the following structure:
retrieve-publish-crypto
├── .github
│ └── workflows
│ ├── build_docker.yml
│ ├── deploy_docs.yml
│ └── test.yml
├── .gitignore
├── Dockerfile
├── LICENSE
├── README.md
├── app
│ └── application.py
├── pyproject.toml
├── scripts
│ ├── build_docker.sh
│ ├── lint.sh
│ ├── services.yml
│ ├── start_kafka_broker_locally.sh
│ ├── static-analysis.sh
│ ├── stop_kafka_broker_locally.sh
│ └── subscribe_to_kafka_broker_locally.sh
└── tests
├── __init__.py
└── test_application.py
The generated application is located in the app/
directory, while the tests are located in the tests/
directory. It is important to keep in mind that these files are generated by LLM and may vary with each generation.
app/application.py
:
import asyncio
import json
from datetime import datetime
import aiohttp
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import ContextRepo, FastStream, Logger
from faststream.kafka import KafkaBroker
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
class CryptoPrice(BaseModel):
price: NonNegativeFloat = Field(
..., examples=[50000.0], description="Current price of cryptocurrency in USD"
)
crypto_currency: str = Field(
..., examples=["BTC"], description="The cryptocurrency"
)
publisher = broker.publisher("new_crypto_price")
async def fetch_crypto_price(
url: str, crypto_currency: str, logger: Logger, context: ContextRepo, time_interval: int = 2
) -> None:
# Always use context: ContextRepo for storing app_is_running variable
while context.get("app_is_running"):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
price = data["data"]["amount"]
new_crypto_price = CryptoPrice(
price=price, crypto_currency=crypto_currency
)
await publisher.publish(
new_crypto_price,
key=crypto_currency.encode("utf-8"),
)
else:
logger.warning(
f"Failed API request {url} at time {datetime.now()}"
)
await asyncio.sleep(time_interval)
@app.on_startup
async def app_setup(context: ContextRepo):
context.set_global("app_is_running", True)
@app.on_shutdown
async def shutdown(context: ContextRepo):
context.set_global("app_is_running", False)
# Get all the running tasks and wait them to finish
fetch_tasks = context.get("fetch_tasks")
await asyncio.gather(*fetch_tasks)
@app.after_startup
async def publish_crypto_price(logger: Logger, context: ContextRepo):
logger.info("Starting publishing:")
cryptocurrencies = [("Bitcoin", "BTC"), ("Ethereum", "ETH")]
fetch_tasks = [
asyncio.create_task(
fetch_crypto_price(
f"https://api.coinbase.com/v2/prices/{crypto_currency}-USD/spot",
crypto_currency,
logger,
context,
)
)
for _, crypto_currency in cryptocurrencies
]
# you need to save asyncio tasks so you can wait them to finish at app shutdown (the function with @app.on_shutdown function)
context.set_global("fetch_tasks", fetch_tasks)
tests/test_application.py
:
import pytest
from faststream import Context, TestApp
from faststream.kafka import TestKafkaBroker
from app.application import CryptoPrice, app, broker
@broker.subscriber("new_crypto_price")
async def on_new_crypto_price(
msg: CryptoPrice, key: bytes = Context("message.raw_message.key")
):
pass
@pytest.mark.asyncio
async def test_fetch_crypto_price():
async with TestKafkaBroker(broker):
async with TestApp(app):
await on_new_crypto_price.wait_call(2)
on_new_crypto_price.mock.assert_called()
Creating a new Python virtual environment
All the required dependencies to run the newly generated FastStream
project are located within the pyproject.toml
file.
Create and activate a new virtual environment inside the
retrieve-publish-crypto
directory by using venv:
cd retrieve-publish-crypto
python3 -m venv venv
source venv/bin/activate
Upgrade pip if needed and install the development dependencies:
pip install --upgrade pip && pip install -e .[dev]
Testing the application
In order to verify the functional correctness of the application, it is recommended to execute the generated unit and integration test by running the pytest
command.
pytest
============================= test session starts ==============================
platform linux -- Python 3.11.4, pytest-7.4.2, pluggy-1.3.0
rootdir: /work/fastkafka-gen/docs_src/tutorial/retrieve-publish-crypto
configfile: pyproject.toml
plugins: anyio-3.7.1, asyncio-0.21.1
asyncio: mode=Mode.STRICT
collected 1 item
tests/test_application.py . [100%]
============================== 1 passed in 2.65s ===============================
Previewing AsyncAPI Docs
To preview AsyncAPI Docs for the application, execute the following
command:
faststream docs serve app.application:app
INFO: Started server process [3575270]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
You can now access the AsyncAPI Docs by opening localhost:8000 in your browser.
Starting localhost Kafka broker
To run the FastStream
application locally, ensure that you have a
running Kafka broker. You can start a Kafka Docker container by
executing the start_kafka_broker_locally.sh
shell script:
./scripts/start_kafka_broker_locally.sh
[+] Running 2/2
⠿ Network scripts_default Created 0.1s
⠿ Container bitnami_kafka Started
Starting the application
To start the application, execute the following command:
faststream run app.application:app
2023-09-15 13:41:21,948 INFO - FastStream app starting...
2023-09-15 13:41:22,144 INFO - | - Starting publishing:
2023-09-15 13:41:22,144 INFO - FastStream app started successfully! To exit press CTRL+C
Topic new_crypto_price not found in cluster metadata
Ensure that the app remains running, it is needed for the subsequent
steps.
Calculating the moving average
Let’s develop an application that calculates the average price of the
three most recent messages received from the new_crypto_price
topic
for each cryptocurrency. Subsequently, we will publish the computed
average price to the price_mean
topic.
Here is the full description of the desired application:
Create a FastStream application for consuming messages
from the new_crypto_price topic.
This topic needs to use a partition key.
new_crypto_price messages use JSON with two attributes
(create class CryptoPrice with these attributes):
- price: non-negative float (it represents the current price of the crypto)
- crypto_currency: string (it represents the cryptocurrency e.g. BTC, ETH...)
The application should save each message to a dictionary (global variable)
- partition key should be used as a dictionary key
and value should be a List of prices.
Keep only the last 100 messages in the dictionary.
If there are fewer than 3 messages for a given partition key,
do not publish any messages.
Otherwise, Calculate the price mean of the last 3 messages
for the given partition key.
Publish the price mean to the price_mean topic and use
the same partition key that the new_crypto_price topic is using.
Please open a new terminal and navigate to the root directory of this
tutorial, which is called faststream_gen_tutorial
. Once you are inside
the faststream_gen_tutorial
folder, please activate the virtual
environment.
cd path_to/faststream_gen_tutorial
source venv/bin/activate
To create a faststream
application inside the calculate-mean-app
directory, first copy the previous description and paste it into the
description_calculate_mean.txt
file.
Next, run the following command:
faststream_gen -i description_calculate_mean.txt -o calculate-mean-app
✨ Generating a new FastStream application!
✔ Application description validated.
✔ FastStream app skeleton code generated.
✔ The app and the tests are generated.
✔ New FastStream project created.
✔ Integration tests were successfully completed.
Tokens used: 13367
Total Cost (USD): $0.04147
✨ All files were successfully generated!
If successful, the command will generate calculate-mean-app
directory
with app/application.py
and tests/test_application.py
inside. If
not, just rerun the command again until it succeds.
app/application.py
:
from typing import Dict, List
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import Context, ContextRepo, FastStream, Logger
from faststream.kafka import KafkaBroker
class CryptoPrice(BaseModel):
price: NonNegativeFloat = Field(
..., examples=[50000], description="Current price of the cryptocurrency"
)
crypto_currency: str = Field(
..., examples=["BTC"], description="Cryptocurrency symbol"
)
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
publisher = broker.publisher("price_mean")
@app.on_startup
async def app_setup(context: ContextRepo):
message_history: Dict[str, List[float]] = {}
context.set_global("message_history", message_history)
@broker.subscriber("new_crypto_price")
async def on_new_crypto_price(
msg: CryptoPrice,
logger: Logger,
message_history: Dict[str, List[float]] = Context(),
key: bytes = Context("message.raw_message.key"),
) -> None:
logger.info(f"New crypto price {msg=}")
crypto_key = key.decode("utf-8")
if crypto_key not in message_history:
message_history[crypto_key] = []
message_history[crypto_key].append(msg.price)
if len(message_history[crypto_key]) > 100:
message_history[crypto_key] = message_history[crypto_key][-100:]
if len(message_history[crypto_key]) >= 3:
price_mean = sum(message_history[crypto_key][-3:]) / 3
await publisher.publish(price_mean, key=key)
tests/test_application.py
:
import pytest
from faststream import Context, TestApp
from faststream.kafka import TestKafkaBroker
from app.application import CryptoPrice, app, broker, on_new_crypto_price
@broker.subscriber("price_mean")
async def on_price_mean(msg: float, key: bytes = Context("message.raw_message.key")):
pass
@pytest.mark.asyncio
async def test_app():
async with TestKafkaBroker(broker):
async with TestApp(app):
await broker.publish(
CryptoPrice(price=50000, crypto_currency="BTC"),
"new_crypto_price",
key=b"BTC",
)
on_new_crypto_price.mock.assert_called_with(
dict(CryptoPrice(price=50000, crypto_currency="BTC"))
)
on_price_mean.mock.assert_not_called()
await broker.publish(
CryptoPrice(price=60000, crypto_currency="BTC"),
"new_crypto_price",
key=b"BTC",
)
on_new_crypto_price.mock.assert_called_with(
dict(CryptoPrice(price=60000, crypto_currency="BTC"))
)
on_price_mean.mock.assert_not_called()
await broker.publish(
CryptoPrice(price=70000, crypto_currency="BTC"),
"new_crypto_price",
key=b"BTC",
)
on_new_crypto_price.mock.assert_called_with(
dict(CryptoPrice(price=70000, crypto_currency="BTC"))
)
on_price_mean.mock.assert_called_with(60000.0)
Creating a new Python virtual environment
All the required dependencies to run the newly generated FastStream
project are located within the pyproject.toml
file. Create a new
virtual environment and install the development dependencies for the
project.
Create and activate a new virtual environment inside the
calculate-mean-app
directory by using env:
cd calculate-mean-app
python3 -m venv venv
source venv/bin/activate
Upgrade pip if needed and install the development dependencies:
pip install --upgrade pip && pip install -e .[dev]
Testing the application
In order to verify functional correctness of the application, it is
recommended to execute the generated unit and integration test by
running the pytest
command.
pytest
============================= test session starts ==============================
platform linux -- Python 3.11.4, pytest-7.4.2, pluggy-1.3.0
rootdir: /work/fastkafka-gen/docs_src/tutorial/calculate-mean-app
configfile: pyproject.toml
plugins: anyio-3.7.1, asyncio-0.21.1
asyncio: mode=Mode.STRICT
collected 1 item
tests/test_application.py . [100%]
============================== 1 passed in 0.44s ===============================
Previewing AsyncAPI Docs
To preview AsyncAPI Docs for the application, execute the following
command:
faststream docs serve app.application:app
INFO: Started server process [3596205]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
You can now access the AsyncAPI Docs by opening http://localhost:8000 in your browser.
Starting the application
To start the application, execute the following command:
faststream run app.application:app
2023-10-04 09:31:18,926 INFO - FastStream app starting...
2023-10-04 09:31:18,948 INFO - new_crypto_price | - `OnNewCryptoPrice` waiting for messages
Topic new_crypto_price not found in cluster metadata
2023-10-04 09:31:19,069 INFO - FastStream app started successfully! To exit, press CTRL+C
2023-10-04 09:31:40,876 INFO - new_crypto_price | 0-16964047 - Received
2023-10-04 09:31:40,878 INFO - new_crypto_price | 0-16964047 - New crypto price msg=CryptoPrice(price=27414.085, crypto_currency='BTC')
2023-10-04 09:31:40,878 INFO - new_crypto_price | 0-16964047 - Processed
2023-10-04 09:31:40,878 INFO - new_crypto_price | 1-16964047 - Received
2023-10-04 09:31:40,879 INFO - new_crypto_price | 1-16964047 - New crypto price msg=CryptoPrice(price=1642.425, crypto_currency='ETH')
2023-10-04 09:31:40,879 INFO - new_crypto_price | 1-16964047 - Processed
2023-10-04 09:31:43,053 INFO - new_crypto_price | 2-16964047 - Received
2023-10-04 09:31:43,054 INFO - new_crypto_price | 2-16964047 - New crypto price msg=CryptoPrice(price=27414.085, crypto_currency='BTC')
2023-10-04 09:31:43,054 INFO - new_crypto_price | 2-16964047 - Processed
2023-10-04 09:31:43,054 INFO - new_crypto_price | 3-16964047 - Received
2023-10-04 09:31:43,055 INFO - new_crypto_price | 3-16964047 - New crypto price msg=CryptoPrice(price=1642.425, crypto_currency='ETH')
...
You can see in the terminal that the application is reading the messages
from the new_crypto_price
topic. Ensure that the app remains running
as it is needed for the next step.
Subscribing directly to local Kafka broker topic
Open the new terminal, navigate to the calculate-mean-app
directory.
To check if the calculate-mean-app
is publishing messages to the
price_mean
topic, run the following command:
./scripts/subscribe_to_kafka_broker_locally.sh price_mean
BTC 26405.745
ETH 1621.3733333333332
BTC 26404.865
ETH 1621.375
BTC 26404.865
...
Stopping Kafka broker
To stop the Kafka broker after analyzing the mean price of
cryptocurrencies, you can execute the following command:
./scripts/stop_kafka_broker_locally.sh
[+] Running 2/2
⠿ Container bitnami_kafka Removed 1.2s
⠿ Network scripts_default Removed
[Optional] GitHub integration
To successfully complete this optional tutorial chapter, make sure you
have a GitHub account and the Git command installed. Additionally, ensure that your authentication for GitHub is properly set.
We need to create two GitHub repositories, one for the FastStream
project in the retrieve-publish-crypto
directory and another for the FastStream
project in the calculate-mean-app
directory. In this chapter, we will upload the FastStream
project to GitHub, which includes the retrieve-publish-crypto
project. We will also provide an
explanation of the generated CI workflows.
Adding locally hosted code to GitHub
To create a GitHub repository, click on the following link. For the
Repository name
, use retrieve-publish-crypto
and click on
Create repository
.
Please open your development environment and go to the
retrieve-publish-crypto
directory that was generated in the previous
steps.
cd path_to/faststream_gen_tutorial/retrieve-publish-crypto
Next, execute the following commands:
git init
git add .
git commit -m "Retrieve and publish crypto prices application implemented"
git branch -M main
If you are using HTTPS authentication execute (replace the
git_username
in the URL with your own GitHub username):
git remote add origin https://github.com/git_username/retrieve-publish-crypto.git
If you are using SSH authentication execute (replace the
git_username
in the URL with your own GitHub username):
git remote add origin git@github.com:git_username/retrieve-publish-crypto.git
To update the remote branch with local commits, execute:
git push -u origin main
Continuous integration with GitHub Actions
Once the changes are pushed, CI pipeline will run the tests again,
create and publish the documentation and build Docker container with the
application.
To verify the passing status of the CI, open your web browser, go to the
newly created GitHub repository, and click on the Actions tab.
The tests executed with pytest
are passing successfully.
The Docker image has been successfully built and pushed to the GitHub
Container registry under the repository
ghcr.io/your_username/retrieve-publish-crypto
The AsyncAPI docs have been successfully generated and deployed to
GitHub Pages.
After the successful execution of the Deploy FastStream AsyncAPI Docs
workflow, a new branch named gh-pages
will be created. To access the
GitHub Pages settings, navigate to the Settings -> Pages
, select the
gh-pages
branch as the designated branch for hosting the GitHub Pages
and click on Save
.
By setting up a branch on GitHub Pages, a new workflow will
automatically be triggered within the GitHub Actions. To access this
workflow, simply click on the Actions
tab and open the
pages build and deployment
workflow.
Once all the jobs within the workflow are completed, you can click on
the provided URL to conveniently view and explore the AsyncAPI Docs that
have been generated for your application.
Repeat
Repeat the same process with calculate-mean-app
project.
Next steps
Congratulations! You have successfully completed this tutorial and
gained a new set of skills. Now that you have learned how to use
faststream-gen
, try it out with your own example!
Posted on October 5, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.