Introducing Deimos: Using Kafka as the Data Backbone for your Architecture
Daniel Orner
Posted on November 22, 2019
In the previous article, I detailed the microservice pattern of using Kafka as a data backbone for your architecture — Kafka acts as the source of truth and each microservice consumes data from it. This allows each service to be independent and to own only the data it cares about. We also said that there should be some way to marry traditional relational databases like MySQL and Postgres with the Kafka data backbone with a minimum of fuss.
We left off with a couple of problems:
Ensuring that downstream systems are not tied to your internal data schemas;
Fixing the transaction problem, where due to errors data can be written to Kafka or your database but not both.
Ruby on Rails — the Database Experts
Here at Flipp, we use Ruby on Rails heavily for systems that talk to databases. Rails’s ActiveRecord package has been in active development for over 13 years, covers a wide range of features and has been battle-tested in thousands of applications. Rails allows us to talk to our database in a consistent way and abstracts most of the underlying details away for us.
When we started widely using Kafka in our engineering org, we quickly realized we needed a way to:
Save data from Kafka to our database
Write data to Kafka — either updating state or sending events
Encode/decode data using Avro and the Confluent Schema Registry (a free centralized hub for all our Avro schemas)
Standardize logging and tracing for all our consumers
Our first attempt to send data across systems was to use Kafka Connect. We ran into a number of problems with it, some of which were detailed in the previous article. We decided it was more prudent for us to strike our own way to solve this problem.
We started off with a shared library we called FlippRubyKafka in late 2017. Over the last two years, this library has grown, added features and fixed bugs and has powered over a dozen of our core microservices. We are happy to announce that this project, now called Deimos, is now open source!
Deimos Features
Deimos is firmly designed to cut down as much as possible on boilerplate and plumbing code. It is focused entirely on the use case of Kafka as a data backbone with Avro composing the structure of the data.
Deimos is built on a previous, smaller library called Phobos (hence the name), which in turn is built on RubyKafka, a pure Ruby implementation of the Kafka client.
Note that Deimos assumes that you have a copy of the Confluent Schema Registry running. If you’ve already got Kafka and Zookeeper up then all you need to do is deploy the Docker container to the cloud provider of your choice and get a reference to its URL.
Configuring Producers and Consumers
You can use Deimos simply as a way to tie Avro and Kafka together, and it works really well even if you’re not using Rails! Both producers and consumers allow your business logic to deal exclusively with Ruby hashes — they will automatically be encoded to and decoded from Avro via the schema registry before interacting with Kafka.
Because Avro is a central tenet of Deimos, all producers and consumers must specify which Avro schema and namespace they will use while consuming or producing. In addition, they need to specify how they handle keys for these messages:
The default, and recommended, way to handle keys is to Avro-encode them. This allows for any downstream systems that use static types (e.g. those written in Java or Scala, which have a large ecosystem related to Kafka) to easily decode them.
You can also indicate that your topic has no keys (e.g. event topics should not have keys as you want to keep all events, not just the last updated state).
Finally, you can leave your keys as plaintext, although this is not recommended.
When using Avro-encoded keys, you can provide a separate key schema, or you can have Deimos auto-magically create one for you from your value schema, by specifying a field (usually “id”) to extract from it.
Sample producer:
class MyProducer < Deimos::Producer
namespace 'com.deimos.my-app-special'
topic 'MyApp.MyTopic'
schema 'MySchema' # will search in the configured path
key_config field: 'key_field'
def send_some_message(an_object)
payload = {
'some-key' => an_object.foo,
'some-key2' => an_object.bar,
'key_field' => an_object.my_field
}
self.publish(payload)
end
end
end
Sample consumer:
class MyConsumer < Deimos::Consumer
schema 'MySchema'
namespace 'com.my-namespace'
key_config field: :my_id
def consume(payload, metadata)
MyHandlerClass.do_something_with(payload[:my_field], metadata[:key])
end
end
All schemas are automatically registered in the schema registry using the topic name (so if you produce to MyTopic, your schemas are automatically registered to MyTopic-keyand MyTopic-value.
Producing to Kafka
Deimos producers always produce to a given topic. Producers have a number of convenience features:
You will get an automatically generated timestamp (current time) and message_id (uuid) in your payload, if they are present in the schema. This helps immensely when debugging messages across systems.
Types will be coerced before saving, so if your schema expects a string but you give it an int, you don’t need to worry about crashes.
Metrics are sent allowing you to track the messages you send per topic and partition, as well as tracing enabled to determine if encoding your message is slow.
Consuming from Kafka
As with Phobos, Deimos creates a separate thread per consumer, where each consumer listens to a single topic. Metrics and tracing are used to track the number of messages consumed as well as how long they take to process, and there is a special “lag reporter” that sends metrics on the lag for each topic/partition being listened to.
Consumers will “swallow” errors by default (relying on tracing to handle alerting in this case) and move on, to prevent individual bad messages from blocking your consumer forever. You can change this behavior or define specific errors or situations where you would not want the consumer to continue.
Working with Databases
Although just including Avro will unlock a lot of potential, Deimos really starts to shine once you hook it up to a database via ActiveRecord. As with Avro, there are ActiveRecord versions of both producers and consumers:
ActiveRecordConsumers essentially act as a “sink” — dumping a topic into a database table based on some business logic.
ActiveRecordProducers act as a “source” — taking in an ActiveRecord object and automatically turning it into a payload to be encoded and sent to Kafka.
Sample ActiveRecordCosumer:
class MyConsumer < Deimos::ActiveRecordConsumer
schema 'MySchema'
key_config field: 'my_field'
record_class Widget
# Optional override to change the attributes of the record before
# they are saved.
def record_attributes(payload)
super.merge(:some_field => 'some_value')
end
end
Sample ActiveRecordProducer:
class MyProducer < Deimos::ActiveRecordProducer
topic 'MyApp.MyTopic'
schema 'MySchema'
key_config field: 'my_field'
record_class Widget
# Optional override to change the default payload calculated from the record.
def generate_payload(attributes, record)
super.merge(:assoc_key => record.some_association.assoc_key)
end
end
MyProducer.send_events([Widget.new(foo: 1), Widget.new(foo: 2)])
In addition to the producer, Deimos also provides a mixin you can add to your ActiveRecord objects which will *automatically *send messages whenever your records are created, modified or destroyed (as long as you don’t use mass operations like update_all or import).
class Widget < ActiveRecord::Base
include Deimos::KafkaSource
def self.kafka_producers
[MyActiveRecordProducer]
end
end
Database Backend
So we’ve got Deimos sending your database records out to Kafka and receiving messages and dumping them in. So far so good. But let’s take this one step further. In the previous article we discussed some of the issues with having both databases and Kafka in the same application — the problem of transactions.
You can read that article for more, but in a nutshell, we want a guarantee that every record written to the database has a corresponding Kafka message, and every Kafka message is written to the database.
Deimos achieves this with the database backend.
This feature transparently replaces the inline sending of Kafka messages to instead save them to a table in the database. There is then a separate process that reads the messages off this table and sends them off in batches.
Enabling this feature is incredibly simple:
Generate the migration to add the table.
Set the publish_backend configuration setting to :db .
Run the DB Producer in some way (forked process, thread, rake task) via Deimos.run_db_backend!.
This is also known as the transactional outbox pattern and achieves full transactional integrity. All messages (whether they represent changes to objects or events) are part of the same transaction and will only be saved if the transaction commits. If it rolls back, all messages will also roll back and not be sent.
Calling code doesn’t even need to know that the DB backend is turned on. You call the publish method on your producer the same way — instead of sending the message to Kafka directly, it will encode it and save it as a message in the database.
This feature is currently powering our largest producer of data and our production metrics show being able to send 4,000–6,000 messages per second with a single thread.
Test Helpers
Deimos comes with a robust suite of RSpec test helpers to allow you to test your business logic without having to actually set up Kafka and the schema registry for local testing.
Just include the TestHelpers module in your specs and automatically stub out all loaded producers and consumers:
RSpec.configure do |config|
config.include Deimos::TestHelpers
config.before(:each) { stub_producers_and_consumers! }
end
Consumers can be tested by passing a hash and the consumer class into a test method:
expect(Widget.count).to eq(0)
test_consume_message(WidgetConsumer, {id: 5, name: "Darth"}, call_original: true)
expect(Widget.count).to eq(1)
Producers can be tested by just sending messages normally and using the have_sent matcher:
expect('widget-topic').not_to have_sent(anything)
Widget.update(:my_attr, 1)
expect('widget-topic').to have_sent(hash_including(:my_attr => 1))
What’s Next
We’re just getting started with Deimos. We’d love to hear your use cases and feature requests, and we’ve got a bunch of our own to start with and plenty of improving to do. We welcome your contributions and we’re excited to see how far this will go!
Posted on November 22, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.