Skip to content

Commit 83e5414

Browse files
authored
Adding sinkCSV to ldf (#168)
1 parent 98b5987 commit 83e5414

File tree

5 files changed

+203
-2
lines changed

5 files changed

+203
-2
lines changed

__tests__/lazyframe.test.ts

+55
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pl from "@polars";
2+
import fs from "fs";
23

34
describe("lazyframe", () => {
45
test("columns", () => {
@@ -1231,4 +1232,58 @@ describe("lazyframe", () => {
12311232
.collectSync();
12321233
expect(actual).toFrameEqual(expected);
12331234
});
1235+
test("sinkCSV:path", async () => {
1236+
const ldf = pl
1237+
.DataFrame([
1238+
pl.Series("foo", [1, 2, 3], pl.Int64),
1239+
pl.Series("bar", ["a", "b", "c"]),
1240+
])
1241+
.lazy();
1242+
ldf.sinkCSV("./test.csv");
1243+
const newDF: pl.DataFrame = pl.readCSV("./test.csv");
1244+
const actualDf: pl.DataFrame = await ldf.collect();
1245+
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
1246+
fs.rmSync("./test.csv");
1247+
});
1248+
test("sinkCSV:noHeader", async () => {
1249+
const ldf = pl
1250+
.DataFrame([
1251+
pl.Series("column_1", [1, 2, 3], pl.Int64),
1252+
pl.Series("column_2", ["a", "b", "c"]),
1253+
])
1254+
.lazy();
1255+
ldf.sinkCSV("./test.csv", { includeHeader: false });
1256+
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { hasHeader: false });
1257+
const actualDf: pl.DataFrame = await ldf.collect();
1258+
expect(newDF.sort("column_1").toString()).toEqual(actualDf.toString());
1259+
fs.rmSync("./test.csv");
1260+
});
1261+
test("sinkCSV:separator", async () => {
1262+
const ldf = pl
1263+
.DataFrame([
1264+
pl.Series("foo", [1, 2, 3], pl.Int64),
1265+
pl.Series("bar", ["a", "b", "c"]),
1266+
])
1267+
.lazy();
1268+
ldf.sinkCSV("./test.csv", { separator: "|" });
1269+
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { sep: "|" });
1270+
const actualDf: pl.DataFrame = await ldf.collect();
1271+
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
1272+
fs.rmSync("./test.csv");
1273+
});
1274+
test("sinkCSV:nullValue", async () => {
1275+
const ldf = pl
1276+
.DataFrame([
1277+
pl.Series("foo", [1, 2, 3], pl.Int64),
1278+
pl.Series("bar", ["a", "b", null]),
1279+
])
1280+
.lazy();
1281+
ldf.sinkCSV("./test.csv", { nullValue: "BOOM" });
1282+
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { sep: "," });
1283+
const actualDf: pl.DataFrame = await (await ldf.collect()).withColumn(
1284+
pl.col("bar").fillNull("BOOM"),
1285+
);
1286+
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
1287+
fs.rmSync("./test.csv");
1288+
});
12341289
});

polars/lazy/dataframe.ts

+66-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from "../utils";
1212
import { _LazyGroupBy, LazyGroupBy } from "./groupby";
1313
import { Deserialize, GroupByOps, Serialize } from "../shared_traits";
14-
import { LazyOptions, LazyJoinOptions } from "../types";
14+
import { LazyOptions, LazyJoinOptions, SinkCsvOptions } from "../types";
1515
import { Series } from "../series";
1616

1717
const inspect = Symbol.for("nodejs.util.inspect.custom");
@@ -457,6 +457,63 @@ export interface LazyDataFrame extends Serialize, GroupByOps<LazyGroupBy> {
457457
* @see {@link DataFrame.withRowCount}
458458
*/
459459
withRowCount();
460+
/***
461+
*
462+
* Evaluate the query in streaming mode and write to a CSV file.
463+
464+
.. warning::
465+
Streaming mode is considered **unstable**. It may be changed
466+
at any point without it being considered a breaking change.
467+
468+
This allows streaming results that are larger than RAM to be written to disk.
469+
470+
Parameters
471+
----------
472+
@param path - File path to which the file should be written.
473+
@param includeBom - Whether to include UTF-8 BOM in the CSV output.
474+
@param includeHeader - Whether to include header in the CSV output.
475+
@param separator - Separate CSV fields with this symbol.
476+
@param lineTerminator - String used to end each row.
477+
@param quoteChar - Byte to use as quoting character.
478+
@param batchSize - Number of rows that will be processed per thread. Default - 1024
479+
@param datetimeFormat - A format string, with the specifiers defined by the
480+
`chrono <https://docs.rs/chrono/latest/chrono/format/strftime/index.html>`_
481+
Rust crate. If no format specified, the default fractional-second
482+
precision is inferred from the maximum timeunit found in the frame's
483+
Datetime cols (if any).
484+
@param dateFormat - A format string, with the specifiers defined by the
485+
`chrono <https://docs.rs/chrono/latest/chrono/format/strftime/index.html>`_
486+
Rust crate.
487+
@param timeFormat A format string, with the specifiers defined by the
488+
`chrono <https://docs.rs/chrono/latest/chrono/format/strftime/index.html>`_
489+
Rust crate.
490+
@param floatPrecision - Number of decimal places to write, applied to both `Float32` and `Float64` datatypes.
491+
@param nullValue - A string representing null values (defaulting to the empty string).
492+
@param quoteStyle - Determines the quoting strategy used. : {'necessary', 'always', 'non_numeric', 'never'}
493+
- necessary (default): This puts quotes around fields only when necessary.
494+
They are necessary when fields contain a quote,
495+
delimiter or record terminator.
496+
Quotes are also necessary when writing an empty record
497+
(which is indistinguishable from a record with one empty field).
498+
This is the default.
499+
- always: This puts quotes around every field. Always.
500+
- never: This never puts quotes around fields, even if that results in
501+
invalid CSV data (e.g.: by not quoting strings containing the
502+
separator).
503+
- non_numeric: This puts quotes around all fields that are non-numeric.
504+
Namely, when writing a field that does not parse as a valid float
505+
or integer, then quotes will be used even if they aren`t strictly
506+
necessary.
507+
@param maintainOrder - Maintain the order in which data is processed.
508+
Setting this to `False` will be slightly faster.
509+
510+
Examples
511+
--------
512+
>>> const lf = pl.scanCsv("/path/to/my_larger_than_ram_file.csv")
513+
>>> lf.sinkCsv("out.csv")
514+
*/
515+
516+
sinkCSV(dest: string, options?: SinkCsvOptions): void;
460517
}
461518

