Azure Event Hubs

Azure Event Hubs is a hyper-scale telemetry ingestion service that collects, transforms, and stores millions of events. As a distributed streaming platform, it gives you low latency and configurable time retention, which enables you to ingress massive amounts of telemetry into the cloud and read the data from multiple applications using publish-subscribe semantics.

This topic explains how to use Structured Streaming with Azure Event Hubs and Databricks clusters.

Requirements

The Azure Event Hubs Spark Connector, developed by Microsoft, requires Databricks Runtime 3.5-LTS or above.

For current release support, see “Latest Releases” in the Azure Event Hubs Spark Connector project readme file.

  1. Create a library in your Databricks workspace using the Maven coordinate com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.6.

    Note

    This connector is updated regularly, and a more recent version may be available: we recommend that you pull the latest connector from the Maven repository

  2. Attach the created library to your cluster.

Schema

The schema of the records is:

Column Type
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

The body is always provided as a byte array. Use cast("string") to explicitly deserialize the body column.

Quick Start

Let’s start with a quick example: WordCount. The following notebook is all that it takes to run WordCount using Structured Streaming with Azure Event Hubs.

Azure Event Hubs WordCount with Structured Streaming

Configuration

This section discusses the configuration settings you need to work with Event Hubs.

For detailed guidance on configuring Structured Streaming with Azure Event Hubs, see the Structured Streaming and Azure Event Hubs Integration Guide developed by Microsoft.

For detailed guidance on using Structured Streaming, see Structured Streaming.

In this section:

Connection String

An Event Hubs connection string is required to connect to the Event Hubs service. You can get the connection string for your Event Hubs instance from the Azure Portal or by using the ConnectionStringBuilder in the library.

Azure Portal

When you get the connection string from the Azure portal, it may or may not have the EntityPath key. Consider:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

To connect to your EventHubs, an EntityPath must be present. If your connection string doesn’t have one, don’t worry. This will take care of it:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Alternatively, you can use the ConnectionStringBuilder to make your connection string.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

All configuration relating to Event Hubs happens in your EventHubsConf. To create an EventHubsConf, you must pass a connection string:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

See Connection String for more information about obtaining a valid connection string.

For a complete list of configurations, see EventHubsConf. Here is a subset of configurations to get you started:

Option Value Default Query type Description
consumerGroup String “$Default” Streaming and batch A consumer group is a view of an entire event hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. More information is available in the Microsoft documentation.
startingPosition EventPosition Start of stream Streaming and batch The starting position for your Structured Streaming job. See startingPositions for information about the order in which options are read.
maxEventsPerTrigger long partitionCount * 1000 Streaming query Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume.

For each option, there exists a corresponding setting in EventHubsConf. For example:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf allows users to specify starting (and ending) positions with the EventPosition class. EventPosition defines the position of an event in an Event Hub partition. The position can be an enqueued time, offset, sequence number, the start of the stream, or the end of the stream.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Specifies any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

If you would like to start (or end) at a specific position, simply create the correct EventPosition and set it in your EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Production Structured Streaming with Azure Event Hubs

When you run streaming queries in production, you probably want more robustness and uptime guarantees than you would have when you simply attach a notebook to a cluster and run your streaming queries interactively. Import and run the following notebook for a demonstration of how to configure and run Structured Streaming in production with Azure Event Hubs and Databricks.

For more information, see Structured Streaming in Production.

Notebook: Production Structured Streaming with Azure Event Hubs

End-to-end Event Hubs streaming tutorial

For an end-to-end example of streaming data into a cluster using Event Hubs, see Tutorial: Stream data into Azure Databricks using Event Hubs.