Yitaek Hwang
Posted on February 18, 2021
Analyze cryptocurrency price trends in realtime via Kafka and store for further investigation in a timeseries database.
“Bitcoin soars past $50,000 for the first time” — CNN
“Tesla invests $1.5 billion in bitcoin, will start accepting it as payment” — Washington Post
Not a day goes by without some crypto news stealing the headlines these days. From institutional support of Bitcoin to central banks around the world exploring some form of digital currency, interest in cryptocurrency has never been higher. This is also reflected in the daily exchange volume:
As someone interested in the future of DeFi (decentralized finance), I wanted to better track the price of different cryptocurrencies and store them into a timeseries database for further analysis. I found an interesting talk by Ludvig Sandman and Bruce Zulu at Kafka Summit London 2019, “Using Kafka Streams to Analyze Live Trading Activity for Crypto Exchanges”, so I decided to leverage Kafka and modify it for my own use. In this tutorial, we will use Python to send real-time cryptocurrency metrics into Kafka topics, store these records in QuestDB, and perform moving average calculations on this time series data with numpy.
Project Setup
At a high level, this project polls the public Coinbase API for the price of Bitcoin, Ethereum, and Chainlink. This information is then published onto individual topics on Kafka (e.g. topic_BTC). The raw price information is sent to a QuestDB via Kafka Connect to populate the timeseries database. At the same time, a separate consumer also pulls that data and calculates a moving average for a quick trend analysis.
The codebase is organized into three parts:
docker-compose: holds docker-compose file to start Kafka (zookeeper, broker, kafka connect), QuestDB, and JSON file to initialize Kafka Connect
docker: Dockerfile to build Kafka Connect image (pre-built image is available via docker-compose)
Python files: grabs latest pricing information from Coinbase, pubishes information to Kafka, and calculates a moving average
If you would like to analyze different cryptocurrencies or extend the simple moving average example with a more complicated algorithm like relative strength index analysis, feel free to fork the repo on Github
Prerequisites
Docker (with at least 4GB memory): if using Docker Desktop, go to Settings -> Resources -> Memory and increase he default limit from 2GB to 4GB
Python 3.7+
Setting up Kafka & QuestDB
Before pulling data from Coinbase, we need a running instance of a Kafka cluster and QuestDB. In the repo, I have a working docker-compose file with Confluent Kafka components (i.e. zookeeper, broker, Kafka Connect) and QuestDB. If you would like to run this on the cloud or run it locally, follow the instructions on the Confluent website. Otherwise simply apply the docker-compose file:
cd docker-compose
docker-compose up -d
The docker-compose file runs the following services:
Zookeeper
Kafka Broker
Kafka Connect with JDBC driver
QuestDB
The Kafka Connect image is based on confluentinc/cp-kafka-connect-base:6.1.0 . If you wish to modify this image (e.g. add a new connector to MongoDB or modify the bootup process), you can override the Dockerfile and build it locally.
Wait for the Kafka cluster to come up. Watch the logs in the connect container until you see the following messages:
[2021-02-17 01:55:54,456] INFO [Worker clientId=connect-1, groupId=compose-connect-group] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2021-02-17 01:55:54,456] INFO [Worker clientId=connect-1, groupId=compose-connect-group] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2021-02-17 01:55:54,572] INFO [Worker clientId=connect-1, groupId=compose-connect-group] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Configuring Postgres Sink
At this point, we have a health Kafka cluster and a running instance of QuestDB, but they are not connected. Since QuestDB supports Kafka Connect JDBC driver, we can leverage the PostgreSQL sink to populate our database automatically. Post this connector definition to our Kafka Connect container:
# Make sure you're inside the docker-compose directory
$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @postgres-sink-btc.json [http://localhost:8083/connectors](http://localhost:8083/connectors)
postgres-sink-btc.json holds the following configuration details:
{
"name": "postgres-sink-btc",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics": "topic_BTC",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connection.url": "jdbc:postgresql://questdb:8812/qdb?useSSL=false",
"connection.user": "admin",
"connection.password": "quest",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"insert.mode": "insert",
"pk.mode": "none"
}
}
Some important fields to note:
topics: Kafka topic to consume and convert into Postgres format
connection: Using default credentials for QuestDB (admin/quest) on port 8812
value.converter: This example uses JSON with schema, but you can also use Avro or raw JSON. If you would like to override the default configuration, you can refer to Kafka Sink Connector Guide from MongoDB.
Poll Coinbase for Latest Crypto Prices
Now our that our Kafka-QuestDB connection is made, we can start pulling data from Coinbase. The Python code requires numpy , kafka-python , and pandasto run. Using pip , install those packages and run the getData.py script:
$ pip install -r requirements.txt
$ python getData.py
It will now print out debug message with pricing information as well as the schema we’re using to populate QuestDB:
Initializing Kafka producer at 2021-02-17 14:38:18.655069
Initialized Kafka producer at 2021-02-17 14:38:18.812354
API request at time 2021-02-17 14:38:19.170623
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 19, 170617), 'currency': 'BTC', 'amount': 50884.75}}
API request at time 2021-02-17 14:38:19.313046
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 19, 313041), 'currency': 'ETH', 'amount': 1809.76}}
API request at time 2021-02-17 14:38:19.471573
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 19, 471566), 'currency': 'LINK', 'amount': 31.68216}}
API request at time 2021-02-17 14:38:23.978928
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 23, 978918), 'currency': 'BTC', 'amount': 50884.75}}
...
Query Data on QuestDB
QuestDB is a fast, open-source, timeseries database with SQL support. This makes it a great candidate to store financial market data for further historical trend analysis and generating trade signals. By default, QuestDB ships with a console UI exposed on port 9000. Navigate to localhost:9000 and query Bitcoin tracking topic topic_BTC to see price data stream in:
You can repeat this process for the other topics as well. If you prefer to run without a UI, you can also use the REST API to check:
$ curl -G \
--data-urlencode "query=select * from topic_BTC" \
http://localhost:9000/exp
QuestDB console UI also provides the ability to generate basic graphs. Click on the Chart tab underneath the Tables. Select line as the chart type, timestamp as the label, and click Draw :
Unfortunately, the QuestDB native charting capabilities are currently limited. For more advanced visualization, check out my previous guide on streaming heart rate data to QuestDB under the “Visualizing Data with Grafana” section.
Calculate Moving Average
While we store the raw data on QuestDB for more sophisticated analysis, we can also consume from the same topics to calculate a quick moving average. This may be useful if you want to also post these records to another Kafka topic that you may use on a dashboard or to set alerts on pricing trends.
On a separate terminal, run the moving average script:
$ python movingAverage.py
It will print out the moving average of 25 data points and post it to topic__ma_25 :
Starting Apache Kafka consumers and producer
Initializing Kafka producer at 2021-02-17 16:28:33.584649
Initialized Kafka producer at 2021-02-17 16:28:33.699208
Consume record from topic 'topic_BTC' at time 2021-02-17 16:28:34.933318
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.072581
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.075352
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.077106
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.088821
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.091865
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.094458
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.096814
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.098512
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.100150
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.103512
If you wish to also populate these data points into QuestDB, supplement the JSON data with schema information in movingAverage.py similar to the way it is defined in the new_data JSON block in getData.py . Then create another Postgres sink via curl with topic set as topic__ma_25 .
Wrapping Up
To stop streaming data, simply stop the Python scripts. To destroy the Kafka cluster and QuestDB, run:
$ docker-compose down
While this is a simple example, you can extend this to optimize the data format with Avro, connect it with your Coinbase account to execute trades based on trading signals, or test out different statistical methods on the raw data. Feel free to submit a PR to make this repo more useful.
Posted on February 18, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.