Skip to content

Commit ea4ddc5

Browse files
authored
Adding ldf sinkParquet (#171)
1 parent 8ecf9bd commit ea4ddc5

File tree

5 files changed

+189
-13
lines changed

5 files changed

+189
-13
lines changed

__tests__/lazyframe.test.ts

+30-4
Original file line numberDiff line numberDiff line change
@@ -1242,7 +1242,7 @@ describe("lazyframe", () => {
12421242
ldf.sinkCSV("./test.csv");
12431243
const newDF: pl.DataFrame = pl.readCSV("./test.csv");
12441244
const actualDf: pl.DataFrame = await ldf.collect();
1245-
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
1245+
expect(newDF.sort("foo")).toFrameEqual(actualDf);
12461246
fs.rmSync("./test.csv");
12471247
});
12481248
test("sinkCSV:noHeader", async () => {
@@ -1255,7 +1255,7 @@ describe("lazyframe", () => {
12551255
ldf.sinkCSV("./test.csv", { includeHeader: false });
12561256
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { hasHeader: false });
12571257
const actualDf: pl.DataFrame = await ldf.collect();
1258-
expect(newDF.sort("column_1").toString()).toEqual(actualDf.toString());
1258+
expect(newDF.sort("column_1")).toFrameEqual(actualDf);
12591259
fs.rmSync("./test.csv");
12601260
});
12611261
test("sinkCSV:separator", async () => {
@@ -1268,7 +1268,7 @@ describe("lazyframe", () => {
12681268
ldf.sinkCSV("./test.csv", { separator: "|" });
12691269
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { sep: "|" });
12701270
const actualDf: pl.DataFrame = await ldf.collect();
1271-
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
1271+
expect(newDF.sort("foo")).toFrameEqual(actualDf);
12721272
fs.rmSync("./test.csv");
12731273
});
12741274
test("sinkCSV:nullValue", async () => {
@@ -1283,7 +1283,33 @@ describe("lazyframe", () => {
12831283
const actualDf: pl.DataFrame = await (await ldf.collect()).withColumn(
12841284
pl.col("bar").fillNull("BOOM"),
12851285
);
1286-
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
1286+
expect(newDF.sort("foo")).toFrameEqual(actualDf);
12871287
fs.rmSync("./test.csv");
12881288
});
1289+
test("sinkParquet:path", async () => {
1290+
const ldf = pl
1291+
.DataFrame([
1292+
pl.Series("foo", [1, 2, 3], pl.Int64),
1293+
pl.Series("bar", ["a", "b", "c"]),
1294+
])
1295+
.lazy();
1296+
ldf.sinkParquet("./test.parquet");
1297+
const newDF: pl.DataFrame = pl.readParquet("./test.parquet");
1298+
const actualDf: pl.DataFrame = await ldf.collect();
1299+
expect(newDF.sort("foo")).toFrameEqual(actualDf);
1300+
fs.rmSync("./test.parquet");
1301+
});
1302+
test("sinkParquet:compression:gzip", async () => {
1303+
const ldf = pl
1304+
.DataFrame([
1305+
pl.Series("foo", [1, 2, 3], pl.Int64),
1306+
pl.Series("bar", ["a", "b", "c"]),
1307+
])
1308+
.lazy();
1309+
ldf.sinkParquet("./test.parquet", { compression: "gzip" });
1310+
const newDF: pl.DataFrame = pl.readParquet("./test.parquet");
1311+
const actualDf: pl.DataFrame = await ldf.collect();
1312+
expect(newDF.sort("foo")).toFrameEqual(actualDf);
1313+
fs.rmSync("./test.parquet");
1314+
});
12891315
});

polars/lazy/dataframe.ts

+58-8
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@ import {
1111
} from "../utils";
1212
import { _LazyGroupBy, LazyGroupBy } from "./groupby";
1313
import { Deserialize, GroupByOps, Serialize } from "../shared_traits";
14-
import { LazyOptions, LazyJoinOptions, SinkCsvOptions } from "../types";
14+
import {
15+
LazyOptions,
16+
LazyJoinOptions,
17+
SinkCsvOptions,
18+
SinkParquetOptions,
19+
} from "../types";
1520
import { Series } from "../series";
1621

1722
const inspect = Symbol.for("nodejs.util.inspect.custom");
@@ -513,7 +518,52 @@ export interface LazyDataFrame extends Serialize, GroupByOps<LazyGroupBy> {
513518
>>> lf.sinkCsv("out.csv")
514519
*/
515520

516-
sinkCSV(dest: string, options?: SinkCsvOptions): void;
521+
sinkCSV(path: string, options?: SinkCsvOptions): void;
522+
523+
/***
524+
*
525+
* Evaluate the query in streaming mode and write to a Parquet file.
526+
527+
.. warning::
528+
Streaming mode is considered **unstable**. It may be changed
529+
at any point without it being considered a breaking change.
530+
531+
This allows streaming results that are larger than RAM to be written to disk.
532+
533+
Parameters
534+
----------
535+
@param path - File path to which the file should be written.
536+
@param compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'}
537+
Choose "zstd" for good compression performance. (default)
538+
Choose "lz4" for fast compression/decompression.
539+
Choose "snappy" for more backwards compatibility guarantees
540+
when you deal with older parquet readers.
541+
@param compressionLevel - The level of compression to use. Higher compression means smaller files on disk.
542+
- "gzip" : min-level: 0, max-level: 10.
543+
- "brotli" : min-level: 0, max-level: 11.
544+
- "zstd" : min-level: 1, max-level: 22.
545+
@param statistics - Write statistics to the parquet headers. This requires extra compute. Default - false
546+
@param rowGroupSize - Size of the row groups in number of rows.
547+
If None (default), the chunks of the `DataFrame` are
548+
used. Writing in smaller chunks may reduce memory pressure and improve
549+
writing speeds.
550+
@param dataPagesizeLimit - Size limit of individual data pages.
551+
If not set defaults to 1024 * 1024 bytes
552+
@param maintainOrder - Maintain the order in which data is processed. Default -> true
553+
Setting this to `False` will be slightly faster.
554+
@param typeCoercion - Do type coercion optimization. Default -> true
555+
@param predicatePushdown - Do predicate pushdown optimization. Default -> true
556+
@param projectionPushdown - Do projection pushdown optimization. Default -> true
557+
@param simplifyExpression - Run simplify expressions optimization. Default -> true
558+
@param slicePushdown - Slice pushdown optimization. Default -> true
559+
@param noOptimization - Turn off (certain) optimizations. Default -> false
560+
561+
Examples
562+
--------
563+
>>> const lf = pl.scanCsv("/path/to/my_larger_than_ram_file.csv") # doctest: +SKIP
564+
>>> lf.sinkParquet("out.parquet") # doctest: +SKIP
565+
*/
566+
sinkParquet(path: string, options?: SinkParquetOptions): void;
517567
}
518568

519569
const prepareGroupbyInputs = (by) => {
@@ -956,13 +1006,13 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
9561006
withRowCount(name = "row_nr") {
9571007
return _LazyDataFrame(_ldf.withRowCount(name));
9581008
},
959-
sinkCSV(dest?, options = {}) {
1009+
sinkCSV(path, options: SinkCsvOptions = {}) {
9601010
options.maintainOrder = options.maintainOrder ?? false;
961-
if (typeof dest === "string") {
962-
_ldf.sinkCsv(dest, options);
963-
} else {
964-
throw new TypeError("Expected a string destination");
965-
}
1011+
_ldf.sinkCsv(path, options);
1012+
},
1013+
sinkParquet(path: string, options: SinkParquetOptions = {}) {
1014+
options.compression = options.compression ?? "zstd";
1015+
_ldf.sinkParquet(path, options);
9661016
},
9671017
};
9681018
};

polars/types.ts

+19-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export interface WriteCsvOptions {
4747
sep?: string;
4848
}
4949
/**
50-
* Options for {@link LazyDataFrame.sinkCSV}
50+
* Options for @see {@link LazyDataFrame.sinkCSV}
5151
* @category Options
5252
*/
5353
export interface SinkCsvOptions {
@@ -65,6 +65,24 @@ export interface SinkCsvOptions {
6565
nullValue?: string;
6666
maintainOrder?: boolean;
6767
}
68+
/**
69+
* Options for @see {@link LazyDataFrame.sinkParquet}
70+
* @category Options
71+
*/
72+
export interface SinkParquetOptions {
73+
compression?: string;
74+
compressionLevel?: number;
75+
statistics?: boolean;
76+
rowGroupSize?: number;
77+
dataPagesizeLimit?: number;
78+
maintainOrder?: boolean;
79+
typeCoercion?: boolean;
80+
predicatePushdown?: boolean;
81+
projectionPushdown?: boolean;
82+
simplifyExpression?: boolean;
83+
slicePushdown?: boolean;
84+
noOptimization?: boolean;
85+
}
6886
/**
6987
* Options for {@link DataFrame.writeJSON}
7088
* @category Options

src/conversion.rs

+59
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,22 @@ pub struct SinkCsvOptions {
586586
pub maintain_order: bool,
587587
}
588588

589+
#[napi(object)]
590+
pub struct SinkParquetOptions {
591+
pub compression: Option<String>,
592+
pub compression_level: Option<i32>,
593+
pub statistics: Option<bool>,
594+
pub row_group_size: Option<i16>,
595+
pub data_pagesize_limit: Option<i64>,
596+
pub maintain_order: Option<bool>,
597+
pub type_coercion: Option<bool>,
598+
pub predicate_pushdown: Option<bool>,
599+
pub projection_pushdown: Option<bool>,
600+
pub simplify_expression: Option<bool>,
601+
pub slice_pushdown: Option<bool>,
602+
pub no_optimization: Option<bool>,
603+
}
604+
589605
#[napi(object)]
590606
pub struct Shape {
591607
pub height: i64,
@@ -1197,3 +1213,46 @@ where
11971213
{
11981214
container.into_iter().map(|s| s.as_ref().into()).collect()
11991215
}
1216+
1217+
pub(crate) fn parse_parquet_compression(
1218+
compression: String,
1219+
compression_level: Option<i32>,
1220+
) -> JsResult<ParquetCompression> {
1221+
let parsed = match compression.as_ref() {
1222+
"uncompressed" => ParquetCompression::Uncompressed,
1223+
"snappy" => ParquetCompression::Snappy,
1224+
"gzip" => ParquetCompression::Gzip(
1225+
compression_level
1226+
.map(|lvl| {
1227+
GzipLevel::try_new(lvl as u8)
1228+
// .map_err(|e| JsValueErr::new_err(format!("{e:?}")))
1229+
.map_err(|e| napi::Error::from_reason(format!("{:?}", e)))
1230+
})
1231+
.transpose()?,
1232+
),
1233+
"lzo" => ParquetCompression::Lzo,
1234+
"brotli" => ParquetCompression::Brotli(
1235+
compression_level
1236+
.map(|lvl| {
1237+
BrotliLevel::try_new(lvl as u32)
1238+
.map_err(|e| napi::Error::from_reason(format!("{e:?}")))
1239+
})
1240+
.transpose()?,
1241+
),
1242+
"lz4" => ParquetCompression::Lz4Raw,
1243+
"zstd" => ParquetCompression::Zstd(
1244+
compression_level
1245+
.map(|lvl| {
1246+
ZstdLevel::try_new(lvl)
1247+
.map_err(|e| napi::Error::from_reason(format!("{e:?}")))
1248+
})
1249+
.transpose()?,
1250+
),
1251+
e => {
1252+
return Err(napi::Error::from_reason(format!(
1253+
"parquet `compression` must be one of {{'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'lz4', 'zstd'}}, got {e}",
1254+
)))
1255+
}
1256+
};
1257+
Ok(parsed)
1258+
}

src/lazy/dataframe.rs

+23
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,29 @@ impl JsLazyFrame {
590590
let _ = ldf.sink_csv(path_buf, options).map_err(JsPolarsErr::from);
591591
Ok(())
592592
}
593+
594+
#[napi(catch_unwind)]
595+
pub fn sink_parquet(&self, path: String, options: SinkParquetOptions) -> napi::Result<()> {
596+
let compression_str = options.compression.unwrap_or("zstd".to_string());
597+
let compression = parse_parquet_compression(compression_str, options.compression_level)?;
598+
let statistics = options.statistics.unwrap_or(false);
599+
let row_group_size = options.row_group_size.map(|i| i as usize);
600+
let data_pagesize_limit = options.data_pagesize_limit.map(|i| i as usize);
601+
let maintain_order = options.maintain_order.unwrap_or(true);
602+
603+
let options = ParquetWriteOptions {
604+
compression,
605+
statistics,
606+
row_group_size,
607+
data_pagesize_limit,
608+
maintain_order,
609+
};
610+
611+
let path_buf: PathBuf = PathBuf::from(path);
612+
let ldf = self.ldf.clone().with_comm_subplan_elim(false);
613+
let _ = ldf.sink_parquet(path_buf, options).map_err(JsPolarsErr::from);
614+
Ok(())
615+
}
593616
}
594617

595618
#[napi(object)]

0 commit comments

Comments
 (0)