FastStream: Python's framework for Efficient Message Queue Handling

tvrtko

Tvrtko Sternak

Posted on October 16, 2023

FastStream: Python's framework for Efficient Message Queue Handling

Ever felt lost in the complexity of microservices and message queues like Kafka and RabbitMQ? FastStream is here to simplify it all. That's precisely why we created FastStream. Initially, it was our solution to the challenges we faced with messaging queues in our own projects. But as it simplified our lives, we realized it could do the same for others. So, we decided to share it with the world.

FastStream streamlines the entire process of working with message queues in microservices. Parsing messages, managing networking, and keeping documentation updated—all handled effortlessly.

In this blog post, we'll explore how FastStream simplifies microservices development. Let's dive in and discover how FastStream can revolutionize your workflow.

Hint: If you want to dive in the code right away, checkout the hands-on tutorial at FastStream documentation

Our motivation

Our journey with FastStream started when we needed to integrate our machine learning models into a customer's Apache Kafka environment. To streamline this process, we created FastKafka using AIOKafka, AsyncAPI, and asyncio. It was our first step in making message queue management easier.

Later, we discovered Propan, a library created by Nikita Pastukhov, which solved similar problems but for RabbitMQ. Recognizing the potential for collaboration, we joined forces with Nikita to build a unified library that could work seamlessly with both Kafka and RabbitMQ. And that's how FastStream came to be—a solution born out of the need for simplicity and efficiency in microservices development.

Key features that set FastStream apart 🚀

FastStream is more than just another library; it's a powerful toolkit designed to simplify and supercharge your microservices development. Let's dive into the key features that make FastStream stand out:

Multiple Broker Support: FastStream provides a unified API that works seamlessly across multiple message brokers. Whether you're dealing with Kafka, RabbitMQ, or others, FastStream has you covered, making it effortless to switch between them.



broker = KafkaBroker("localhost:9092")

@broker.publisher(prediction)
@broker.subscriber(input_data)
async def on_input_data(msg: InputData) -> Prediction:
  # your processing processing
  return prediction


Enter fullscreen mode Exit fullscreen mode


# Just change the broker class, 
#  rest of the code stays the same
broker = RabbitBroker("localhost:5672")

@broker.publisher(prediction)
@broker.subscriber(input_data)
async def on_input_data(msg: InputData) -> Prediction:
  # your processing processing
  return prediction


Enter fullscreen mode Exit fullscreen mode

Pydantic Validation: Leverage the robust validation capabilities of Pydantic to serialize and validate incoming messages. With Pydantic, you can ensure that your data is always in the right format.



broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber(input_data)
async def on_input_data(msg: InputData): # <- decodes consumed message using InputData(**json.loads(data))
  # your processing logic


Enter fullscreen mode Exit fullscreen mode


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.publisher(prediction)
@broker.subscriber(input_data)
async def on_input_data(msg: InputData) -> Prediction: # <- encodes produced message using Prediction.json()
  # some processing
  return prediction


Enter fullscreen mode Exit fullscreen mode

Automatic Documentation: FastStream keeps you ahead of the game with automatic AsyncAPI documentation generation. Say goodbye to outdated documentation – FastStream ensures it's always up-to-date.

Basic FastStream documentation example

Intuitive Development: FastStream offers full-typed editor support, catching errors before they reach runtime. This means you can code with confidence, knowing that issues are caught early in the development process.

Powerful Dependency Injection System: Manage your service dependencies efficiently with FastStream's built-in Dependency Injection (DI) system. Say goodbye to spaghetti code and embrace clean, modular architecture.



broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber(input_data)
# Load a global logger instance from faststream.Context()
async def on_input_data(msg: InputData, logger=Context()):
  logger.info(msg)


Enter fullscreen mode Exit fullscreen mode

Testability: FastStream supports in-memory tests, making your Continuous Integration and Continuous Deployment (CI/CD) pipeline faster and more reliable. Test your microservices with ease, ensuring they perform as expected.



async def test_base_app():
  # Subscribe to prediction topic so that we can assert incoming msgs
  @broker.subscriber("prediction")
  async def on_prediction(msg: Prediction):
    pass

  async with TestKafkaBroker(broker):
    # Publish a test message to the input_data topic
    await broker.publish(InputData(data=0.2), "input_data")

    # Check that the handle function for "input_data" topic was called with the correct msg
    on_input_data.mock.assert_called_once_with(dict(InputData(data=0.2)))

    # Check that the service responded with the correct prediction in the "prediction" topic
    on_prediction.mock.assert_called_once_with(dict(Prediction(score=1.2)))


Enter fullscreen mode Exit fullscreen mode

