Skip to content

Commit f21b6f4

Browse files
committed
Initial version of read_delta
1 parent e1b78c3 commit f21b6f4

12 files changed

+127
-5
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ thiserror = "1"
2727
smartstring = { version = "1" }
2828
serde_json = { version = "1" }
2929
either = "1.9"
30+
deltalake = "0.17.1"
3031

3132
[dependencies.polars]
3233
features = [
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
2+
{"metaData":{"id":"b27252b0-26cc-49c5-b79d-12dd6647ffba","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"foo\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"fruits\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"B\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"cars\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1711300448856,"configuration":{}}}
3+
{"add":{"path":"0-07808ab0-6b13-421c-bb67-ca09945eb281-0.parquet","partitionValues":{},"size":1550,"modificationTime":1711300448855,"dataChange":true,"stats":"{\"numRecords\": 5, \"minValues\": {\"foo\": \"5\", \"fruits\": \"apple\", \"B\": 1, \"cars\": \"audi\"}, \"maxValues\": {\"foo\": \"you have a,b,c\", \"fruits\": \"banana\", \"B\": 5, \"cars\": \"beetle\"}, \"nullCount\": {\"foo\": 0, \"fruits\": 0, \"B\": 0, \"cars\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
4+
{"commitInfo":{"timestamp":1711300448856,"operation":"CREATE TABLE","operationParameters":{"location":"file:///examples/delta/sample.table","protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","metadata":"{\"configuration\":{},\"createdTime\":1711300448856,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"b27252b0-26cc-49c5-b79d-12dd6647ffba\",\"name\":null,\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"foo\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"fruits\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"B\\\",\\\"type\\\":\\\"long\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"cars\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}","mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.17.1"}}

__tests__/io.test.ts

+9
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ const ipcpath = path.resolve(__dirname, "./examples/foods.ipc");
1818
const jsonpath = path.resolve(__dirname, "./examples/foods.json");
1919
// eslint-disable-next-line no-undef
2020
const singlejsonpath = path.resolve(__dirname, "./examples/single_foods.json");
21+
// eslint-disable-next-line no-undef
22+
const deltapath = path.resolve(__dirname, "./examples/delta/sample.table");
2123
describe("read:csv", () => {
2224
it("can read from a csv file", () => {
2325
const df = pl.readCSV(csvpath);
@@ -235,6 +237,13 @@ describe("scan", () => {
235237
});
236238
});
237239

240+
describe("delta", () => {
241+
test("delta:scan", async () => {
242+
const df = await pl.readDelta(deltapath).collect();
243+
expect(df.shape).toEqual({ height: 5, width: 4 });
244+
});
245+
});
246+
238247
describe("parquet", () => {
239248
beforeEach(() => {
240249
pl.readCSV(csvpath).writeParquet(parquetpath);

biome.json

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"$schema": "https://biomejs.dev/schemas/1.0.0/schema.json",
2+
"$schema": "https://biomejs.dev/schemas/1.6.2/schema.json",
33
"organizeImports": {
44
"enabled": false
55
},
@@ -24,7 +24,8 @@
2424
"ignore": [
2525
"polars/native-polars.js",
2626
"./docs/*",
27-
"./bin/*"
27+
"./bin/*",
28+
"**/delta/sample.table/*"
2829
]
2930
},
3031
"formatter": {
@@ -33,7 +34,8 @@
3334
"ignore": [
3435
"polars/native-polars.js",
3536
"./docs/*",
36-
"./bin/*"
37+
"./bin/*",
38+
"**/delta/sample.table/*"
3739
]
3840
},
3941
"javascript": {

bun.lockb

396 Bytes
Binary file not shown.

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
"precommit": "yarn lint && yarn test"
5555
},
5656
"devDependencies": {
57-
"@biomejs/biome": "^1.6.1",
57+
"@biomejs/biome": "^1.6.2",
5858
"@napi-rs/cli": "^2.18.0",
5959
"@types/chance": "^1.1.6",
6060
"@types/jest": "^29.5.12",
@@ -65,7 +65,7 @@
6565
"ts-jest": "^29.1.2",
6666
"ts-node": "^10.9.2",
6767
"typedoc": "^0.25.12",
68-
"typescript": "5.4.2"
68+
"typescript": "5.4.3"
6969
},
7070
"packageManager": "[email protected]",
7171
"workspaces": [

polars/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export namespace pl {
4747
export import readJSON = io.readJSON;
4848
export import readParquet = io.readParquet;
4949
export import readAvro = io.readAvro;
50+
export import readDelta = io.readDelta;
5051

5152
export import readCSVStream = io.readCSVStream;
5253
export import readJSONStream = io.readJSONStream;

polars/io.ts

+26
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,32 @@ export function readParquet(
436436
throw new Error("must supply either a path or body");
437437
}
438438

439+
interface ReadDeltaOptions {
440+
version?: number;
441+
columns?: string[];
442+
parallel?: "auto" | "columns" | "row_groups" | "none";
443+
}
444+
445+
/**
446+
* Reads into a DataFrame from a Delta lake table.
447+
* @param path - Path or URI to the root of the Delta lake table.
448+
* @param option.version - Version of the Delta lake table. Note: If `version` is not provided, the latest version of delta lake
449+
* table is read.
450+
* @param option.columns - Columns to select. Accepts a list of column names.
451+
* @param options.parallel - Any of 'auto' | 'columns' | 'row_groups' | 'none'
452+
* This determines the direction of parallelism. 'auto' will try to determine the optimal direction.
453+
* Defaults to 'auto'
454+
*/
455+
456+
export function readDelta(
457+
path: string,
458+
options: ReadDeltaOptions = {},
459+
): LazyDataFrame {
460+
const parallel = options?.parallel ?? "auto";
461+
options = { parallel, ...options };
462+
return _LazyDataFrame(pli.readDelta(path, options));
463+
}
464+
439465
export interface ReadAvroOptions {
440466
columns: string[] | Array<string> | number[];
441467
projection: number;

src/dataframe.rs

+26
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
use crate::delta::read_delta_table;
2+
use crate::export::JsLazyFrame;
13
use crate::file::*;
24
use crate::prelude::*;
35
use crate::series::JsSeries;
46
use napi::JsUnknown;
57
use polars::frame::row::{infer_schema, Row};
68
use polars::frame::NullStrategy;
9+
use polars_io::pl_async::get_runtime;
710
use polars_io::RowIndex;
811

912
use std::borrow::Borrow;
@@ -271,6 +274,29 @@ pub fn read_json(
271274
Ok(df.into())
272275
}
273276

277+
#[napi(object)]
278+
pub struct ReadDeltaOptions {
279+
pub version: Option<i64>,
280+
pub columns: Option<Vec<String>>,
281+
pub parallel: Wrap<ParallelStrategy>,
282+
}
283+
284+
#[napi(catch_unwind)]
285+
pub fn read_delta(path: String,
286+
options: ReadDeltaOptions
287+
) -> napi::Result<JsLazyFrame> {
288+
let table: std::prelude::v1::Result<LazyFrame, deltalake::DeltaTableError> = get_runtime().block_on(async {
289+
read_delta_table(&path, options).await
290+
});
291+
292+
let ldf:LazyFrame = match table {
293+
Ok(table) => table,
294+
Err(err) => return Err(napi::Error::from_reason(err.to_string()))
295+
};
296+
297+
Ok(LazyFrame::from(ldf).into())
298+
}
299+
274300
#[napi(object)]
275301
pub struct ReadParquetOptions {
276302
pub columns: Option<Vec<String>>,

src/delta.rs

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use polars_core::prelude::DataFrame;
2+
use polars_io::{parquet::ParquetReader, SerReader};
3+
use polars_lazy::{dsl::UnionArgs, frame::{IntoLazy, LazyFrame}};
4+
use polars::prelude::concat;
5+
use deltalake::{DeltaTableBuilder, DeltaTableError};
6+
use crate::dataframe::ReadDeltaOptions;
7+
8+
pub async fn read_delta_table(path: &str,
9+
options: ReadDeltaOptions,
10+
) -> Result<LazyFrame, DeltaTableError> {
11+
let mut db = DeltaTableBuilder::from_uri(path)
12+
.with_allow_http(false);
13+
14+
// if version specified, add it
15+
if options.version.is_some() {
16+
db = db.with_version(options.version.unwrap());
17+
}
18+
19+
let dt = db.load().await?;
20+
21+
// show all active files in the table
22+
let files: Vec<_> = dt.get_file_uris()?.collect();
23+
24+
let mut df_collection: Vec<DataFrame> = vec![];
25+
26+
for file in files.into_iter() {
27+
let base = std::path::Path::new(path);
28+
let file_path = std::path::Path::new(&file);
29+
let full_path = base.join(file_path);
30+
let mut file = std::fs::File::open(full_path).unwrap();
31+
32+
let columns = options.columns.clone();
33+
let parallel = options.parallel.0;
34+
35+
let df = ParquetReader::new(&mut file)
36+
.with_columns(columns)
37+
.read_parallel(parallel)
38+
.finish().unwrap();
39+
40+
df_collection.push(df);
41+
}
42+
43+
let empty_head = df_collection[0].clone().lazy().limit(0);
44+
45+
Ok(df_collection.into_iter().fold(empty_head, |acc, df| concat([acc, df.lazy()],
46+
UnionArgs {
47+
rechunk: false,
48+
parallel: false,
49+
..Default::default()
50+
}).unwrap()))
51+
52+
}

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub mod series;
2828
pub mod set;
2929
pub mod utils;
3030
pub mod sql;
31+
pub mod delta;
3132

3233
pub use polars_core;
3334

0 commit comments

Comments
 (0)