Skip to content

Commit b047dea

Browse files
committed
- Rework location cache to have 2 hashmap and not cleanup anymore
- Remove location_validation_timestamp from cbrs_heartbeats - Handle GatewayResolution::GatewayNotAsserted for cbrs and insert in cache
1 parent aac6330 commit b047dea

File tree

5 files changed

+96
-93
lines changed

5 files changed

+96
-93
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
ALTER TABLE cbrs_heartbeats
2-
ADD COLUMN location_validation_timestamp TIMESTAMPTZ,
32
ADD COLUMN lat DOUBLE PRECISION NOT NULL DEFAULT 0.0,
43
ADD COLUMN lon DOUBLE PRECISION NOT NULL DEFAULT 0.0;

mobile_verifier/src/heartbeats/location_cache.rs

+61-46
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ use file_store::radio_location_estimates::Entity;
33
use helium_crypto::PublicKeyBinary;
44
use sqlx::PgPool;
55
use std::{collections::HashMap, sync::Arc};
6-
use tokio::sync::Mutex;
7-
use tracing::info;
6+
use tokio::sync::{Mutex, MutexGuard};
87

98
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
109
pub enum LocationCacheKey {
@@ -29,56 +28,57 @@ impl LocationCacheValue {
2928
}
3029
}
3130

31+
type LocationCacheData = HashMap<LocationCacheKey, LocationCacheValue>;
32+
3233
/// A cache WiFi/Cbrs heartbeat locations
3334
#[derive(Clone)]
3435
pub struct LocationCache {
3536
pool: PgPool,
36-
data: Arc<Mutex<HashMap<LocationCacheKey, LocationCacheValue>>>,
37+
wifi: Arc<Mutex<LocationCacheData>>,
38+
cbrs: Arc<Mutex<LocationCacheData>>,
3739
}
3840

