Apache Pulsar for Modern Messaging

shades198

shades198

Posted on January 29, 2022

Apache Pulsar for Modern Messaging

Introduction

Apache Pulsar is slowly emerging as a new strong contender in the space of enterprise messaging. Currently open-source application messaging is dominated by Kafka. However Pulsar promises to fill the gaps left by Kafka and overcome some of its shortcomings. Which pose severe challenges when you are considering application
performance and stability at large scales.

Apache Pulsar is an open-source, distributed messaging and streaming platform. It was originally conceived at Yahoo. The USP of Pulsar is a segmented architecture where storage is completely taken care of by Apache Bookkeeper. Bookkeeper is a distributed storage service, to which message storage is offloaded. That results in Brokers of Pulsar to be free from all storage related concerns and better handle inbound and outbound traffic. This is what makes Pulsar unique and desirable in many ways which we look into below.

Architecture

Brokers
Main difference between Pulsar and Kafka starts at the foundation. Kafka brokers store all the data in themselves. Whereas Pulsar relies on Apache Bookkeeper to do the same. Bookkeeper is a scalable, fault-tolerant, and low-latency storage service. Pulsar brokers are free to do other jobs like stream computations, topic and consumer management etc. This is why adding a broker to increase the capacity of the Pulsar cluster is much easier. In Kafka one has to not only add a broker (which itself is not easy), but also manually reassign partitions to the newly added broker. Which can take a lot of time.

Underlying Storage Mechanism
Pulsar implements a topic as an infinite log just like Kafka. Pulsar breaks topic partitions into segments and then distributes the segments across the storage nodes in Apache BookKeeper to get better performance, scalability, and availability.

Bookkeeper stores a topic in Ledgers. And Ledgers themselves are made up of segments. These segments are evenly distributed among all Bookie nodes. This is different from Kafka where each partition is replicated, and partitions and their replicas are stored in its entirety on brokers. This not only leads to larger storage requirements for a single Broker but also the problem of re-balancing. When a new Kafka broker is added or one goes down, the topic replica is moved to new broker. This rebalance can take large amount of time and can generate excessive chatter in network.

With the storage model of Bookkeeper this cannot happen since a topic is split into segments which are evenly distributed. New Bookie node added to the cluster can immediately start absorbing the incoming load without any rebalance or delay. This is because the data associated with any given topic is not tied to any specific storage node. Segments of a topic can be moved around to new Bookie node to balance entire cluster. This can happen without impacting incoming load.

Below diagram shows how storage is not tied to brokers unlike Kafka, and how it is segmented.

Apache Kafka and Apache Pulsar storage architecture

Multi-tenancy and Namespaces
Pulsar was created from the ground up as a multi-tenant system. In Pulsar, Tenants can be spread across clusters and can each have their own authentication and authorization scheme applied to them. They are also the administrative unit at which storage quotas, message TTL, and isolation policies can be managed.

What this means is that a single Pulsar cluster can be used by different groups of users each with their distinct needs. One might use Pulsar to send and process low importance, high volume messages where security is not critical. Another group of users can use Pulsar for mission critical and highly sensitive data. These user groups get their own totally separate logical space with no interference from others.

Then there is a Namespaces which is an administrative unit within a tenant. The configuration policies set on a namespace apply to all the topics created in that namespace. A tenant may create multiple namespaces via self-administration using the REST API and the pulsar-admin CLI tool. For instance, a tenant with different applications can create a separate namespace for each application.

Names for topics in the same namespace will look like this

persistent://tenant/app1/topic-1
persistent://tenant/app1/topic-2
persistent://tenant/app1/topic-3

Multi-tenancy in Pulsar

Messaging

Topics
Topic names in Apache Pulsar names are well defined. They have following structure
{persistent|non-persistent}://tenant/namespace/topic

Apart from standard persistent topics, Pulsar has non-persistent topics as well. These are not persisted on disk and are ephemeral, identified by keyword non-persistent. These can have lowest latency which can be helpful especially when you want to transport low priority but extremely voluminous data like debug logs.

