Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions client/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
DownloadUrlResponse,
GetDownloadUrlPayload,
GetUploadUrlPayload,
HeartbeatResponsePayload,
RegisterResponse,
RegisterOffboardingResponse,
RegisterPushToken,
Expand Down Expand Up @@ -160,6 +161,12 @@ export const submitInvoice = (payload: SubmitInvoicePayload & { k1?: string }) =
payload,
);

export const heartbeatResponse = (payload: HeartbeatResponsePayload & { k1?: string }) =>
post<HeartbeatResponsePayload & { k1?: string }, DefaultSuccessPayload>(
"/heartbeat_response",
payload,
);

export const getK1 = async (): Promise<Result<string, Error>> => {
const headers: Record<string, string> = {
"Content-Type": "application/json",
Expand Down
21 changes: 20 additions & 1 deletion client/src/lib/pushNotifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
submitInvoice,
triggerBackupTask,
} from "./tasks";
import { registerPushToken, reportJobStatus } from "~/lib/api";
import { registerPushToken, reportJobStatus, heartbeatResponse } from "~/lib/api";
import { err, ok, Result, ResultAsync } from "neverthrow";
import { NotificationsData, ReportType } from "~/types/serverTypes";

Expand Down Expand Up @@ -127,6 +127,25 @@ TaskManager.defineTask<Notifications.NotificationTaskPayload>(
}
const result = await offboardTask(notificationData.offboarding_request_id);
await handleTaskCompletion("offboarding", result, notificationData.k1);
} else if (notificationData.notification_type === "heartbeat") {
log.i("Received heartbeat notification", [notificationData]);
if (!notificationData.notification_id || !notificationData.k1) {
log.w("Invalid heartbeat notification - missing notification_id or k1", [
notificationData,
]);
return;
}

const heartbeatResult = await heartbeatResponse({
notification_id: notificationData.notification_id,
k1: notificationData.k1,
});

if (heartbeatResult.isErr()) {
log.w("Failed to respond to heartbeat", [heartbeatResult.error]);
} else {
log.d("Successfully responded to heartbeat", [notificationData.notification_id]);
}
}
})(),
(e) =>
Expand Down
6 changes: 4 additions & 2 deletions client/src/types/serverTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ export type GetDownloadUrlPayload = { backup_version: number | null, };

export type GetUploadUrlPayload = { backup_version: number, };

export type NotificationTypes = "background_sync" | "maintenance" | "lightning_invoice_request" | "backup_trigger" | "offboarding";
export type HeartbeatResponsePayload = { notification_id: string, };

export type NotificationsData = { notification_type: NotificationTypes, k1: string | null, transaction_id: string | null, amount: number | null, offboarding_request_id: string | null, };
export type NotificationTypes = "background_sync" | "maintenance" | "lightning_invoice_request" | "backup_trigger" | "offboarding" | "heartbeat";

export type NotificationsData = { notification_type: NotificationTypes, k1: string | null, transaction_id: string | null, amount: number | null, offboarding_request_id: string | null, notification_id: string | null, };

export type RegisterOffboardingResponse = { success: boolean, request_id: string, };

Expand Down
2 changes: 2 additions & 0 deletions server/src/ark_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub async fn maintenance(app_state: AppState) -> anyhow::Result<()> {
transaction_id: None,
amount: None,
offboarding_request_id: None,
notification_id: None,
};

if let Err(e) = send_push_notification_with_unique_k1(app_state, notification_data, None).await
Expand Down Expand Up @@ -174,6 +175,7 @@ pub async fn handle_offboarding_requests(app_state: AppState) -> anyhow::Result<
transaction_id: None,
amount: None,
offboarding_request_id: Some(request.request_id.clone()),
notification_id: None,
};

if let Err(e) = send_push_notification_with_unique_k1(
Expand Down
4 changes: 3 additions & 1 deletion server/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ pub const DEFAULT_PRIVATE_PORT: &str = "3099";
pub const DEFAULT_LNURL_DOMAIN: &str = "localhost";
pub const DEFAULT_SERVER_NETWORK: &str = "regtest";
pub const DEFAULT_BACKUP_CRON: &str = "every 2 hours";
pub const DEFAULT_BACKGROUND_SYNC_CRON: &str = "every 2 hours";
pub const DEFAULT_BACKGROUND_SYNC_CRON: &str = "every 48 hours";
pub const DEFAULT_HEARTBEAT_CRON: &str = "every 48 hours";
pub const DEFAULT_DEREGISTER_CRON: &str = "every 12 hours";
134 changes: 133 additions & 1 deletion server/src/cron.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{
AppState,
constants::{self, EnvVariables},
db::backup_repo::BackupRepository,
db::{
backup_repo::BackupRepository, heartbeat_repo::HeartbeatRepository,
offboarding_repo::OffboardingRepository, push_token_repo::PushTokenRepository,
},
push::{send_push_notification, send_push_notification_with_unique_k1},
types::{NotificationTypes, NotificationsData},
};
Expand All @@ -17,6 +20,7 @@ async fn background_sync(app_state: AppState) {
transaction_id: None,
amount: None,
offboarding_request_id: None,
notification_id: None,
})
.unwrap(),
priority: "high".to_string(),
Expand Down Expand Up @@ -44,6 +48,7 @@ pub async fn send_backup_notifications(app_state: AppState) -> anyhow::Result<()
transaction_id: None,
amount: None,
offboarding_request_id: None,
notification_id: None,
};
if let Err(e) = send_push_notification_with_unique_k1(
app_state.clone(),
Expand All @@ -59,6 +64,109 @@ pub async fn send_backup_notifications(app_state: AppState) -> anyhow::Result<()
Ok(())
}

