Getting Eventful With Azure Event Hubs: Part One

danondso

Dublin Anondson

Posted on September 25, 2019

Getting Eventful With Azure Event Hubs: Part One

Event hub is a big data ingestion offering from Microsoft, it leverages AMQP, HTTPS, and Apache Kafka under the hood. Event hub offers benefits like partitioning and check-pointing on the data stream, plus all the scalability your wallet can handle.

In this article I'll be going over how to setup a connection and send data to event hub using a spring boot web service. You can view the completed example here.

Setting up an Azure Account

If you haven't yet, you'll need to signup for an azure dev account. They hook you up with enough credit for what we need to do in this guide.

Creating the Event Hub Namespace

Log in to the azure portal and search for 'event hub' in the search box at the top. You'll want to select the 'Event Hubs' option.

Event Hub Search Result

From there click on 'Event Hubs', and click on the Add (+) icon.

Adding Event Hub

Fill out the create form, for this example I'm using the barest possible settings. You'll probably need to create a resource group for this event hub as well.

Event Hub Namespace Create Form

Boom! We've got an event hub namespace setup. The namespace acts as an organizational directory for your created event hubs.

Creating the Event Hub

Navigate to your event hub namespace and click on the Event Hub Add (+) icon.

Add Event Hub

Give your event hub a snazzy name, scale the partition count as you need but for this guide I'm keeping mine at 2. Click create at the bottom of the form and we're good to go.

Event Hub Create Form

You should see a message saying that your event hub is getting created, once it's done, navigate to it from your namespace menu. You'll come upon a dashboard that shows all kind of sweet metrics, like throughput and message counts.

Event Hub Stats

In the side menu, navigate over to 'Shared access policies' and click Add (+). Give it a name and give it manage access (which gives us read and write).

Shared Access Policy Create

Creating a Shared Access Policy gives us the keys to the event hub castle. Click on the policy you just created and make a note of the connection string.

Sending Data to Event Hub

Adding in the event hub dependencies.

First things first, get those dependencies added to your pom file. As of this writing the latest versions of the dependencies are 3.0.0.

      <dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-eventhubs</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-eventhubs-eph</artifactId>
            <version>3.0.0</version>
        </dependency>
Enter fullscreen mode Exit fullscreen mode

Next, let's create a config class and create an EventHubClient.

Creating a client and hooking it up.

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.concurrent.Executors;

@Configuration
public class EventHubConfig {

    @Value("${eventHub.connectionString}")
    private String connectionString;

    @Bean
    public EventHubClient setupEventHubConnection() throws IOException, EventHubException {
        return EventHubClient.createFromConnectionStringSync(connectionString,
                              Executors.newSingleThreadScheduledExecutor());
    }
}
Enter fullscreen mode Exit fullscreen mode

In this class I'm pulling in the connectionString from a configuration file (in my case the application's yml file), injecting into the config class, then passing it as-is to the createFromConnectionStringSync method, which takes in a connection string and a ScheduledExecutorService object. Since we're not doing anything fancy here I'm using a single threaded executor.

Here's how I've defined my yml file.

eventHub:
  connectionString: 'connections string here'
Enter fullscreen mode Exit fullscreen mode

Now that we have an Event Hub Client bean, let's go ahead and create a service component that uses it.

import com.dublin.eventhub.demo.controller.Controller;
import com.dublin.eventhub.demo.model.EventPayload;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.SerializationUtils;

import java.util.Objects;

@Service
public class EventHubService {
    private final EventHubClient eventHubClient;
    private Logger log = LoggerFactory.getLogger(Controller.class);

    @Autowired
    public EventHubService(EventHubClient eventHubClient) {
        this.eventHubClient = eventHubClient;
    }

    public void sendEvent(EventPayload test) {

        byte[] bytes = SerializationUtils.serialize(test);

        log.info("Sending message to the event hub {}", eventHubClient.getEventHubName());
        eventHubClient.send(EventData.create(Objects.requireNonNull(bytes)), test.toString());
    }
}

Enter fullscreen mode Exit fullscreen mode

We'll use constructor injection to inject the client into our service, from there I define the sendEvent method which will use the client to send the data.

To send data to event hub, we need to serialize the message into a byte array, wrap it in an EventData object, then pass it to the client's send method.

Creating a data class

For this example I've defined a simple data class.

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class EventPayload implements Serializable {
    private String firstName;
    private String lastName;
    private String favoriteFood;
}
Enter fullscreen mode Exit fullscreen mode