Seamless Integrations: FastStream plays well with others. It's fully compatible with any HTTP framework you prefer, with a special emphasis on compatibility with FastAPI.



# Create a FastStream router
router = KafkaRouter("localhost:9092")

...

# Connect a FastStream router to a FastAPI application lifespan
app = FastAPI(lifespan=router.lifespan_context)


Enter fullscreen mode Exit fullscreen mode

Built for Automatic Code Generation: FastStream is optimized for automatic code generation using advanced models like GPT. This means you can leverage the power of code generation to boost your productivity. Checkout the amazing tool we built for the microservice code generation: faststream-gen.

FsLovesGPT

Image description

FastStream, in a nutshell, offers ease, efficiency, and power in your microservices development journey. Whether you're just starting or looking to scale your microservices, FastStream is your trusted companion. With these core features at your disposal, you'll be well-equipped to tackle the challenges of modern, data-centric microservices.

Let's build something!

Now, let's get our hands a bit dirty 👷.
Let's implement an example python app using FastStream that consumes names from "persons" topic and outputs greetings to the "greetings" topic.

Cookiecutter project

To start our project, we will use the prepared cookiecutter FastStream project. To find out more about it, check our detailed guide.

Install the cookiecutter package using the following command:



pip install cookiecutter


Enter fullscreen mode Exit fullscreen mode

Now, run the provided cookiecutter command and fill out the relevant details to generate a new FastStream project, we will name this project "greetings_app":



cookiecutter https://github.com/airtai/cookiecutter-faststream.git


Enter fullscreen mode Exit fullscreen mode

The creation process should look like this:



You`ve downloaded /Users/tvrtko/.cookiecutters/cookiecutter-faststream before. Is it okay to delete and re-download it? [y/n] (y): y
  [1/4] username (github-username): sternakt
  [2/4] project_name (My FastStream App): Greetings App
  [3/4] project_slug (greetings_app): greetings_app
  [4/4] Select streaming_service
    1 - kafka
    2 - nats
    3 - rabbit
    Choose from [1/2/3] (1): 1
```

Change the working directory to the newly created directory and install all development requirements using pip:
```sh
cd greetings_app
pip install -e ".[dev]"
```

Now we are ready to edit our greetings_app/application.py and tests/test_application.py files to implement our application logic. 


### Writing app code

**FastStream** brokers provide convenient function decorators `@broker.subscriber` and `@broker.publisher` to allow you to delegate the actual process of:

- consuming and producing data to Event queues, and

- decoding and encoding JSON encoded messages

These decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.

