Databricks Delta is in Private Preview. Contact your account manager or go to https://databricks.com/product/databricks-delta to request access.
Databricks Delta is a transactional storage layer designed specifically to harness the power of Apache Spark and Databricks DBFS. The core abstraction of Databricks Delta is a Databricks Delta table, which is an optimized Spark table that
- Stores your data as Parquet files in DBFS
- Maintains a transaction log that efficiently tracks changes to the table
You can read and write data stored in Databricks Delta using the same familiar Apache Spark SQL batch and streaming APIs that you use to work with Hive tables and DBFS directories. However, through the addition of the transaction log and other enhancements, Databricks Delta provides the following functionality:
- ACID transactions - Multiple writers can simultaneously modify a dataset and see consistent views.
- DELETES/UPDATES/UPSERTS - Writers can modify a dataset without interfering with jobs reading the dataset.
- Automatic file management - Speeds up data access by organizing data into large files that can be read efficiently.
- Statistics and data skipping - Speeds up reads by 10-100x by tracking statistics about the data in each file and avoiding reading irrelevant information.
Databricks Delta requires Databricks Runtime 4.1 and above. Tables created using Databricks Runtime versions lower than 4.1 must be upgraded. To upgrade an existing table, first upgrade all jobs that are writing to the table. Then run:
%scala com.databricks.delta.Delta.upgradeTableProtocol("</path/to/table>" or "<tableName>")
A common use case is to store both the intermediate and final results of a multi-stage pipeline in Databricks Delta. Typically these pipelines begin with relatively raw, unprocessed records and become more selective and refined as processing occurs.
The phases of processing are grouped into categories by their level of refinement:
- Bronze - Raw events with very little transformation. Often contains a “firehose” of records from many different parts of the organization.
- Silver - Events are cleaned and normalized, sometimes joined with dimension information.
- Gold - Filtered down and sometimes aggregated, usually related to one particular business objective.
- Platinum - High-level summaries of key business metrics.
Several features of Databricks Delta makes building and running this type of pipeline significantly easier, cheaper, and more effective than using traditional tools:
- Retain large histories of data - Databricks Delta can efficiently store years worth of data at any stage of the pipeline. Having a long history allows you to fix mistakes by reprocessing old data and also allows you to ask new questions about historical events.
- Automatically update with streaming - Streaming can efficiently read from one table and write the results to another, reducing the complexity and management overhead for each hop.
- Query intermediate results - Each step of the process is materialized as an actual table, and can optionally be optimized for fast read performance. If an ad-hoc query results in new insights, it can easily be turned into a materialized table by switching execution to streaming.
- Share intermediate results - When there are multiple downstream jobs that depend on a given computation, a Databricks Delta table is a natural forking point. Each downstream transformation can run against the efficiently columnar encoded table.
- Backfill and correct with batch - Databricks Delta supports a wide range of batch operations that you can use to backfill and correct mistakes transactionally without interrupting any new streaming data that is arriving.
This quick start example demonstrates the basics of working with Databricks Delta. This section shows how to build a pipeline that reads JSON data into a Databricks Delta table and optimizes the table for fast reads.
Create a table from a dataset. You can use existing Spark SQL code and simply change the format from
json, and so on, to
events = spark.read.json("/data/events") events.write.format("delta").save("/data/events")
CREATE TABLE events USING delta AS SELECT * FROM json.`/data/events/`
These operations create a new table using the schema that was inferred from the JSON data. For the full set of options available when creating a new Databricks Delta table, see Create a table and Write to a table.
Once data is stored in Databricks Delta, you access it either by specifying the path on DBFS (for example,
dbfs:/data/events) or the table name:
events = spark.read.format("delta").load("/data/events")
events = spark.table("events")
SELECT * FROM delta.`/data/events`
SELECT * FROM events
As new events arrive, you can atomically append them to the table:
newEvents.write .format("delta") .mode("append") .save("/data/events")
INSERT INTO events VALUES(...)
INSERT INTO events SELECT * FROM newEvents
You can also use Structured Streaming to stream new data automatically into the table as it arrives:
events = spark.readStream.json("/data/events") events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoint/etl-from-json") .start("/delta/events")
For more information about Databricks Delta integration with Structured Streaming, see Table Streaming Reads and Writes.
Once you have been streaming for awhile, you will likely have a lot of small files in the table. If you want to improve the speed of read queries, you can use
OPTIMIZE to perform automatic operations like collapsing small files into larger ones:
You can also specify interesting columns that are often present in query predicates for your workload, and Databricks Delta uses this information to cluster related records together:
OPTIMIZE events ZORDER BY eventType, city
For the full set of options available when running
OPTIMIZE, see Optimizing Performance and Cost.
Databricks Delta provides snapshot isolation for reads, which means that it is safe to run
OPTIMIZE even while other users or jobs are querying the table. Eventually you should clean up old snapshots. You can do this by running the
You control the age of the latest retained snapshot by using the
RETAIN <N> HOURS option:
VACUUM events RETAIN 24 HOURS
For details on how to use
VACUUM effectively, see Garbage collection.
When you port existing workloads to Databricks Delta, you should be aware of the following simplifications and differences compared with the data sources provided by Apache Spark and Apache Hive.
Databricks Delta handles the following operations automatically, which you should never perform manually:
- REFRESH TABLE - Databricks Delta tables always return the most up-to-date information, so there is no need to manually call
REFRESH TABLEafter changes.
- Add or remove partitions - Databricks Delta automatically tracks the set of partitions present in a table and updates the list as data is added or removed. As a result, there is no need to manually run
ALTER TABLE [ADD|DROP] PARTITIONor
- Load a single partition quickly - As an optimization, users sometimes directly load the partition of data they are interested in. For example,
spark.read.parquet("/data/date=2017-01-01"). With Databricks Delta, this is unnecessary, since it can quickly scan the list of files to find the list of relevant ones. If you are interested in a single partition, specify it using a
WHEREclause. For example,
spark.read.parquet("/data").where("date = '2017-01-01'").
When you port an existing application to Databricks Delta, you should avoid the following operations, which bypass the transaction log:
- Manually modify data - Databricks Delta uses the transaction log to atomically commit changes to the table. Because the log is the source of truth, files that are written out but not added to the transaction log are not read by Spark. Similarly, even if you manually delete a file, a pointer to the file is still present in the transaction log. Instead of manually modifying files stored in a Databricks Delta table, always use the DML commands that are described in this guide.
- External readers - The data stored in Databricks Delta is encoded as Parquet files. However, when you read from a Databricks Delta table using an external tool, you must first clean up stale copies of data. Otherwise, the other query engine might read duplicate values.
There are a couple of ways you can migrate existing tables to Databricks Delta tables.
Suppose you have Parquet data stored in the directory
data-pipeline. To convert this data to Databricks Delta, first read the data into a DataFrame and save it to a new directory in
data = spark.read.parquet("/data-pipeline") data.write.format("delta").save("/delta/data-pipeline/")
Then, create a Databricks Delta table that refers to the Databricks Delta directory:
DROP TABLE IF EXISTS pipeline_delta; CREATE TABLE pipeline_delta USING DELTA LOCATION "/delta/data-pipeline/"
Alternatively, you can create a Databricks Delta table from the existing files:
%sql CREATE TABLE pipeline_delta USING delta AS SELECT * FROM parquet.`/data-pipeline`
In this case, the data files and Databricks Delta transaction log are written out to
/user/hive/warehouse/<table-name>; that is,
/user/hive/warehouse/pipeline_delta, which you can see by running
describe detail pipeline_delta.
Writes to a single table must originate from a single cluster. There is experimental support for writes from multiple clusters in the same workspace. Contact Databricks support if you are interested in trying this feature.
INSERT INTO [OVERWRITE]with static partitions.
- Subqueries in the
- Specifying a schema when reading from a table. A command such as
Unsupported DDLs (compared with Parquet tables):
ANALYZE TABLE PARTITION
ALTER TABLE [ADD|DROP] PARTITION
ALTER TABLE SET LOCATION
ALTER TABLE RECOVER PARTITIONS
ALTER TABLE SET SERDEPROPERTIES
CREATE TABLE LIKE
INSERT OVERWRITE DIRECTORY
- How does Databricks Delta compare to Hive SerDe tables?
There are several Hive SerDe configuration parameters that Databricks Delta always manages on your behalf; you should never specify them manually:
- Does Databricks Delta support multi-table transactions?
- Databricks Delta does not support multi-table transactions and foreign keys. Databricks Delta supports transactions at the table level.
- Does Databricks Delta support writes or reads using the Spark Streaming DStream API?
- Databricks Delta does not support the DStream API. We recommend Structured Streaming.
- Why isn’t
OPTIMIZEoperation starts up many Spark jobs in order to optimize the file sizing via compaction (and optionally perform ZOrdering). Since much of what
OPTIMIZEdoes is compact small files, you must first accumulate many small files before this operation would have an effect. Therefore, the
OPTIMIZEoperation is not run automatically.
OPTIMIZE, especially with
ZORDER, is an expensive operation in time and resources. If Databricks ran
OPTIMIZEautomatically or waited to write out data in batches, it would remove the ability to run low-latency Databricks Delta streams (where a Databricks Delta table is the source). Many customers have Databricks Delta tables that are never optimized because they only stream data from these tables, obviating the query benefits that
Lastly, Databricks Delta automatically collects statistics about the files that are written to the table (whether through an
OPTIMIZEoperation or not). This means that reads from Databricks Delta tables leverage this information whether or not the table or a partition has had the
OPTIMIZEoperation run on it.
- How often should I run
There is a trade-off between cost and performance when you run
OPTIMIZE. You should run
OPTIMIZEmore often if you want better end-user query performance (necessarily at a higher cost because of used resources). You should run it less often if you want to optimize more cost.
We recommend you start by running
OPTIMIZEon a daily basis (preferably at night when spot prices are low). Then modify your job from there.