diff --git a/Cargo.lock b/Cargo.lock index 3c36092..a262b33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,20 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "ahash" -version = "0.8.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" -dependencies = [ - "cfg-if", - "getrandom", - "once_cell", - "serde", - "version_check", - "zerocopy", -] - [[package]] name = "android_system_properties" version = "0.1.5" @@ -33,9 +19,12 @@ checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "arc-swap" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +checksum = "51d03449bb8ca2cc2ef70869af31463d1ae5ccc8fa3e334b307203fbf815207e" +dependencies = [ + "rustversion", +] [[package]] name = "autocfg" @@ -51,15 +40,15 @@ checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" [[package]] name = "bumpalo" -version = "3.19.0" +version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" [[package]] name = "cc" -version = "1.2.48" +version = "1.2.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c481bdbf0ed3b892f6f806287d72acd515b352a4ec27a208489b8c1bc839633a" +checksum = "755d2fce177175ffca841e9a06afdb2c4ab0f593d53b4dee48147dfaade85932" dependencies = [ "find-msvc-tools", "shlex", @@ -73,9 +62,9 @@ checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" [[package]] name = "chrono" -version = "0.4.42" +version = "0.4.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" +checksum = "fac4744fb15ae8337dc853fee7fb3f4e48c0fbaa23d0afe49c447b4fab126118" dependencies = [ "iana-time-zone", "num-traits", @@ -90,21 +79,22 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "derive_more" -version = "2.0.1" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +checksum = "d751e9e49156b02b44f9c1815bcb94b984cdcc4396ecc32521c739452808b134" dependencies = [ "derive_more-impl", ] [[package]] name = "derive_more-impl" -version = "2.0.1" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +checksum = "799a97264921d8623a957f6c3b9011f3b5492f557bbb7a5a19b7fa6d06ba8dcb" dependencies = [ "proc-macro2", "quote", + "rustc_version", "syn", "unicode-xid", ] @@ -123,9 +113,9 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "find-msvc-tools" -version = "0.1.5" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" +checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" [[package]] name = "fnv" @@ -183,9 +173,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.12.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", "hashbrown", @@ -193,15 +183,15 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.15" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" [[package]] name = "js-sys" -version = "0.3.83" +version = "0.3.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" +checksum = "8c942ebf8e95485ca0d52d97da7c5a2c387d0e7f0ba4c35e93bfcaee045955b3" dependencies = [ "once_cell", "wasm-bindgen", @@ -209,9 +199,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.177" +version = "0.2.180" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" [[package]] name = "lock_api" @@ -224,11 +214,11 @@ dependencies = [ [[package]] name = "log" -version = "0.4.28" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" dependencies = [ - "serde", + "serde_core", ] [[package]] @@ -329,10 +319,10 @@ dependencies = [ name = "pdslib" version = "0.3.0" dependencies = [ - "ahash", "anyhow", "log", "log4rs", + "rustc-hash", "serde", "thiserror", ] @@ -348,18 +338,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.103" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" +checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.42" +version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a" dependencies = [ "proc-macro2", ] @@ -392,9 +382,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.9.3" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" dependencies = [ "getrandom", ] @@ -408,6 +398,21 @@ dependencies = [ "bitflags", ] +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -416,9 +421,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "ryu" -version = "1.0.20" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" [[package]] name = "scopeguard" @@ -426,6 +431,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "serde" version = "1.0.228" @@ -468,15 +479,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.145" +version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ "itoa", "memchr", - "ryu", "serde", "serde_core", + "zmij", ] [[package]] @@ -506,9 +517,9 @@ checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] name = "syn" -version = "2.0.111" +version = "2.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87" +checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a" dependencies = [ "proc-macro2", "quote", @@ -537,9 +548,9 @@ dependencies = [ [[package]] name = "thread-id" -version = "5.0.0" +version = "5.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99043e46c5a15af379c06add30d9c93a6c0e8849de00d244c4a2c417da128d80" +checksum = "2010d27add3f3240c1fef7959f46c814487b216baee662af53be645ba7831c07" dependencies = [ "libc", "windows-sys", @@ -587,26 +598,20 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" -[[package]] -name = "version_check" -version = "0.9.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" - [[package]] name = "wasip2" -version = "1.0.1+wasi-0.2.4" +version = "1.0.2+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" dependencies = [ "wit-bindgen", ] [[package]] name = "wasm-bindgen" -version = "0.2.106" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" +checksum = "64024a30ec1e37399cf85a7ffefebdb72205ca1c972291c51512360d90bd8566" dependencies = [ "cfg-if", "once_cell", @@ -617,9 +622,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.106" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" +checksum = "008b239d9c740232e71bd39e8ef6429d27097518b6b30bdf9086833bd5b6d608" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -627,9 +632,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.106" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" +checksum = "5256bae2d58f54820e6490f9839c49780dff84c65aeab9e772f15d5f0e913a55" dependencies = [ "bumpalo", "proc-macro2", @@ -640,9 +645,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.106" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" +checksum = "1f01b580c9ac74c8d8f0c0e4afb04eeef2acf145458e52c03845ee9cd23e3d12" dependencies = [ "unicode-ident", ] @@ -730,99 +735,41 @@ dependencies = [ [[package]] name = "windows-sys" -version = "0.59.0" +version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" dependencies = [ - "windows-targets", -] - -[[package]] -name = "windows-targets" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" -dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows-link", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" - -[[package]] -name = "windows_i686_gnu" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" - -[[package]] -name = "windows_i686_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" - -[[package]] -name = "windows_i686_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" - [[package]] name = "wit-bindgen" -version = "0.46.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" [[package]] name = "zerocopy" -version = "0.8.30" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea879c944afe8a2b25fef16bb4ba234f47c694565e97383b36f3a878219065c" +checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.30" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf955aa904d6040f70dc8e9384444cb1030aed272ba3cb09bbc4ab9e7c1f34f5" +checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1" dependencies = [ "proc-macro2", "quote", "syn", ] + +[[package]] +name = "zmij" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd8f3f50b848df28f887acb68e41201b5aea6bc8a8dacc00fb40635ff9a72fea" diff --git a/Cargo.toml b/Cargo.toml index dcdcbbe..7e43c15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,15 +10,15 @@ crate-type = ["cdylib", "lib"] [features] default = [] -experimental = [] # Experimental algorithms and APIs -ahash = ["dep:ahash"] # Use ahash for HashMap and HashSet +experimental = [] # Experimental algorithms and APIs +fxhash = ["dep:rustc-hash"] # Use fxhash for HashMap and HashSet [dependencies] thiserror = "2.0" anyhow = "1.0" log = "0.4" serde = { version = "1.0", features = ["derive"] } -ahash = { version = "0.8", features = ["serde"], optional = true } +rustc-hash = { version = "2.1.1", optional = true } [dev-dependencies] log4rs = "1.4" diff --git a/src/actions/hashmap_action_storage.rs b/src/actions/hashmap_action_storage.rs new file mode 100644 index 0000000..954ceb2 --- /dev/null +++ b/src/actions/hashmap_action_storage.rs @@ -0,0 +1,104 @@ +use std::{fmt::Debug, hash::BuildHasher}; + +use crate::{ + actions::traits::{ActionId, ActionStorage}, + events::traits::{EpochId, Uri}, + util::hashmap::{HashMap, HashSet, RandomState}, +}; + +#[derive(Debug, Clone)] +pub struct UserActionState { + pub accessed_sites: HashMap, S>, +} + +impl Default + for UserActionState +{ + fn default() -> Self { + Self { + accessed_sites: HashMap::with_hasher(S::default()), + } + } +} + +#[derive(Debug)] +pub struct HashMapActionStorage< + AID: ActionId, + E: EpochId, + U: Uri, + S: BuildHasher = RandomState, +> { + pub actions: HashMap, S>, + pub quota_limit: Option, +} + +impl Default + for HashMapActionStorage +{ + fn default() -> Self { + Self { + actions: HashMap::with_hasher(S::default()), + quota_limit: None, + } + } +} + +impl + HashMapActionStorage +{ + pub fn new(quota_limit: Option) -> Self { + Self::with_hashmap_capacity(quota_limit, 0) + } + + pub fn with_hashmap_capacity( + quota_limit: Option, + hashmap_capacity: usize, + ) -> Self { + Self { + actions: HashMap::with_capacity_and_hasher( + hashmap_capacity, + S::default(), + ), + quota_limit, + } + } +} + +impl ActionStorage for HashMapActionStorage +where + AID: ActionId, + E: EpochId, + U: Uri, + S: BuildHasher + Default, +{ + type ActionId = AID; + type EpochId = E; + type Uri = U; + type Error = anyhow::Error; + + fn try_record_site( + &mut self, + action_id: Self::ActionId, + epoch: Self::EpochId, + site: &Self::Uri, + ) -> Result { + let state = self.actions.entry(action_id).or_default(); + let epoch_sites = state + .accessed_sites + .entry(epoch) + .or_insert_with(|| HashSet::with_hasher(S::default())); + + if epoch_sites.contains(site) { + return Ok(true); + } + + if let Some(quota_limit) = self.quota_limit + && epoch_sites.len() >= quota_limit + { + return Ok(false); + } + + epoch_sites.insert(site.clone()); + Ok(true) + } +} diff --git a/src/actions/mod.rs b/src/actions/mod.rs new file mode 100644 index 0000000..0d3debf --- /dev/null +++ b/src/actions/mod.rs @@ -0,0 +1,2 @@ +pub mod hashmap_action_storage; +pub mod traits; diff --git a/src/actions/traits.rs b/src/actions/traits.rs new file mode 100644 index 0000000..8cf0695 --- /dev/null +++ b/src/actions/traits.rs @@ -0,0 +1,24 @@ +use std::{fmt::Debug, hash::Hash}; + +use crate::events::traits::{EpochId, Uri}; + +/// Marker trait for user-action context identifiers +pub trait ActionId: Copy + Eq + Hash + Debug {} +impl ActionId for T {} + +pub trait ActionStorage { + type ActionId: ActionId; + type EpochId: EpochId; + type Uri: Uri; + type Error: Debug; + + /// Checks if a site can be recorded for the given action and epoch. + /// Returns Ok(true) if allowed (and records it), Ok(false) if + /// quota exceeded. + fn try_record_site( + &mut self, + action_id: Self::ActionId, + epoch: Self::EpochId, + site: &Self::Uri, + ) -> Result; +} diff --git a/src/budget/hashmap_filter_storage.rs b/src/budget/hashmap_filter_storage.rs index 67d192a..e142c60 100644 --- a/src/budget/hashmap_filter_storage.rs +++ b/src/budget/hashmap_filter_storage.rs @@ -1,33 +1,54 @@ -use std::{fmt::Debug, hash::Hash}; +use std::{ + collections::hash_map::Entry, + fmt::Debug, + hash::{BuildHasher, Hash}, +}; -use serde::{ser::SerializeStruct, Serialize}; +use serde::{Serialize, ser::SerializeStruct}; use crate::{ budget::traits::{Filter, FilterCapacities, FilterStorage}, - util::hashmap::HashMap, + util::hashmap::{HashMap, RandomState}, }; /// Simple implementation of FilterStorage using a HashMap. /// Works for any Filter that implements the Filter trait. -#[derive(Debug, Default)] -pub struct HashMapFilterStorage +#[derive(Debug)] +pub struct HashMapFilterStorage where C: FilterCapacities, F: Filter, + S: BuildHasher + Default, { pub capacities: C, - pub filters: HashMap, + pub filters: HashMap, +} + +impl Default for HashMapFilterStorage +where + C: FilterCapacities + Default, + F: Filter, + S: BuildHasher + Default, +{ + fn default() -> Self { + Self { + capacities: C::default(), + filters: HashMap::with_hasher(S::default()), + } + } } -impl Serialize for HashMapFilterStorage +impl Serialize for HashMapFilterStorage where C: FilterCapacities + Serialize, F: Filter + Serialize, FID: Serialize + Eq + Hash + Debug, + S: BuildHasher + Default, + HashMap: Serialize, { - fn serialize(&self, serializer: S) -> Result + fn serialize(&self, serializer: Ser) -> Result where - S: serde::Serializer, + Ser: serde::Serializer, { let mut state = serializer.serialize_struct("HashMapFilterStorage", 2)?; @@ -37,11 +58,32 @@ where } } -impl FilterStorage for HashMapFilterStorage +impl HashMapFilterStorage +where + C: FilterCapacities, + F: Filter, + S: BuildHasher + Default, +{ + pub fn with_hashmap_capacity( + capacities: C, + hashmap_capacity: usize, + ) -> Self { + Self { + capacities, + filters: HashMap::with_capacity_and_hasher( + hashmap_capacity, + S::default(), + ), + } + } +} + +impl FilterStorage for HashMapFilterStorage where F: Filter + Clone, C: FilterCapacities, C::FilterId: Clone + Eq + Hash + Debug, + S: BuildHasher + Default, { type FilterId = C::FilterId; type Filter = F; @@ -53,11 +95,7 @@ where where Self: Sized, { - let this = Self { - capacities, - filters: HashMap::new(), - }; - Ok(this) + Ok(Self::with_hashmap_capacity(capacities, 0)) } fn capacities(&self) -> &Self::Capacities { @@ -80,6 +118,25 @@ where self.filters.insert(filter_id.clone(), filter); Ok(()) } + + /// Optimized version using Entry API + fn edit_filter_or_new( + &mut self, + filter_id: &Self::FilterId, + f: impl FnOnce(&mut Self::Filter) -> Result, + ) -> Result { + let entry = self.filters.entry(filter_id.clone()); + + let filter = match entry { + Entry::Occupied(occupied) => occupied.into_mut(), + Entry::Vacant(vacant) => { + let capacity = self.capacities.capacity(filter_id)?; + vacant.insert(Self::Filter::new(capacity)?) + } + }; + + f(filter) + } } #[cfg(test)] diff --git a/src/budget/pure_dp_filter.rs b/src/budget/pure_dp_filter.rs index 1b8efbc..cba2066 100644 --- a/src/budget/pure_dp_filter.rs +++ b/src/budget/pure_dp_filter.rs @@ -67,7 +67,10 @@ impl Filter for PureDPBudgetFilter { &mut self, budget: &PureDPBudget, ) -> Result { - debug!("The budget consumed in this epoch is {:?}, budget capacity for this epoch is {:?}, and we need to consume this much budget {:?}", self.consumed, self.capacity, budget); + debug!( + "The budget consumed in this epoch is {:?}, budget capacity for this epoch is {:?}, and we need to consume this much budget {:?}", + self.consumed, self.capacity, budget + ); let status = self.can_consume(budget)?; if status == FilterStatus::Continue { diff --git a/src/budget/traits.rs b/src/budget/traits.rs index 985d238..604b7a8 100644 --- a/src/budget/traits.rs +++ b/src/budget/traits.rs @@ -65,10 +65,10 @@ pub trait FilterStorage { type Budget: Budget; type Filter: Filter; type Capacities: FilterCapacities< - FilterId = Self::FilterId, - Budget = Self::Budget, - Error = Self::Error, - >; + FilterId = Self::FilterId, + Budget = Self::Budget, + Error = Self::Error, + >; type Error; /// Create a new filter storage with the given capacities for new filters. @@ -144,10 +144,7 @@ pub trait FilterStorage { filter_id: &Self::FilterId, budget: &Self::Budget, ) -> Result { - let mut filter = self.get_filter_or_new(filter_id)?; - let status = filter.try_consume(budget)?; - self.set_filter(filter_id, filter)?; - Ok(status) + self.edit_filter_or_new(filter_id, |filter| filter.try_consume(budget)) } /// Gets the remaining budget for a filter. diff --git a/src/events/hashmap_event_storage.rs b/src/events/hashmap_event_storage.rs index 025cea1..e6c8a30 100644 --- a/src/events/hashmap_event_storage.rs +++ b/src/events/hashmap_event_storage.rs @@ -1,29 +1,50 @@ +use std::hash::BuildHasher; + use crate::{ events::traits::{Event, EventStorage, RelevantEventSelector}, - util::hashmap::HashMap, + util::hashmap::{HashMap, RandomState}, }; /// A simple in-memory event storage. Stores a mapping of epoch id to epoch /// events, where each epoch events is just a vec of events. /// Clones events when asked to retrieve events for an epoch. -#[derive(Debug, Default)] -pub struct HashMapEventStorage { - pub epochs: HashMap>, +#[derive(Debug)] +pub struct HashMapEventStorage +where + S: BuildHasher, +{ + pub epochs: HashMap, S>, +} + +impl Default for HashMapEventStorage { + fn default() -> Self { + Self { + epochs: HashMap::with_hasher(S::default()), + } + } } /// Simple in-memory event storage. Stores a mapping of epoch id to events /// in that epoch. -impl HashMapEventStorage { +impl HashMapEventStorage { pub fn new() -> Self { + Self::with_hashmap_capacity(0) + } + + pub fn with_hashmap_capacity(hashmap_capacity: usize) -> Self { Self { - epochs: HashMap::new(), + epochs: HashMap::with_capacity_and_hasher( + hashmap_capacity, + S::default(), + ), } } } -impl EventStorage for HashMapEventStorage +impl EventStorage for HashMapEventStorage where E: Event + Clone, + S: BuildHasher + Default, { type Event = E; type Error = anyhow::Error; diff --git a/src/events/ppa_event.rs b/src/events/ppa_event.rs index 647cb9b..74d15ec 100644 --- a/src/events/ppa_event.rs +++ b/src/events/ppa_event.rs @@ -2,24 +2,23 @@ use std::fmt::Debug; use super::traits::Uri; use crate::{ + actions::traits::ActionId, events::traits::{Event, EventUris}, queries::ppa_histogram::{PpaBucketKey, PpaEpochId, PpaFilterData}, }; /// Impression event #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PpaEvent { +pub struct PpaEvent { /// Event ID, e.g., counter or random ID. Unused in Firefox but kept for /// debugging purposes. pub id: u64, - /// Timestamp, used to order events for last-touch attribution. + /// Used to order events for last-touch attribution. pub timestamp: u64, - pub epoch_number: PpaEpochId, - pub histogram_index: PpaBucketKey, - + pub user_action_id: Option, pub uris: EventUris, /// This field can contain bit-packed information about campaigns, ads, or @@ -30,9 +29,10 @@ pub struct PpaEvent { pub filter_data: PpaFilterData, } -impl Event for PpaEvent { +impl Event for PpaEvent { type EpochId = PpaEpochId; type Uri = U; + type ActionId = A; fn epoch_id(&self) -> Self::EpochId { self.epoch_number @@ -41,4 +41,8 @@ impl Event for PpaEvent { fn event_uris(&self) -> &EventUris { &self.uris } + + fn user_action_id(&self) -> Option { + self.user_action_id + } } diff --git a/src/events/relevant_events.rs b/src/events/relevant_events.rs index cf0f55d..7d8b5d3 100644 --- a/src/events/relevant_events.rs +++ b/src/events/relevant_events.rs @@ -18,7 +18,8 @@ impl RelevantEvents { epoch_ids: &[E::EpochId], selector: &impl RelevantEventSelector, ) -> Result { - let mut events_per_epoch = HashMap::with_capacity(epoch_ids.len()); + let mut events_per_epoch = HashMap::default(); + events_per_epoch.reserve(epoch_ids.len()); for epoch_id in epoch_ids { // fetch all relevant events at that epoch from storage @@ -41,7 +42,8 @@ impl RelevantEvents { } pub fn from_vec(events: Vec) -> Self { - let mut events_per_epoch: HashMap> = HashMap::new(); + let mut events_per_epoch: HashMap> = + HashMap::default(); for event in events { events_per_epoch diff --git a/src/events/simple_event.rs b/src/events/simple_event.rs index f6c01e6..2e461dc 100644 --- a/src/events/simple_event.rs +++ b/src/events/simple_event.rs @@ -16,6 +16,7 @@ pub struct SimpleEvent { impl Event for SimpleEvent { type EpochId = u64; type Uri = U; + type ActionId = (); fn epoch_id(&self) -> Self::EpochId { self.epoch_number @@ -24,6 +25,10 @@ impl Event for SimpleEvent { fn event_uris(&self) -> &EventUris { &self.uris } + + fn user_action_id(&self) -> Option { + None + } } #[cfg(test)] diff --git a/src/events/traits.rs b/src/events/traits.rs index 6e8cc17..8d6e4f6 100644 --- a/src/events/traits.rs +++ b/src/events/traits.rs @@ -3,7 +3,7 @@ use std::{ hash::{Hash, Hasher}, }; -use crate::events::uri_set::UriSet; +use crate::{actions::traits::ActionId, events::uri_set::UriSet}; /// Marker trait with bounds for epoch identifiers. pub trait EpochId: Clone + Copy + Debug + Eq + Hash {} @@ -41,10 +41,11 @@ impl Hash for EventUris { pub trait Event: Debug + Clone { type EpochId: EpochId; type Uri: Uri; + type ActionId: ActionId; fn epoch_id(&self) -> Self::EpochId; - fn event_uris(&self) -> &EventUris; + fn user_action_id(&self) -> Option; } /// Selector that can tag relevant events one by one or in bulk. diff --git a/src/lib.rs b/src/lib.rs index 266b975..704bd61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod actions; pub mod budget; pub mod events; pub mod mechanisms; diff --git a/src/pds/accounting.rs b/src/pds/accounting.rs index 35e75c9..1f14628 100644 --- a/src/pds/accounting.rs +++ b/src/pds/accounting.rs @@ -6,7 +6,7 @@ use crate::{ budget::pure_dp_filter::PureDPBudget, mechanisms::{NoiseScale, NormType}, queries::traits::EpochReportRequest, - util::hashmap::{HashMap, HashSet}, + util::hashmap::HashSet, }; /// Pure DP individual privacy loss, following @@ -36,7 +36,9 @@ pub fn compute_epoch_loss( } }; - debug!("Individual sensitivity: {individual_sensitivity} for {num_epochs} epochs"); + debug!( + "Individual sensitivity: {individual_sensitivity} for {num_epochs} epochs" + ); let NoiseScale::Laplace(noise_scale) = request.noise_scale(); @@ -61,9 +63,7 @@ pub fn compute_epoch_source_losses( epoch_event_sources: HashSet<&Q::Uri>, computed_attribution: &Q::Report, num_epochs: usize, -) -> HashMap { - let mut per_source_losses = HashMap::new(); - +) -> Vec<(Q::Uri, PureDPBudget)> { // Collect sources and noise scale from the request. let requested_sources = &request.report_uris().source_uris; let NoiseScale::Laplace(noise_scale) = request.noise_scale(); @@ -71,6 +71,8 @@ pub fn compute_epoch_source_losses( // Count requested sources for case analysis let num_requested_sources = requested_sources.len(); + let mut per_source_losses = Vec::with_capacity(num_requested_sources); + for source in requested_sources { let has_relevant_events = epoch_event_sources.contains(&source); @@ -109,14 +111,14 @@ pub fn compute_epoch_source_losses( // not related to privacy. if noise_scale.abs() < f64::EPSILON { per_source_losses - .insert(source.clone(), PureDPBudget::from(f64::INFINITY)); + .push((source.clone(), PureDPBudget::from(f64::INFINITY))); } else { // In Cookie Monster, we have `query_global_sensitivity` / // `requested_epsilon` instead of just `noise_scale`. - per_source_losses.insert( + per_source_losses.push(( source.clone(), PureDPBudget::from(individual_sensitivity / noise_scale), - ); + )); } } diff --git a/src/pds/aliases.rs b/src/pds/aliases.rs index 3ca4828..6a068f7 100644 --- a/src/pds/aliases.rs +++ b/src/pds/aliases.rs @@ -4,6 +4,7 @@ use super::{ quotas::{FilterId, StaticCapacities}, }; use crate::{ + actions::hashmap_action_storage::HashMapActionStorage, budget::{ hashmap_filter_storage::HashMapFilterStorage, pure_dp_filter::{PureDPBudget, PureDPBudgetFilter}, @@ -24,11 +25,27 @@ pub type SimpleFilterStorage = HashMapFilterStorage< PureDPBudgetFilter, StaticCapacities, PureDPBudget>, >; +pub type SimpleActionStorage = HashMapActionStorage<(), u64, String>; pub type SimpleEventStorage = HashMapEventStorage; -pub type SimplePdsCore = - PrivateDataServiceCore; -pub type SimplePds = - PrivateDataService; + +pub type SimplePdsCore = + PrivateDataServiceCore< + SimpleLastTouchHistogramRequest, + FS, + AS, + anyhow::Error, + >; +pub type SimplePds< + FS = SimpleFilterStorage, + AS = SimpleActionStorage, + ES = SimpleEventStorage, +> = PrivateDataService< + SimpleLastTouchHistogramRequest, + FS, + AS, + ES, + anyhow::Error, +>; // === PPA aliases === @@ -37,11 +54,18 @@ pub type PpaFilterStorage = HashMapFilterStorage< StaticCapacities, PureDPBudget>, >; pub type PpaEventStorage = HashMapEventStorage>; -pub type PpaPdsCore = - PrivateDataServiceCore, FS, ERR>; +pub type PpaActionStorage = HashMapActionStorage; + +pub type PpaPdsCore< + FS = PpaFilterStorage, + AS = PpaActionStorage, + U = String, + ERR = anyhow::Error, +> = PrivateDataServiceCore, FS, AS, ERR>; pub type PpaPds< FS = PpaFilterStorage, + AS = PpaActionStorage, ES = PpaEventStorage, U = String, ERR = anyhow::Error, -> = PrivateDataService, FS, ES, ERR>; +> = PrivateDataService, FS, AS, ES, ERR>; diff --git a/src/pds/batch_pds.rs b/src/pds/batch_pds.rs index 52d6285..f42bb04 100644 --- a/src/pds/batch_pds.rs +++ b/src/pds/batch_pds.rs @@ -17,13 +17,14 @@ use super::{ quotas::{PdsFilterStatus, StaticCapacities}, }; use crate::{ + actions::traits::ActionStorage, budget::{ pure_dp_filter::PureDPBudget, traits::{Filter, FilterStatus, FilterStorage, ReleaseFilter}, }, - events::traits::EventStorage, + events::traits::{Event, EventStorage}, mechanisms::NoiseScale, - pds::quotas::FilterId, + pds::{core::DropEpochReason, quotas::FilterId}, queries::traits::EpochReportRequest, util::hashmap::{HashMap, HashSet}, }; @@ -68,18 +69,23 @@ impl BatchedRequest { } /// [Experimental] Batch wrapper for private data service. -pub struct BatchPrivateDataService +pub struct BatchPrivateDataService where Q: EpochReportRequest, Q::Report: Clone, FS: FilterStorage< - Budget = PureDPBudget, - FilterId = FilterIdQ, - Capacities = StaticCapacities, PureDPBudget>, - >, + Budget = PureDPBudget, + FilterId = FilterIdQ, + Capacities = StaticCapacities, PureDPBudget>, + >, FS::Filter: ReleaseFilter, + AS: ActionStorage< + EpochId = Q::EpochId, + Uri = Q::Uri, + ActionId = ::ActionId, + >, ES: EventStorage, - ERR: From + From, + ERR: From + From + From, { /// Current scheduling interval. /// Used to release budget for the Global filter. @@ -113,7 +119,7 @@ where /// Base private data service. /// Filters need to have functionality to unlock budget. - pub pds: PrivateDataService, + pub pds: PrivateDataService, } /// Report for a batched request. Guaranteed to be returned after the number of @@ -130,22 +136,27 @@ pub struct BatchedReport { #[allow(type_alias_bounds)] type FilterIdQ = FilterId; -impl BatchPrivateDataService +impl BatchPrivateDataService where Q: EpochReportRequest, Q::Report: Clone, FS: FilterStorage< - Budget = PureDPBudget, - FilterId = FilterIdQ, - Capacities = StaticCapacities, PureDPBudget>, - >, + Budget = PureDPBudget, + FilterId = FilterIdQ, + Capacities = StaticCapacities, PureDPBudget>, + >, FS::Filter: ReleaseFilter, + AS: ActionStorage< + EpochId = Q::EpochId, + Uri = Q::Uri, + ActionId = ::ActionId, + >, ES: EventStorage, - ERR: From + From, + ERR: From + From + From, { /// Create a new batch private data service. pub fn new( - pds: PrivateDataService, + pds: PrivateDataService, n_releases: usize, ) -> Result { let capacities = pds.core.filter_storage.capacities().clone(); @@ -172,9 +183,9 @@ where current_scheduling_interval: 0, new_pending_requests: vec![], batched_requests: vec![], - delayed_reports: HashMap::new(), + delayed_reports: HashMap::default(), epochs: None, - sources_per_epoch: HashMap::new(), + sources_per_epoch: HashMap::default(), }) } @@ -454,18 +465,29 @@ where self.initialize_filters_for_request(&request.request)?; // Compute the actual report. It might be null though. - let report = self.pds.compute_report(&request.request)?; - - if !report.oob_filters.is_empty() { - for filter_id in report.oob_filters.iter() { - if let FilterId::SourceQuota(_, _) = filter_id { - // SourceQuota should never block a request if we - // have perfect upper - // bounds for the public filters. - panic!( - "Request {} was not allocated: {:?}. Final attempt? {}", - request.request_id, report.oob_filters, allocate_final_attempts - ); + let report = self + .pds + .compute_report(&request.request, None /* todo */)?; + + if !report.drop_epoch_reasons.is_empty() { + for drop_reason in report.drop_epoch_reasons.iter() { + let DropEpochReason::OutOfBudget(filters) = drop_reason + else { + continue; + }; + + for filter_id in filters { + if let FilterId::SourceQuota(_, _) = filter_id { + // SourceQuota should never block a request if + // we have perfect upper bounds for the public + // filters. + panic!( + "Request {} was not allocated: {:?}. Final attempt? {}", + request.request_id, + report.drop_epoch_reasons, + allocate_final_attempts + ); + } } } } @@ -583,7 +605,8 @@ where } debug!("Epochs across all requests: {all_epochs:?}"); - let mut budget_per_source: HashMap = HashMap::new(); + let mut budget_per_source: HashMap = + HashMap::default(); for source in &all_sources { let source = (*source).clone(); let mut source_total_budget: f64 = 0.0; @@ -676,33 +699,37 @@ where // We (mis-)use PDS's filters_to_consume() method to get a list of // filters that will be deducted for this request. for epoch_id in request.epoch_ids() { - let mut source_losses = HashMap::new(); + let mut source_losses = Vec::with_capacity(uris.source_uris.len()); for source in &uris.source_uris { - source_losses.insert(source.clone(), 0.0); + source_losses.push((source.clone(), 0.0)); } - let filter_ids = self.pds.core.filters_to_consume( - epoch_id, - &0.0, // just set to 0, we only care about the filter IDs - &source_losses, - request.report_uris(), - ); - self.initialize_filters(filter_ids.keys())?; + let filter_ids = self + .pds + .core + .filters_to_consume( + epoch_id, + &0.0, // just set to 0, we only care about the filter IDs + &source_losses, + request.report_uris(), + ) + .into_iter() + .map(|(fid, _)| fid); + + self.initialize_filters(filter_ids)?; } Ok(()) } /// Given a list of filter IDs, initialize them in the filter storage, /// such that non-global filters are unlocked and act as regular filters. - fn initialize_filters<'f, FID>( + fn initialize_filters( &mut self, filters: impl Iterator, ) -> Result<(), ERR> where // accept both owned and borrowed FilterIDs - FID: Borrow<&'f FilterIdQ> + 'f, - Q::EpochId: 'f, // required by borrow checker - Q::Uri: 'f, + FID: Borrow>, { for filter_id in filters { let filter_id = filter_id.borrow(); @@ -739,6 +766,7 @@ mod tests { use super::*; use crate::{ + actions::hashmap_action_storage::HashMapActionStorage, budget::{ hashmap_filter_storage::HashMapFilterStorage, release_filter::PureDPBudgetReleaseFilter, @@ -752,7 +780,7 @@ mod tests { queries::{ ppa_histogram::{ PpaHistogramConfig, PpaHistogramRequest, - PpaRelevantEventSelector, RequestedBuckets, + PpaRelevantEventSelector, }, traits::ReportRequestUris, }, @@ -792,6 +820,7 @@ mod tests { timestamp: 0, epoch_number: 1, histogram_index: 0, + user_action_id: None, uris: EventUris::mock(), filter_data: 1, }; @@ -799,8 +828,14 @@ mod tests { let filter_storage: HashMapFilterStorage = HashMapFilterStorage::new(capacities)?; - let pds: PrivateDataService<_, _, _, anyhow::Error> = - PrivateDataService::new(filter_storage, event_storage); + let action_storage: HashMapActionStorage = + HashMapActionStorage::new(None); + let pds: PrivateDataService<_, _, _, _, anyhow::Error> = + PrivateDataService::new( + filter_storage, + action_storage, + event_storage, + ); let mut batch_pds = BatchPrivateDataService::new(pds, 2)?; let mut request_config = PpaHistogramConfig { @@ -814,11 +849,8 @@ mod tests { let report_uris = ReportRequestUris::mock(); - let always_relevant_selector = || PpaRelevantEventSelector { - report_request_uris: report_uris.clone(), - is_matching_event: Box::new(|_: u64| true), - requested_buckets: RequestedBuckets::AllBuckets, - }; + let always_relevant_selector = + || PpaRelevantEventSelector::new(report_uris.clone()); // Request that will be answered in the first scheduling attempt. batch_pds.register_report_request(BatchedRequest::new( @@ -862,9 +894,9 @@ mod tests { for report in reports { assert!( - report.report.oob_filters.is_empty(), - "Report should not have any OOB filters. Got: {:?}", - report.report.oob_filters + report.report.drop_epoch_reasons.is_empty(), + "Report should not have any dropped epochs. Got: {:?}", + report.report.drop_epoch_reasons ); } @@ -873,9 +905,9 @@ mod tests { debug!("Reports again: {reports:?}"); assert!( - reports[0].report.oob_filters.is_empty(), - "Report should not have any OOB filters. Got: {:?}", - reports[0].report.oob_filters + reports[0].report.drop_epoch_reasons.is_empty(), + "Report should not have any dropped epochs. Got: {:?}", + reports[0].report.drop_epoch_reasons ); Ok(()) @@ -894,31 +926,34 @@ mod tests { } let trigger_uris: UriSet<_> = trigger_uris.into(); - // Event relevant to all the shoes websites. Could also register 10 - // different events, with one querier each. - let event1 = PpaEvent { + let event_template = PpaEvent { id: 1, timestamp: 0, epoch_number: 1, histogram_index: 0, + user_action_id: None, + uris: EventUris::mock(), + filter_data: 1, + }; + + // Event relevant to all the shoes websites. Could also register 10 + // different events, with one querier each. + let event1 = PpaEvent { + user_action_id: None, uris: EventUris { source_uri: "news.ex".to_string(), trigger_uris: trigger_uris.clone(), querier_uris: trigger_uris.clone(), }, - filter_data: 1, + ..event_template.clone() }; let event2 = PpaEvent { - id: 1, - timestamp: 0, - epoch_number: 1, - histogram_index: 0, uris: EventUris { source_uri: "blog.ex".to_string(), trigger_uris: ["hats-1.ex".to_string()].into(), querier_uris: ["hats-1.ex".to_string()].into(), }, - filter_data: 1, + ..event_template }; let event_storage = event_storage_with_events(vec![event1, event2]); @@ -926,8 +961,14 @@ mod tests { // Using a single release here. let filter_storage: HashMapFilterStorage = HashMapFilterStorage::new(capacities)?; - let pds: PrivateDataService<_, _, _, anyhow::Error> = - PrivateDataService::new(filter_storage, event_storage); + let action_storage: HashMapActionStorage = + HashMapActionStorage::new(None); + let pds: PrivateDataService<_, _, _, _, anyhow::Error> = + PrivateDataService::new( + filter_storage, + action_storage, + event_storage, + ); let mut batch_pds = BatchPrivateDataService::new(pds, 1)?; let mut request_config = PpaHistogramConfig { @@ -939,12 +980,9 @@ mod tests { histogram_size: 5, }; - let always_valid_selector = - |uris: ReportRequestUris| PpaRelevantEventSelector { - report_request_uris: uris, - is_matching_event: Box::new(|_: u64| true), - requested_buckets: RequestedBuckets::AllBuckets, - }; + let always_valid_selector = |uris: ReportRequestUris| { + PpaRelevantEventSelector::new(uris) + }; // Every single conversion sites gets a conversion. for i in 1..=9 { @@ -1027,9 +1065,9 @@ mod tests { // No report should be null for report in &reports { assert!( - report.report.oob_filters.is_empty(), + report.report.drop_epoch_reasons.is_empty(), "Report should not have an error cause. Got: {:?}", - report.report.oob_filters + report.report.drop_epoch_reasons ); } @@ -1069,6 +1107,7 @@ mod tests { timestamp: 0, epoch_number: 1, histogram_index: 0, + user_action_id: None, uris: EventUris { source_uri: "news.ex".to_string(), trigger_uris: trigger_uris.clone(), @@ -1089,6 +1128,7 @@ mod tests { timestamp: 0, epoch_number: 1, histogram_index: 0, + user_action_id: None, uris: EventUris { source_uri: "blog.ex".to_string(), trigger_uris: trigger_uris.clone(), @@ -1102,8 +1142,14 @@ mod tests { // Using a single release here. let filter_storage: HashMapFilterStorage = HashMapFilterStorage::new(capacities)?; - let pds: PrivateDataService<_, _, _, anyhow::Error> = - PrivateDataService::new(filter_storage, event_storage); + let action_storage: HashMapActionStorage = + HashMapActionStorage::new(None); + let pds: PrivateDataService<_, _, _, _, anyhow::Error> = + PrivateDataService::new( + filter_storage, + action_storage, + event_storage, + ); let mut batch_pds = BatchPrivateDataService::new(pds, 2)?; let mut request_config = PpaHistogramConfig { @@ -1123,7 +1169,7 @@ mod tests { request_config.requested_epsilon = if i == 3 { 0.99 // We want this request to be smaller than the others in - // the tests. + // the tests. } else { 0.99 + 0.0001 * i as f64 }; @@ -1133,15 +1179,11 @@ mod tests { 2, // Space for one more time. Easier to check the batch. PpaHistogramRequest::new( &request_config, - PpaRelevantEventSelector { - report_request_uris: ReportRequestUris { - trigger_uri: shoes_conv.clone(), - source_uris: ["news.ex".to_string()].into(), - querier_uris: [shoes_conv.clone()].into(), - }, - is_matching_event: Box::new(|_: u64| true), - requested_buckets: RequestedBuckets::AllBuckets, - }, + PpaRelevantEventSelector::new(ReportRequestUris { + trigger_uri: shoes_conv.clone(), + source_uris: ["news.ex".to_string()].into(), + querier_uris: [shoes_conv.clone()].into(), + }), )?, ))?; } @@ -1156,15 +1198,11 @@ mod tests { 2, PpaHistogramRequest::new( &request_config, - PpaRelevantEventSelector { - report_request_uris: ReportRequestUris { - trigger_uri: hats_conv.clone(), - source_uris: ["blog.ex".to_string()].into(), - querier_uris: [hats_conv.clone()].into(), - }, - is_matching_event: Box::new(|_: u64| true), - requested_buckets: RequestedBuckets::AllBuckets, - }, + PpaRelevantEventSelector::new(ReportRequestUris { + trigger_uri: hats_conv.clone(), + source_uris: ["blog.ex".to_string()].into(), + querier_uris: [hats_conv.clone()].into(), + }), )?, ))?; } @@ -1211,7 +1249,7 @@ mod tests { // Only 5 reports should be non-null. let mut n_non_null_reports = 0; for report in &reports { - if report.report.oob_filters.is_empty() { + if report.report.drop_epoch_reasons.is_empty() { n_non_null_reports += 1; } } diff --git a/src/pds/core.rs b/src/pds/core.rs index 387bfc5..0d5f84e 100644 --- a/src/pds/core.rs +++ b/src/pds/core.rs @@ -8,27 +8,31 @@ use super::{ quotas::{FilterId, PdsFilterStatus}, }; use crate::{ + actions::traits::ActionStorage, budget::{ pure_dp_filter::PureDPBudget, traits::{FilterStatus, FilterStorage}, }, events::relevant_events::RelevantEvents, queries::traits::{EpochReportRequest, Report, ReportRequestUris}, - util::hashmap::HashMap, }; -pub struct PrivateDataServiceCore +pub struct PrivateDataServiceCore where Q: EpochReportRequest, FS: FilterStorage< - FilterId = FilterId, - Budget = PureDPBudget, - >, - ERR: From, + FilterId = FilterId, + Budget = PureDPBudget, + >, + AS: ActionStorage, + ERR: From + From, { /// Filter storage interface. pub filter_storage: FS, + /// Action storage interface. + pub action_storage: AS, + /// This PhantomData serves two purposes: /// 1. It Defines the Q and ERR generics on the struct instead of on each /// individual function, reducing boilerplate @@ -37,31 +41,34 @@ where _phantom: PhantomData>, } -impl PrivateDataServiceCore +impl PrivateDataServiceCore where R: Report + Clone, Q: EpochReportRequest, FS: FilterStorage< - FilterId = FilterId, - Budget = PureDPBudget, - >, - ERR: From, + FilterId = FilterId, + Budget = PureDPBudget, + >, + AS: ActionStorage, + ERR: From + From, { - pub fn new(filter_storage: FS) -> Self { + pub fn new(filter_storage: FS, action_storage: AS) -> Self { Self { filter_storage, + action_storage, _phantom: PhantomData, } } /// Computes a report for the given report request. - /// This function follows `compute_attribution_report` from the Cookie - /// Monster Algorithm (https://arxiv.org/pdf/2405.16719, Code Listing 1) + /// This function follows Algorithm 2 from the Big Bird paper + /// (https://arxiv.org/abs/2506.05290, Alg. 2) pub fn compute_report( &mut self, request: &Q, // mutable, as we will drop out-of-budget epochs from it mut relevant_events: RelevantEvents, + action_id: Option, ) -> Result, ERR> { debug!("Computing report for request {request:?}"); @@ -79,8 +86,28 @@ where // Compute the raw report, useful for debugging and accounting. let unfiltered_report = request.compute_report(&relevant_events); + let mut drop_epoch_reasons: Vec> = vec![]; + + // First, enforce action quotas + if let Some(aid) = action_id { + let conv_site = &uris.trigger_uri; + + for &epoch_id in &epochs { + let allowed = self + .action_storage + .try_record_site(aid, epoch_id, conv_site)?; + + if !allowed { + // Oscar Paper: "If quota-count is exceeded in epoch e... + // nullifies only epoch e's data" + relevant_events.drop_epoch(&epoch_id); + drop_epoch_reasons + .push(DropEpochReason::CountQuotaExceeded); + } + } + } + // Browse epochs in the attribution window - let mut oob_filters = vec![]; for epoch_id in epochs { // Step 1. Get relevant events for the current epoch `epoch_id`. let epoch_relevant_events = relevant_events.for_epoch(&epoch_id); @@ -125,17 +152,20 @@ where )?; if consume_status != PdsFilterStatus::Continue { - panic!("ERR: Phase 2 failed unexpectedly wtih status {consume_status:?} after Phase 1 succeeded"); + panic!( + "ERR: Phase 2 failed unexpectedly wtih status {consume_status:?} after Phase 1 succeeded" + ); } } - PdsFilterStatus::OutOfBudget(mut filters) => { + PdsFilterStatus::OutOfBudget(filters) => { // Not enough budget, drop events without any filter // consumption relevant_events.drop_epoch(&epoch_id); // Keep track of why we dropped this epoch - oob_filters.append(&mut filters); + drop_epoch_reasons + .push(DropEpochReason::OutOfBudget(filters)); } } } @@ -152,7 +182,7 @@ where let report_with_metadata = PdsReport { filtered_report, unfiltered_report, - oob_filters, + drop_epoch_reasons, }; #[cfg(not(feature = "experimental"))] let report_with_metadata = PdsReport { @@ -165,13 +195,14 @@ where /// Calculate how much privacy to deduct from which filters, /// for the given epoch and losses. + #[allow(clippy::type_complexity)] pub fn filters_to_consume<'a>( &self, epoch_id: Q::EpochId, loss: &'a FS::Budget, - source_losses: &'a HashMap, + source_losses: &'a Vec<(Q::Uri, FS::Budget)>, uris: &ReportRequestUris, - ) -> HashMap, &'a PureDPBudget> { + ) -> Vec<(FilterId, &'a PureDPBudget)> { // Build the filter IDs for PerQuerier, Global and TriggerQuota let mut device_epoch_filter_ids = Vec::new(); for query_uri in &uris.querier_uris { @@ -182,18 +213,21 @@ where .push(FilterId::TriggerQuota(epoch_id, uris.trigger_uri.clone())); device_epoch_filter_ids.push(FilterId::Global(epoch_id)); + let mut filters_to_consume = Vec::with_capacity( + device_epoch_filter_ids.len() + source_losses.len(), + ); + // PerQuerier, Global and TriggerQuota all have the same device-epoch // level loss - let mut filters_to_consume = HashMap::new(); for filter_id in device_epoch_filter_ids { - filters_to_consume.insert(filter_id, loss); + filters_to_consume.push((filter_id, loss)); } // Add the SourceQuota filters with their own device-epoch-source level // loss for (source, loss) in source_losses { let fid = FilterId::SourceQuota(epoch_id, source.clone()); - filters_to_consume.insert(fid, loss); + filters_to_consume.push((fid, loss)); } filters_to_consume @@ -203,10 +237,7 @@ where #[allow(clippy::type_complexity)] pub fn deduct_budget( &mut self, - filters_to_consume: &HashMap< - FilterId, - &PureDPBudget, - >, + filters_to_consume: &Vec<(FilterId, &PureDPBudget)>, dry_run: bool, ) -> Result>, ERR> { // Try to consume the privacy loss from the filters @@ -230,3 +261,9 @@ where Ok(PdsFilterStatus::Continue) } } + +#[derive(Debug)] +pub enum DropEpochReason { + CountQuotaExceeded, + OutOfBudget(Vec), +} diff --git a/src/pds/cross_report.rs b/src/pds/cross_report.rs index af25270..dcde7a6 100644 --- a/src/pds/cross_report.rs +++ b/src/pds/cross_report.rs @@ -8,6 +8,7 @@ use super::{ quotas::{FilterId, PdsFilterStatus}, }; use crate::{ + actions::traits::ActionStorage, budget::{ pure_dp_filter::PureDPBudget, traits::{FilterStatus, FilterStorage}, @@ -18,7 +19,7 @@ use crate::{ traits::{Event as _, Uri}, }, mechanisms::NoiseScale, - pds::core::PrivateDataServiceCore, + pds::core::{DropEpochReason, PrivateDataServiceCore}, queries::{ histogram::HistogramRequest, ppa_histogram::{ @@ -27,7 +28,7 @@ use crate::{ }, traits::EpochReportRequest, }, - util::hashmap::{HashMap, HashSet}, + util::hashmap::HashSet, }; /// The attribution object that can be used to compute distinct @@ -50,11 +51,12 @@ pub struct AttributionObject { pub already_requested_buckets: RequestedBuckets, } -impl PrivateDataServiceCore, FS, ERR> +impl PrivateDataServiceCore, FS, AS, ERR> where U: Uri, FS: FilterStorage, Budget = PureDPBudget>, - ERR: From, + AS: ActionStorage, + ERR: From + From, { /// Attributes conversion value to events and deduct privacy loss from /// global filter and quotas. Creates an `AttributionObject` that @@ -90,7 +92,7 @@ where .source_uris .iter() .map(|source_uri| (source_uri.clone(), individual_privacy_loss)) - .collect::>(); + .collect::>(); // Try to consume budget from current epoch, drop events if OOB. // Two phase commit. @@ -102,12 +104,8 @@ where ); // Do not consume per-querier, that is done in get_report(). - for querier_uri in &uris.querier_uris { - filters_to_consume.remove(&FilterId::PerQuerier( - epoch_id, - querier_uri.clone(), - )); - } + filters_to_consume + .retain(|(fid, _)| !matches!(fid, FilterId::PerQuerier(_, _))); // Phase 1: dry run. let check_status = self.deduct_budget( @@ -152,7 +150,7 @@ where event_values, events: relevant_events, already_requested_buckets: RequestedBuckets::SpecificBuckets( - HashSet::new(), + HashSet::default(), ), }; @@ -232,7 +230,7 @@ impl AttributionObject> { .request .map_events_to_buckets(filtered_event_values.clone()); - let mut oob_filters = vec![]; + let mut drop_epoch_reasons = vec![]; let mut events_to_drop = HashSet::new(); for &epoch_id in &requested_epochs { let epoch_relevant_events = self.events.for_epoch(&epoch_id); @@ -255,7 +253,8 @@ impl AttributionObject> { events_to_drop.extend(epoch_relevant_events.iter()); // Keep track of dropped filters - oob_filters.push(filter_id); + drop_epoch_reasons + .push(DropEpochReason::OutOfBudget(vec![filter_id])); } } @@ -272,7 +271,7 @@ impl AttributionObject> { let report = PdsReport { filtered_report, unfiltered_report, - oob_filters, + drop_epoch_reasons, }; Ok(report) } @@ -284,7 +283,7 @@ mod tests { use crate::{ events::{ppa_event::PpaEvent, traits::EventUris, uri_set::UriSet}, pds::{ - aliases::{PpaFilterStorage, PpaPdsCore}, + aliases::{PpaActionStorage, PpaFilterStorage, PpaPdsCore}, quotas::StaticCapacities, }, queries::{ @@ -301,7 +300,8 @@ mod tests { // Create PDS with mock capacities let capacities = StaticCapacities::mock(); let filters = PpaFilterStorage::new(capacities.clone())?; - let mut pds = PpaPdsCore::<_>::new(filters); + let actions = PpaActionStorage::new(None); + let mut pds = PpaPdsCore::<_>::new(filters, actions); // Create test URIs let source_uri = "blog.example.com".to_string(); @@ -327,27 +327,31 @@ mod tests { querier_uris: querier_uris.clone(), }; + let event_template = PpaEvent { + id: 0, + timestamp: 0, + epoch_number: 1, + histogram_index: 0, + user_action_id: None, + uris: event_uris.clone(), + filter_data: 1, + }; + // Register an early event with bucket 1 - this should be overridden by // last-touch attribution let early_event = PpaEvent { - id: 1, timestamp: 100, - epoch_number: 1, histogram_index: 1, // r1.ex bucket - uris: event_uris.clone(), - filter_data: 1, + ..event_template.clone() }; // The event that should be attributed (latest timestamp in epoch 1) // We'll use a histogram index that's covered by both intermediaries (3) let main_event = PpaEvent { - id: 2, - timestamp: 200, /* Later timestamp so this event is picked by - * last-touch */ - epoch_number: 1, + timestamp: 200, /* Later timestamp so this event is picked by + * last-touch */ histogram_index: 2, // A bucket that will be kept and read by r2.ex - uris: event_uris.clone(), - filter_data: 1, + ..event_template.clone() }; let relevant_events = @@ -363,9 +367,8 @@ mod tests { }; let relevant_event_selector = |bucket: u64| PpaRelevantEventSelector { - report_request_uris: report_request_uris.clone(), - is_matching_event: Box::new(|_: u64| true), requested_buckets: vec![bucket].into(), + ..PpaRelevantEventSelector::new(report_request_uris.clone()) }; let request = @@ -464,23 +467,28 @@ mod tests { fn test_cross_epoch_last_touch() -> Result<(), anyhow::Error> { let capacities = StaticCapacities::mock(); let filters = PpaFilterStorage::new(capacities.clone())?; - let mut pds = PpaPdsCore::<_>::new(filters); + let actions = PpaActionStorage::new(None); + let mut pds = PpaPdsCore::<_>::new(filters, actions); - let event1 = PpaEvent { - id: 1, - timestamp: 100, + let event_template = PpaEvent { + id: 0, + timestamp: 0, epoch_number: 1, - histogram_index: 1, + histogram_index: 0, + user_action_id: None, uris: EventUris::mock(), filter_data: 1, }; + + let event1 = PpaEvent { + timestamp: 100, + epoch_number: 1, + ..event_template.clone() + }; let event2 = PpaEvent { - id: 2, timestamp: 200, // Later timestamp epoch_number: 2, - histogram_index: 1, // Same bucket as event1 - uris: EventUris::mock(), - filter_data: 1, + ..event_template.clone() }; // set epoch 2 PerQuerier filter to be OOB @@ -501,9 +509,8 @@ mod tests { histogram_size: 3, }, PpaRelevantEventSelector { - report_request_uris: ReportRequestUris::mock(), - is_matching_event: Box::new(|_| true), requested_buckets: vec![1].into(), + ..PpaRelevantEventSelector::new(ReportRequestUris::mock()) }, ) .unwrap(); @@ -514,9 +521,8 @@ mod tests { let report = attr_object.get_report( &querier_uri, &PpaRelevantEventSelector { - report_request_uris: ReportRequestUris::mock(), - is_matching_event: Box::new(|_| true), requested_buckets: RequestedBuckets::AllBuckets, + ..PpaRelevantEventSelector::new(ReportRequestUris::mock()) }, &mut pds.filter_storage, )?; diff --git a/src/pds/private_data_service.rs b/src/pds/private_data_service.rs index 2481118..d8d4128 100644 --- a/src/pds/private_data_service.rs +++ b/src/pds/private_data_service.rs @@ -5,14 +5,18 @@ use log::debug; use super::{core::PrivateDataServiceCore, quotas::FilterId}; use crate::{ + actions::traits::ActionStorage, budget::{pure_dp_filter::PureDPBudget, traits::FilterStorage}, - events::{relevant_events::RelevantEvents, traits::EventStorage}, + events::{ + relevant_events::RelevantEvents, + traits::{Event, EventStorage}, + }, + pds::core::DropEpochReason, queries::traits::EpochReportRequest, }; #[cfg(feature = "experimental")] use crate::{ pds::quotas::PdsFilterStatus, queries::traits::PassivePrivacyLossRequest, - util::hashmap::HashMap, }; /// Epoch-based private data service, using generic filter @@ -20,13 +24,14 @@ use crate::{ pub struct PrivateDataService< Q: EpochReportRequest, FS: FilterStorage< - Budget = PureDPBudget, - FilterId = FilterId, - >, + Budget = PureDPBudget, + FilterId = FilterId, + >, + AS: ActionStorage, ES: EventStorage, - ERR: From + From, + ERR: From + From + From, > { - pub core: PrivateDataServiceCore, + pub core: PrivateDataServiceCore, /// Event storage interface. pub event_storage: ES, @@ -38,9 +43,9 @@ pub struct PdsReport { pub filtered_report: Q::Report, pub unfiltered_report: Q::Report, - /// Store a list of the filter IDs that were out-of-budget in the atomic - /// check for any epoch in the attribution window. - pub oob_filters: Vec>, + /// Store a list of reasons for which all the events in an epoch were + /// dropped. This can include out-of-budget filters or the count-quota. + pub drop_epoch_reasons: Vec>>, } /// Default implementation for a null report @@ -49,25 +54,34 @@ impl Default for PdsReport { Self { filtered_report: Q::Report::default(), unfiltered_report: Q::Report::default(), - oob_filters: Vec::new(), + drop_epoch_reasons: Vec::new(), } } } /// API for the epoch-based PDS. -impl PrivateDataService +impl PrivateDataService where Q: EpochReportRequest, FS: FilterStorage< - Budget = PureDPBudget, - FilterId = FilterId, - >, + Budget = PureDPBudget, + FilterId = FilterId, + >, + AS: ActionStorage< + EpochId = Q::EpochId, + Uri = Q::Uri, + ActionId = ::ActionId, + >, ES: EventStorage, - ERR: From + From, + ERR: From + From + From, { - pub fn new(filter_storage: FS, event_storage: ES) -> Self { + pub fn new( + filter_storage: FS, + action_storage: AS, + event_storage: ES, + ) -> Self { Self { - core: PrivateDataServiceCore::new(filter_storage), + core: PrivateDataServiceCore::new(filter_storage, action_storage), event_storage, } } @@ -75,12 +89,34 @@ where /// Registers a new event. pub fn register_event(&mut self, event: Q::Event) -> Result<(), ERR> { debug!("Registering event {event:?}"); + + if let Some(aid) = event.user_action_id() { + let source_uri = &event.event_uris().source_uri; + + let allowed = self.core.action_storage.try_record_site( + aid, + event.epoch_id(), + source_uri, + )?; + + if !allowed { + debug!( + "Impression quota exceeded for action {aid:?}, dropping event." + ); + return Ok(()); + } + } + self.event_storage.add_event(event)?; Ok(()) } /// Computes a report for the given report request. - pub fn compute_report(&mut self, request: &Q) -> Result, ERR> { + pub fn compute_report( + &mut self, + request: &Q, + action_id: Option, + ) -> Result, ERR> { let relevant_event_selector = request.relevant_event_selector(); let relevant_events = RelevantEvents::from_event_storage( &mut self.event_storage, @@ -88,7 +124,8 @@ where relevant_event_selector, )?; - self.core.compute_report(request, relevant_events) + self.core + .compute_report(request, relevant_events, action_id) } /// [Experimental] Accounts for passive privacy loss. Can fail if the @@ -103,7 +140,7 @@ where &mut self, request: PassivePrivacyLossRequest, ) -> Result>, ERR> { - let source_losses = HashMap::new(); // Dummy. + let source_losses = vec![]; // Dummy. // For each epoch, try to consume the privacy budget. for epoch_id in request.epoch_ids { @@ -130,7 +167,8 @@ where )?; assert_eq!( - consume_status, PdsFilterStatus::Continue, + consume_status, + PdsFilterStatus::Continue, "ERR: Phase 2 failed unexpectedly with status {consume_status:?} after Phase 1 succeeded", ); diff --git a/src/pds/quotas.rs b/src/pds/quotas.rs index d7237ca..719f717 100644 --- a/src/pds/quotas.rs +++ b/src/pds/quotas.rs @@ -67,7 +67,7 @@ impl fmt::Display for FilterId { } /// Struct containing the default capacity for each type of filter. -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, PartialEq)] pub struct StaticCapacities { pub per_querier: B, pub global: B, diff --git a/src/pds/tests.rs b/src/pds/tests.rs index e9ad143..d706c64 100644 --- a/src/pds/tests.rs +++ b/src/pds/tests.rs @@ -14,11 +14,14 @@ use crate::{ #[test] #[cfg(feature = "experimental")] fn test_account_for_passive_privacy_loss() -> Result<(), anyhow::Error> { + use crate::pds::aliases::SimpleActionStorage; + let capacities: StaticCapacities = StaticCapacities::mock(); let filters = SimpleFilterStorage::new(capacities)?; + let actions = SimpleActionStorage::new(None); let events = SimpleEventStorage::new(); - let mut pds = SimplePds::new(filters, events); + let mut pds = SimplePds::new(filters, actions, events); let uris = ReportRequestUris::mock(); let querier_uri = uris.querier_uris.iter().next().unwrap().clone(); @@ -129,6 +132,8 @@ fn assert_remaining_budgets>( #[cfg(feature = "experimental")] fn test_budget_rollback_on_depletion() -> Result<(), anyhow::Error> { // PDS with several filters + + use crate::pds::aliases::SimpleActionStorage; let capacities: StaticCapacities = StaticCapacities::new( PureDPBudget::from(1.0), // PerQuerier @@ -138,8 +143,9 @@ fn test_budget_rollback_on_depletion() -> Result<(), anyhow::Error> { ); let filters = SimpleFilterStorage::new(capacities)?; + let actions = SimpleActionStorage::new(None); let events = SimpleEventStorage::new(); - let mut pds = SimplePds::new(filters, events); + let mut pds = SimplePds::new(filters, actions, events); // Create a sample request uris with multiple queriers let mut uris = ReportRequestUris::mock(); diff --git a/src/queries/histogram.rs b/src/queries/histogram.rs index ee97ab8..66af00b 100644 --- a/src/queries/histogram.rs +++ b/src/queries/histogram.rs @@ -22,7 +22,7 @@ impl BucketKey for T {} impl Default for HistogramReport { fn default() -> Self { Self { - bin_values: HashMap::new(), + bin_values: HashMap::default(), } } } @@ -72,7 +72,7 @@ where &'a self, event_values: impl IntoIterator, ) -> HistogramReport { - let mut bin_values: HashMap = HashMap::new(); + let mut bin_values: HashMap = HashMap::default(); let mut total_value: f64 = 0.0; // The event_values function selects the relevant events and assigns @@ -85,7 +85,7 @@ where // dropped by the contribution cap. `event_values` is in charge of // ordering the events from `relevant_events`. let mut report = HistogramReport { - bin_values: HashMap::new(), + bin_values: HashMap::default(), }; let mut early_stop = false; diff --git a/src/queries/ppa_histogram.rs b/src/queries/ppa_histogram.rs index 20144d1..2dac2f2 100644 --- a/src/queries/ppa_histogram.rs +++ b/src/queries/ppa_histogram.rs @@ -1,13 +1,14 @@ use std::vec; -use anyhow::{bail, Result}; +use anyhow::{Result, bail}; use crate::{ + actions::traits::ActionId, budget::pure_dp_filter::PureDPBudget, events::{ ppa_event::PpaEvent, relevant_events::RelevantEvents, - traits::{RelevantEventSelector, Uri}, + traits::{Event, RelevantEventSelector, Uri}, }, mechanisms::{NoiseScale, NormType}, queries::{ @@ -21,10 +22,13 @@ pub type PpaBucketKey = u64; pub type PpaEpochId = u64; pub type PpaFilterData = u64; -pub struct PpaRelevantEventSelector { +pub struct PpaRelevantEventSelector { /// source/trigger/querier URIs for this request pub report_request_uris: ReportRequestUris, + /// user action id, to filter out events from the same user-action context + pub user_action_id: Option, + /// Function to determine if an event is relevant based on its filter_data pub is_matching_event: Box bool>, @@ -33,6 +37,17 @@ pub struct PpaRelevantEventSelector { pub requested_buckets: RequestedBuckets, } +impl PpaRelevantEventSelector { + pub fn new(uris: ReportRequestUris) -> Self { + Self { + report_request_uris: uris, + user_action_id: None, + is_matching_event: Box::new(|_filter_data| true), + requested_buckets: RequestedBuckets::AllBuckets, + } + } +} + impl std::fmt::Debug for PpaRelevantEventSelector { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PpaRelevantEventSelector") @@ -104,14 +119,35 @@ impl RelevantEventSelector for PpaRelevantEventSelector { type Event = PpaEvent; fn is_relevant_event(&self, event: &Self::Event) -> bool { - // Condition 1: Event's source URI should be in the allowed list by the + // Condition 1: Requests must be cross-site, so the impression site + // (source_uri) must be different from the conversion site + // (trigger_uri). + let trying_to_match_itself = + event.uris.source_uri == self.report_request_uris.trigger_uri; + if trying_to_match_itself { + return false; + } + + // Condition 2: If user_action_id is Some(aid), filter out events from + // the same user-action context. + let action_id_match = self + .user_action_id + .is_none_or(|aid| event.user_action_id() != Some(aid)); + if !action_id_match { + return false; + } + + // Condition 3: Event's source URI should be in the allowed list by the // report request source URIs. let source_match = self .report_request_uris .source_uris .contains(&event.uris.source_uri); + if !source_match { + return false; + } - // Condition 2: Every querier URI from the report must be in the event’s + // Condition 4: Every querier URI from the report must be in the event’s // querier URIs. // TODO(https://github.com/columbia/pdslib/issues/71): modify this for cross-report // loss optimization, where one querier is authorized but not others? @@ -120,18 +156,27 @@ impl RelevantEventSelector for PpaRelevantEventSelector { .querier_uris .iter() .all(|uri| event.uris.querier_uris.contains(uri)); + if !querier_match { + return false; + } - // Condition 3: The report’s trigger URI should be allowed by the event + // Condition 5: The report’s trigger URI should be allowed by the event // trigger URIs. let trigger_match = event .uris .trigger_uris .contains(&self.report_request_uris.trigger_uri); + if !trigger_match { + return false; + } + + // Condition 6: Apply the custom filter function on event's filter_data + let filter_data_match = (self.is_matching_event)(event.filter_data); + if !filter_data_match { + return false; + } - source_match - && querier_match - && trigger_match - && (self.is_matching_event)(event.filter_data) + true } } diff --git a/src/queries/traits.rs b/src/queries/traits.rs index d3d72ab..155acb1 100644 --- a/src/queries/traits.rs +++ b/src/queries/traits.rs @@ -3,7 +3,8 @@ use std::fmt::Debug; use crate::{ events::{ relevant_events::RelevantEvents, - traits::{EpochId, Event, RelevantEventSelector, Uri}, uri_set::UriSet, + traits::{EpochId, Event, RelevantEventSelector, Uri}, + uri_set::UriSet, }, mechanisms::{NoiseScale, NormType}, }; diff --git a/src/util/hashmap.rs b/src/util/hashmap.rs index 2b984e8..d836be3 100644 --- a/src/util/hashmap.rs +++ b/src/util/hashmap.rs @@ -1,5 +1,8 @@ -#[cfg(not(feature = "ahash"))] -pub use std::collections::{HashMap, HashSet}; +#[cfg(not(feature = "fxhash"))] +pub use std::collections::hash_map::RandomState; -#[cfg(feature = "ahash")] -pub use ahash::{AHashMap as HashMap, AHashSet as HashSet}; +#[cfg(feature = "fxhash")] +pub use rustc_hash::FxBuildHasher as RandomState; + +pub type HashMap = std::collections::HashMap; +pub type HashSet = std::collections::HashSet; diff --git a/tests/debug_reports_integration_test.rs b/tests/debug_reports_integration_test.rs index 67860bd..1fc5a70 100644 --- a/tests/debug_reports_integration_test.rs +++ b/tests/debug_reports_integration_test.rs @@ -7,7 +7,10 @@ mod experimental_feature_tests { traits::{EventStorage, EventUris}, }, pds::{ - aliases::{SimpleEventStorage, SimpleFilterStorage, SimplePds}, + aliases::{ + SimpleActionStorage, SimpleEventStorage, SimpleFilterStorage, + SimplePds, + }, quotas::{FilterId, StaticCapacities}, }, queries::{ @@ -29,6 +32,7 @@ mod experimental_feature_tests { ); let filters = SimpleFilterStorage::new(capacities)?; + let actions = SimpleActionStorage::new(None); let mut events = SimpleEventStorage::new(); // Add events across multiple epochs @@ -45,7 +49,7 @@ mod experimental_feature_tests { } } - Ok(SimplePds::new(filters, events)) + Ok(SimplePds::new(filters, actions, events)) } fn create_high_budget_request() -> SimpleLastTouchHistogramRequest { @@ -66,8 +70,8 @@ mod experimental_feature_tests { // data #[cfg(feature = "experimental")] #[test] - fn experimental_mode_provides_unfiltered_access( - ) -> Result<(), anyhow::Error> { + fn experimental_mode_provides_unfiltered_access() + -> Result<(), anyhow::Error> { use pdslib::queries::simple_last_touch_histogram::SimpleLastTouchHistogramReport; let mut pds = setup_constrained_pds()?; @@ -85,13 +89,13 @@ mod experimental_feature_tests { report_uris: ReportRequestUris::mock(), }; - let report = pds.compute_report(&simple_request)?; + let report = pds.compute_report(&simple_request, None)?; // In experimental mode, unfiltered_report should NOT be the default // empty report let default_report = SimpleLastTouchHistogramReport::default(); assert_ne!( - format!("{:?}", report.unfiltered_report), + format!("{:?}", report.unfiltered_report), format!("{:?}", default_report), "Experimental mode: unfiltered_report should contain actual data, not default empty report" ); @@ -103,14 +107,14 @@ mod experimental_feature_tests { // To activate the test, run `cargo test --no-default-features`. #[cfg(not(feature = "experimental"))] #[test] - fn production_mode_uses_default_unfiltered_report( - ) -> Result<(), anyhow::Error> { + fn production_mode_uses_default_unfiltered_report() + -> Result<(), anyhow::Error> { use pdslib::queries::simple_last_touch_histogram::SimpleLastTouchHistogramReport; let mut pds = setup_constrained_pds()?; let request = create_high_budget_request(); - let report = pds.compute_report(&request)?; + let report = pds.compute_report(&request, None)?; // In production mode, unfiltered_report should be the default report assert_eq!( @@ -124,12 +128,12 @@ mod experimental_feature_tests { // Test that core PDS behavior is consistent regardless of feature flag #[test] - fn core_behavior_and_budget_exhaustion_work_consistently( - ) -> Result<(), anyhow::Error> { + fn core_behavior_and_budget_exhaustion_work_consistently() + -> Result<(), anyhow::Error> { let mut pds = setup_constrained_pds()?; let request = create_high_budget_request(); - let report = pds.compute_report(&request)?; + let report = pds.compute_report(&request, None)?; // Core PDS behavior should work the same // filtered_report should always be populated correctly diff --git a/tests/ppa_bench.rs b/tests/ppa_bench.rs index 2b35a9a..7d72d8d 100644 --- a/tests/ppa_bench.rs +++ b/tests/ppa_bench.rs @@ -7,13 +7,14 @@ use pdslib::{ traits::{EventStorage as _, EventUris}, }, pds::{ - aliases::{PpaEventStorage, PpaFilterStorage, PpaPds}, + aliases::{ + PpaActionStorage, PpaEventStorage, PpaFilterStorage, PpaPds, + }, quotas::StaticCapacities, }, queries::{ ppa_histogram::{ PpaHistogramConfig, PpaHistogramRequest, PpaRelevantEventSelector, - RequestedBuckets, }, traits::ReportRequestUris, }, @@ -24,11 +25,14 @@ use pdslib::{ fn bench_compute_report() -> anyhow::Result<()> { let capacities = StaticCapacities::mock(); let filters = PpaFilterStorage::<&str>::new(capacities)?; + let actions = PpaActionStorage::<&str>::new(None); let events = PpaEventStorage::<&str>::new(); - let mut pds = - PpaPds::, PpaEventStorage<&str>, &str>::new( - filters, events, - ); + let mut pds = PpaPds::< + PpaFilterStorage<&str>, + PpaActionStorage<&str>, + PpaEventStorage<&str>, + &str, + >::new(filters, actions, events); let event_uris = EventUris { source_uri: "source", @@ -50,6 +54,7 @@ fn bench_compute_report() -> anyhow::Result<()> { timestamp: 1000 + epoch_id * 100 + event_id, epoch_number: epoch_id, histogram_index: event_id, + user_action_id: None, uris: event_uris.clone(), filter_data: 0, }; @@ -65,14 +70,10 @@ fn bench_compute_report() -> anyhow::Result<()> { requested_epsilon: 1.0, histogram_size: 1001, }; - let selector = PpaRelevantEventSelector { - report_request_uris: report_uris.clone(), - is_matching_event: Box::new(|_| true), - requested_buckets: RequestedBuckets::AllBuckets, - }; + let selector = PpaRelevantEventSelector::new(report_uris.clone()); let request = PpaHistogramRequest::new(&request_config, selector)?; - let report = pds.compute_report(&request)?; + let report = pds.compute_report(&request, None)?; assert!(!report.filtered_report.bin_values.is_empty()) } diff --git a/tests/ppa_demo.rs b/tests/ppa_demo.rs index b65ae24..37556b3 100644 --- a/tests/ppa_demo.rs +++ b/tests/ppa_demo.rs @@ -6,7 +6,9 @@ use pdslib::{ budget::traits::FilterStorage, events::{ppa_event::PpaEvent, traits::EventUris}, pds::{ - aliases::{PpaEventStorage, PpaFilterStorage, PpaPds}, + aliases::{ + PpaActionStorage, PpaEventStorage, PpaFilterStorage, PpaPds, + }, quotas::StaticCapacities, }, queries::{ @@ -22,9 +24,15 @@ fn main() -> Result<(), anyhow::Error> { logging::init_default_logging(); let capacities = StaticCapacities::mock(); let filters = PpaFilterStorage::new(capacities)?; + let actions = PpaActionStorage::new(None); let events = PpaEventStorage::new(); - let mut pds = PpaPds::<_>::new(filters, events); + let mut pds = PpaPds::< + PpaFilterStorage, + PpaActionStorage, + PpaEventStorage, + String, + >::new(filters, actions, events); let sample_event_uris = EventUris::mock(); let event_uris_irrelevant_due_to_source = EventUris { @@ -46,40 +54,38 @@ fn main() -> Result<(), anyhow::Error> { querier_uris: ["adtech.com".to_string()].into(), }; - let event1 = PpaEvent { + let default_event = PpaEvent { id: 1, timestamp: 0, epoch_number: 1, - histogram_index: 0x559, // 0x559 = "campaignCounts".to_string() | 0x400 + histogram_index: 0, + user_action_id: None, uris: sample_event_uris.clone(), filter_data: 1, }; + let event1 = PpaEvent { + histogram_index: 0x559, // 0x559 = "campaignCounts".to_string() | 0x400 + uris: sample_event_uris.clone(), + ..default_event.clone() + }; + let event_irr_1 = PpaEvent { - id: 1, - timestamp: 0, - epoch_number: 1, histogram_index: 0x559, // 0x559 = "campaignCounts".to_string() | 0x400 uris: event_uris_irrelevant_due_to_source.clone(), - filter_data: 1, + ..default_event.clone() }; let event_irr_2 = PpaEvent { - id: 1, - timestamp: 0, - epoch_number: 1, histogram_index: 0x559, // 0x559 = "campaignCounts".to_string() | 0x400 uris: event_uris_irrelevant_due_to_trigger.clone(), - filter_data: 1, + ..default_event.clone() }; let event_irr_3 = PpaEvent { - id: 1, - timestamp: 0, - epoch_number: 1, histogram_index: 0x559, // 0x559 = "campaignCounts".to_string() | 0x400 uris: event_uris_irrelevant_due_to_querier.clone(), - filter_data: 1, + ..default_event.clone() }; pds.register_event(event1.clone())?; @@ -98,16 +104,16 @@ fn main() -> Result<(), anyhow::Error> { histogram_size: 2048, }, PpaRelevantEventSelector { - report_request_uris: sample_report_request_uris.clone(), is_matching_event: Box::new(|event_filter_data: u64| { event_filter_data == 1 }), requested_buckets: vec![0x559].into(), + ..PpaRelevantEventSelector::new(sample_report_request_uris.clone()) }, // Not filtering yet. ) .unwrap(); - let report1 = pds.compute_report(&request1).unwrap(); + let report1 = pds.compute_report(&request1, None).unwrap(); info!("Report1: {report1:?}"); let bin_values1 = &report1.filtered_report.bin_values; @@ -128,11 +134,11 @@ fn main() -> Result<(), anyhow::Error> { histogram_size: 2048, }, PpaRelevantEventSelector { - report_request_uris: sample_report_request_uris.clone(), is_matching_event: Box::new(|event_filter_data: u64| { event_filter_data == 1 }), requested_buckets: vec![0x559].into(), + ..PpaRelevantEventSelector::new(sample_report_request_uris.clone()) }, // Not filtering yet. ); assert!(request2.is_err()); @@ -147,16 +153,16 @@ fn main() -> Result<(), anyhow::Error> { histogram_size: 2048, }, PpaRelevantEventSelector { - report_request_uris: sample_report_request_uris.clone(), is_matching_event: Box::new(|event_filter_data: u64| { event_filter_data != 1 }), requested_buckets: vec![0x559].into(), + ..PpaRelevantEventSelector::new(sample_report_request_uris.clone()) }, // Not filtering yet. ) .unwrap(); - let report3 = pds.compute_report(&request3).unwrap(); + let report3 = pds.compute_report(&request3, None).unwrap(); info!("Report3: {report3:?}"); // No event attributed because the lambda logic filters out the only diff --git a/tests/ppa_generic_uri.rs b/tests/ppa_generic_uri.rs index 8a4db0a..c9d7d3b 100644 --- a/tests/ppa_generic_uri.rs +++ b/tests/ppa_generic_uri.rs @@ -7,7 +7,10 @@ use pdslib::{ hashmap_event_storage::HashMapEventStorage, ppa_event::PpaEvent, traits::EventUris, }, - pds::{private_data_service::PrivateDataService, quotas::StaticCapacities}, + pds::{ + aliases::PpaActionStorage, private_data_service::PrivateDataService, + quotas::StaticCapacities, + }, queries::{ ppa_histogram::{ PpaHistogramConfig, PpaHistogramRequest, PpaRelevantEventSelector, @@ -37,9 +40,11 @@ fn main() -> Result<(), anyhow::Error> { let capacities = StaticCapacities::mock(); let filters: HashMapFilterStorage = HashMapFilterStorage::new(capacities)?; + let actions = PpaActionStorage::new(None); - let mut pds = - PrivateDataService::<_, _, _, anyhow::Error>::new(filters, events); + let mut pds = PrivateDataService::<_, _, _, _, anyhow::Error>::new( + filters, actions, events, + ); let event_uris = EventUris { source_uri: CustomUri {}, @@ -57,14 +62,15 @@ fn main() -> Result<(), anyhow::Error> { timestamp: 1, epoch_number: 1, histogram_index: 1, + user_action_id: None, uris: event_uris.clone(), filter_data: 1, }; let always_relevant_event_selector = TestRelevantEventSelector { - report_request_uris: report_uris.clone(), is_matching_event: Box::new(|_| true), requested_buckets: RequestedBuckets::AllBuckets, + ..TestRelevantEventSelector::new(report_uris.clone()) }; pds.register_event(event.clone())?; @@ -80,7 +86,7 @@ fn main() -> Result<(), anyhow::Error> { let report_request = TestHistogramRequest::new(&config, always_relevant_event_selector) .unwrap(); - let _report = pds.compute_report(&report_request)?; + let _report = pds.compute_report(&report_request, None)?; Ok(()) } diff --git a/tests/ppa_workflow.rs b/tests/ppa_workflow.rs index c2d8136..1abe028 100644 --- a/tests/ppa_workflow.rs +++ b/tests/ppa_workflow.rs @@ -5,7 +5,10 @@ use pdslib::{ budget::traits::FilterStorage, events::{simple_event::SimpleEvent, traits::EventUris}, pds::{ - aliases::{SimpleEventStorage, SimpleFilterStorage, SimplePds}, + aliases::{ + SimpleActionStorage, SimpleEventStorage, SimpleFilterStorage, + SimplePds, + }, quotas::StaticCapacities, }, queries::{ @@ -26,8 +29,9 @@ fn main() -> Result<(), anyhow::Error> { // Set up storage and Private Data Service. let capacities = StaticCapacities::mock(); let filters = SimpleFilterStorage::new(capacities)?; + let actions = SimpleActionStorage::new(None); let events = SimpleEventStorage::new(); - let mut pds = SimplePds::new(filters, events); + let mut pds = SimplePds::new(filters, actions, events); let sample_event_uris = EventUris::mock(); let sample_report_uris = ReportRequestUris { @@ -76,7 +80,7 @@ fn main() -> Result<(), anyhow::Error> { }; // Measure conversion. - let report = pds.compute_report(&report_request)?; + let report = pds.compute_report(&report_request, None)?; // Look at the histogram stored in the report (unencrypted here). assert_eq!( diff --git a/tests/simple_events_demo.rs b/tests/simple_events_demo.rs index 5df7939..d36793c 100644 --- a/tests/simple_events_demo.rs +++ b/tests/simple_events_demo.rs @@ -5,7 +5,10 @@ use pdslib::{ budget::{pure_dp_filter::PureDPBudget, traits::FilterStorage}, events::{simple_event::SimpleEvent, traits::EventUris}, pds::{ - aliases::{SimpleEventStorage, SimpleFilterStorage, SimplePds}, + aliases::{ + SimpleActionStorage, SimpleEventStorage, SimpleFilterStorage, + SimplePds, + }, quotas::StaticCapacities, }, queries::{ @@ -28,8 +31,9 @@ fn main() -> Result<(), anyhow::Error> { PureDPBudget::from(8.0), ); let filters = SimpleFilterStorage::new(capacities)?; + let actions = SimpleActionStorage::new(None); - let mut pds = SimplePds::new(filters, events); + let mut pds = SimplePds::new(filters, actions, events); let sample_event_uris = EventUris::mock(); let sample_report_uris = ReportRequestUris { @@ -77,7 +81,7 @@ fn main() -> Result<(), anyhow::Error> { is_relevant_event: always_relevant_event_selector, report_uris: sample_report_uris.clone(), }; - let report = pds.compute_report(&report_request)?; + let report = pds.compute_report(&report_request, None)?; let bucket = Some((event.event_key, 3.0)); assert_eq!(report.filtered_report.bin_value, bucket); @@ -87,17 +91,15 @@ fn main() -> Result<(), anyhow::Error> { let report_request2 = SimpleLastTouchHistogramRequest { epoch_start: 1, epoch_end: 1, //test restricting the end epoch - report_global_sensitivity: 0.1, /* Even 0.1 should be enough to go - * over the - * limit as the current budget left - * for - * epoch 1 is 0. */ + // Even 0.1 should be enough to go over the limit as the current budget + // left for epoch 1 is 0. + report_global_sensitivity: 0.1, query_global_sensitivity: 5.0, requested_epsilon: 5.0, is_relevant_event: always_relevant_event_selector, report_uris: sample_report_uris.clone(), }; - let report2 = pds.compute_report(&report_request2)?; + let report2 = pds.compute_report(&report_request2, None)?; // Allocated budget for epoch 1 is 3.0, but 3.0 has already been consumed in // the last request, so the budget is depleted. Now, the null report should // be returned for this additional query. @@ -112,7 +114,7 @@ fn main() -> Result<(), anyhow::Error> { is_relevant_event: always_relevant_event_selector, report_uris: sample_report_uris.clone(), }; - let report2 = pds.compute_report(&report_request2)?; + let report2 = pds.compute_report(&report_request2, None)?; let bucket2 = Some((event2.event_key, 3.0)); assert_eq!(report2.filtered_report.bin_value, bucket2); @@ -126,7 +128,7 @@ fn main() -> Result<(), anyhow::Error> { is_relevant_event: always_relevant_event_selector, report_uris: sample_report_uris.clone(), }; - let report3_empty = pds.compute_report(&report_request3_empty)?; + let report3_empty = pds.compute_report(&report_request3_empty, None)?; assert_eq!(report3_empty.filtered_report.bin_value, None); // Test restricting report_global_sensitivity @@ -141,7 +143,7 @@ fn main() -> Result<(), anyhow::Error> { report_uris: sample_report_uris.clone(), }; let report3_over_budget = - pds.compute_report(&report_request3_over_budget)?; + pds.compute_report(&report_request3_over_budget, None)?; assert_eq!(report3_over_budget.filtered_report.bin_value, None); // This tests the case where we meet the first event in epoch 3, below the @@ -155,7 +157,7 @@ fn main() -> Result<(), anyhow::Error> { is_relevant_event: always_relevant_event_selector, report_uris: sample_report_uris.clone(), }; - let report3 = pds.compute_report(&report_request3)?; + let report3 = pds.compute_report(&report_request3, None)?; let bucket3 = Some((event3.event_key, 3.0)); assert_eq!(report3.filtered_report.bin_value, bucket3); @@ -171,7 +173,7 @@ fn main() -> Result<(), anyhow::Error> { }, report_uris: sample_report_uris.clone(), }; - let report4 = pds.compute_report(&report_request4)?; + let report4 = pds.compute_report(&report_request4, None)?; let bucket4: Option<(u64, f64)> = None; assert_eq!(report4.filtered_report.bin_value, bucket4);