Skip to content

Commit 6793fe5

Browse files
authored
Adding cloud options to scan_parquet (#173)
* Adding cloud options to scan_parquet * Removing extra function * Removing extra , * Removing duplicated option
1 parent 65581cd commit 6793fe5

File tree

5 files changed

+129
-23
lines changed

5 files changed

+129
-23
lines changed

Cargo.toml

+67
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,73 @@ features = [
9393
"cov",
9494
"group_by_list",
9595
"sql",
96+
"binary_encoding",
97+
"rolling_window",
98+
"json",
99+
"dynamic_group_by",
100+
"zip_with",
101+
"simd",
102+
"lazy",
103+
"strings",
104+
"temporal",
105+
"random",
106+
"object",
107+
"fmt",
108+
"performant",
109+
"dtype-full",
110+
"rows",
111+
"round_series",
112+
"is_unique",
113+
"is_in",
114+
"is_first_distinct",
115+
"asof_join",
116+
"cross_join",
117+
"dot_product",
118+
"concat_str",
119+
"row_hash",
120+
"reinterpret",
121+
"mode",
122+
"extract_jsonpath",
123+
"cum_agg",
124+
"rolling_window",
125+
"repeat_by",
126+
"interpolate",
127+
"ewma",
128+
"rank",
129+
"propagate_nans",
130+
"diff",
131+
"pct_change",
132+
"moment",
133+
"diagonal_concat",
134+
"abs",
135+
"dot_diagram",
136+
"dataframe_arithmetic",
137+
"json",
138+
"string_encoding",
139+
"product",
140+
"ndarray",
141+
"unique_counts",
142+
"log",
143+
"serde-lazy",
144+
"partition_by",
145+
"pivot",
146+
"semi_anti_join",
147+
"parquet",
148+
"to_dummies",
149+
"ipc",
150+
"avro",
151+
"list_eval",
152+
"arg_where",
153+
"timezones",
154+
"peaks",
155+
"string_pad",
156+
"cov",
157+
"group_by_list",
158+
"http",
159+
"cloud",
160+
"aws",
161+
"gcp",
162+
"azure"
96163
]
97164
git = "https://github.com/pola-rs/polars.git"
98165
rev = "3cf4897e679b056d17a235d48867035265d43cdc"

__tests__/io.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ describe("parquet", () => {
271271
});
272272

273273
test("scan:options", () => {
274-
const df = pl.scanParquet(parquetpath, { numRows: 4 }).collectSync();
274+
const df = pl.scanParquet(parquetpath, { nRows: 4 }).collectSync();
275275
expect(df.shape).toEqual({ height: 4, width: 4 });
276276
});
277277
});

bun.lockb

129 KB
Binary file not shown.

polars/io.ts

+40-19
Original file line numberDiff line numberDiff line change
@@ -476,33 +476,54 @@ interface RowCount {
476476
}
477477

478478
interface ScanParquetOptions {
479-
columns?: string[] | number[];
480-
numRows?: number;
479+
nRows?: number;
480+
cache?: boolean;
481481
parallel?: "auto" | "columns" | "row_groups" | "none";
482482
rowCount?: RowCount;
483-
cache?: boolean;
484483
rechunk?: boolean;
485-
hive_partitioning?: boolean;
484+
lowMemory?: boolean;
485+
useStatistics?: boolean;
486+
hivePartitioning?: boolean;
487+
cloudOptions?: Map<string, string>;
488+
retries?: number;
486489
}
487490

