Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions client/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
DownloadUrlResponse,
GetDownloadUrlPayload,
GetUploadUrlPayload,
HeartbeatResponsePayload,
RegisterResponse,
RegisterOffboardingResponse,
RegisterPushToken,
Expand Down Expand Up @@ -160,6 +161,12 @@ export const submitInvoice = (payload: SubmitInvoicePayload & { k1?: string }) =
payload,
);

export const heartbeatResponse = (payload: HeartbeatResponsePayload & { k1?: string }) =>
post<HeartbeatResponsePayload & { k1?: string }, DefaultSuccessPayload>(
"/heartbeat_response",
payload,
);

export const getK1 = async (): Promise<Result<string, Error>> => {
const headers: Record<string, string> = {
"Content-Type": "application/json",
Expand Down
21 changes: 20 additions & 1 deletion client/src/lib/pushNotifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
submitInvoice,
triggerBackupTask,
} from "./tasks";
import { registerPushToken, reportJobStatus } from "~/lib/api";
import { registerPushToken, reportJobStatus, heartbeatResponse } from "~/lib/api";
import { err, ok, Result, ResultAsync } from "neverthrow";
import { NotificationsData, ReportType } from "~/types/serverTypes";

Expand Down Expand Up @@ -127,6 +127,25 @@ TaskManager.defineTask<Notifications.NotificationTaskPayload>(
}
const result = await offboardTask(notificationData.offboarding_request_id);
await handleTaskCompletion("offboarding", result, notificationData.k1);
} else if (notificationData.notification_type === "heartbeat") {
log.i("Received heartbeat notification", [notificationData]);
if (!notificationData.notification_id || !notificationData.k1) {
log.w("Invalid heartbeat notification - missing notification_id or k1", [
notificationData,
]);
return;
}

const heartbeatResult = await heartbeatResponse({
notification_id: notificationData.notification_id,
k1: notificationData.k1,
});

if (heartbeatResult.isErr()) {
log.w("Failed to respond to heartbeat", [heartbeatResult.error]);
} else {
log.d("Successfully responded to heartbeat", [notificationData.notification_id]);
}
}
})(),
(e) =>
Expand Down
6 changes: 4 additions & 2 deletions client/src/types/serverTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ export type GetDownloadUrlPayload = { backup_version: number | null, };

export type GetUploadUrlPayload = { backup_version: number, };

export type NotificationTypes = "background_sync" | "maintenance" | "lightning_invoice_request" | "backup_trigger" | "offboarding";
export type HeartbeatResponsePayload = { notification_id: string, };

export type NotificationsData = { notification_type: NotificationTypes, k1: string | null, transaction_id: string | null, amount: number | null, offboarding_request_id: string | null, };
export type NotificationTypes = "background_sync" | "maintenance" | "lightning_invoice_request" | "backup_trigger" | "offboarding" | "heartbeat";

export type NotificationsData = { notification_type: NotificationTypes, k1: string | null, transaction_id: string | null, amount: number | null, offboarding_request_id: string | null, notification_id: string | null, };

export type RegisterOffboardingResponse = { success: boolean, request_id: string, };

Expand Down
2 changes: 2 additions & 0 deletions server/src/ark_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub async fn maintenance(app_state: AppState) -> anyhow::Result<()> {
transaction_id: None,
amount: None,
offboarding_request_id: None,
notification_id: None,
};

if let Err(e) = send_push_notification_with_unique_k1(app_state, notification_data, None).await
Expand Down Expand Up @@ -174,6 +175,7 @@ pub async fn handle_offboarding_requests(app_state: AppState) -> anyhow::Result<
transaction_id: None,
amount: None,
offboarding_request_id: Some(request.request_id.clone()),
notification_id: None,
};