Also, **FastStream** uses [**Pydantic**](https://docs.pydantic.dev/) to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your applications, so you can serialize your input messages just using type annotations.

Here is an example python app we talked about:

```python
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
from pydantic import BaseModel, Field

version = "0.1.0"
title = "My FastStream service"
description = "Description of my FastStream service"


class Name(BaseModel):
    name: str = Field(..., description="Name of the person")


class Greeting(BaseModel):
    greeting: str = Field(..., description="Greeting message")


broker = KafkaBroker()
app = FastStream(broker, title=title, version=version, description=description)

to_greetings = broker.publisher(
    "greetings",
    description="Produces a message on greetings after receiving a meesage on names",
)


@broker.subscriber("names", description="Consumes messages from names topic and produces messages to greetings topic")
async def on_names(msg: Name, logger: Logger) -> None:
    result = f"hello {msg.name}"
    logger.info(result)
    greeting = Greeting(greeting=result)
    await to_greetings.publish(greeting)
```

The example application will subscribe to **persons** Kafka topic and consume Name JSON messages from it. When the application consumes a message it will publish a Greetings JSON message **greetings** topic.

We can save the application into the `application.py` file and let's take a closer look at the code.

**Creating a broker**
To create an application, first we need to create a broker. This is the main piece of FastStream and takes care of the defining subscribers and producers.

```python
version = "0.1.0"
title = "My FastStream service"
description = "Description of my FastStream service"

...

broker = KafkaBroker()
app = FastStream(broker, title=title, version=version, description=description)
```

**Defining data structures**
Next, we need to define the structure of incoming and outgoing data. FastStream is integrated with Pydantic and offers automatic encoding and decoding of JSON formatted messages into Pydantic classes.

```python
class Name(BaseModel):
    name: str = Field(..., description="Name of the person")


class Greeting(BaseModel):
    greeting: str = Field(..., description="Greeting message")
```

**Defining a publisher**
Now, we define the publishing logic of our application.

```python
to_greetings = broker.publisher(
    "greetings",
    description="Produces a message on greetings after receiving a message on names",
)
```

**Defining a subscriber**
Finally, we can define the subscribing logic of our application. The app will consume data from the "names" topic and use the defined publisher to produce to the "greetings" topic whenever a message is consumed.

```python
@broker.subscriber("names", description="Consumes messages from names topic and produces messages to greetings topic")
async def on_names(msg: Name, logger: Logger) -> None:
    result = f"hello {msg.name}"
    logger.info(result)
    greeting = Greeting(greeting=result)
    await to_greetings.publish(greeting)
```

### Testing the service

The service can be tested using the `TestBroker` context managers, which, by default, puts the Broker into "testing mode".

The Tester will redirect your `subscriber` and `publisher` decorated functions to the InMemory brokers, allowing you to quickly test your app without the need for a running broker and all its dependencies.

Using pytest, the test for our service would look like this:

```python
import pytest
from faststream.kafka import TestKafkaBroker

from greetings_app.application import Greeting, Name, broker, on_names


# Subscribe to the "greetings" topic so we can monitor 
# messages our application is producing
@broker.subscriber("greetings")
async def on_greetings(msg: Greeting) -> None:
    pass


@pytest.mark.asyncio
async def test_on_names():
    async with TestKafkaBroker(broker):
        # Send John to "names" topic
        await broker.publish(Name(name="John"), "names")

        # Assert that our application has consumed "John"
        on_names.mock.assert_called_with(dict(Name(name="John")))

        # Assert that our application has greeted John in the "greetings" topic
        on_greetings.mock.assert_called_with(dict(Greeting(greeting="hello John")))
```

In the test, we send a test User JSON to the **in** topic, and then we assert that the broker has responded to the **out** topic with the appropriate message.

We can save the test to the test_application.py file and run the test by executing the following command in your application root file.

```shell
pytest
```

Here is how the tests execution should look like in your terminal:


```shell
===================================== test session starts =====================================
platform darwin -- Python 3.11.5, pytest-7.4.2, pluggy-1.3.0
rootdir: /Users/tvrtko/Documents/Airt Projects/FastStream/faststream-cookiecutter/greetings_app
configfile: pyproject.toml
plugins: asyncio-0.21.1, anyio-3.7.1
asyncio: mode=Mode.STRICT
collected 1 item                                                                              

tests/test_application.py .                                                             [100%]

====================================== 1 passed in 0.34s ======================================
```

### Running the application

The application can be started using built-in **FastStream** CLI command.

To run the service, use the **FastStream CLI** command and pass the module (in this case, the file where the app implementation is located) and the app symbol to the command.

``` shell
faststream run greetings_app.application:app
```

After running the command, you should see the following output:

``` shell
2023-10-13 08:36:32,162 INFO     - FastStream app starting...
2023-10-13 08:36:32,170 INFO     - names |            - `OnNames` waiting for messages
2023-10-13 08:36:32,177 INFO     - FastStream app started successfully! To exit, press CTRL+C
```

Also, **FastStream** provides you a great hot reload feature to improve your Development Experience

``` shell
faststream run greetings_app.application:app --reload
```

And multiprocessing horizontal scaling feature as well:

``` shell
faststream run greetings_app.application:app --workers 3
```

### Documentation

FastStream provides a command to serve the AsyncAPI documentation, let's use it to document our application.
To generate and serve the documentation, run the following command:

```shell
faststream docs serve greetings_app.application:app
```

Now, you should see the following output:

```shell
INFO:     Started server process [47151]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
```

Now open your browser at `http://localhost:8000` and enjoy in your automatically generated documentation! :tada:

![Generated docs](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/aa3d6fjauuqmbdvck6sx.png)

Aaaand, that's it! :tada: :tada: Feel free to experiment further with your application and checkout [the documentation](https://faststream.airt.ai/latest/) for more complex examples.

## Support us on GitHub and join our community :star:

Ready to join the FastStream revolution? Head over to our [GitHub repository](https://github.com/airtai/faststream) and show your support by starring it. By doing so, you'll stay in the loop with the latest developments, updates, and enhancements as we continue to refine and expand FastStream.

Don't forget, we also have an active Discord channel where you can connect with fellow FastStream enthusiasts, ask questions, and share your experiences. Your active participation in our growing community, whether on GitHub or Discord, is invaluable, and we're grateful for your interest and potential contributions. Together, we can make microservices development simpler and more efficient with FastStream.

## Conclusion

FastStream is your go-to tool for efficient microservices development. It simplifies message queues, supports various brokers, and offers Pydantic validation and auto-doc generation.

We're immensely grateful for your interest, and we look forward to your potential contributions. With FastStream in your toolkit, you're prepared to conquer the challenges of data-centric microservices like never before. Happy coding!
Enter fullscreen mode Exit fullscreen mode
💖 💪 🙅 🚩
tvrtko
Tvrtko Sternak

Posted on October 16, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related