Best Practices for Developing Streaming Applications

This topic provides some pointers for developing production quality Spark Streaming applications in Databricks notebooks. It focuses on the issues you typically come across while developing these applications and provides an example that demonstrates best practices.

Serialization Issues

Notebooks allow for a great development environment. You can iterate on your code as quickly as it takes to run a single cell. Once the development cycle is over, the notebook can easily be transferred into a production workload. However, this development process can become tedious, more so for Spark Streaming applications. The following section describes tips on how to overcome one of the most common issues you hit when developing Spark Streaming applications: NotSerializableException.

When developing Spark applications, it is common to hit a stack trace like the following:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2058)
  ...
Caused by: java.io.NotSerializableException

It’s hard to reason about what gets caught in which closure which requires what class to be serializable (or understand the previous sentence!). Here are some guidelines on the best ways to avoid NotSerializableException:

  • Declare functions inside an Object as much as possible
  • If you need to use SparkContext or SQLContext inside closures (for example, inside foreachRDD), then use SparkContext.get() and SQLContext.getActiveOrCreate() instead
  • Redefine variables provided to class constructors inside functions

Refer to the Example Spark Streaming application at the end of this topic for specific implementation examples.

Spark uses SerializationDebugger as default debugger to detect the serialization issues, but sometimes it may run into SerializationDebugger: java.lang.StackOverflowError error. You can also turn off it by turn on sun.io.serialization.extendedDebugInfo flag for the JVM. Set the following properties in the Spark configuration properties while creating the cluster:

spark.driver.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true
spark.executor.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true

Checkpointing Recovery Issues

When developing production quality Spark Streaming applications, there is a requirement that stands out, and that is fault tolerance. As a long running application, it is imperative that if failure occurs the application can pick up from where it left off.

Checkpointing is one of the mechanisms that make Spark Streaming fault tolerant. When checkpointing is enabled, two things happen:

  • The Directed Acyclic Graph (DAG) of all DStream transformations are serialized and stored in reliable storage.
  • If a stateful operation is being run (for example, mapWithState, updateStateByKey), the state is serialized and stored in reliable storage after each batch is processed.

You enable checkpointing in your streaming application by supplying a checkpoint directory:

val checkpointDir = ...

def creatingFunc(): StreamingContext = {
   val newSsc = ...                      // create and setup a new StreamingContext

   newSsc.checkpoint(checkpointDir)      // enable checkpointing

   ...
}

// Recreate context from checkpoints info in checkpointDir, or create a new one by calling the function.
val ssc = StreamingContext.getOrCreate(checkpointDir, creatingFunc _)

Anything that is used within DStream operations (for example, transform, foreachRDD, and so on) needs to be serializable so that Spark Streaming can store it for driver fault tolerance. Once your DAG is serialized and stored, your application can be restarted on a separate cluster and still work, given there were no code changes.

This is great news for jobs scheduled with JARs. You can schedule retries for your Spark Streaming job, and in case of failures, your job will be restarted and you can pick up from where you left off.

There is a caveat though when working with notebooks. If your notebook job is restarted, you may come across a nasty stacktrace that looks like this:

org.apache.spark.SparkException: Failed to read checkpoint from directory ...
  at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:367)
    at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:862)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getActiveOrCreate$1.apply(StreamingContext.scala:838)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getActiveOrCreate$1.apply(StreamingContext.scala:838)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.streaming.StreamingContext$.getActiveOrCreate(StreamingContext.scala:838)
Caused by: java.io.IOException: java.lang.ClassNotFoundException: line8c9ff88e00d34452b053d892b6d2a6d720.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$4

What is that ugly class that Spark couldn’t find?

Each notebook uses a REPL behind the scenes. This makes any classes, and functions defined to be wrapped inside closures causing the ugly, obscure class name. For example, consider the following code.

dstream.map { x => (x, 1) }

The inline function x => (x, 1) is compiled by the REPL into an anonymous class and function with the name $read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$.

These anonymous classes are serialized and saved into the checkpoint files. Once the cluster is restarted, the new REPL will have a different ID, causing a different generated class name. Hence, the classes in the checkpoint files cannot be deserialized for recovering the StreamingContext and this causes the ClassNotFoundException.

Solution

The way to work around this issue is to use a package cell:

package x.y.z

Anything inside a package cell is compiled with the given package namespace instead of the ugly anonymous ones. To fix this, you can move the function to a different “package” cell in the same notebook. It will have the following contents.

package example  // whatever package name you want

object MyFunctions {
  def mapFunc(x: String) = (x, 1)
}

This generates the class with the fully qualified name example.MyFunctions. Then the earlier code can be changed to the following.