if let Err(e) = send_push_notification_with_unique_k1(
Expand Down
4 changes: 3 additions & 1 deletion server/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ pub const DEFAULT_PRIVATE_PORT: &str = "3099";
pub const DEFAULT_LNURL_DOMAIN: &str = "localhost";
pub const DEFAULT_SERVER_NETWORK: &str = "regtest";
pub const DEFAULT_BACKUP_CRON: &str = "every 2 hours";
pub const DEFAULT_BACKGROUND_SYNC_CRON: &str = "every 2 hours";
pub const DEFAULT_BACKGROUND_SYNC_CRON: &str = "every 48 hours";
pub const DEFAULT_HEARTBEAT_CRON: &str = "every 48 hours";
pub const DEFAULT_DEREGISTER_CRON: &str = "every 12 hours";
116 changes: 115 additions & 1 deletion server/src/cron.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{
AppState,
constants::{self, EnvVariables},
db::backup_repo::BackupRepository,
db::{
backup_repo::BackupRepository, heartbeat_repo::HeartbeatRepository,
offboarding_repo::OffboardingRepository, push_token_repo::PushTokenRepository,
},
push::{send_push_notification, send_push_notification_with_unique_k1},
types::{NotificationTypes, NotificationsData},
};
Expand All @@ -17,6 +20,7 @@ async fn background_sync(app_state: AppState) {
transaction_id: None,
amount: None,
offboarding_request_id: None,
notification_id: None,
})
.unwrap(),
priority: "high".to_string(),
Expand Down Expand Up @@ -44,6 +48,7 @@ pub async fn send_backup_notifications(app_state: AppState) -> anyhow::Result<()
transaction_id: None,
amount: None,
offboarding_request_id: None,
notification_id: None,
};
if let Err(e) = send_push_notification_with_unique_k1(
app_state.clone(),
Expand All @@ -59,6 +64,91 @@ pub async fn send_backup_notifications(app_state: AppState) -> anyhow::Result<()
Ok(())
}

pub async fn send_heartbeat_notifications(app_state: AppState) -> anyhow::Result<()> {
let conn = app_state.db.connect()?;
let heartbeat_repo = HeartbeatRepository::new(&conn);

let active_users = heartbeat_repo.get_active_users().await?;
tracing::info!(
"Sending heartbeat notifications to {} active users",
active_users.len()
);

for pubkey in active_users {
let notification_id = heartbeat_repo.create_notification(&pubkey).await?;

let notification_data = NotificationsData {
notification_type: NotificationTypes::Heartbeat,
k1: None,
transaction_id: None,
amount: None,
offboarding_request_id: None,
notification_id: Some(notification_id),
};

if let Err(e) = send_push_notification_with_unique_k1(
app_state.clone(),
notification_data,
Some(pubkey.clone()),
)
.await
{
tracing::error!("Failed to send heartbeat notification to {}: {}", pubkey, e);
}
}

// Cleanup old notifications
heartbeat_repo.cleanup_old_notifications().await?;

Ok(())
}

pub async fn check_and_deregister_inactive_users(app_state: AppState) -> anyhow::Result<()> {
let conn = app_state.db.connect()?;
let heartbeat_repo = HeartbeatRepository::new(&conn);

let users_to_deregister = heartbeat_repo.get_users_to_deregister().await?;

if users_to_deregister.is_empty() {
return Ok(());
}

tracing::info!("Deregistering {} inactive users", users_to_deregister.len());

for pubkey in users_to_deregister {
tracing::info!("Deregistering inactive user: {}", pubkey);

// Use a transaction to ensure all or nothing is deleted
let tx = conn.transaction().await?;

if let Err(e) = PushTokenRepository::delete_by_pubkey(&tx, &pubkey).await {
tracing::error!("Failed to delete push token for {}: {}", pubkey, e);
continue;
}

if let Err(e) = OffboardingRepository::delete_by_pubkey(&tx, &pubkey).await {
tracing::error!(
"Failed to delete offboarding requests for {}: {}",
pubkey,
e
);
continue;
}

if let Err(e) = tx.commit().await {
tracing::error!(
"Failed to commit deregistration transaction for {}: {}",
pubkey,
e
);
} else {
tracing::info!("Successfully deregistered inactive user: {}", pubkey);
}
}

Ok(())
}

pub async fn cron_scheduler(app_state: AppState) -> anyhow::Result<JobScheduler> {
let sched = JobScheduler::new().await?;

Expand Down Expand Up @@ -86,5 +176,29 @@ pub async fn cron_scheduler(app_state: AppState) -> anyhow::Result<JobScheduler>
})?;
sched.add(backup_job).await?;

// Heartbeat notifications - every 48 hours
let heartbeat_app_state = app_state.clone();
let heartbeat_job = Job::new_async(constants::DEFAULT_HEARTBEAT_CRON, move |_, _| {
let app_state = heartbeat_app_state.clone();
Box::pin(async move {
if let Err(e) = send_heartbeat_notifications(app_state).await {
tracing::error!("Failed to send heartbeat notifications: {}", e);
}
})
})?;
sched.add(heartbeat_job).await?;

// Check for inactive users - every 12 hours
let inactive_check_app_state = app_state.clone();
let inactive_check_job = Job::new_async(constants::DEFAULT_DEREGISTER_CRON, move |_, _| {
let app_state = inactive_check_app_state.clone();
Box::pin(async move {
if let Err(e) = check_and_deregister_inactive_users(app_state).await {
tracing::error!("Failed to check and deregister inactive users: {}", e);
}
})
})?;
sched.add(inactive_check_job).await?;

