@@ -673,59 +673,6 @@ gen_ili_data <- function(default_day_of_week = 1) {
673
673
as_epi_archive(compactify = TRUE )
674
674
}
675
675
676
- # ' Remove duplicate files from S3
677
- # '
678
- # ' Removes duplicate files from S3 by keeping only the earliest LastModified
679
- # ' file for each ETag. You can modify the logic of keep_df, if this doesn't suit
680
- # ' your needs.
681
- # '
682
- # ' @param bucket The name of the S3 bucket.
683
- # ' @param prefix The prefix of the files to remove duplicates from.
684
- # ' @param dry_run Whether to actually delete the files.
685
- # ' @param .progress Whether to show a progress bar.
686
- delete_duplicates_from_s3_by_etag <- function (bucket , prefix , dry_run = TRUE , .progress = TRUE ) {
687
- # Get a list of all new dataset snapshots from S3
688
- files_df <- aws.s3 :: get_bucket_df(bucket = bucket , prefix = prefix ) %> % as_tibble()
689
-
690
- # Create a list of all the files to keep by keeping the earliest timestamp file for each ETag
691
- keep_df <- files_df %> %
692
- group_by(ETag ) %> %
693
- slice_min(LastModified ) %> %
694
- ungroup()
695
-
696
- # Create a list of all the files to delete by taking the complement of keep_df
697
- delete_df <- files_df %> %
698
- anti_join(keep_df , by = " Key" )
699
-
700
- if (nrow(delete_df ) == 0 ) {
701
- return (invisible (delete_df ))
702
- }
703
-
704
- if (dry_run ) {
705
- cli :: cli_alert_info(" Would delete {nrow(delete_df)} files from {bucket} with prefix {prefix}" )
706
- print(delete_df )
707
- return (invisible (delete_df ))
708
- }
709
-
710
- # Delete
711
- delete_files_from_s3(bucket = bucket , keys = delete_df $ Key , .progress = .progress )
712
-
713
- return (invisible (delete_df ))
714
- }
715
-
716
- # ' Delete files from S3
717
- # '
718
- # ' Faster than aws.s3::delete_object, when there are many files to delete (thousands).
719
- # '
720
- # ' @param bucket The name of the S3 bucket.
721
- # ' @param keys The keys of the files to delete, as a character vector.
722
- # ' @param batch_size The number of files to delete in each batch.
723
- # ' @param .progress Whether to show a progress bar.
724
- delete_files_from_s3 <- function (keys , bucket , batch_size = 500 , .progress = TRUE ) {
725
- split(keys , ceiling(seq_along(keys ) / batch_size )) %> %
726
- purrr :: walk(~ aws.s3 :: delete_object(bucket = bucket , object = .x ), .progress = .progress )
727
- }
728
-
729
676
# ' Get the NHSN data archive from S3
730
677
# '
731
678
# ' If you want to avoid downloading the archive from S3 every time, you can
@@ -763,67 +710,3 @@ up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
763
710
mutate(time_value = time_value + 3 ) %> %
764
711
as_epi_archive(compactify = TRUE )
765
712
}
766
-
767
- MIN_TIMESTAMP <- as.POSIXct(" 2000-01-01 00:00:00S" , tz = " UTC" )
768
- MAX_TIMESTAMP <- as.POSIXct(" 2040-01-01 00:00:00S" , tz = " UTC" )
769
-
770
- # ' Get the last time a covidcast signal was updated.
771
- # '
772
- # ' @param source The source of the signal.
773
- # ' @param signal The signal of the signal.
774
- # ' @param geo_type The geo type of the signal.
775
- # ' @param missing_value The value to return if the signal is not found.
776
- # '
777
- # ' @return The last time the signal was updated in POSIXct format.
778
- get_covidcast_signal_last_update <- function (source , signal , geo_type , missing_value = MAX_TIMESTAMP ) {
779
- tryCatch(
780
- {
781
- pub_covidcast_meta() %> %
782
- filter(source == !! source , signal == !! signal , geo_type == !! geo_type ) %> %
783
- pull(last_update ) %> %
784
- as.POSIXct()
785
- },
786
- error = function (cond ) {
787
- return (missing_value )
788
- }
789
- )
790
- }
791
-
792
- # ' Get the last modified date of an S3 object
793
- # '
794
- # ' @param bucket The name of the S3 bucket.
795
- # ' @param key The key of the S3 object.
796
- # '
797
- # ' @return The last modified date of the S3 object in POSIXct format.
798
- get_s3_object_last_modified <- function (key , bucket , missing_value = MIN_TIMESTAMP ) {
799
- metadata <- suppressMessages(head_object(key , bucket = bucket ))
800
- if (! metadata ) {
801
- return (missing_value )
802
- }
803
- # Format looks like "Fri, 31 Jan 2025 22:01:16 GMT"
804
- attr(metadata , " last-modified" ) %> %
805
- str_replace_all(" GMT" , " " ) %> %
806
- as.POSIXct(format = " %a, %d %b %Y %H:%M:%S" , tz = " UTC" )
807
- }
808
-
809
- # ' Get the last updated date of a Socrata dataset
810
- # '
811
- # ' FYI: This hits a cache layer, which is only updated ~every 4 hours.
812
- # '
813
- # ' @param dataset_url The URL of the Socrata dataset.
814
- # '
815
- # ' @return The last updated date of the Socrata dataset in POSIXct format.
816
- get_socrata_updated_at <- function (dataset_url , missing_value = MAX_TIMESTAMP ) {
817
- tryCatch(
818
- {
819
- httr :: with_config(httr :: config(timeout = 5 ), httr :: RETRY(" GET" , dataset_url , times = 5 , pause_min = 5 , pause_cap = 5 )) %> %
820
- httr :: content() %> %
821
- # This field comes in as integer seconds since epoch, so we need to convert it.
822
- pluck(" rowsUpdatedAt" ) %> %
823
- as.POSIXct(origin = " 1970-01-01" , tz = " UTC" )
824
- },
825
- error = function (cond ) {
826
- return (missing_value )
827
- }
828
- )
829
- }
0 commit comments