Riottecboi
Posted on June 19, 2024
Introduction
In the bustling realm of modern travel, personalized recommendations play a pivotal role in enhancing user experiences and fostering memorable journeys. Our project, the Smart Travel Recommender System, aims to revolutionize the way travelers explore new destinations by providing tailored recommendations for activities based on country and season. By integrating cutting-edge technologies such as FastAPI, Kafka, MongoDB, and OpenAI, we aspire to deliver a seamless and scalable solution that empowers users to discover the essence of every destination.
Structure of Project
Travel Recommender/
├── app/
│ ├── background_worker.py
│ ├── main.py
│ ├── api/
│ │ ├── openai_api.py
│ │ └── routes/
│ │ ├── recommendations_route.py
│ │ └── status_route.py
│ ├── models/
│ │ ├── __init__.py
│ │ └── models.py
│ ├── schemas/
│ │ └── schema.py
│ ├── core/
│ │ └── config.py
│ ├── db/
│ │ └── mongodb.py
│ └── utils/
│ └── kafka.py
├── test/
│ └── sample_test.py
├── requirements.txt
├── README.md
├── Dockerfile
├── docker-compose.yml
More information from my Github
Installation & Usage
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update
sudo apt install python3.11 -y
Use the package manager Pip to install driver, and any other required libraries.
sudo apt-get install python3-pip -y
Initialize your virtual environment in your project and activate the virtual environment.
python3.11 -m venv <virtual-environment-name>
source <virtual-environment-name>/bin/activate
Install all required libraries for this project from the file requirements.txt
python3.11 -m pip install -r requirements.txt
Set Up All Services
I have created a docker-compose file for easy to install all of elements we need for this project.
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: bitnami/kafka:latest
ports:
- 9092:9092
- 9093:9093
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
kafka-ui:
image: provectuslabs/kafka-ui
ports:
- 8080:8080
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
mongodb:
image: mongo:5.0
ports:
- 27017:27017
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: root
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongo localhost:27017/test --quiet
interval: 10s
timeout: 5s
retries: 5
mongo-express:
image: mongo-express:latest
depends_on:
mongodb:
condition: service_healthy
ports:
- 8888:8081
environment:
ME_CONFIG_MONGODB_SERVER: mongodb
fastapi-app:
build:
context: .
args:
NO_CACHE: "true"
depends_on:
- kafka
- mongodb
ports:
- "8000:8000"
environment:
OPENAI_KEY: "xxxx"
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_TOPIC: recommendations_topic
MONGODB_URI: mongodb://root:root@mongodb:27017
MONGODB_DATABASE: recommendations
Dockerfile will be like this
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.11
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt /app/requirements.txt
COPY app/api /app/api
COPY app/core /app/core
COPY app/db /app/db
COPY app/schemas /app/schemas
COPY app/utils /app/utils
COPY app/background-worker.py /app/background.py
COPY app/main.py /app/main.py
RUN pip3 install --no-cache-dir -r requirements.txt
EXPOSE 8000
# Run the FastAPI service and background process
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port 8000 & python background.py"]
Containers will be pulled and run by each process, connect with a default bridge network of project.
Explanation
We will have in total two routes (recommendation_route and status_route)
- recommendation_route will accept two query parameters:
- country: The country for which the recommendations are to be fetched.
- season: The season in which the recommendations are desired (e.g., "summer", "winter").
- Both parameters are required and the season must validate that one of the four seasons is chosen.
class RecommendationsRequest(BaseModel):
country: str = Field(..., description="The country for which recommendations are to be fetched.")
season: str = Field(..., description="The season in which the recommendations are desired.")
@model_validator(mode='after')
def validate_season(cls, values):
try:
pycountry.countries.search_fuzzy(values.country)
except LookupError:
raise ValueError(f"Invalid country.")
valid_seasons = ["spring", "summer", "autumn", "winter"]
if values.season not in valid_seasons:
raise ValueError(f"Invalid season. Must be one of {', '.join(valid_seasons)}")
return values
When a request is made to the endpoint, generate a unique identifier (UID) for the request then return the UID to the user immediately.
try:
RecommendationsRequest(country=country, season=season)
uid = str(uuid.uuid4())
request_data = {'country': country, 'season': season}
await kafka_producer(request_data, uid)
return RecommendationSubmitResponse(uid=uid)
except ValidationError as e:
raise HTTPException(status_code=422, detail=ErrorResponse(error="Invalid country/season", message="The input of country or season is invalid. Please try again.").dict())
Offload the processing of the request to a background component. At this stage, we will use Kafka to send the request as a message to a Kafka topic and consume it with a separate worker.
async def kafka_producer(request_data, uid):
producer = AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
)
await producer.start()
await producer.send(
settings.KAFKA_TOPIC,
f"{uid}:{request_data}".encode("utf-8"), partition=0
)
await producer.stop()
This Python code block defines an asynchronous function named kafka_producer that is responsible for sending data to a Kafka topic.
producer = AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
)
Initializes a Kafka producer using the AIOKafkaProducer class from aiokafka library. This producer is configured to connect to the Kafka cluster specified in the settings.KAFKA_BOOTSTRAP_SERVERS
await producer.start()
await producer.send(
settings.KAFKA_TOPIC,
f"{uid}:{request_data}".encode("utf-8"), partition=0
)
This step prepares the producer to send messages to the Kafka cluster, sends a message to the specified Kafka topic (settings.KAFKA_TOPIC). The message content is a string composed of the uid followed by a colon (:)
and the request_data, message should be encoded as UTF-8 before sending.
When the message sent a new topic to Kafka, background_worker.py will do a work to catch that message.
async def handle_request(uid, request_data):
try:
recommendations = await get_openai_recommendation(request_data)
except Exception as e:
recommendations = []
result = await save_recommendations(uid, request_data, recommendations)
print(f"Recommendations saved with ID: {result}")
async def main():
while True:
uid, request_data = await kafka_consumer()
await handle_request(uid, request_data)
on this stage, the worker will consume a message by this code block, it defines an asynchronous function that acts as a Kafka consumer, receiving messages from a specified Kafka topic, processing them, and committing the offsets to track the progress of message consumption.
consumer = AIOKafkaConsumer(
settings.KAFKA_TOPIC,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=settings.KAFKA_TOPIC,
auto_offset_reset='earliest'
)
await consumer.start()
try:
async for msg in consumer:
uid, request_data = msg.value.decode("utf-8").split(":", 1)
print(f"Processed recommendation request: {request_data}")
await consumer.commit()
return uid, eval(request_data)
except Exception as e:
print(f"Consumer error: {e}")
finally:
await consumer.stop()
About the auto_offset_reset=’earliest’, It specifies the behavior for handling the offset when there is no initial offset or the current offset does not exist on the server (e.g., because the data has been deleted). In this case, it's set to ‘earliest’, meaning the consumer will start reading from the earliest available message.
After commits the offset of the consumed message, background_worker.py make the OpenAI API call.
try:
client = OpenAI(api_key=settings.OPENAI_KEY)
chat_completion = client.chat.completions.create(
messages=[
{
"role": "user",
"content": f"Provide three recommendations for doing in {request_data['country']} during {request_data['season']}.",
}
],
model="gpt-3.5-turbo",
)
return [chat_completion.choices[0].message.content]
except Exception as e:
raise Exception(str(e))
and saving recommendations store the recommendations in MongoDB with the UID as the key, structure the data to include the (country, season) as request_data, recommendations.
async def save_recommendations(uid, request_data, recommendations):
recommendation_doc = {
"uid": uid,
"request_data": request_data,
"recommendations": recommendations
}
result = await loop.run_in_executor(None, recommendations_collection.insert_one, recommendation_doc)
return result.inserted_id
- status_route accept the UID as a query parameter, checking the result in MongoDB.
- If having result, the status is “completed” with the recommendations.
- If not process haven’t finish, the status is “pending” to inform the data is not yet available.
if recommendations is None:
raise HTTPException(status_code=404, detail=ErrorResponse(error="UID not found", message="The provided UID does not exist. Please check the UID and try again.").dict())
if recommendations:
return RecommendationResponse(uid=uid, country=country, season=season, message="The recommendations are ready", recommendations=recommendations, status="completed")
return RecommendationCheckResponse(uid=uid, status="pending", message="The recommendations are not yet available. Please try again later.")
class RecommendationSubmitResponse(BaseModel):
uid: str
class RecommendationCheckResponse(RecommendationSubmitResponse):
message: Optional[str] = None
status: str
class RecommendationResponse(RecommendationCheckResponse):
country: Optional[str] = None
season: Optional[Literal["spring", "summer", "autumn", "winter"]] = None
recommendations: Optional[List[str]] = None
class ErrorResponse(BaseModel):
error: str
message: str
Final Demo
Conclusion
By leveraging FastAPI, Kafka, MongoDB, and OpenAI, we endeavor to deliver a sophisticated yet user-friendly platform that empowers travelers to embark on unforgettable journeys tailored to their preferences and interests. With scalability, efficiency, and personalization at its core, our system strives to redefine the way travelers explore the world, one recommendation at a time.
Posted on June 19, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.