Databricks Delta

Note

Databricks Delta is in Private Preview. Contact your account manager to request access to the Private Preview.

What is Databricks Delta?

Databricks Delta is a transactional storage layer designed specifically to harness the power of Apache Spark and Databricks DBFS. The core abstraction of Databricks Delta is a Databricks Delta table which is an optimized Spark table that stores your data as Parquet files in DBFS and maintains a transaction log that efficiently tracks changes to the table.

You can read and write data stored in Databricks Delta using the same familiar Apache Spark SQL batch and streaming APIs that you use to work with Hive tables or DBFS directories. However, through the addition of the transaction log and other enhancements, Databricks Delta provides the following functionality:

  • ACID Transactions - Multiple writers can simultaneously modify a dataset and see consistent views.
  • DELETES/UPDATES/UPSERTS - Writers can modify a dataset without interfering with jobs reading the dataset.
  • Data Validation - Ensures that data meets specified invariants (for example, NOT NULL) by rejecting invalid data.
  • Automatic File Management - Speeds up data access by organizing data into large files that can be read efficiently.
  • Statistics and Data Skipping - Speeds up reads by 10-100x by tracking statistics about the data in each file and avoiding reading irrelevant information.

Supported runtime versions

Databricks Delta requires Databricks Runtime 4.0 and above. Databricks Delta does not work with older Databricks Runtime versions.

Limitations

  • Concurrent writers from multiple clusters

    Warning

    Writes can occur only from one cluster to one Databricks Delta table or path at a given time. If you want to write data from multiple Spark jobs to a single path or table, you should do so only from a single cluster. The result of writing from multiple clusters is undefined.

  • Bucketing is not supported.

  • Specifying a schema when reading from Databricks Delta tables is not supported. A command such as spark.read.format("delta").schema(df.schema).load(path) will fail.

  • The standard SQL statements show partitions and describe formatted do not return expected partition information on partitioned Databricks Delta tables.

Case study: multi-hop pipelines

A common use case is to store both the intermediate and final results of a multi-stage pipeline in Databricks Delta. Typically these pipelines begin with relatively raw, unprocessed records and become more selective and refined as processing occurs.

The phases of processing are grouped into categories by their level of refinement:

  • Bronze - Raw events with very little transformation. Often contains a “firehose” of records from many different parts of the organization.
  • Silver - Events are cleaned and normalized, sometimes joined with dimension information.
  • Gold - Filtered down and sometimes aggregated, usually related to one particular business objective.
  • Platinum - High-level summaries of key business metrics.

Several features of Databricks Delta makes building and running this type of pipeline significantly easier, cheaper, and more effective than using traditional tools:

  • Retain large histories of data - Databricks Delta can efficiently store years worth of data at any stage of the pipeline. Having a long history allows you to fix mistakes by reprocessing old data and also allows you to ask new questions about historical events.
  • Automatically update with streaming - Streaming can efficiently read from one table and write the results to another, reducing the complexity and management overhead for each hop.
  • Query intermediate results - Each step of the process is materialized as an actual table, and can optionally be optimized for fast read performance. If an ad-hoc query results in new insights, it can easily be turned into a materialized table by switching execution to streaming.
  • Share intermediate results - When there are multiple downstream jobs that depend on a given computation, a Databricks Delta table is a natural forking point. Each downstream transformation can run against the efficiently columnar encoded table.
  • Backfill and correct with batch - Databricks Delta supports a wide range of batch operations that you can use to backfill and correct mistakes transactionally without interrupting any new streaming data that is arriving.

Quick start example: importing JSON data

This quick start example demonstrates the basics of working with Databricks Delta. This section shows how to build a pipeline that reads JSON data into a Databricks Delta table and optimizes the table for fast reads.

Create a table

Create the table from a dataset. You can use existing Spark SQL code and simply change the format from parquet, csv, json, etc., to delta.

  • Using DataFrames:

    events = spark.read.json("/data/events").
    events.write.format("delta").save("/delta/events")
    
  • Using SQL:

    CREATE TABLE events
    USING delta
    AS SELECT *
    FROM json.`/data/events/`
    

These operations create a new table using the schema that was inferred from the JSON data. For the full set of options available when creating a new Databricks Delta table, see Create a table and Write.

Read the table

Once data is stored in Databricks Delta you access it either by specifying the path on DBFS (for example dbfs:/data/events) or the table name in the metastore:

  • Using DataFrames:

    events = spark.read.format("delta").load("/data/events")
    // or
    spark.table("events")
    
  • Using SQL:

    SELECT * FROM delta.`/data/events`
    -- or
    SELECT * FROM events
    

For the full set of options available when reading Databricks Delta, see Read.

Append data to the table

As new events arrive, you can atomically append them to the table:

  • Using DataFrames:

    newEvents.write
      .format("delta")
      .mode("append")
      .save("/data/events")// or .saveAsTable("events")
    
  • Using SQL:

    INSERT INTO events VALUES(...)
    -- or
    INSERT INTO events SELECT * FROM newEvents
    