3941
impl LocationCache {
4042
pub fn new(pool: &PgPool) -> Self {
41-
let data = Arc::new(Mutex::new(
42-
HashMap::<LocationCacheKey, LocationCacheValue>::new(),
43-
));
44-
let data_clone = data.clone();
45-
tokio::spawn(async move {
46-
loop {
47-
// Sleep 1 hour
48-
let duration = core::time::Duration::from_secs(60 * 60);
49-
tokio::time::sleep(duration).await;
50-
51-
let now = Utc::now();
52-
// Set the 12-hour threshold
53-
let twelve_hours_ago = now - Duration::hours(12);
54-
55-
let mut data = data_clone.lock().await;
56-
let size_before = data.len() as f64;
57-
58-
// Retain only values that are within the last 12 hours
59-
data.retain(|_, v| v.timestamp > twelve_hours_ago);
60-
61-
let size_after = data.len() as f64;
62-
info!("cleaned {}", size_before - size_after);
63-
}
64-
});
43+
let wifi = Arc::new(Mutex::new(HashMap::new()));
44+
let cbrs = Arc::new(Mutex::new(HashMap::new()));
6545
// TODO: We could spawn an hydrate from DB here?
6646
Self {
6747
pool: pool.clone(),
68-
data,
48+
wifi,
49+
cbrs,
6950
}
7051
}
7152

7253
pub async fn get(&self, key: LocationCacheKey) -> anyhow::Result<Option<LocationCacheValue>> {
7354
{
74-
let data = self.data.lock().await;
55+
let data = self.key_to_lock(&key).await;
56+
if let Some(&value) = data.get(&key) {
57+
return Ok(Some(value));
58+
}
59+
}
60+
match key {
61+
LocationCacheKey::WifiPubKey(pub_key_bin) => {
62+
self.fetch_wifi_and_insert(pub_key_bin).await
63+
}
64+
LocationCacheKey::CbrsId(id) => self.fetch_cbrs_and_insert(id).await,
65+
}
66+
}
67+
68+
pub async fn get_recent(
69+
&self,
70+
key: LocationCacheKey,
71+
when: Duration,
72+
) -> anyhow::Result<Option<LocationCacheValue>> {
73+
{
74+
let data = self.key_to_lock(&key).await;
7575
if let Some(&value) = data.get(&key) {
7676
let now = Utc::now();
77-
let twelve_hours_ago = now - Duration::hours(12);
78-
if value.timestamp > twelve_hours_ago {
79-
return Ok(None);
80-
} else {
77+
let before = now - when;
78+
if value.timestamp > before {
8179
return Ok(Some(value));
80+
} else {
81+
return Ok(None);
8282
}
8383
}
8484
}
@@ -90,28 +90,41 @@ impl LocationCache {
9090
}
9191
}
9292

93-
pub async fn get_all(&self) -> HashMap<LocationCacheKey, LocationCacheValue> {
94-
let data = self.data.lock().await;
95-
data.clone()
93+
pub async fn get_all(&self) -> LocationCacheData {
94+
let wifi_data = self.wifi.lock().await;
95+
let mut wifi_data_cloned = wifi_data.clone();
96+
97+
let cbrs_data = self.cbrs.lock().await;
98+
let cbrs_data_cloned = cbrs_data.clone();
99+
100+
wifi_data_cloned.extend(cbrs_data_cloned);
101+
wifi_data_cloned
96102
}
97103

98104
pub async fn insert(
99105
&self,
100106
key: LocationCacheKey,
101107
value: LocationCacheValue,
102108
) -> anyhow::Result<()> {
103-
let mut data = self.data.lock().await;
109+
let mut data = self.key_to_lock(&key).await;
104110
data.insert(key, value);
105111
Ok(())
106112
}
107113

108114
/// Only used for testing.
109115
pub async fn remove(&self, key: LocationCacheKey) -> anyhow::Result<()> {
110-
let mut data = self.data.lock().await;
116+
let mut data = self.key_to_lock(&key).await;
111117
data.remove(&key);
112118
Ok(())
113119
}
114120

121+
async fn key_to_lock(&self, key: &LocationCacheKey) -> MutexGuard<'_, LocationCacheData> {
122+
match key {
123+
LocationCacheKey::WifiPubKey(_) => self.wifi.lock().await,
124+
LocationCacheKey::CbrsId(_) => self.cbrs.lock().await,
125+
}
126+
}
127+
115128
async fn fetch_wifi_and_insert(
116129
&self,
117130
pub_key_bin: PublicKeyBinary,
@@ -134,8 +147,9 @@ impl LocationCache {
134147
match sqlx_return {
135148
None => Ok(None),
136149
Some(value) => {
137-
let mut data = self.data.lock().await;
138-
data.insert(LocationCacheKey::WifiPubKey(pub_key_bin), value);
150+
let key = LocationCacheKey::WifiPubKey(pub_key_bin);
151+
let mut data = self.key_to_lock(&key).await;
152+
data.insert(key, value);
139153
Ok(Some(value))
140154
}
141155
}
@@ -147,12 +161,12 @@ impl LocationCache {
147161
) -> anyhow::Result<Option<LocationCacheValue>> {
148162
let sqlx_return: Option<LocationCacheValue> = sqlx::query_as(
149163
r#"
150-
SELECT lat, lon, location_validation_timestamp AS timestamp
164+
SELECT lat, lon, latest_timestamp AS timestamp
151165
FROM cbrs_heartbeats
152-
WHERE location_validation_timestamp IS NOT NULL
153-
AND location_validation_timestamp >= $1
166+
WHERE latest_timestamp IS NOT NULL
167+
AND latest_timestamp >= $1
154168
AND hotspot_key = $2
155-
ORDER BY location_validation_timestamp DESC
169+
ORDER BY latest_timestamp DESC
156170
LIMIT 1
157171
"#,
158172
)
@@ -164,8 +178,9 @@ impl LocationCache {
164178
match sqlx_return {
165179
None => Ok(None),
166180
Some(value) => {
167-
let mut data = self.data.lock().await;
168-
data.insert(LocationCacheKey::CbrsId(cbsd_id), value);
181+
let key = LocationCacheKey::CbrsId(cbsd_id);
182+
let mut data = self.key_to_lock(&key).await;
183+
data.insert(key, value);
169184
Ok(Some(value))
170185
}
171186
}

mobile_verifier/src/heartbeats/mod.rs

+32-37
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,6 @@ impl ValidatedHeartbeat {
482482
proto::HeartbeatValidity::UnsupportedLocation,
483483
));
484484
}
485-
486485
match gateway_info_resolver
487486
.resolve_gateway(&heartbeat.hotspot_key)
488487
.await?
@@ -495,34 +494,6 @@ impl ValidatedHeartbeat {
495494
Some(coverage_object.meta),
496495
proto::HeartbeatValidity::InvalidDeviceType,
497496
)),
498-
// TODO do we get there when CBRS?
499-
// Should I then update location form here then?
500-
GatewayResolution::GatewayNotFound if heartbeat.hb_type == HbType::Cbrs => {
501-
if let (Some(location_validation_timestamp), Some(cbsd_id)) = (
502-
heartbeat.location_validation_timestamp,
503-
heartbeat.cbsd_id.clone(),
504-
) {
505-
location_cache
506-
.insert(
507-
LocationCacheKey::CbrsId(cbsd_id),
508-
LocationCacheValue::new(
509-
heartbeat.lat,
510-
heartbeat.lon,
511-
location_validation_timestamp,
512-
),
513-
)
514-
.await?;
515-
};
516-
517-
Ok(Self::new(
518-
heartbeat,
519-
cell_type,
520-
dec!(0),
521-
None,
522-
Some(coverage_object.meta),
523-
proto::HeartbeatValidity::GatewayNotFound,
524-
))
525-
}
526497
GatewayResolution::GatewayNotFound => Ok(Self::new(
527498
heartbeat,
528499
cell_type,
@@ -531,22 +502,47 @@ impl ValidatedHeartbeat {
531502
Some(coverage_object.meta),
532503
proto::HeartbeatValidity::GatewayNotFound,
533504
)),
534-
GatewayResolution::GatewayNotAsserted if heartbeat.hb_type == HbType::Wifi => {
535-
Ok(Self::new(
505+
GatewayResolution::GatewayNotAsserted => match heartbeat.hb_type {
506+
HbType::Wifi => Ok(Self::new(
536507
heartbeat,
537508
cell_type,
538509
dec!(0),
539510
None,
540511
Some(coverage_object.meta),
541512
proto::HeartbeatValidity::GatewayNotAsserted,
542-
))
543-
}
513+
)),
514+
HbType::Cbrs => {
515+
if let Some(cbsd_id) = heartbeat.cbsd_id.clone() {
516+
location_cache
517+
.insert(
518+
LocationCacheKey::CbrsId(cbsd_id),
519+
LocationCacheValue::new(
520+
heartbeat.lat,
521+
heartbeat.lon,
522+
heartbeat.timestamp,
523+
),
524+
)
525+
.await?;
526+
};
527+
Ok(Self::new(
528+
heartbeat,
529+
cell_type,
530+
dec!(1.0),
531+
None,
532+
Some(coverage_object.meta),
533+
proto::HeartbeatValidity::Valid,
534+
))
535+
}
536+
},
544537
GatewayResolution::AssertedLocation(location) if heartbeat.hb_type == HbType::Wifi => {
545538
let asserted_latlng: LatLng = CellIndex::try_from(location)?.into();
546539
let is_valid = match heartbeat.location_validation_timestamp {
547540
None => {
548541
if let Some(last_location) = location_cache
549-
.get(LocationCacheKey::WifiPubKey(heartbeat.hotspot_key.clone()))
542+
.get_recent(
543+
LocationCacheKey::WifiPubKey(heartbeat.hotspot_key.clone()),
544+
Duration::hours(12),
545+
)
550546
.await?
551547
{
552548
heartbeat.lat = last_location.lat;
@@ -702,8 +698,8 @@ impl ValidatedHeartbeat {
702698
let truncated_timestamp = self.truncated_timestamp()?;
703699
sqlx::query(
704700
r#"
705-
INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, location_validation_timestamp, lat, lon)
706-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
701+
INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, lat, lon)
702+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
707703
ON CONFLICT (cbsd_id, truncated_timestamp) DO UPDATE SET
708704
latest_timestamp = EXCLUDED.latest_timestamp,
709705
coverage_object = EXCLUDED.coverage_object
@@ -716,7 +712,6 @@ impl ValidatedHeartbeat {
716712
.bind(truncated_timestamp)
717713
.bind(self.heartbeat.coverage_object)
718714
.bind(self.location_trust_score_multiplier)
719-
.bind(self.heartbeat.location_validation_timestamp)
720715
.bind(self.heartbeat.lat)
721716
.bind(self.heartbeat.lon)
722717
.execute(&mut *exec)

mobile_verifier/src/rewarder.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -533,15 +533,15 @@ async fn reward_poc(
533533
fn is_within_radius(
534534
loc_lat: f64,
535535
loc_lon: f64,
536-
comparators: Vec<(Decimal, Decimal, Decimal)>,
536+
estimates: Vec<(Decimal, Decimal, Decimal)>,
537537
) -> anyhow::Result<bool> {
538538
let resolution = Resolution::Twelve;
539539

540540
let point_a = LatLng::new(loc_lat, loc_lon)
541541
.map_err(|e| anyhow::anyhow!("Invalid LatLng for A: {}", e))?;
542542
let h3_index_a = point_a.to_cell(resolution);
543543

544-
for (radius_meters, lat, lon) in comparators {
544+
for (radius_meters, lat, lon) in estimates {
545545
let lat_f64 = lat
546546
.to_f64()
547547
.ok_or_else(|| anyhow::anyhow!("Failed to convert lat_b to f64"))?;

mobile_verifier/tests/integrations/last_location.rs

+1-7
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,7 @@ async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours(
181181

182182
let validated_heartbeat_1 = ValidatedHeartbeat::validate(
183183
heartbeat(&hotspot, &coverage_object)
184-
.location_validation_timestamp(Utc::now())
185-
.timestamp(Utc::now() - Duration::hours(12) - Duration::seconds(1))
184+
.location_validation_timestamp(Utc::now() - Duration::hours(12) - Duration::seconds(1))
186185
.build(),
187186
&GatewayClientAllOwnersValid,
188187
&coverage_objects,
@@ -248,11 +247,6 @@ impl HeartbeatBuilder {
248247
self
249248
}
250249

251-
fn timestamp(mut self, ts: DateTime<Utc>) -> Self {
252-
self.timestamp = Some(ts);
253-
self
254-
}
255-
256250
fn build(self) -> Heartbeat {
257251
let (lat, lon) = self.latlng.unwrap_or_else(|| {
258252
let lat_lng: LatLng = self

0 commit comments

Comments
 (0)