Topic Creation
In Pulsar topic can be created with such a command
sh pulsar-admin topics create-topic persistent://public/default/test-topic

Retention and Expiry
By default, Pulsar message brokers:

  • immediately delete all messages that have been acknowledged by a consumer, and
  • persistently store all unacknowledged messages in a message backlog.

However, Pulsar has two features that enable you to override this default behaviour:

  • Message retention enables you to store messages that have been acknowledged by a consumer
  • Message expiry enables you to set a time to live (TTL) for messages that have not yet been acknowledged

Producing Messages

You produce messages to topics in Pulsar using lightweight Producer API. Below are the various features offered by Producer API

Topic access modes
Shared: Multiple producers can publish to a topic, default setting
Exclusive: Only one producer can publish on a topic. Other producer trying to gain access will get an error and stop
Wait-for-exclusive: Other producers wait to get access to a topic

Routing Modes
Needs to be selected when publishing messages to partitioned topics. Supported modes:

  • Round Robin Partition
  • Single Partition: keyed messages are sent to a single partition in order
  • Custom Partition: Create a custom routing mode by using the Java client and implementing the MessageRouter interface.

Ordering guarantee
Ordering is applied on:

  • Per-key-partition basis: All the messages with the same key will be in order and be placed in same partition.
  • Per-producer basis: All messages sent from a producer will be in order

Sync/Async dispatch
Producer either waits for acknowledgement from Broker of receiving message or it returns immediately after sending out a message
Batching
Improve delivery throughput by accumulating and sending a batch of messages in a single request

Compression
Messages can be compressed by using blow supported compression algorithms:

  • Lz4
  • Snappy
  • Zstd
  • Zlib

Message delivery schedule
Deliver messages at specific times using deliverAt or deliverAfter properties

Chunked messages
Large messages can be split into chunks and delivered to consumers in correct order.

Message Consumption

Pulsar also supports the same publish-subscribe messaging model as does Kafka. Only in case of Pulsar, brokers discard messages as soon as consumers send acknowledgements for having consumed them. Though this behaviour can be changed and we can still retain messages on the basis of time.

Subscription Types

There are various subscription modes for consumers. They are as follows:

  • Exclusive: Only one consumer per topic
  • Failover: Assign a different consumer if the primary one fails
  • Shared: Topic is shared across multiple consumers
  • Key Shared

There are also two Receive Modes, namely Sync and Async. In Sync mode consumer blocks till a message is available from brokers. In Async mode the receive call returns immediately with a Future object.

Receive Modes
Consumers in Pulsar have two modes for receiving messages namely sync and async. Sync is standard where a call to broker blocks until a message is received. In async mode a call to broker is released immediately with a CompletetableFuture object that is returned. That future object will receive a message once it's available in Broker.

Negative Acknowledgments
Pulsar consumers can send acknowledgements for received messages to brokers just how it happens in Kafka. Additionally in Pulsar we can choose to store or retain messages which are acknowledged with retention policy for topics. By default ack’ed messages are removed from brokers.

There is also facility to provide negative acks. If a consumer fails to receive a message then it can send negative ack. This makes the broker redeliver the message which failed to be delivered. This feature is not there in Kafka and can be very helpful in simplifying message re-delivery mechanism.

Please note that choosing this feature can lead to re-delivery of negatively ack’ed messages to go out of order from other messages.

Replaying Messages
One of the strong points of Kafka is the ability to replay or re-consume messages in a topic as many times as you like. Pulsar gives you the same ability albeit with a slightly different approach.

In Kafka, a consumer can perform a dual role. It can read a topic from either the latest or earliest point in time, but it can also rewind to a specific message id or offset.
In Pulsar standard Consumer API will let you consume only the latest unacknowledged messages
Pulsar also offers Reader API, which lets you rewind a topic from topic’s beginning or from latest point. Or you can also rewind to a specific message-id.

