How to stream data from Mastodon public timelines to Apache Kafka with NodeJS and TypeScript
Olena Kutsenko
Posted on March 21, 2023
Mastodon has been rising in popularity over recent months. If you're not yet familiar with this exotic online creature, Mastodon is an open-source social networking software for microblogging. Instead of being a single network, like Twitter, Mastodon is a federated platform that connects independent interconnected servers. This makes it a fully decentralised system. It relies on ActivityPub and uses the ActivityStreams 2.0 data format and JSON-LD. As for the functionality, it resembles closely Twitter - you can read the timeline, post messages and interact with other users.
If you just recently joined Mastodon and are still exploring it, you might find that the scrolling timeline has its limits to understand all that is happening there. Applying some engineering skills will give a better overview on the topics and discussions happening on the platform.
Since Mastodon's timeline is nothing more than a collection of continuously coming events, the data feed is well-suited for Apache Kafka®. Adding Kafka connectors on top of that opens multiple possibilities to use data for aggregations and visualisations.
Continue reading to learn how to bring data from Mastodon to Kafka using TypeScript and a couple of helpful libraries.
Prepare the Apache Kafka cluster
To bring the data from the Mastodon timeline to a topic in Apache Kafka, you'll need an Apache Kafka cluster and some code to stream the data there. For the former, you can use either your own cluster, or a managed version that runs in the cloud, such as Aiven for Apache Kafka. If you don't have an Aiven account yet, sign up for a free trial and create your cluster, the setup only takes a few minutes.
Once your cluster is running, add a topic with the name mastodon
. Alternatively, you can use any other name, just remember it, you'll need it a bit later.
To connect securely to the cluster we'll use SSL. Aiven already takes care of the configuration of the server, but you'll need to download three files for the client to establish the connection. Download these files from Aiven's console:
You will need these files later, so put them somewhere safe for now.
Working with the Mastodon API
The Mastodon API has excellent documentation that makes it straightforward to access the public data feeds. You don't need to be registered to retrieve a public feed, which makes it very convenient. Actually, just give it a try right now. Run the line below in your terminal to start retrieving a stream of data from mastodon.social
:
curl https://mastodon.social/api/v1/streaming/public
As a response you should see an endless flow of incoming events:
Parsing the response from the server manually is a monotonous and tedious operation. Rather than reinvent the wheel, you can use one of the available libraries for Mastodon. For this example we'll be using masto.js.
Jump into the code
To give you an instant start to bring Mastodon data into an Apache Kafka cluster, clone this repository:
git clone https://github.com/aiven/mastodon-to-kafka
Once you have the contents of the repo locally, follow these steps:
- Create a folder
certificates/
and add the SSL certificates you downloaded earlier into this folder. We will need these to connect securely to Apache Kafka. - Copy the file
.env.example
and rename to.env
, this file will hold the environment variables. - Set
kafka.uri
in.env
to the address of your cluster. You can take it from the connection information of your Aiven for Apache Kafka service. - Run
npm install
to install all dependencies (if you don't have npm or NodeJS yet, follow the installation instructions).
Finally, start by running npm run start
and you should see a flow of delivery reports for every new message coming from the Mastodon public feed, that is defined in the code (in the next section you'll see how to change it to whichever Mastodon feed you like!).
If things don't work first time, check for error messages printed in the terminal. They will help you navigate the problem.
You can verify that the data is flowing and see what messages you get by enabling Apache Kafka Rest API.
In the contextual menu for the topic, select Apache Kafka REST:
We can step back to see exactly how the code works to send the data from Mastodon to Apache Kafka. This can be divided into two logical steps:
- Streaming messages from a public Mastodon timeline.
- Sending these messages to Apache Kafka.
In the sections below you can see these steps in detail.
Streaming messages from a public Mastodon timeline
Open the file mastostream.ts
. It contains a small module to stream the Mastodon data.
To initialise the Mastodon client you need to call login()
from the masto.js
client library and provide the required configuration. This is also the place to provide authentication information, however, since we are only interested in public feeds, the URL property is enough. As URL, use your favourite Mastodon server.
const masto = await login({
url: 'https://mastodon.social/', // choose your favourite mastodon server
});
With the initialised Mastodon client you connect to the public stream API by calling the asynchronous function masto.stream.streamPublicTimeline()
.
const stream = await masto.stream.streamPublicTimeline();
Finally, you're ready to subscribe to the updates from the public stream provided by the API.
stream.on('update', (status) => {
console.log(status)
// next - send status data to Apache Kafka topic
});
Now time to put these building blocks together.
For the sake of encapsulation, you wouldn't want the mastostream
module to know directly about the Apache Kafka producer. That's why when putting all the above ingredients together we provide the module mastostream
with a more generic callback argument.
This callback function will return the Mastodon status message converted to a string, and the party that triggered the mastostream
will receive the data and be able to act on it:
export default async (callback: (status: string) => void) => {
try {
const masto = await login({
url: 'https://fosstodon.org/',
});
// Connect to the streaming api
const stream = await masto.stream.streamPublicTimeline();
// Subscribe to updates
stream.on('update', (status) => {
try {
callback(JSON.stringify(status));
} catch (err) {
console.log('Callback failed', err);
}
});
} catch (err) {
console.log(err)
}
};
This is what you need to stream the data from Mastadon! Time to send these messages to an Apache Kafka topic.
Sending messages to Apache Kafka using node-rdkafka
Open producer.ts
to see the code you need to send the data to an Apache Kafka topic. To work with Apache Kafka you can use one of the existing client libraries, there are several options available. This project uses node-rdkafka
, which is a NodeJS wrapper for the Kafka C/C++ library. Check its GitHub repository Readme for installation steps.
With node-rdkafka
you can create a producer to send data to the cluster. This is where you'll use the Apache Kafka configuration settings defined in .env
earlier and the certificates that you downloaded to prepare to establish a secure connection.
//create a producer
const producer = new Kafka.Producer({
'metadata.broker.list': process.env["kafka.uri"],
'security.protocol': 'ssl',
'ssl.key.location': process.env["ssl.key.location"],
'ssl.certificate.location': process.env["ssl.certificate.location"],
'ssl.ca.location': process.env["ssl.ca.location"],
'dr_cb': true
});
The producer will emit events when things happen, so to understand what is happening and to catch any errors, we subscribe to numerous events including delivery reports.
producer.on('event.log', function (log) {
console.log(log);
});
//logging all errors
producer.on('event.error', function (err) {
console.error(err);
});
producer.on('connection.failure', function (err) {
console.error(err);
});
producer.on('delivery-report', function (err, report) {
console.log('Message was delivered' + JSON.stringify(report));
});
producer.on('disconnected', function (arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});
One last event, which is especially important to use, is called on ready
. This is the moment where the producer is ready to dispatch a message to the topic. This method will rely on the callback provided by the mastostream
module that we implemented in the previous section:
producer.on('ready', async () => {
mastostream((status) => {
producer.produce(
'mastodon', // topic to send the message to
null, // partition, null for librdkafka default partitioner
Buffer.from(status), // value
null, // optional key
Date.now() // optional timestamp
);
producer.flush(2000);
}).catch((error) => {
throw error;
});
});
Yet, none of the above will work till you run the connect()
method. With the snippet below, run your code and watch the data start to flow!
producer.connect({}, (err) => {
if (err) {
console.error(err);
}
});
This method has an optional second parameter, which is a callback that you can use to be informed about any errors during the connection.
We've now seen all the code and examined how it all works together. By separating the concerns of collecting data from Mastodon, and passing it to Apache Kafka, we have a system that can also be adapted to handle different data sources as needed.
What's next
With the data being constantly collected in the topic you can now use it as an input for other tools and databases, such as OpenSearch®, ClickHouse®, PostgreSQL®. Apache Kafka® Connect connectors will help you bring the data into other systems with no code required. Learn more about Apache Kafka and Kafka Connect and check the full list of sink connectors that are available in Aiven platform to see where you can bring the data for further storage and analysis.
Posted on March 21, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
March 21, 2023