Building an Event-Driven System for GeoJSON File Processing and Serving
AissaGeek
Posted on January 25, 2024
Introduction
Event-Driven architectures are powerful inn handling asynchronous task, providing scalability, and decoupling various components of a system. In this article we'll build an event-driven system that processes GeoJSON files. if you are interested, see how it is done:
- File Watcher: Detects new files in a directory.
- Message Broker (RabbitMQ): Receives message about the new files.
- Consumer: Processes files and interacts with a database and object storage.
- FastAPI Server: Serves the files through streaming.
System Overview
Our system consists of four main component.
- A python script that watches a directory for new GeoJSON files, then publishes information about the new file into the RabbitMQ broker, information such, file name and absolute path.
- A message broker that queues this information.
- A python script consumer, that processes each file, stores in PostGIS database, and uploads it to MinIO.
- A FastAPI API that provides an endpoint to stream the file form MinIO.
Technologies used
- RabbitMQ: for message queuing.
- PostGIS: A spatial database extender for PostgresSQL.
- MinIO: An object storage solution.
- FastAPI: A modern, fast framework for building APIs. Now for practical implementation of the system.
System Setup
In order to quickly build our system, we will use docker-compose to orchestrate some services, see below the docker-compose.yml
file
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
postgis:
image: postgis/postgis
environment:
POSTGRES_DB: postgis
POSTGRES_USER: user
POSTGRES_PASSWORD: password
minio:
image: minio/minio
volumes:
- minio_data:/data
ports:
- "9000:9000"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
command: server /data
volumes:
minio_data:
What is next ? Let's put in place the python watcher and consumer so our system will be partially done.
File Watcher Implementation
The file watcher uses watchdog
to monitor a directory:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import pika
import json
class Handler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
self.send_message(event.src_path)
def send_message(self, file_path):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='file_queue')
message = json.dumps({'file_path': file_path})
channel.basic_publish(exchange='', routing_key='file_queue', body=message)
connection.close()
observer = Observer()
observer.schedule(Handler(), path='/path/to/watch', recursive=False)
observer.start()
Python Consumer for Processing Files
This script below consumes messages, processes files, and interacts with PostGIS and MinIO:
import pika
import json
import geopandas as gpd
from minio import Minio
def process_file(file_path):
# Read GeoJSON file
gdf = gpd.read_file(file_path)
# TODO Process and save to PostGIS
# ...... Here its your turn to complete this project ....
# Upload to MinIO
minio_client.fput_object("your-bucket", "file_name.geojson", file_path)
minio_client = Minio('minio:9000', access_key='minio', secret_key='minio123', secure=False)
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
channel.queue_declare(queue='file_queue')
def callback(ch, method, properties, body):
file_info = json.loads(body)
process_file(file_info['file_path'])
channel.basic_consume(queue='file_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
FastAPI Endpoint for streaming Files
FastAPI provides an endpoint to stream files form MinIO:
from fastapi import FastAPI
from minio import Minio
from starlette.responses import StreamingResponse
app = FastAPI()
minio_client = Minio('minio:9000', access_key='minio', secret_key='minio123', secure=False)
@app.get("/files/{file_name}")
async def get_file(file_name: str):
obj = minio_client.get_object("your-bucket", file_name)
return StreamingResponse(obj.stream(32*1024), media_type="application/octet-stream")
What is next ?
You think we are all done, then you are wrong, try to put all together what we have seen, and see if it works.
Something messing right ? Of course, there are some missing pieces, like the database ORM model to map the database tables. That is not all, how about creating the database with necessary tables.
How about you complete this project and let me know about your precious contribution ;)
Conclusion
We have built an event-driven system for processing and serving GeoJSON files. This setup demonstrates the power of combining several technologies and Python scripts to create a scalable and efficient system.
By leveraging Docker, we ensure that our components are easily deployable and maintainable. This architecture is not only limited to GeoJSON files but can be adapted to various scenarios requiring automated file processing and serving.
Posted on January 25, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
June 19, 2024