A code snippet below shows how to use Reader API to rewind a topic

Reader reader = pulsarClient.newReader()
                    .topic("test-topic")
                    .subscriptionName("my-subscription")
                    .startMessageId(MessageId.earliest)
                    .readerListener(new MessageListener())
                    .create();
Enter fullscreen mode Exit fullscreen mode

DLQs and Retry Queues
One very attractive feature of Pulsar over Kafka would be built-in support for Dead Letter Topics (DLQ) and Retry queues or Retry letter topics.

When some messages cannot be consumed successfully by a consumer, those messages can be sent to a separate Dead letter topic. We can then decide how to handle messages in that DLQ. A default dead letter topic uses this format: <topicname>-<subscriptionname>-DLQ

There are some use cases where a message processing can fail but it can be retried after some time by same processing logic. Here we can use Retry letter topics to automatically re-consume messages after a certain interval. Whereas a DLQ will require a separate consumer instance to consume messages in it, a Retry letter topic is consumed by the same consumer.

No Consumer Rebalance
One of the biggest advantages with Pulsar is re-balance-free consumption. This is especially a problem in Kafka, wherein once you add or remove a consumer from a consumer-group consumption stops till Kafka brokers reassign/redistribute partitions among remaining consumers. Depending upon load on cluster or number of consumers or partitions this re-balance duration can become very large. Leading to patchy or very slow message processing performance.

In Pulsar with Shared subscription to a topic multiple consumers can latch onto a topic which might even have a single partition. And no matter if a consumer leaves or joins, there is no re-balance observed as such. This helps when consumers might come and go very fast.

Advanced Features

Schema Management
Type safety is very important in message bus like Pulsar. Producers and consumers need some kind of mechanism for coordinating types at the topic level to avoid serialization and deserialization failures. For this purpose Pulsar supports a built-in schema registry.

Pulsar schema registry enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic. Pulsar schema is defined by SchemaInfo object which has a field SchemaType. SchemaType can be primitive or complex. Complex types can currently support AvroBaseStructSchema or ProtobufNativeSchema

Pulsar also has a helpful feature of AUTO_PRODUCE and AUTO_CONSUME in place of schema type. We can use these to produce or consume from topic whose schema we don't know in-advance. Pulsar client will attempt sending bytes and check if it meets compatibility with schema of that specific topic.

*Pulsar Functions - Stream Processing *
Pulsar functions is a lightweight stream-processing framework. A user supplied processing logic can be applied to messages streaming from one topic and result can streamed out to another topic.

Lambda style functions can be written containing business logic and they can be run via Pulsar Functions. These functions are designed to use Pulsar as a message bus.

If a simple word count function is like this:

public class WordCountFunction implements Function<String, Void> {
    // This function is invoked every time a message is published to the input topic
    @Override
    public Void process(String input, Context context) throws Exception {
        Arrays.asList(input.split(" ")).forEach(word -> {
            String counterKey = word.toLowerCase();
            context.incrCounter(counterKey, 1);
        });
        return null;
    }
}
Enter fullscreen mode Exit fullscreen mode

We can build this as a jar and run it via Pulsar Functions like so

$ bin/pulsar-admin functions create \
  --jar target/my-jar-with-dependencies.jar \
  --classname org.example.functions.WordCountFunction \
  --tenant public \
  --namespace default \
  --name word-count \
  --inputs persistent://public/default/sentences \
  --output persistent://public/default/count
Enter fullscreen mode Exit fullscreen mode

Pulsar Functions provide below 3 levels of Processing Guarantees

ATMOST_ONCE: Each message may be processed maximum one time or not at all
ATLEASET_ONCE: Each message may be processed minimum once
EFFECTIVELY_ONCE:

These guarantees can be applied to Pulsar Functions by using this option:
--processing-guarantees EFFECTIVELY_ONCE

