Apache Spark-Structured Streaming :: Cab Aggregator Use-case
SNEHASISH DUTTA
Posted on June 30, 2024
Building helps you retain more knowledge.
But teaching helps you retain even more. Teaching is another modality that locks in the experience you gain from building.--Dan Koe
Objective
Imagine a very simple system that can automatically warn cab companies whenever a driver rejects a bunch of rides in a short time. This system would use Kafka to send ride information (accepted, rejected) and Spark Structured Streaming to analyze it in real-time. If a driver rejects too many rides, the system would trigger an alert so the cab company can investigate.
What is Spark Structured Streaming ?
Spark Structured Streaming is a powerful tool for processing data streams in real-time. It's built on top of Apache Spark SQL, which means it leverages the familiar DataFrame and Dataset APIs you might already use for batch data processing in Spark. This offers several advantages:
Unified Programming Model: You can use the same set of operations for both streaming and batch data, making it easier to develop and maintain code.
Declarative API: Spark Structured Streaming lets you describe what you want to achieve with your data processing, rather than writing complex low-level code to handle the streaming aspects.
Fault Tolerance: Spark Structured Streaming ensures your processing jobs can recover from failures without losing data. It achieves this through techniques like checkpointing and write-ahead logs.
Here's a breakdown of how Spark Structured Streaming works:
Streaming Data Source: Your data comes from a streaming source like Kafka, Flume, or custom code that generates a continuous stream of data.
Micro-Batching: Spark Structured Streaming breaks down the continuous stream into small chunks of data called micro-batches.
Structured Processing: Each micro-batch is processed like a regular DataFrame or Dataset using Spark SQL operations. This allows you to perform transformations, aggregations, and other data manipulations on the streaming data.
Updated Results: As new micro-batches arrive, the processing continues, and the results are constantly updated, reflecting the latest data in the stream.
Sinks: The final output can be written to various destinations like databases, dashboards, or other streaming systems for further analysis or action.
Benefits of Spark Structured Streaming:
Real-time Insights: Analyze data as it arrives, enabling quicker decision-making and proactive responses to events.
Scalability: Handles large volumes of streaming data efficiently by leveraging Spark's distributed processing capabilities.
Ease of Use: The familiar DataFrame/Dataset API makes it easier to develop and maintain streaming applications.
In essence, Spark Structured Streaming bridges the gap between batch processing and real-time analytics, allowing you to analyze data as it's generated and gain valuable insights from continuous data streams.
Project Architecture
Extract From : Apache Kafka
Transform Using : Apache Spark
Load Into : Apache Kafka
Producer and Infrastructure
Repository : https://github.com/snepar/cab-producer-infra
It is a Simple Application which ingests data into Kafka
It ingests Random Events either Accepted or Rejected
Sample Event
{
"id": 3949106,
"event_date": 1719749696532,
"tour_value": 29.75265579847153,
"id_driver": 3,
"id_passenger": 11,
"tour_status": rejected
}
Start the Infrastructure
docker compose up
Radom Events Generator
val statuses = List("accepted", "rejected")
val status = statuses(Random.nextInt(statuses.length))
while (true) {
val topic = "ride"
val r = scala.util.Random
val id = r.nextInt(10000000)
val tour_value = r.nextDouble() * 100
val id_driver = r.nextInt(10)
val id_passenger = r.nextInt(100)
val event_date = System.currentTimeMillis
val payload =
s"""{"id":$id,"event_date":$event_date,"tour_value":$tour_value,"id_driver":$id_driver,"id_passenger":$id_passenger,"tour_status":"$status"}""".stripMargin
EventProducer.send(topic, payload)
Thread.sleep(1000)
}
Send Random Events to Producer
def send(topic: String, payload: String): Unit = {
val record = new ProducerRecord[String, String](topic, key, payload)
producer.send(record)
}
See the produced events from Topic named ride in the Docker Terminal
kafka-console-consumer --topic ride --bootstrap-server broker:9092
Spark Structured Streaming Application
Repository : https://github.com/snepar/spark-streaming-cab
Create Spark Session to Execute the application locally ::
val spark = SparkSession.builder()
.appName("Integrating Kafka")
.master("local[2]")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
Configure Reader and Writer - Kafka topics
val kafkahost = "localhost:9092"
val inputTopic = "ride"
val outputTopic = "rejectalert"
val props = new Properties()
props.put("host", kafkahost)
props.put("input_topic",inputTopic)
props.put("output_host", kafkahost)
props.put("output_topic",outputTopic)
props.put("checkpointLocation","/tmp")
Define Schema for the Events
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = true),
StructField("event_date", LongType, nullable = false),
StructField("tour_value", DoubleType, nullable = true),
StructField("id_driver", StringType, nullable = false),
StructField("id_passenger", IntegerType, nullable = false),
StructField("tour_status", StringType, nullable = false)
))
Read from Kafka Topic and Create the Streaming Dataframe
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("failOnDataLoss","false")
.option("startingOffsets", "latest")
.option("subscribe", "ride").load()
Parse the Dataframe with Schema and filter out only Events which are marked as Rejected
The Rejected Events Signify that a Driver has rejected a ride
val parsedDF = df.selectExpr("cast(value as string) as value")
.select(from_json(col("value"), schema).as("data"))
.select("data.*").where("tour_status='rejected'")
Aggregate in a Window of 1 minute how many rides were rejected and Group By driver ID , also calculate how much money has been lost due to this rejection
val driverPerformance: DataFrame = parsedDF.groupBy(
window(to_utc_timestamp(from_unixtime(col("event_date") / 1000, "yyyy-MM-dd HH:mm:ss"), "UTC+1")
.alias("event_timestamp"),
"1 minute"),
col("id_driver"))
.agg(count(col("id")).alias("total_rejected_tours"),
sum("tour_value").alias("total_loss"))
.select("id_driver", "total_rejected_tours", "total_loss")
Set a threshold of 3 cancellations , if it crosses 3 generate an event
val thresholdCrossedDF = driverPerformance.where(col("total_rejected_tours").gt(3))
Write this DataFrame to a Kafka Topic rejectalert
thresholdCrossedDF.selectExpr("CAST(id_driver AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers",
prop.getProperty("output_host","localhost:9092"))
.option("topic",prop.getProperty("output_topic","rejectalert"))
.outputMode("update".option("checkpointLocation",prop.getProperty("checkpoint","/tmp"))
.start().awaitTermination()
Run the Complete Application : https://github.com/snepar/spark-streaming-cab/blob/master/src/main/scala/rideevent/AlertGenerator.scala
Using A Consumer on Kafka Broker Subscribe to these Alerts
References
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Posted on June 30, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.