488491
/**
489-
* __Lazily read from a parquet file or multiple files via glob patterns.__
490-
* ___
492+
* Lazily read from a local or cloud-hosted parquet file (or files).
493+
494+
This function allows the query optimizer to push down predicates and projections to
495+
the scan level, typically increasing performance and reducing memory overhead.
496+
491497
* This allows the query optimizer to push down predicates and projections to the scan level,
492-
* thereby potentially reducing memory overhead.
493-
* @param path Path to a file or or glob pattern
494-
* @param options.numRows Stop reading from parquet file after reading ``numRows``.
495-
* @param options.cache Cache the result after reading.
496-
* @param options.parallel Read the parquet file in parallel. The single threaded reader consumes less memory.
497-
* @param options.rechunk In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks.
498+
* thereby potentially reducing memory overhead.
499+
* @param source - Path(s) to a file. If a single path is given, it can be a globbing pattern.
500+
@param options.nRows - Stop reading from parquet file after reading `n_rows`.
501+
@param options.rowIndexName - If not None, this will insert a row index column with the given name into the DataFrame
502+
@param options.rowIndexOffset - Offset to start the row index column (only used if the name is set)
503+
@param options.parallel : {'auto', 'columns', 'row_groups', 'none'}
504+
This determines the direction of parallelism. 'auto' will try to determine the optimal direction.
505+
@param options.useStatistics - Use statistics in the parquet to determine if pages can be skipped from reading.
506+
@param options.hivePartitioning - Infer statistics and schema from hive partitioned URL and use them to prune reads.
507+
@param options.rechunk - In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks.
508+
@param options.lowMemory - Reduce memory pressure at the expense of performance.
509+
@param options.cache - Cache the result after reading.
510+
@param options.storageOptions - Options that indicate how to connect to a cloud provider.
511+
If the cloud provider is not supported by Polars, the storage options are passed to `fsspec.open()`.
512+
513+
The cloud providers currently supported are AWS, GCP, and Azure.
514+
See supported keys here:
515+
516+
* `aws <https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html>`_
517+
* `gcp <https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html>`_
518+
* `azure <https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html>`_
519+
520+
If `storage_options` is not provided, Polars will try to infer the information from environment variables.
521+
@param retries - Number of retries if accessing a cloud instance fails.
498522
*/
499-
export function scanParquet(path: string, options: ScanParquetOptions = {}) {
500-
const pliOptions: any = {};
501-
502-
pliOptions.nRows = options?.numRows;
503-
pliOptions.rowCount = options?.rowCount;
504-
pliOptions.parallel = options?.parallel ?? "auto";
505-
return _LazyDataFrame(pli.scanParquet(path, pliOptions));
523+
export function scanParquet(source: string, options: ScanParquetOptions = {}) {
524+
const defaultOptions = { parallel: "auto" };
525+
const pliOptions = { ...defaultOptions, ...options };
526+
return _LazyDataFrame(pli.scanParquet(source, pliOptions));
506527
}
507528

508529
export interface ReadIPCOptions {

src/lazy/dataframe.rs

+21-3
Original file line numberDiff line numberDiff line change
@@ -709,11 +709,11 @@ pub struct ScanParquetOptions {
709709
pub parallel: Wrap<ParallelStrategy>,
710710
pub row_count: Option<JsRowCount>,
711711
pub rechunk: Option<bool>,
712-
pub row_count_name: Option<String>,
713-
pub row_count_offset: Option<u32>,
714712
pub low_memory: Option<bool>,
715713
pub use_statistics: Option<bool>,
716714
pub hive_partitioning: Option<bool>,
715+
pub cloud_options: Option<HashMap::<String, String>>,
716+
pub retries: Option<i64>,
717717
}
718718

719719
#[napi(catch_unwind)]
@@ -725,7 +725,25 @@ pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<J
725725
let rechunk = options.rechunk.unwrap_or(false);
726726
let low_memory = options.low_memory.unwrap_or(false);
727727
let use_statistics = options.use_statistics.unwrap_or(false);
728-
let cloud_options = Some(CloudOptions::default());
728+
729+
let mut cloud_options: Option<CloudOptions> = if let Some(o) = options.cloud_options {
730+
let co: Vec<(String, String)> = o.into_iter().map(|kv: (String, String)| kv).collect();
731+
Some(CloudOptions::from_untyped_config(&path, co).map_err(JsPolarsErr::from)?)
732+
} else {
733+
None
734+
};
735+
736+
let retries = options.retries.unwrap_or_else(|| 2) as usize;
737+
if retries > 0 {
738+
cloud_options =
739+
cloud_options
740+
.or_else(|| Some(CloudOptions::default()))
741+
.map(|mut options| {
742+
options.max_retries = retries;
743+
options
744+
});
745+
}
746+
729747
let hive_partitioning: bool = options.hive_partitioning.unwrap_or(false);
730748
let args = ScanArgsParquet {
731749
n_rows,

0 commit comments

Comments
 (0)