Read and Write Streaming Avro Data with DataFrames

Apache Avro is a commonly used data serialization system in the streaming world, and many users have a requirement to read and write Avro data in Apache Kafka. One commonly used solution is to put data in Kafka with Avro format, metadata in Confluent Schema Registry, and then run queries with a streaming framework that can connect to both Kafka and Schema Registry.

In Databricks, the from_avro and to_avro functions, provided by the Avro data source, can be used in Spark Structured Streaming to build streaming pipelines with Avro data in Kafka and metadata in Schema Registry.

Note

The from_avro and to_avro functions are:

  • In preview and may change in future releases.
  • Available only in Scala and Java.
  • Normal SQL functions and can be used in both batch and streaming queries.

For other aspects of Avro, see Avro Files.

Examples

Similar to from_json and to_json, from_avro and to_avro can also be used with any binary column, but you must specify the Avro schema manually.

import com.databricks.spark.avro._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save them to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

If your cluster has a Schema Registry service, from_avro and to_avro can also work with it so that you don’t need to specify the Avro schema manually.

import com.databricks.spark.avro._

// Read a Kafka topic "t", assuming the key and value are already
// registered in Schema Registry as subjects "t-key" and "t-value" of type
// string and int. The binary key and value columns are turned into string
// and int type with Avro and Schema Registry. The schema of the resulting DataFrame
// is: <key: string, value: int>.
val schemaRegistryAddr = "http://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

  // Given that key and value columns are registered in Schema Registry, convert
  // structured data of key and value columns to Avro binary data by reading the schema
  // info from the Schema Registry. The converted data is saved to Kafka as a Kafka topic "t".
  dataDF
    .select(
      to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
      to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()