Pulsar IO
Pulsar IO bridges a source datasource to a sink datasource. It uses various sink and source connectors to achieve the same.

Pulsar IO

Pulsar comes built-in with few connectors for data sources like Apache Cassandra, Postgre SQL, Aerospike DB etc. These can be listed by using Pulsar Functions REST interface like this:
curl -s http://localhost:8080/admin/v2/functions/connectors

Which can show you available connectors like this

[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]

You can use Connector Admin CLI to setup and start sink and source connectors. Each connector requires a config file which can be in json or yml format. Like this below Cassandra sink connector.

configs:
    roots: "localhost:9042"
    keyspace: "pulsar_test_keyspace"
    columnFamily: "pulsar_test_table"
    keyname: "key"
    columnName: "col"
Enter fullscreen mode Exit fullscreen mode

And then you start sink job which reads message from Pulsar topic and sends data to Cassandra table like this:

bin/pulsar-admin sinks create \
    --tenant public \
    --namespace default \
    --name cassandra-test-sink \
    --sink-type cassandra \
    --sink-config-file examples/cassandra-sink.yml \
    --inputs test_cassandra
Enter fullscreen mode Exit fullscreen mode

Transaction Support
Exactly-once streaming message delivery is guaranteed by Pulsar’s transaction support. A transaction in Pulsar guarantees atomic execution of message dispatch, processing and consumption. With transaction support pulsar client does loops of consume-process-produce and it does operation B=f(A). Wherein B = message produced and A = message consumed. Then B is considered safely produced only if A was acknowledged and vice versa.

A transaction coordinator module exists on Pulsar broker which maintains status of each transaction. Transaction Coordinator writes transactions states to a Transaction Log which is a persisted Pulsar topic itself. Transaction Coordination can recover itself from Transaction Log if it crashes.

How transactions work in Pulsar
In brief a flow of operations in transaction happen as below:

  • Pulsar client first sends a request to begin transaction to Transaction Coordinator (TC).
  • TC registers a new transaction with OPEN state and generates a transaction Id. This transaction Id is returned to pulsar client and it will use this id in successive steps.
  • Client opens one or more subscriptions on topics from which it wants to consume messages. These subscriptions are registered with TC so that it knows all such subscriptions which will play part in commit phase of transaction
  • Client starts consuming messages and sends ACKs. These ACK requests go out with transaction Id.
  • TC checks whether incoming ACK request belongs to subscription under transaction or not. If it does then it registers those ACKs under PENDING_ACK status in transaction log.
  • After each consumed message is processed client may want to publish result on other topic. Then client uses transaction Id and publishes response messages. Here it requests TC to register partitions which will receive messages under a transaction. Then messages are written to partition with transaction Ids
  • After transaction is completed client sends commit request to TC which: Marks all messages with PENDING_ACK to ACK for subscriptions under a transaction. On rollback messages are updated with UNACK status, and those are redelivered to other consumers. Instructs broker to write a write-marker to each message produced on partition under transaction. Then those messages are considered as finally written. On rollback these markers are not added and produced messages are discarded

How to use Pulsar transactions
To use transactions facility in Pulsar we need to do following changes on broker level
Change the configuration in the broker.conf file and set: transactionCoordinatorEnabled=true
Initialize transaction coordinator metadata.
bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone

Now we can write code as follows
Initiate a transaction:

Transaction txn = pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
Enter fullscreen mode Exit fullscreen mode

Create consumers/producers:

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                    .topic("test")
                    .subscriptionName("txn-consumer")
                    .subscribe();
      Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                    .topic("test-response")
                    .create();
Enter fullscreen mode Exit fullscreen mode

Consume, process, send and acknowledge

Message<String> msg = consumer.receive();
producer.newMessage(txn).value(“some response”)).send();
consumer.acknowledgeAsync(msg.getMessageId(), txn);
Commit Transaction
txn.commit();


Enter fullscreen mode Exit fullscreen mode

Geo Replication