pub async fn send_heartbeat_notifications(app_state: AppState) -> anyhow::Result<()> {
let conn = app_state.db.connect()?;
let heartbeat_repo = HeartbeatRepository::new(&conn);

let active_users = heartbeat_repo.get_active_users().await?;
tracing::info!(
"Sending heartbeat notifications to {} active users",
active_users.len()
);

for pubkey in active_users {
let notification_id = heartbeat_repo.create_notification(&pubkey).await?;

let notification_data = NotificationsData {
notification_type: NotificationTypes::Heartbeat,
k1: None,
transaction_id: None,
amount: None,
offboarding_request_id: None,
notification_id: Some(notification_id.clone()),
};

if let Err(e) = send_push_notification_with_unique_k1(
app_state.clone(),
notification_data,
Some(pubkey.clone()),
)
.await
{
tracing::error!("Failed to send heartbeat notification to {}: {}", pubkey, e);
// Rollback the created notification record
if let Err(delete_err) = heartbeat_repo.delete_notification(&notification_id).await {
tracing::error!(
"Failed to delete orphaned heartbeat notification {}: {}",
notification_id,
delete_err
);
}
}
}

// Cleanup old notifications
heartbeat_repo.cleanup_old_notifications().await?;

Ok(())
}

pub async fn check_and_deregister_inactive_users(app_state: AppState) -> anyhow::Result<()> {
let conn = app_state.db.connect()?;
let heartbeat_repo = HeartbeatRepository::new(&conn);

let users_to_deregister = heartbeat_repo.get_users_to_deregister().await?;

if users_to_deregister.is_empty() {
return Ok(());
}

tracing::info!("Deregistering {} inactive users", users_to_deregister.len());

for pubkey in users_to_deregister {
tracing::info!("Deregistering inactive user: {}", pubkey);

// Use a transaction to ensure all or nothing is deleted
let tx = conn.transaction().await?;

if let Err(e) = PushTokenRepository::delete_by_pubkey(&tx, &pubkey).await {
tracing::error!("Failed to delete push token for {}: {}", pubkey, e);
continue;
}

if let Err(e) = OffboardingRepository::delete_by_pubkey(&tx, &pubkey).await {
tracing::error!(
"Failed to delete offboarding requests for {}: {}",
pubkey,
e
);
continue;
}

let heartbeat_repo = HeartbeatRepository::new(&tx);
if let Err(e) = heartbeat_repo.delete_by_pubkey(&pubkey).await {
tracing::error!(
"Failed to delete heartbeat notifications for {}: {}",
pubkey,
e
);
continue;
}

if let Err(e) = tx.commit().await {
tracing::error!(
"Failed to commit deregistration transaction for {}: {}",
pubkey,
e
);
} else {
tracing::info!("Successfully deregistered inactive user: {}", pubkey);
}
}

Ok(())
}

pub async fn cron_scheduler(app_state: AppState) -> anyhow::Result<JobScheduler> {
let sched = JobScheduler::new().await?;

Expand Down Expand Up @@ -86,5 +194,29 @@ pub async fn cron_scheduler(app_state: AppState) -> anyhow::Result<JobScheduler>
})?;
sched.add(backup_job).await?;

// Heartbeat notifications - every 48 hours
let heartbeat_app_state = app_state.clone();
let heartbeat_job = Job::new_async(constants::DEFAULT_HEARTBEAT_CRON, move |_, _| {
let app_state = heartbeat_app_state.clone();
Box::pin(async move {
if let Err(e) = send_heartbeat_notifications(app_state).await {
tracing::error!("Failed to send heartbeat notifications: {}", e);
}
})
})?;
sched.add(heartbeat_job).await?;

// Check for inactive users - every 12 hours
let inactive_check_app_state = app_state.clone();
let inactive_check_job = Job::new_async(constants::DEFAULT_DEREGISTER_CRON, move |_, _| {
let app_state = inactive_check_app_state.clone();
Box::pin(async move {
if let Err(e) = check_and_deregister_inactive_users(app_state).await {
tracing::error!("Failed to check and deregister inactive users: {}", e);
}
})
})?;
sched.add(inactive_check_job).await?;

Ok(sched)
}
Loading