dstream.map(example.MyFunctions.mapFunc)

This will allow recovery from checkpoint files. Therefore, in order to correctly recover from driver failures you should:

  • Move all classes used in your application inside package cells
  • Define all functions in objects inside package cells

Example Spark Streaming application

Here is a simple example of adding 1 to your stream of integers in a reliable, fault tolerant manner, and then visualize them. We will use all the tips and tricks in this topic to develop and debug our application.

You can use the following receiver to generate data:

package com.databricks.example

import scala.util.Random

import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._

/** This is a dummy receiver that generates data. */
class DummySource extends Receiver[(Int, Long)](StorageLevel.MEMORY_AND_DISK_2) {

  /** Start the thread that receives data over a connection */
  def onStart() {
    new Thread("Dummy Source") { override def run() { receive() } }.start()
  }

  def onStop() {  }

  /** Periodically generate a random number from 0 to 9, and the timestamp */
  private def receive() {
    while(!isStopped()) {
      store(Iterator((Random.nextInt(10), System.currentTimeMillis)))
      Thread.sleep(1000)
    }
  }
}

Bad Stream example

Here’s how you should not set up your stream.

package com.databricks.example

import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import com.databricks.example._

class AddOneStream(sc: SparkContext, sqlContext: SQLContext, cpDir: String) {

  // This will cause a NotSerializableException while checkpointing,
  // as it will unintentionally bring sqlContext object and associated non-serializable REPL objects in scope
  import sqlContext.implicits._

  def creatingFunc(): StreamingContext = {

    val batchInterval = Seconds(1)
    val ssc = new StreamingContext(sc, batchInterval)
    ssc.checkpoint(cpDir)

    val stream = ssc.receiverStream(new DummySource())

    // This function will cause a NotSerializableException on class AddOneStream while running the job,
    // as they will require `AddOneStream` to be serialized in order to be used inside DStream functions.
    def addOne(value: (Int, Long)): (Int, Long) = {
      (value._1 + 1, value._2)
    }

    stream.map(addOne).window(Minutes(1)).foreachRDD { rdd =>

      // This will cause a NotSerializableException while checkpointing,
      // as it will unintentionally bring sqlContext object and associated non-serializable REPL objects in scope
      sqlContext.createDataFrame(rdd).toDF("value", "time")
        .withColumn("date", from_unixtime($"time" / 1000))
        .registerTempTable("demo_numbers")
    }

    ssc
  }
}
import com.databricks.example._
import org.apache.spark.streaming._
val cpDir = "dbfs:/home/examples/serialization"

val addOneStream = new AddOneStream(sc, sqlContext, cpDir)
val ssc = StreamingContext.getActiveOrCreate(cpDir, addOneStream.creatingFunc _)
ssc.start()

The previous definition won’t work for the following reasons:

  • The addOne function requires AddOneStream to be serialized, but it’s not Serializable.
  • We tried to serialize sqlContext within the foreachRDD.
  • import sqlContext.implicits._ was defined within the class, but it was accessed within the foreachRDD, also requiring AddOneStream to be serialized.

Good stream example

All of these problems can be worked around as follows:

package com.databricks.example2

import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import com.databricks.example._

class AddOneStream(sc: SparkContext, sqlContext: SQLContext, cpDir: String) {

  // import the functions defined in the object.
  import AddOneStream._

  def creatingFunc(): StreamingContext = {

    val batchInterval = Seconds(1)
    val ssc = new StreamingContext(sc, batchInterval)

    // Set the active SQLContext so that we can access it statically within the foreachRDD
    SQLContext.setActive(sqlContext)

    ssc.checkpoint(cpDir)

    val stream = ssc.receiverStream(new DummySource())

    stream.map(addOne).window(Minutes(1)).foreachRDD { rdd =>

      // Access the SQLContext using getOrCreate
      val _sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      _sqlContext.createDataFrame(rdd).toDF("value", "time")
        .withColumn("date", from_unixtime(col("time") / 1000))
        // we could have imported _sqlContext.implicits._ and used $"time"
        .registerTempTable("demo_numbers")
    }

    ssc
  }
}

object AddOneStream {
  def addOne(value: (Int, Long)): (Int, Long) = {
    (value._1 + 1, value._2)
  }
}
import com.databricks.example2._
import org.apache.spark.streaming._

val addOneStream = new AddOneStream(sc, sqlContext, cpDir)
val ssc = StreamingContext.getActiveOrCreate(cpDir, addOneStream.creatingFunc _)
ssc.start()

Run the following cell with the confidence that you have a production ready streaming application!

%sql
select * from demo_numbers