Geo Replication in Pulsar enables dissemination of messages across multiple Pulsar clusters. In other words messages generated/produced at one Pulsar cluster can be replicated to multiple other Pulsar clusters separated over geographical distances. Geo-replication needs to be set on Tenant level first. This isolates replication access to specific tenants.

Diagram below shows how Producer-1 can send messages to Topic-1 inside Cluster-1, and those messages are synced to other clusters and consumers in those clusters can read that same message

Pulsar Geo Replication

Of course the prerequisite for this feature to work is to have more than one Pulsar cluster under a Pulsar instance. We can then enable Geo Replication in Pulsar by taking following steps:

Connect clusters that should be under replication
To establish connection between cluster-1 and cluster-2 run command to create cluster-2 on cluster-1
E.g run below command on cluster-1

$ bin/pulsar-admin clusters create \
  --broker-url pulsar://<DNS-OF-CLUSTER-2>:<PORT> \
  --url http://<DNS-OF-CLUSTER-2>:<PORT> \
  cluster-2
Enter fullscreen mode Exit fullscreen mode

Allow replication permissions to tenant
Need to create a tenant which has permission to use clusters that should replicate
E.g

$ bin/pulsar-admin tenants create my-tenant \
   --admin-roles my-admin-role \
   --allowed-clusters cluster-1,cluster-2,cluster-3
Enter fullscreen mode Exit fullscreen mode

Create geo-replicated namespaces/topic
To bring entire namespace under replication, create a namespace then assign replicated clusters to that namespace
E.g

$ bin/pulsar-admin namespaces create my-tenant/my-namespace
$ bin/pulsar-admin namespaces set-clusters my-tenant/my-namespace \
  --clusters cluster-1,cluster-2,cluster-3

Enter fullscreen mode Exit fullscreen mode

To enable replication on selective topics we can provide which clusters to replicate the messages when creating topic
E.g

$ bin/pulsar-admin topics set-replication-clusters --clusters cluster-1,cluster-2,cluster-3 my-tenant/my-namespace/my-topic
Enter fullscreen mode Exit fullscreen mode

Any topic we create under a geo-replication enabled namespace will get created on all clusters which are connected for replication. Once the message production on such topic/s start all messages are replicated asynchronously to other clusters.

For consumers even subscriptions can be replicated. This allows a consumer to start consumption of messages from some other cluster if the one it was connected to is unreachable or has failed.
This can be done when creating a consumer client like so.

Consumer<String> consumer = client.newConsumer(Schema.STRING)
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .replicateSubscriptionState(true)
            .subscribe();
Enter fullscreen mode Exit fullscreen mode

Overall the feature of geo-replication is well thought and well designed, as once enabled it is seamless and transparent. Unlike in Kafka where we need to use mirror-maker tool which is an application containing just a pair of consumer-producer which pulls data from one cluster and sends to another. It is something that requires active monitoring and maintenance as it will live as a separate process. Also since internally it has a consumer there will be rebalancing issues as we add topics or partitions.

Conclusion

In this article we saw how Apache Pulsar promises to fill in the gaps left behind by Apache Kafka. To summarise below are the strong points where Pulsar really shines:

Segmented storage architecture which decouples message storage from brokers. Lending the ability to scale cluster by quickly adding brokers
Re-balance free broker additions, removal and message consumption
Multi-tenant setup and namespaces
Seamless and well built geo-replication support
Highly versatile set of features available to both producer and consumer clients which makes Pulsar more than just a pub-sub messaging.

Many of the above mentioned features are completely absent in Kafka. However those are essential in the modern day landscape of cloud-native microservice styled applications. In terms of support Pulsar lags a lot behind Kafka since the latter is ubiquitous. But that is changing rapidly. And although Kafka still rules the roost Pulsar can be a compelling choice where Kafka falls short with respect to versatility, scalability and performance.

💖 💪 🙅 🚩
shades198
shades198

Posted on January 29, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related