diff --git a/src/blog/delta-lake-sql/index.mdx b/src/blog/delta-lake-sql/index.mdx new file mode 100644 index 00000000..bb9c01a2 --- /dev/null +++ b/src/blog/delta-lake-sql/index.mdx @@ -0,0 +1,323 @@ +--- +title: Delta Lake SQL +description: Learn how to use Delta Lake with SQL to query and transform your data. +thumbnail: ./thumbnail.png +author: Avril Aysha +date: 2025-05-29 +--- + +Many data analysts and engineers use SQL as their primary querying language. It's been around for decades, and it works with everything from spreadsheets to big data platforms. SQL is easy to learn, yet powerful enough to join, filter, and transform large datasets. It's also readable, making your queries easy to understand for others who need to read your code. + +Delta Lake is a storage format that gives your data lake ACID transactions, advanced data skipping, version control, and schema enforcement. You get the flexibility of a lake with the safety of a warehouse. Delta Lake offers full SQL support. That means you can read, write, update, and even time travel, without learning a new tool. + +This article will show you how to use Delta Lake and SQL together. We will start by demonstrating examples using PySpark. After that, we will show how you can use Delta Lake with SQL from other engines like Apache Flink, Apache Hive, Presto, Trino and DuckDB. + +Let's jump in! 🤓 + +## Delta Lake SQL with Spark + +Spark is one of the best engines to use when you want to query and manipulate your Delta Lake data with SQL. If this is your first time using Delta Lake with PySpark, follow the instructions in the [Delta Lake with PySpark blog](#link-to-pyspark-blog) to get started. + +Let's take a look at how we can use Delta Lake with SQL in PySpark to create new tables, query them and perform common data manipulation operations like inserts, updates, deletes, and merges. We will also explain how to use Delta Lake's powerful features like time travel and schema evolution using SQL. + +## Delta Lake SQL: Create table from existing data + +You can create SQL tables from existing Delta Lake tables. This is helpful when you receive data from another team or client. + +If you want to use SQL to query an existing Delta Lake table, you will first need to register your Delta table as a Spark SQL table using a `CREATE TABLE IF NOT EXISTS` clause and the path to the Delta table: + +```python + delta_path = + spark.sql(f"CREATE TABLE IF NOT EXISTS data USING DELTA LOCATION '{delta_path}'") +``` + +When you've registered your table, you can then run regular SQL queries on your data: + +```python + result = spark.sql("SELECT * FROM data") +``` + +Delta Lake makes it easy for teams to work together even if they don't use the same programming language. A Delta Lake table can be created using Java, Python or any of the other supported languages and can then be read and further manipulated downstream using SQL. + +## Delta Lake SQL: Create new SQL table + +Alternatively, you can create a new SQL table from scratch using a `CREATE TABLE` clause. + +For example, let's create an empty table `people` with two columns `name` and `age`: + +```python + spark.sql(""" + CREATE TABLE people ( + name STRING, + age INT + ) + USING DELTA + LOCATION '/tmp/people_table' + """) +``` + +## Delta Lake SQL: Insert, Update and Delete + +You can use SQL to perform standard DML (Data Manipulation Language) operations on your data, such as inserts, updates and deletes. + +Let's insert 3 new rows of data into our empty `people` table. Use the `INSERT INTO` clause to do this: + +```python + spark.sql("INSERT INTO people VALUES ('yasmin', 33), ('janco', 37), ('paulo', 22)") +``` + +You can now query this table to confirm the rows have been added. For example, let's get all names with ages above 30: + +```python + > result = spark.sql("SELECT * FROM people WHERE age > 30") + > result.show() + + +------+---+ + | name|age| + +------+---+ + |yasmin| 33| + | janco| 37| + +------+---+ +``` + +You can use the `UPDATE` clause to update an existing row: + +```python + spark.sql("UPDATE people SET age = 29 WHERE name = 'yasmin'") +``` + +The same filter on `age > 30` will now return: + +```python + > result = spark.sql("SELECT * FROM people WHERE age > 30") + > result.show() + + +-----+---+ + | name|age| + +-----+---+ + |janco| 37| + +-----+---+ +``` + +Use the `DELETE FROM` clause to delete an existing row: + +```python + spark.sql("DELETE FROM people WHERE age == 37") +``` + +Confirm by querying all rows in your table: + +```python + > result = spark.sql("SELECT * FROM people") + > result.show() + + +------+---+ + | name|age| + +------+---+ + |yasmin| 29| + | paulo| 22| + +------+---+ +``` + +## Delta Lake SQL: Merge + +Delta Lake has great support for complex merge operations, such as upserts. This allows you to insert, update, or delete records in one operation, without having to rewrite the entire dataset. + +Let's say we have a larger table that contains our primary database of names and ages. This table is stored at `main_delta_path` and has been registered as a SQL table with the name `data`: + +```python + data = [("alice", 30), ("bob", 42), ("claire", 25), ("janco", 35), ("paulo", 54), ("sylvia", 21)] + df = spark.createDataFrame(data, ["name", "age"]) + main_delta_path = "/tmp/main_delta_table" + df.write.format("delta").save(main_delta_path) + spark.sql(f"CREATE TABLE IF NOT EXISTS data USING DELTA LOCATION '{main_delta_path}'") +``` + +Here are the contents of the main table: + +```python + > result = spark.sql("SELECT * FROM data") + > result.show() + + +------+---+ + | name|age| + +------+---+ + |claire| 25| + |sylvia| 21| + | paulo| 54| + | janco| 35| + | alice| 30| + | bob| 42| + +------+---+ +``` + +Now let's say that our `people` table contains rows that we want to merge into our main database. + +```python + > result = spark.sql("SELECT * FROM people") + > result.show() + + +------+---+ + | name|age| + +------+---+ + |yasmin| 29| + | paulo| 22| + +------+---+ +``` + +We want to merge these rows according to multiple conditions: + +1. If the `name` field matches, we want to update the `age` field to the number in the `people` table +2. If there are no matches for the `name` field, we want to insert the row from the `people` table as a new row in the `data` table. + +You can perform this kind of upsert operation with Delta Lake using SQL: + +```python + spark.sql(""" + MERGE INTO data AS target + USING people AS source + ON target.name = source.name + WHEN MATCHED THEN UPDATE SET age = source.age + WHEN NOT MATCHED THEN INSERT * + """) +``` + +Your main database table `data` now contains: + +```python + > result = spark.sql("SELECT * FROM data") + > result.show() + + +------+---+ + | name|age| + +------+---+ + |claire| 25| + |sylvia| 21| + | paulo| 22| + |yasmin| 29| + | janco| 35| + | alice| 30| + | bob| 42| + +------+---+ +``` + +Paulo's age has been correctly updated and the row for Yasmin has been added. + +Read more about performing complex upsert operations in the [Delta Lake upsert](https://delta.io/blog/delta-lake-upsert/) article. + +## Delta Lake SQL: Time Travel + +Time travel is one of Delta Lake's most powerful features. It lets you travel back to earlier versions of your data to correct mistakes, recover lost data or for auditing purposes. Time travel is possible because Delta Lake registers all changes to your data in [the transaction log](#link-to-architecture-post). + +Let's say the upsert operation we just performed was actually a mistake. You can travel back to the version of your table before the upsert using the `VERSION AS OF` clause: + +```python + > spark.sql("SELECT * FROM data VERSION AS OF 0").show() + + +------+---+ + | name|age| + +------+---+ + |claire| 25| + |sylvia| 21| + | janco| 35| + | paulo| 54| + | alice| 30| + | bob| 42| + +------+---+ +``` + +You can also choose a version of data based on the commit timestamp using the `TIMESTAMP AS OF` clause: + +```python +spark.sql("SELECT * FROM data TIMESTAMP AS OF ''").show() +``` + +Read more about version control and time travel in the [Delta Lake time travel](https://delta.io/blog/2023-02-01-delta-lake-time-travel/) article. + +## Delta Lake SQL: Schema Evolution + +Delta Lake has [schema enforcement](https://delta.io/blog/2022-11-16-delta-lake-schema-enforcement/) guarantees in place to prevent accidental data corruption. You cannot simply `INSERT` a new row that does not follow the existing schema of your Delta table: + +```python +spark.sql("INSERT INTO people VALUES ('janco', 37, 'student')") +``` + +This will throw an error: + +``` +[UNRESOLVED_COLUMN.WITHOUT_SUGGESTION] A column or function parameter with name `student` cannot be resolved. +``` + +But sometimes the schema of your table evolves over time and you need some flexibility to add, remove or rename columns. + +In these situations, use the `ALTER TABLE` clause to update your schema: + +```python + spark.sql("ALTER TABLE people ADD COLUMNS (occupation STRING)") +``` + +Now you can add the new row with the new `occupation` column: + +```python + spark.sql("INSERT INTO people VALUES ('janco', 37, 'student')") +``` + +Your `people` table now has 3 rows and 3 columns. The new column fields are filled with `NULL` values for existing rows: + +```python + > spark.sql("SELECT * FROM people").show() + + +------+---+----------+ + | name|age|occupation| + +------+---+----------+ + | janco| 37| student| + |yasmin| 29| NULL| + | paulo| 22| NULL| + +------+---+----------+ +``` + +Read more in the [Delta Lake schema evolution](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/) article. + +## Delta Lake SQL with other engines + +You can also use Delta Lake and SQL with many other query engines. For example, here's how to query an existing Delta Lake table on S3 with Apache Flink: + +```SQL + CREATE TABLE sales.apac.sales_data_new (sampleColumn INT) + WITH (external_location = 's3://db-sa-datasets/presto/sales_data_new'); + + SELECT * FROM sales.apac.sales_data LIMIT 200; +``` + +And here's how to query an existing Delta Lake table with DuckDB: + +```python +dt = DeltaTable("tmp/pandas-table/") +arrow_data = dt.to_pyarrow_dataset() +duck_data = duckdb.arrow(arrow_data) + +query = """ + select * + from duck_data +""" + +duckdb.query(query) +``` + +Other supported SQL engines include: + +- Apache Hive +- Amazon Athena +- PrestoDB +- Trino +- Snowflake +- Databricks +- Microsoft Fabric + +Read the [Delta Lake without Spark article](https://delta.io/blog/delta-lake-without-spark/) for detailed code examples. + +## Delta Lake and SQL + +Delta Lake and SQL are a powerful match, especially for data engineers and analysts who are already comfortable in SQL. You get the safety of ACID transactions, the speed and performance of advanced data skipping, together with the simplicity of a query language you already know. Whether you're inserting rows, fixing mistakes, or auditing history, using SQL with Delta Lake gives you full control over your data. + +If you want to learn more about Delta Lake's powerful features, check out the [Delta Lake vs Data Lake ](https://delta.io/blog/delta-lake-vs-data-lake/)post. diff --git a/src/blog/delta-lake-sql/thumbnail.png b/src/blog/delta-lake-sql/thumbnail.png new file mode 100644 index 00000000..7a66c4fb Binary files /dev/null and b/src/blog/delta-lake-sql/thumbnail.png differ