Sid
Posted on August 2, 2022
Hello dev community! I would like to share my learnings on how to make an streaming service for sentiment analysis of customer interaction with e-commerce application.
The technologies used are Apache-Kafka, Zookeeper, Apache-Spark, Docker, and Cassandra.
Yeah, so let's start 😃
Prerequisites:
I have used the following versions
spark-2.1.1-bin-hadoop2.7
kafka_2.11-0.9.0.0
Cassandra 4.0.5
Docker version 20.10.7, build 20.10.7-0ubuntu5~20.04.2
openjdk version 11.0.14.1
Python3.8
Let's know what are these in short
- Apache Kafka is an event streaming platform which has capabilities to publish, subscribe, store, and process the stream of events.
- Apache ZooKeeper is a service used by a cluster (group of nodes) to coordinate between themselves and maintain shared data with robust synchronization.
- Cassandra is a high-performance distributed database designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. It is a type of NoSQL database.
- Docker is an containerization platform which enables to package application into containers which simplifies building, shipping, running tasks.
ZooKeeper daemon (process), runs in the name of QuorunPeerMain is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages. Kafka daemon consists of kafka-producer and kafka-consumer. Kafka-producer will listen to the click events on the web application and create event based data, which is published to one of the Kafka topics which is further consumed by kafka consumer.
This consumed data is sent to spark using a kafka to spark connector.
Note that Kafka is a data pipeline tool for streaming data, and Spark is a big-data processing tool.
Spark consists of master and worker processes. The master is the driver that runs the main() program where the spark context is created. It then interacts with the cluster manager to schedule the job execution and perform the tasks. 2. The worker consists of processes that can run in parallel to perform the tasks scheduled by the driver program.
Cassandra is connected to spark via a connector trough which the data is provided and stored in the database.
Implementation
I'm using an ubuntu virtual machine.
Download the zip files and unzip them.
Navigate into the spark directory and start all the spark daemons.
sbin/start-all.sh
You can check the status which the help of jps
command which lists the instrumented Java HotSpot VMs on the target system.
Master, Worker would be appeared in the list.
Now, navigate to zookeeper directory and start the zookeeper which acts as a metadata service for Kafka. To run this command in background we use nohup
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >> nohup.out &
(press enter if required)
QuorumPeerMain process is started which you can verify using jps
Similarly, start Kafka daemon
nohup bin/kafka-server-start.sh config/server.properties >> nohup.out &
You would now see Kafka process using jps
I have used docker container for running cassandra, but you can do it without using docker using command nohup bin/cassandra -f
and check if you are able to access cqlsh
shell. I was facing problems accessing cqlsh shell as I had installed and uninstalled multiple versions of Cassandra.
Using docker, pull the cassandra latest version image and create a container out of it.
docker pull cassandra:latest
docker run --name cassandra -p 127.0.0.1:9042:9042 -p 127.0.0.1:9160:9160 -d cassandra
To get access of cqlsh shell inside the container, we do the following.
docker exec -it cassandra bash
Note that the name cassandra is just name of container and can be any other name.
You may face some issues, like when you accidently create multiple container, or other things which you can fix using some docker commands referring the documentation.
Now, create a topic in Kafka, here topic1 is the topic name.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
Before moving onto spark, we will first setup cassandra database.
In cqlsh shell create a database and a table with the following schema.
create kespace sparkdata with replication ={'class':'simpleStrategy', 'replication_factor':1};
use sparkdata;
CREATE TABLE cust_data (fname text , lname text , url text,product text , cnt counter ,primary key (fname,lname,url,product));
select * from cust_data;
Cassandra will do an upsert if you try to add records with a primary key that already exists.
Now coming to spark, run this commands to get the required packages which help in connecting kafka with spark and spark with cassandra.
bin/spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.0.2","org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0"
You may face some errors, as I had faced, but just trying some other ways and again executing the same command worked for me, strange isn't it? The errors could be because of bintray is not longer active.
I tried this and this sources.
You may need to manually download the packages.
Open spark shell using bin/spark-shell
and import the packages which we downloaded. Before proceeding, stop the spark process within the scala terminal using spark.stop
, as previous spark instance is already running, and only one spark session can be running at a time.
spark.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.streaming._
Further, in the same shell, we create an spark application and configure its settings. Here, app name is KafkaSparkStreaming and 127.0.0.1
is the IP address of the cassandra instance which is running.
val sparkConf = new SparkConf().setAppName("KafkaSparkStreaming").set("spark.cassandra.connection.host", "127.0.0.1")
A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. We create a new SparkContext, and create a stream between kafka and spark.
val ssc = new StreamingContext(sparkConf, Seconds(20))
val topicpMap = "mytopic".split(",").map((_, 1.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "sparkgroup", topicpMap).map(_._2)
lines.print();
The consumed message by kafka-consumer is published to spark which processes with by mapping the message data to corresponding columns of the table schema that we created before, and is saved to the database.
lines.map(line => { val arr = line.split(","); (arr(0),arr(1),arr(2),arr(3),arr(4)) }).saveToCassandra("sparkdata", "cust_data", SomeColumns("fname", "lname","url","product","cnt"))
ssc.start // starts the JobScheduler which inturn starts jobGenerator which creates jobs
Congrats, you made it till the implementation part. 🎉
Testing
Now to test this, we create new sessions of kafka-producer and kafka-consumer, and leave the old sessions of spark-shell and cqlsh undisturbed.
As, I haven't created a web interface currently which I plan to do in near future, we can test it manually on the terminal.
Create kafka-producer and kafka-consumer in different terminals.
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic1 --from-beginning
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1
We then manually enter this lines one by one, in kafka-producer terminal as if it produces this data from the web-application
rahul,patil,http://127.03.32,Microsoft Xbox,1
rohan,G,http://127.24.43,iPhone 13 Pro,1
rahul,patil,http://127.03.32,Microsoft Xbox,1
aditya,chouglae,http://127.55.33,Boatstone 1000,1
rahul,patil,http://127.03.32,Sony PlayStation,1
rahul,patil,http://127.03.32,Microsoft Xbox,1
rohan,G,http://127.24.43,iPhone 13 Pro,1
Notice that the message is been passed to kafka-consumer which in turn streams to spark, which processes the message and stores it into Cassandra.
We can observe that the count get incremented as it is not the part of primary key. In this way we can know how may times the user has visited a particular page in an ecommerce website, and using this data we can further analyse and give discount to the user accordingly so that our product sales in increase effectively, and also know the factors due to which user is product sales are not good enough, and what can be made better.
Finally, we have completed the architecture as planned in the diagram successfully.
References:
This was the initial reference and many other sites like StackOverflow (ofcourse haha), cassandra docs, datastax, GitHub issues, and many more.
Posted on August 2, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.