Stream data into the table

You can also use Structured Streaming to stream new data automatically into the table as it arrives:

events = spark.readStream.json("/data/events")
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoint/etl-from-json")
  .start("/delta/events")

For more information about Databricks Delta integration with Structured Streaming, see Table Streaming Reads and Writes.

Optimize the table

Once you have been streaming for awhile, you will likely have a lot of small files in the table. If you want to improve the speed of read queries, you can use OPTIMIZE to perform automatic operations like collapsing small files into larger ones:

OPTIMIZE events or optimize ‘/data/events’

You can also specify interesting columns that are often present in query predicates for your workload, and Databricks Delta uses this information to cluster related records together:

OPTIMIZE events ZORDER BY eventType, city

For the full set of options available when running OPTIMIZE, see Optimizing Performance and Cost.

Clean up snapshots

Finally, it is important to understand that Databricks Delta provides snapshot isolation for reads, which means that it is safe to run OPTIMIZE even while other users or jobs are querying the table. Eventually you should clean up old snapshots. You can do this by running the VACUUM command:

VACUUM events

You can control the age of the latest retained snapshot by using the RETAIN <N> HOURS option:

VACUUM events RETAIN 24 HOURS

For details on how to use VACUUM effectively, see Garbage collection.

Porting existing workloads to Databricks Delta

If you are porting existing pipelines to Databricks Delta, you should be aware of the following simplifications and differences compared with the data sources provided by Apache Spark or Apache Hive.

Databricks Delta handles the following operations automatically, which you should never perform manually:

  • REFRESH TABLE - Databricks Delta tables always return the most up-to-date information, so there is no need to manually call REFRESH TABLE after changes.
  • Add/Remove Partitions - Databricks Delta automatically tracks the set of partitions that are present in a table and updates the list as data is added or removed. As a result, there is no need to manually run ALTER TABLE, or ADD/REMOVE PARTITION.
  • Loading a Single Partition Quickly - As an optimization, users sometimes directly load the partition of data they are interested in (for example, spark.read.parquet("/data/date=2017-01-01")). With Databricks Delta, this is unnecessary, since we can quickly scan the list of files to find the list of relevant ones, using the distributed processing power of Spark. If you are interested in a single partition, specify it using a WHERE clause (for example, spark.read.parquet("/data").where("date = '2017-01-01'")).

When you port an existing application to Databricks Delta, you should avoid the following operations, which bypass the transaction log:

  • Manually Modifying Data - Databricks Delta uses the transaction log to atomically commit changes to the table. Because the log is the source of truth, files that are written out but not added to the transaction log are not read by Spark. Similarly, if you manually delete a file, a pointer to the file is still present in the transaction log. Instead of manually modifying files stored in a Databricks Delta table, always use the DML commands that are described in this user guide.
  • External Readers without VACUUM - The data stored in Databricks Delta is encoded as Parquet files. However, when you read from a Databricks Delta table using an external tool, you must first run VACUUM to remove any stale copies of data. Otherwise, the other query engine might read duplicate values.

Frequently asked questions

How does Databricks Delta compare to Hive SerDe tables?

There are several Hive SerDe configuration parameters that Databricks Delta always manages on your behalf; you should never specify them manually:

  • ROWFORMAT
  • SERDE
  • OUTPUTFORMAT and INPUTFORMAT
  • COMPRESSION
  • STORED AS
Does Databricks Delta support multi-table transactions?
Databricks Delta does not support multi-table transactions and foreign keys. Databricks Delta supports transactions at the table level.
Does Databricks Delta support writes or reads using the Spark Streaming DStream API?
Databricks Delta does not support the DStream API. We recommend Structured Streaming.
Why isn’t OPTIMIZE automatic?

The OPTIMIZE operation starts up many Spark jobs in order to optimize the file sizing via compaction (and optionally perform ZOrdering). Since much of what OPTIMIZE does is compact small files, you must first accumulate many small files before this operation would have an effect. Therefore, the OPTIMIZE operation is not run automatically.

Moreover, running OPTIMIZE, especially with ZORDER, is an expensive operation in time and resources. If Databricks ran OPTIMIZE automatically or waited to write out data in batches, it would remove the ability to run low-latency Databricks Delta streams (where a Databricks Delta table is the source). Many customers have Databricks Delta tables that are never optimized because they only stream data from these tables, obviating the query benefits that OPTIMIZE would provide.

Lastly, Databricks Delta automatically collects statistics about the files that are written to the table (whether through an OPTIMIZE operation or not). This means that reads from Databricks Delta tables leverage this information whether or not the table or a partition has had the OPTIMIZE operation run on it.

How often should I run OPTIMIZE?

There is a trade-off between cost and performance when you run OPTIMIZE. You should run OPTIMIZE more often if you want better end-user query performance (necessarily at a higher cost because of used resources). You should run it less often if you want to optimize more cost.

We recommend you start by running optimize on a daily basis (preferably at night when spot prices are low). Then modify your job from there.