Search

Search IconIcon to open search

Delta Lake

Last updated by Simon Späti

Delta Lake is an  open source project that enables building a  Lakehouse architecture on top of  data lakes. Delta Lake provides  ACID transactions, scalable metadata handling, and unifies  streaming and  batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.

Some references:

# Architecture and Storage

Databricks stores files in distributed object storage such as AWS S3, Google Cloud Storage or Azure Blob Storage in an open-source table format: Apache Parquet. Parquet is the de facto standard for storing column-oriented, compressed data. The encoded db-schema and the availability for multiple languages make it a great choice to store data for big data use cases.

The difference between the Delta Lake table format and a native Parquet file is the addition of the Transaction Log (Delta Lake). Furthermore, Delta Lake unifies different Parquet-Files with the same db-schema into a single folder, essentially the path to a Delta Table. Included in that folder is an automatically generated separate folder for the transaction log represented by _delta_log, which logs the ACID transactions happening on a table and allows neat features such as rolling back the data to a previous point in time.

# File Format Implementation

See Data Lake File Format.

# Understanding the Sync Process

Let’s look at what files Airbyte created behind the scenes on the S3 data storage, originally from Load Data into Delta Lake on Databricks Lakehouse.

I used  CyberDuck to connect to S3, but you can connect via cmd-line or  the web console. Below is an image that illustrates how that looks:

An illustration of a Databricks Lakehouse destination on AWS S3

Airbyte created several files. Most notably, the *.snappy.parquet and the _delta_log contain the above-mentioned delta lake transaction log. The content of your source data resides in the parquet files consolidated in a delta table represented as a single folder in the tutorial_cities and tutorial_users tables. Each table folder contains the delta-log with detailed information about each transaction.

There are other internal folders that Databricks created, such as copy_into_log and SSTs produced by the  COPY INTO command.

The Airbyte Databricks Lakehouse connector does at its core two following steps:

  1. CREATE OR REPLACE TABLE AS SELECT * FROM LIMIT 0
  2. COPY INTO 

These two transactions are also represented in the transaction log with 00000000000000000000.json/crs and 00000000000000000001.json/crc. If you open the transaction log represented by the two JSON and  CRC files, you should see a protocol for each transaction that happened and some metadata.

00000000000000000000.json:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
    "commitInfo": {
        "timestamp": 1654879959245,
        "operation": "CREATE TABLE AS SELECT",
        "operationParameters": {
            "isManaged": "false",
            "description": "Created from stream tutorial_cities",
            "partitionBy": "[]",
            "properties": "{"delta.autoOptimize.autoCompact":"true","delta.autoOptimize.optimizeWrite":"true","airbyte.destinationSyncMode":"append"}"
        },
        "isolationLevel": "WriteSerializable",
        "isBlindAppend": true,
        "operationMetrics": {
            "numFiles": "1",
            "numOutputBytes": "927",
            "numOutputRows": "0"
        }
    }

00000000000000000000.crc:

1
{"tableSizeBytes":927,"numFiles":1,"numMetadata":1,"numProtocol":1,"numTransactions":0}

And the second transaction 00000000000000000001.json with COPY INTO:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
    "commitInfo": {
        "timestamp": 1654879970063,
        "operation": "COPY INTO",
        "operationParameters": {},
        "readVersion": 0,
        "isolationLevel": "WriteSerializable",
        "isBlindAppend": true,
        "operationMetrics": {
            "numFiles": "1",
            "numOutputBytes": "2085",
            "numOutputRows": "3"
        }
    }
}

00000000000000000001.crc:

1
{"tableSizeBytes":3012,"numFiles":2,"numMetadata":1,"numProtocol":1,"numTransactions":1}

# Key Features

General features are on Data Lake Table Format

# Schema Validation

Delta Lake automatically validates DataFrame schema compatibility with table schema during writes, following these rules:

  • All DataFrame columns must exist in the target table
  • DataFrame column data types must match target table types
  • Column names cannot differ only by case

Delta Lake on Databricks supports DDL for explicit column addition and automatic schema updates. When using append mode with partitionBy, Delta Lake validates matching configurations and throws errors for mismatches. Without partitionBy, it automatically appends to existing data partitioning.

Source: What is Delta Lake? | Databricks on AWS

# Schema Enforcement Implementation

Delta Lake employs schema validation on write, ensuring all new writes are schema-compatible before committing. Incompatible schemas trigger transaction cancellation and raise exceptions. The compatibility rules require that the DataFrame:

  • Cannot contain additional columns absent from the target table
  • Must have matching column data types
  • Cannot contain case-different column names

Source: Schema Evolution & Enforcement on Delta Lake - Databricks

# Change Data Feed (CDF)

Change Data Feed (CDF) feature allows Delta tables to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records “change events” for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated.

You can read the change events in batch queries using DataFrame APIs (that is, df.read) and in streaming queries using DataFrame APIs (that is, df.readStream).

# Manifest Files

For integration with Trino and similar tools, Delta Lake typically works with a manifest file. See Presto, Trino, and Athena to Delta Lake integration using manifests.

# Use cases

Change Data Feed is not enabled by default. The following use cases should drive when you enable the change data feed.

  • Silver and Gold tables: Improve Delta performance by processing only row-level changes following initial MERGEUPDATE, or DELETE operations to accelerate and simplify ETL and ELT operations.
  • Transmit changes: Send a change data feed to downstream systems such as Kafka or RDBMS that can use it to incrementally process in later stages of data pipelines.
  • Audit trail table: Capture the change data feed as a Delta table provides perpetual storage and efficient query capability to see all changes over time, including when deletes occur and what updates were made.

# History

# Databricks Acquires Tabular

2024-06-04 Ali Ghods announced the acquisition of Tabular (Iceberg), the company behind Apache Iceberg.

origin

# Version Updates

# FAQ

# Vendor Independence

Delta Lake maintains independence through:

Delta is not only used by Databricks and there is no vendor locking as Delta Lake project is managed by the Linux Foundation. Delta Lake is supported on AWS Lake Formation, Athena, Redshift (even long time before the support for Iceberg was added there), and EMR. Snowflake does also have some support for Delta Lake. GCP BigQuery also does have some support for Delta Lake. Azure ADF, Synapse, also support Delta Lake. Microsoft Fabric is built on top of Delta Lake. Many other tools and services support Delta. So I don’t think you need Databricks or vendor locked there as you’re suggesting, if you go the Delta way. recap on Iceberg Summit 2024 conference: r/dataengineering


Origin: Data Lake / Lakehouse Guide: Powered by Data Lake Table Formats (Delta Lake, Iceberg, Hudi) | ssp.sh
References: Medallion Architecture
Created: 2019 when Spark & AI Summit Amsterdam 2019