Ok(sched)
}
127 changes: 127 additions & 0 deletions server/src/db/heartbeat_repo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use anyhow::Result;
use uuid::Uuid;

pub struct HeartbeatRepository<'a> {
conn: &'a libsql::Connection,
}

impl<'a> HeartbeatRepository<'a> {
pub fn new(conn: &'a libsql::Connection) -> Self {
Self { conn }
}

/// Creates a new heartbeat notification record
pub async fn create_notification(&self, pubkey: &str) -> Result<String> {
let notification_id = Uuid::new_v4().to_string();

self.conn
.execute(
"INSERT INTO heartbeat_notifications (pubkey, notification_id, status) VALUES (?, ?, 'pending')",
libsql::params![pubkey, notification_id.clone()],
)
.await?;

Ok(notification_id)
}

/// Marks a heartbeat notification as responded
pub async fn mark_as_responded(&self, notification_id: &str) -> Result<bool> {
let result = self.conn
.execute(
"UPDATE heartbeat_notifications SET responded_at = CURRENT_TIMESTAMP, status = 'responded' WHERE notification_id = ? AND status = 'pending'",
libsql::params![notification_id],
)
.await?;

Ok(result > 0)
}

/// Counts consecutive missed heartbeats for a user (most recent first)
pub async fn count_consecutive_missed(&self, pubkey: &str) -> Result<i32> {
let mut rows = self.conn
.query(
"SELECT status FROM heartbeat_notifications WHERE pubkey = ? ORDER BY sent_at DESC LIMIT 10",
libsql::params![pubkey],
)
.await?;

let mut consecutive_missed = 0;
while let Some(row) = rows.next().await? {
let status: String = row.get(0)?;
if status == "pending" {
consecutive_missed += 1;
} else {
break;
}
}

Ok(consecutive_missed)
}

/// Gets all users who have push tokens (active users)
pub async fn get_active_users(&self) -> Result<Vec<String>> {
let mut rows = self.conn
.query(
"SELECT DISTINCT pt.pubkey FROM push_tokens pt INNER JOIN users u ON pt.pubkey = u.pubkey",
(),
)
.await?;

let mut pubkeys = Vec::new();
while let Some(row) = rows.next().await? {
let pubkey: String = row.get(0)?;
pubkeys.push(pubkey);
}

Ok(pubkeys)
}

/// Cleans up old heartbeat notifications (keeps only last 15 per user)
pub async fn cleanup_old_notifications(&self) -> Result<()> {
self.conn
.execute(
"DELETE FROM heartbeat_notifications WHERE id NOT IN (
SELECT id FROM (
SELECT id, ROW_NUMBER() OVER (PARTITION BY pubkey ORDER BY sent_at DESC) as rn
FROM heartbeat_notifications
) ranked WHERE rn <= 15
)",
(),
)
.await?;

Ok(())
}

/// Gets users who have missed 10 or more consecutive heartbeats
pub async fn get_users_to_deregister(&self) -> Result<Vec<String>> {
let mut rows = self
.conn
.query(
"WITH recent_heartbeats AS (
SELECT pubkey, status, sent_at,
ROW_NUMBER() OVER (PARTITION BY pubkey ORDER BY sent_at DESC) as rn
FROM heartbeat_notifications
),
consecutive_missed AS (
SELECT pubkey,
COUNT(*) as missed_count
FROM recent_heartbeats
WHERE rn <= 10 AND status = 'pending'
GROUP BY pubkey
HAVING COUNT(*) = 10
)
SELECT pubkey FROM consecutive_missed",
(),
)
.await?;

let mut pubkeys = Vec::new();
while let Some(row) = rows.next().await? {
let pubkey: String = row.get(0)?;
pubkeys.push(pubkey);
}

Ok(pubkeys)
}
}
15 changes: 15 additions & 0 deletions server/src/db/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ const MIGRATIONS: &[&str] = &[
UPDATE devices SET updated_at = CURRENT_TIMESTAMP WHERE pubkey = OLD.pubkey;
END;
"#,
r#"
CREATE TABLE heartbeat_notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pubkey TEXT NOT NULL,
notification_id TEXT NOT NULL UNIQUE,
sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
responded_at TIMESTAMP,
status TEXT NOT NULL DEFAULT 'pending',
FOREIGN KEY (pubkey) REFERENCES users(pubkey)
);

CREATE INDEX idx_heartbeat_notifications_pubkey ON heartbeat_notifications(pubkey);
CREATE INDEX idx_heartbeat_notifications_status ON heartbeat_notifications(status);
CREATE INDEX idx_heartbeat_notifications_sent_at ON heartbeat_notifications(sent_at);
"#,
];

/// Applies all pending migrations to the database.
Expand Down
Loading