462519
const prepareGroupbyInputs = (by) => {
@@ -899,6 +956,14 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
899956
withRowCount(name = "row_nr") {
900957
return _LazyDataFrame(_ldf.withRowCount(name));
901958
},
959+
sinkCSV(dest?, options = {}) {
960+
options.maintainOrder = options.maintainOrder ?? false;
961+
if (typeof dest === "string") {
962+
_ldf.sinkCsv(dest, options);
963+
} else {
964+
throw new TypeError("Expected a string destination");
965+
}
966+
},
902967
};
903968
};
904969

polars/types.ts

+19-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,25 @@ export interface WriteCsvOptions {
4646
includeHeader?: boolean;
4747
sep?: string;
4848
}
49-
49+
/**
50+
* Options for {@link LazyDataFrame.sinkCSV}
51+
* @category Options
52+
*/
53+
export interface SinkCsvOptions {
54+
includeHeader?: boolean;
55+
quote?: string;
56+
includeBom?: boolean;
57+
separator?: string;
58+
lineTerminator?: string;
59+
quoteChar?: string;
60+
batchSize?: number;
61+
datetimeFormat?: string;
62+
dateFormat?: string;
63+
timeFormat?: string;
64+
floatPrecision?: number;
65+
nullValue?: string;
66+
maintainOrder?: boolean;
67+
}
5068
/**
5169
* Options for {@link DataFrame.writeJSON}
5270
* @category Options

src/conversion.rs

+17
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,23 @@ pub struct WriteCsvOptions {
569569
pub quote: Option<String>,
570570
}
571571

572+
#[napi(object)]
573+
pub struct SinkCsvOptions {
574+
pub include_header: Option<bool>,
575+
pub quote: Option<String>,
576+
pub include_bom: Option<bool>,
577+
pub separator: Option<String>,
578+
pub line_terminator: Option<String>,
579+
pub quote_char: Option<String>,
580+
pub batch_size: Option<i64>,
581+
pub datetime_format: Option<String>,
582+
pub date_format: Option<String>,
583+
pub time_format: Option<String>,
584+
pub float_precision: Option<i64>,
585+
pub null_value: Option<String>,
586+
pub maintain_order: bool,
587+
}
588+
572589
#[napi(object)]
573590
pub struct Shape {
574591
pub height: i64,

src/lazy/dataframe.rs

+46
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use polars::prelude::{col, lit, ClosedWindow, CsvEncoding, DataFrame, Field, Joi
88
use polars_io::cloud::CloudOptions;
99
use polars_io::parquet::ParallelStrategy;
1010
use std::collections::HashMap;
11+
use std::path::PathBuf;
1112

1213
#[napi]
1314
#[repr(transparent)]
@@ -544,6 +545,51 @@ impl JsLazyFrame {
544545
pub fn unnest(&self, colss: Vec<String>) -> JsLazyFrame {
545546
self.ldf.clone().unnest(colss).into()
546547
}
548+
549+
#[napi(catch_unwind)]
550+
pub fn sink_csv(&self, path: String, options: SinkCsvOptions) -> napi::Result<()> {
551+
let quote_style = QuoteStyle::default();
552+
let null_value = options
553+
.null_value
554+
.unwrap_or(SerializeOptions::default().null);
555+
let float_precision: Option<usize> = options.float_precision.map(|fp| fp as usize);
556+
let separator = options.separator.unwrap_or(",".to_owned()).as_bytes()[0];
557+
let line_terminator = options.line_terminator.unwrap_or("\n".to_string());
558+
let quote_char = options.quote_char.unwrap_or("\"".to_owned()).as_bytes()[0];
559+
let date_format = options.date_format;
560+
let time_format = options.time_format;
561+
let datetime_format = options.datetime_format;
562+
563+
let serialize_options = SerializeOptions {
564+
date_format,
565+
time_format,
566+
datetime_format,
567+
float_precision,
568+
separator,
569+
quote_char,
570+
null: null_value,
571+
line_terminator,
572+
quote_style,
573+
};
574+
575+
let batch_size = options.batch_size.map(|bs| bs).unwrap_or(1024) as usize;
576+
let include_bom = options.include_bom.unwrap_or(false);
577+
let include_header = options.include_header.unwrap_or(true);
578+
let maintain_order = options.maintain_order;
579+
580+
let options = CsvWriterOptions {
581+
include_bom,
582+
include_header,
583+
maintain_order,
584+
batch_size,
585+
serialize_options,
586+
};
587+
588+
let path_buf: PathBuf = PathBuf::from(path);
589+
let ldf = self.ldf.clone().with_comm_subplan_elim(false);
590+
let _ = ldf.sink_csv(path_buf, options).map_err(JsPolarsErr::from);
591+
Ok(())
592+
}
547593
}
548594

549595
#[napi(object)]

0 commit comments

Comments
 (0)