Structured Streaming in Production

It is convenient to attach a notebook to a cluster and run your streaming queries interactively. However, when you run them in production, you are likely to want more robustness and uptime guarantees. This topic discusses how to make your streaming application more fault tolerant using Databricks jobs.

Recover from query failures using jobs

A production-grade streaming application must have robust failure handling. In Structured Streaming, if you enable checkpointing for a streaming query, then you can restart the query after a failure and the restarted query will continue where the failed one left off, while ensuring fault tolerance and data consistency guarantees. Hence, to make your queries fault tolerant, you must enable query checkpointing and configure Databricks jobs to restart your queries automatically after a failure.

Enable checkpointing

To enable checkpointing, set the option checkpointLocation to a DBFS or cloud storage path before you start the query. For example:

streamingDataFrame.writeStream
    .format("parquet")
    .option("path", "dbfs://outputPath/")
    .option("checkpointLocation", "dbfs://checkpointPath")
    .start()

This checkpoint location preserves all of the essential information that uniquely identifies a query. Hence, each query must have a different checkpoint location, and multiple queries should never have the same location. See the Structured Streaming Programming Guide for more details.

Configure jobs to restart streaming queries on failure

You can create a Databricks job with the notebook or JAR that has your streaming queries and configure it to:

  • Always use a new cluster.
  • Always retry on failure.

Jobs have tight integration with Structured Streaming APIs and can monitor all streaming queries active in a run. This configuration ensures that if any part of the query fails, jobs automatically terminate the run (along all the other queries) and start a new run in a new cluster. The new run re-executes the notebook or JAR code and restarts all of the queries again. This is the safest way to ensure that you get back into a good state.

Warning

Notebook workflows are currently not supported with long-running jobs. Therefore we don’t recommend using notebook workflows in your streaming jobs.

Note

  • Failure in any of the active streaming queries causes the active run to fail and terminate all the other streaming queries.
  • You do not need to use streamingQuery.awaitTermination() or spark.streams.awaitAnyTermination() at the end of your notebook. Jobs automatically prevent a run from completing when a streaming query is active.

Here are the details of the recommended job configuration.

  • Cluster: Set this always to use a new cluster and use the latest Spark version (or at least version 2.1). Queries started in Spark 2.1 and above are recoverable after query and Spark version upgrades.
  • Alerts: Set this if you want email notification on failures.
  • Schedule: Do not set a schedule.
  • Timeout: Do not set a timeout. Streaming queries run for an indefinitely long time.
  • Maximum concurrent runs: Set to 1. There must be only one instance of each query concurrently active.
  • Retries: Set to Unlimited.

See Jobs to understand these configurations. Here is a screenshot of a good job configuration.

../../../_images/job-conf.png

Configure Spark scheduler pools for efficiency

By default, all queries started in a notebook run in the same fair scheduling pool. Therefore, jobs generated by triggers from all of the streaming queries in a notebook run one after another in first in, first out (FIFO) order. This can cause unnecessary delays in the queries, because they are not efficiently sharing the cluster resources.

To enable all streaming queries to execute jobs concurrently and to share the cluster efficiently, you can set the queries to execute in separate scheduler pools. For example:

// Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").format("parquet").start(path1)

// Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").format("orc").start(path2)

See Apache fair scheduler documentation for more details.

Work with a large volume of state in stateful streaming queries

If you have stateful operations in your streaming query (for example, streaming aggregation, streaming dropDuplicates, stream-stream joins, mapGroupsWithState, and flatMapGroupsWithState) and you are trying to optimize performance with a large volume of state (millions of keys, for example), contact your Databricks Solution Architect for more information about our experimental Large Scale State Stores.