Databricks Delta is a next-generation engine built on top of Apache Spark. Databricks Delta provides ACID transactions, optimized layouts and indexes, and execution engine improvements for building data pipelines to support big data use cases: batch and streaming ingests, fast interactive queries, and machine learning. Specifically, Databricks Delta offers:
- ACID transactions: Serializable isolation levels ensure that readers never see inconsistent data.
- Efficient upserts: Fine-grained updates easily handle late coming data and changing records.
- High throughput streaming ingestion: Ingest high volume data directly into query tables.
- Optimized data layout: Choose a data layout that suits your query patterns; Databricks Delta automatically manages the layout to reduce query latency.
- Schema enforcement and evolution: Automatically handles schema variations to clean bad records during ingestion.
- Data versioning and time travel: Automatically versions your data for easy rollback and lets you time travel to query earlier versions.
- Execution engine optimizations: Optimizes operations with nested data types, higher order functions, range and data skew joins.
Databricks Delta requires Databricks Runtime 4.1 or above. If you created a Databricks Delta table using a Databricks Runtime lower than 4.1, you must upgrade the table version. For details, see Table Versioning.
This quickstart provides an overview of the basics of working with Databricks Delta. It shows how to build a pipeline that reads JSON data into a Databricks Delta table, modify the table, read the table and display table history, and optimize the table.
To try out Databricks Delta, see Try Databricks.
For runnable notebooks that demonstrate these features, see Introductory Notebooks.
In this topic:
To create a Databricks Delta table, you can use existing Spark SQL code and change the format from
json, and so on, to
For all file types, you read the files into a DataFrame and write out in
events = spark.read.json("/databricks-datasets/structured-streaming/events/") events.write.format("delta").save("/delta/events") spark.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")
CREATE TABLE events USING delta AS SELECT * FROM json.`/data/events/`
These operations create a new managed table using the schema that was inferred from the JSON data. For the full set of options available when you create a new Databricks Delta table, see Create a table and Write to a table.
If your source files are in Parquet format, you can use the SQL
Convert to |Delta| statement to convert files in place to create an unmanaged table:
CONVERT TO DELTA parquet.`/delta/events`
To speed up queries that have predicates involving the partition columns, you can partition data.
To partition data when you create a Databricks Delta table, specify
PARTITION BY columns.
CREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING delta PARTITIONED BY (date)
Databricks Delta supports a rich set of operations to modify tables.
You can write data into a Databricks Delta table using Structured Streaming. The Databricks Delta transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table.
from pyspark.sql.types import * inputPath = "/databricks-datasets/structured-streaming/events/" jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ]) eventsDF = ( spark .readStream .schema(jsonSchema) # Set the schema of the JSON data .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time .json(inputPath) ) eventsDF.writeStream .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .table("events")
For more information about Databricks Delta integration with Structured Streaming, see Table Streaming Reads and Writes.
To merge a set of updates and insertions into an existing table, you use the
MERGE INTO statement. For example, the following statement takes a stream of updates and merges it into the
events table. When there is already an event present with the same
eventId, Databricks Delta updates the data column using the given expression. When there is no matching event, Databricks Delta adds a new row.
MERGE INTO events USING updates ON events.eventId = updates.eventId WHEN MATCHED THEN UPDATE SET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
You must specify a value for every column in your table when you perform an
INSERT (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.
You access data in Databricks Delta tables either by specifying the path on DBFS (
"/delta/events") or the table name (
val events = spark.read.format("delta").load("/delta/events")
val events = spark.table("events")
SELECT * FROM delta.`/delta/events`
SELECT * FROM events
To view the history of a table, use the
DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.
To query an older version of a table, specify a version or timestamp in a
SELECT statement. For example, to query version 0 from the history above, use:
SELECT * FROM events VERSION AS OF 0
SELECT * FROM events TIMESTAMP AS OF '2019-01-29 00:37:58'
Because version 1 is at timestamp
'2019-01-29 00:38:10', to query version 0 you can use any timestamp in the range
'2019-01-29 00:37:58' to
'2019-01-29 00:38:09' inclusive.
For details, see Query an older snapshot of a table (time travel).
Once you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use
OPTIMIZE to collapse small files into larger ones:
To improve read performance further, you can co-locate related information in the same set of files by Z-Ordering. This co-locality is automatically used by Databricks Delta data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the
ZORDER BY clause. For example, to co-locate by
OPTIMIZE events ZORDER BY (eventType)
For the full set of options available when running
OPTIMIZE, see Optimizing Performance with File Management.
Databricks Delta provides snapshot isolation for reads, which means that it is safe to run
OPTIMIZE even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the
You control the age of the latest retained snapshot by using the
RETAIN <N> HOURS option:
VACUUM events RETAIN 24 HOURS
For details on using
VACUUM effectively, see Garbage collection.
- For answers to frequently asked questions, see Frequently Asked Questions (FAQ).
- For reference information on Databricks Delta SQL commands, see SQL Guide.
- For detailed information on Databricks Delta features and applications, see these blog posts:
- New Databricks |Delta| Features Simplify Data Pipelines
- Introducing |Delta| Time Travel for Large Scale Data Lakes
- Simplifying Change Data Capture with Databricks |Delta|
- Building a Real-Time Attribution Pipeline with Databricks |Delta|
- Processing Petabytes of Data in Seconds with Databricks |Delta|
- Simplifying Streaming Stock Data Analysis Using Databricks |Delta|
- Build a Mobile Gaming Events Data Pipeline with Databricks |Delta|