Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
- New `append_parquet()` function to append a data frame to an
existing Parquet file.

- New `col_select` argument for `read_parquet()` to read a subset of
columns from a Parquet file.

- `write_parquet()` can now write multiple row groups. By default it puts
at most 10 million rows into a single row group. You can choose the
row groups manually with the `row_groups` argument.
Expand Down
16 changes: 13 additions & 3 deletions R/arrow-schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ parquet_arrow_metadata <- function(file) {
parse_arrow_schema(amd)
}

apply_arrow_schema <- function(tab, file, dicts, types) {
apply_arrow_schema <- function(tab, file, dicts, types, col_select) {
# we don't need to recursively deal with this here, we only need the
# key-value metadata, so we could read just that (TODO)
opt <- options(nanoparquet.use_arrow_metadata = FALSE)
on.exit(options(opt), add = TRUE)
mtd <- read_parquet_metadata(file)

kv <- mtd$file_meta_data$key_value_metadata[[1]]
if ("ARROW:schema" %in% kv$key) {
spec <- arrow_find_special(
kv$value[match("ARROW:schema", kv$key)],
file
file,
col_select
)
for (idx in spec$factor) {
clevels <- Reduce(union, dicts[[idx]])
Expand All @@ -38,7 +44,7 @@ apply_arrow_schema <- function(tab, file, dicts, types) {
tab
}

arrow_find_special <- function(asch, file) {
arrow_find_special <- function(asch, file, col_select = NULL) {
amd <- tryCatch(
parse_arrow_schema(asch)$columns,
error = function(e) {
Expand All @@ -52,6 +58,10 @@ arrow_find_special <- function(asch, file) {
if (is.null(amd)) {
return(list())
}
# Subset of columns?
if (!is.null(col_select)) {
amd <- amd[col_select, , drop = FALSE]
}
# If the type is Utf8 and it is a dictionary, then it is a factor
fct <- which(
amd$type_type == "Utf8" & !vapply(amd$dictionary, is.null, logical(1))
Expand Down
2 changes: 1 addition & 1 deletion R/parquet-column-types.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ parquet_column_types_file <- function(file, options) {
add_r_type_to_schema(mtd, sch, options)
}

add_r_type_to_schema <- function(mtd, sch, options) {
add_r_type_to_schema <- function(mtd, sch, options, col_select = NULL) {
kv <- mtd$file_meta_data$key_value_metadata[[1]]

type_map <- c(
Expand Down
24 changes: 18 additions & 6 deletions R/read-parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#' deletes it. The connection might be open, it which case it must be
#' a binary connection. If it is not open, then `read_parquet()` will
#' open it and also close it in the end.
#' @param col_select Columns to read. It can be a numeric vector of column
#' indices.
#' @param options Nanoparquet options, see [parquet_options()].
#' @return A `data.frame` with the file's contents.
#' @export
Expand All @@ -21,24 +23,34 @@
#' parquet_df <- nanoparquet::read_parquet(file_name)
#' print(str(parquet_df))

read_parquet <- function(file, options = parquet_options()) {
read_parquet <- function(file, col_select = NULL,
options = parquet_options()) {
if (inherits(file, "connection")) {
tmp <- tempfile(fileext = ".parquet")
on.exit(unlink(tmp), add = TRUE)
dump_connection(file, tmp)
file <- tmp
}
file <- path.expand(file)
res <- .Call(nanoparquet_read2, file, options, sys.call())
post_process_read_result(res, file, options)

if (!is.null(col_select)) {
stopifnot(is.numeric(col_select))
col_select <- as.integer(col_select)
col_select <- col_select[!is.na(col_select)]
col_select <- unique(col_select)
stopifnot(all(col_select >= 1L))
}

res <- .Call(nanoparquet_read2, file, options, col_select, sys.call())
post_process_read_result(res, file, options, col_select)
}

post_process_read_result <- function(res, file, options) {
post_process_read_result <- function(res, file, options, col_select) {
dicts <- res[[2]]
types <- res[[3]]
res <- res[[1]]
if (options[["use_arrow_metadata"]]) {
res <- apply_arrow_schema(res, file, dicts, types)
res <- apply_arrow_schema(res, file, dicts, types, col_select)
}

# convert hms from milliseconds to seconds, also integer -> double
Expand Down Expand Up @@ -92,7 +104,7 @@ read_parquet_row_group <- function(file, row_group,
file <- path.expand(file)
res <- .Call(nanoparquet_read_row_group, file, row_group,
options, sys.call())
post_process_read_result(res, file, options)
post_process_read_result(res, file, options, col_select = NULL)
}

# TODO: this does not work currently
Expand Down
5 changes: 4 additions & 1 deletion man/read_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading