Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 0 additions & 117 deletions R/aux_data_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -673,59 +673,6 @@ gen_ili_data <- function(default_day_of_week = 1) {
as_epi_archive(compactify = TRUE)
}

#' Remove duplicate files from S3
#'
#' Removes duplicate files from S3 by keeping only the earliest LastModified
#' file for each ETag. You can modify the logic of keep_df, if this doesn't suit
#' your needs.
#'
#' @param bucket The name of the S3 bucket.
#' @param prefix The prefix of the files to remove duplicates from.
#' @param dry_run Whether to actually delete the files.
#' @param .progress Whether to show a progress bar.
delete_duplicates_from_s3_by_etag <- function(bucket, prefix, dry_run = TRUE, .progress = TRUE) {
# Get a list of all new dataset snapshots from S3
files_df <- aws.s3::get_bucket_df(bucket = bucket, prefix = prefix) %>% as_tibble()

# Create a list of all the files to keep by keeping the earliest timestamp file for each ETag
keep_df <- files_df %>%
group_by(ETag) %>%
slice_min(LastModified) %>%
ungroup()

# Create a list of all the files to delete by taking the complement of keep_df
delete_df <- files_df %>%
anti_join(keep_df, by = "Key")

if (nrow(delete_df) == 0) {
return(invisible(delete_df))
}

if (dry_run) {
cli::cli_alert_info("Would delete {nrow(delete_df)} files from {bucket} with prefix {prefix}")
print(delete_df)
return(invisible(delete_df))
}

# Delete
delete_files_from_s3(bucket = bucket, keys = delete_df$Key, .progress = .progress)

return(invisible(delete_df))
}

#' Delete files from S3
#'
#' Faster than aws.s3::delete_object, when there are many files to delete (thousands).
#'
#' @param bucket The name of the S3 bucket.
#' @param keys The keys of the files to delete, as a character vector.
#' @param batch_size The number of files to delete in each batch.
#' @param .progress Whether to show a progress bar.
delete_files_from_s3 <- function(keys, bucket, batch_size = 500, .progress = TRUE) {
split(keys, ceiling(seq_along(keys) / batch_size)) %>%
purrr::walk(~ aws.s3::delete_object(bucket = bucket, object = .x), .progress = .progress)
}

#' Get the NHSN data archive from S3
#'
#' If you want to avoid downloading the archive from S3 every time, you can
Expand Down Expand Up @@ -763,67 +710,3 @@ up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
mutate(time_value = time_value + 3) %>%
as_epi_archive(compactify = TRUE)
}

MIN_TIMESTAMP <- as.POSIXct("2000-01-01 00:00:00S", tz = "UTC")
MAX_TIMESTAMP <- as.POSIXct("2040-01-01 00:00:00S", tz = "UTC")

#' Get the last time a covidcast signal was updated.
#'
#' @param source The source of the signal.
#' @param signal The signal of the signal.
#' @param geo_type The geo type of the signal.
#' @param missing_value The value to return if the signal is not found.
#'
#' @return The last time the signal was updated in POSIXct format.
get_covidcast_signal_last_update <- function(source, signal, geo_type, missing_value = MAX_TIMESTAMP) {
tryCatch(
{
pub_covidcast_meta() %>%
filter(source == !!source, signal == !!signal, geo_type == !!geo_type) %>%
pull(last_update) %>%
as.POSIXct()
},
error = function(cond) {
return(missing_value)
}
)
}

#' Get the last modified date of an S3 object
#'
#' @param bucket The name of the S3 bucket.
#' @param key The key of the S3 object.
#'
#' @return The last modified date of the S3 object in POSIXct format.
get_s3_object_last_modified <- function(key, bucket, missing_value = MIN_TIMESTAMP) {
metadata <- suppressMessages(head_object(key, bucket = bucket))
if (!metadata) {
return(missing_value)
}
# Format looks like "Fri, 31 Jan 2025 22:01:16 GMT"
attr(metadata, "last-modified") %>%
str_replace_all(" GMT", "") %>%
as.POSIXct(format = "%a, %d %b %Y %H:%M:%S", tz = "UTC")
}