It's important to note that the data you're working with for event hub needs to be Serializable, so make sure to implement Serializable on the data class you're working with, otherwise you'll get a java.lang.IllegalArgumentException: Failed to deserialize object message.

Next, we'll define an endpoint to post our data to.

Building an Endpoint

import com.dublin.eventhub.demo.model.EventPayload;
import com.dublin.eventhub.demo.service.EventHubService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
public class Controller {

    private final EventHubService eventHubService;
    private Logger log = LoggerFactory.getLogger(Controller.class);

    @Autowired
    public Controller(EventHubService eventHubService) {
        this.eventHubService = eventHubService;
    }

    @PostMapping(path = "/eventhub/send")
    public ResponseEntity sendEvent(@RequestBody EventPayload payload) {
        try {
            log.info("Eventhub send endpoint called, sending {} to event hub..", payload.toString());
            eventHubService.sendEvent(payload);
        } catch (Exception e) {
            log.error("An error arose sending a message to event hub: " + e);
            return new ResponseEntity<Exception>(HttpStatus.INTERNAL_SERVER_ERROR);
        }
        return new ResponseEntity(HttpStatus.OK);
    }
}

Enter fullscreen mode Exit fullscreen mode

I've created the POST method "/eventhub/send" and defined the request body to be our data class. From there it just sends to the service.

Once we have all of that hooked up we should be able to run our application and see it successfully connect to event hub.

2019-09-24 13:42:21.670  INFO 16113 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.24]
2019-09-24 13:42:21.734  INFO 16113 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2019-09-24 13:42:21.734  INFO 16113 --- [           main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1063 ms
2019-09-24 13:42:21.983  INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.MessagingFactory    : messagingFactory[MF_41d4fd_1569350541964], hostName[dublin-rest-demo.servicebus.windows.net], info[starting reactor instance.]
2019-09-24 13:42:21.999  INFO 16113 --- [pool-1-thread-1] c.m.azure.eventhubs.impl.ReactorHandler  : name[MF_41d4fd_1569350541964] reactor.onReactorInit
2019-09-24 13:42:22.002  INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionInit hostname[dublin-rest-demo.servicebus.windows.net], connectionId[MF_41d4fd_1569350541964]
2019-09-24 13:42:22.003  INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionLocalOpen hostname[dublin-rest-demo.servicebus.windows.net:5671], connectionId[MF_41d4fd_1569350541964], errorCondition[null], errorDescription[null]
2019-09-24 13:42:22.101  INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionBound hostname[dublin-rest-demo.servicebus.windows.net], connectionId[MF_41d4fd_1569350541964]
2019-09-24 13:42:23.157  INFO 16113 --- [pool-1-thread-4] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionRemoteOpen hostname[dublin-rest-demo.servicebus.windows.net:5671], connectionId[MF_41d4fd_1569350541964], remoteContainer[100db877ccad41b1a689c5a458bf1fbc_G6]
2019-09-24 13:42:23.268  INFO 16113 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2019-09-24 13:42:23.449  INFO 16113 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-09-24 13:42:23.491  INFO 16113 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2019-09-24 13:42:23.495  INFO 16113 --- [           main] com.dublin.eventhub.demo.Application     : Started Application in 3.126 seconds (JVM running for 3.493)
Enter fullscreen mode Exit fullscreen mode

Now that we're all setup, let's send some data to it. Pull up your favorite HTTP client and post some data to it. For this example I'm using Insomnia.

Insomnia Client POST

And in the logs I should see that the message got sent to event hub successfully!

2019-09-24 13:43:25.806  INFO 16113 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller  : Eventhub send endpoint called, sending EventPayload(firstName=Johnny, lastName=Carson, email=null, favoriteFood=Potatoes and Molasses) to event hub..
2019-09-24 13:43:25.808  INFO 16113 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller  : Sending message to the event hub event-hub-test
Enter fullscreen mode Exit fullscreen mode

We did it, we're serializing and sending data to event hub. 😁

Recap

In this guide I walked through creating an event hub in Azure, setting up an event hub client and service, along with an endpoint to post data to.
In part 2 I'll walk through consuming events.

I hope this guide helps, let me know what you think in the comments.

💖 💪 🙅 🚩
danondso
Dublin Anondson

Posted on September 25, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related