#' Get the last updated date of a Socrata dataset
#'
#' FYI: This hits a cache layer, which is only updated ~every 4 hours.
#'
#' @param dataset_url The URL of the Socrata dataset.
#'
#' @return The last updated date of the Socrata dataset in POSIXct format.
get_socrata_updated_at <- function(dataset_url, missing_value = MAX_TIMESTAMP) {
tryCatch(
{
httr::with_config(httr::config(timeout = 5), httr::RETRY("GET", dataset_url, times = 5, pause_min = 5, pause_cap = 5)) %>%
httr::content() %>%
# This field comes in as integer seconds since epoch, so we need to convert it.
pluck("rowsUpdatedAt") %>%
as.POSIXct(origin = "1970-01-01", tz = "UTC")
},
error = function(cond) {
return(missing_value)
}
)
}
118 changes: 118 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -607,3 +607,121 @@ validate_epi_data <- function(epi_data) {
get_bucket_df_delphi <- function(prefix = "", bucket = "forecasting-team-data", ...) {
aws.s3::get_bucket_df(prefix = prefix, bucket = bucket, ...) %>% tibble()
}

#' Remove duplicate files from S3
#'
#' Removes duplicate files from S3 by keeping only the earliest LastModified
#' file for each ETag. You can modify the logic of keep_df, if this doesn't suit
#' your needs.
#'
#' @param bucket The name of the S3 bucket.
#' @param prefix The prefix of the files to remove duplicates from.
#' @param dry_run Whether to actually delete the files.
#' @param .progress Whether to show a progress bar.
delete_duplicates_from_s3_by_etag <- function(bucket, prefix, dry_run = TRUE, .progress = TRUE) {
# Get a list of all new dataset snapshots from S3
files_df <- aws.s3::get_bucket_df(bucket = bucket, prefix = prefix) %>% as_tibble()

# Create a list of all the files to keep by keeping the earliest timestamp file for each ETag
keep_df <- files_df %>%
group_by(ETag) %>%
slice_min(LastModified) %>%
ungroup()

# Create a list of all the files to delete by taking the complement of keep_df
delete_df <- files_df %>%
anti_join(keep_df, by = "Key")

if (nrow(delete_df) == 0) {
return(invisible(delete_df))
}

if (dry_run) {
cli::cli_alert_info("Would delete {nrow(delete_df)} files from {bucket} with prefix {prefix}")
print(delete_df)
return(invisible(delete_df))
}

# Delete
delete_files_from_s3(bucket = bucket, keys = delete_df$Key, .progress = .progress)

return(invisible(delete_df))
}

#' Delete files from S3
#'
#' Faster than aws.s3::delete_object, when there are many files to delete (thousands).
#'
#' @param bucket The name of the S3 bucket.
#' @param keys The keys of the files to delete, as a character vector.
#' @param batch_size The number of files to delete in each batch.
#' @param .progress Whether to show a progress bar.
delete_files_from_s3 <- function(keys, bucket, batch_size = 500, .progress = TRUE) {
split(keys, ceiling(seq_along(keys) / batch_size)) %>%
purrr::walk(~ aws.s3::delete_object(bucket = bucket, object = .x), .progress = .progress)
}


MIN_TIMESTAMP <- as.POSIXct("2000-01-01 00:00:00S", tz = "UTC")
MAX_TIMESTAMP <- as.POSIXct("2040-01-01 00:00:00S", tz = "UTC")

#' Get the last time a covidcast signal was updated.
#'
#' @param source The source of the signal.
#' @param signal The signal of the signal.
#' @param geo_type The geo type of the signal.
#' @param missing_value The value to return if the signal is not found.
#'
#' @return The last time the signal was updated in POSIXct format.
get_covidcast_signal_last_update <- function(source, signal, geo_type, missing_value = MAX_TIMESTAMP) {
tryCatch(
{
pub_covidcast_meta() %>%
filter(source == !!source, signal == !!signal, geo_type == !!geo_type) %>%
pull(last_update) %>%
as.POSIXct()
},
error = function(cond) {
return(missing_value)
}
)
}

#' Get the last modified date of an S3 object
#'
#' @param bucket The name of the S3 bucket.
#' @param key The key of the S3 object.
#'
#' @return The last modified date of the S3 object in POSIXct format.
get_s3_object_last_modified <- function(key, bucket, missing_value = MIN_TIMESTAMP) {
metadata <- suppressMessages(head_object(key, bucket = bucket))
if (!metadata) {
return(missing_value)
}
# Format looks like "Fri, 31 Jan 2025 22:01:16 GMT"
attr(metadata, "last-modified") %>%
str_replace_all(" GMT", "") %>%
as.POSIXct(format = "%a, %d %b %Y %H:%M:%S", tz = "UTC")
}

#' Get the last updated date of a Socrata dataset
#'
#' FYI: This hits a cache layer, which is only updated ~every 4 hours.
#'
#' @param dataset_url The URL of the Socrata dataset.
#'
#' @return The last updated date of the Socrata dataset in POSIXct format.
get_socrata_updated_at <- function(dataset_url, missing_value = MAX_TIMESTAMP) {
tryCatch(
{
httr::with_config(httr::config(timeout = 5), httr::RETRY("GET", dataset_url, times = 5, pause_min = 5, pause_cap = 5)) %>%
httr::content() %>%
# This field comes in as integer seconds since epoch, so we need to convert it.
pluck("rowsUpdatedAt") %>%
as.POSIXct(origin = "1970-01-01", tz = "UTC")
},
error = function(cond) {
return(missing_value)
}
)
}
Loading
Loading