Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion solana-programs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion solana-programs/programs/cron/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cron"
version = "0.2.4"
version = "0.2.5"
description = "Created with Anchor"
edition = "2021"

Expand Down
125 changes: 70 additions & 55 deletions solana-programs/programs/cron/src/instructions/queue_cron_tasks_v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,18 @@ use tuktuk_program::{
RunTaskReturnV0, TaskQueueV0, TaskReturnV0, TransactionSourceV0, TriggerV0,
};

use crate::{
error::ErrorCode,
state::{CronJobTransactionV0, CronJobV0},
};
use crate::state::{CronJobTransactionV0, CronJobV0};

pub const QUEUED_TASKS_PER_QUEUE: u8 = 3;
// Queue tasks 5 minutes before the cron job is scheduled to run
// This way we don't take up space in the task queue for tasks that are running
// A long time from now
pub const QUEUE_TASK_DELAY: i64 = 60 * 5;

#[derive(Accounts)]
pub struct QueueCronTasksV0<'info> {
#[account(
mut,
has_one = task_queue
)]
pub cron_job: Box<Account<'info, CronJobV0>>,
/// CHECK: Checked via require in handler
#[account(mut)]
pub cron_job: UncheckedAccount<'info>,
pub task_queue: Box<Account<'info, TaskQueueV0>>,
/// CHECK: Used to write return data
#[account(
Expand All @@ -48,58 +42,82 @@ pub struct QueueCronTasksV0<'info> {
pub system_program: Program<'info, System>,
}

#[macro_export]
macro_rules! try_from {
($ty: ty, $acc: expr) => {{
let account_info = $acc.as_ref();
<$ty>::try_from(unsafe {
core::mem::transmute::<
&anchor_lang::prelude::AccountInfo<'_>,
&anchor_lang::prelude::AccountInfo<'_>,
>(account_info)
})
}};
}

pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
let stale_task_age = ctx.accounts.task_queue.stale_task_age;
let now = Clock::get()?.unix_timestamp;
if ctx.accounts.cron_job.data_is_empty() {
msg!("Cron job was closed, completing task");
return Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: vec![],
});
}
let mut cron_job = try_from!(Account<CronJobV0>, &ctx.accounts.cron_job)?;
require_eq!(cron_job.task_queue, ctx.accounts.task_queue.key());

// Only proceed if we're within the queue window of the next execution
if (now + QUEUE_TASK_DELAY) < ctx.accounts.cron_job.current_exec_ts {
if (now + QUEUE_TASK_DELAY) < cron_job.current_exec_ts {
msg!("Too early to queue tasks, current time {} is not within {} seconds of next execution {}",
now, QUEUE_TASK_DELAY, ctx.accounts.cron_job.current_exec_ts);
return Err(error!(ErrorCode::TooEarly));
now, QUEUE_TASK_DELAY, cron_job.current_exec_ts);

// Return Ok so this task closes
return Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: vec![],
});
}

if now - ctx.accounts.cron_job.current_exec_ts > stale_task_age as i64 {
if now - cron_job.current_exec_ts > stale_task_age as i64 {
msg!("Cron job is stale, resetting");
ctx.accounts.cron_job.current_exec_ts = now;
ctx.accounts.cron_job.current_transaction_id = 0;
cron_job.current_exec_ts = now;
cron_job.current_transaction_id = 0;
}

let max_num_tasks_remaining =
ctx.accounts.cron_job.next_transaction_id - ctx.accounts.cron_job.current_transaction_id;
let max_num_tasks_remaining = cron_job.next_transaction_id - cron_job.current_transaction_id;
let num_tasks_to_queue =
(ctx.accounts.cron_job.num_tasks_per_queue_call as u32).min(max_num_tasks_remaining);
ctx.accounts.cron_job.current_transaction_id += num_tasks_to_queue;
(cron_job.num_tasks_per_queue_call as u32).min(max_num_tasks_remaining);
cron_job.current_transaction_id += num_tasks_to_queue;

let trigger = TriggerV0::Timestamp(ctx.accounts.cron_job.current_exec_ts);
let trigger = TriggerV0::Timestamp(cron_job.current_exec_ts);

// If we reached the end this time, reset to 0 and move the next execution time forward
if ctx.accounts.cron_job.current_transaction_id == ctx.accounts.cron_job.next_transaction_id {
ctx.accounts.cron_job.current_transaction_id = 0;
let schedule = Schedule::from_str(&ctx.accounts.cron_job.schedule).unwrap();
if cron_job.current_transaction_id == cron_job.next_transaction_id {
cron_job.current_transaction_id = 0;
let schedule = Schedule::from_str(&cron_job.schedule).unwrap();
// Find the next execution time after the last one
let ts = ctx.accounts.cron_job.current_exec_ts;
let ts = cron_job.current_exec_ts;
let date_time_ts = &DateTime::<Utc>::from_naive_utc_and_offset(
DateTime::from_timestamp(ts, 0).unwrap().naive_utc(),
Utc,
);
ctx.accounts.cron_job.current_exec_ts =
schedule.next_after(date_time_ts).unwrap().timestamp();
cron_job.current_exec_ts = schedule.next_after(date_time_ts).unwrap().timestamp();
msg!(
"Will have finished execution ts: {}, moving to {}",
ts,
ctx.accounts.cron_job.current_exec_ts
cron_job.current_exec_ts
);
}

let remaining_accounts = (ctx.accounts.cron_job.current_transaction_id
..ctx.accounts.cron_job.current_transaction_id
+ ctx.accounts.cron_job.num_tasks_per_queue_call as u32)
let remaining_accounts = (cron_job.current_transaction_id
..cron_job.current_transaction_id + cron_job.num_tasks_per_queue_call as u32)
.map(|i| {
Pubkey::find_program_address(
&[
b"cron_job_transaction",
ctx.accounts.cron_job.key().as_ref(),
cron_job.key().as_ref(),
&i.to_le_bytes(),
],
&crate::ID,
Expand All @@ -113,7 +131,7 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
program_id: crate::ID,
accounts: [
crate::__cpi_client_accounts_queue_cron_tasks_v0::QueueCronTasksV0 {
cron_job: ctx.accounts.cron_job.to_account_info(),
cron_job: cron_job.to_account_info(),
task_queue: ctx.accounts.task_queue.to_account_info(),
task_return_account_1: ctx.accounts.task_return_account_1.to_account_info(),
task_return_account_2: ctx.accounts.task_return_account_2.to_account_info(),
Expand All @@ -130,19 +148,13 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
}],
vec![],
)?;
let free_tasks_per_transaction = ctx.accounts.cron_job.free_tasks_per_transaction;
let trunc_name = ctx
.accounts
.cron_job
.name
.chars()
.take(32)
.collect::<String>();
let free_tasks_per_transaction = cron_job.free_tasks_per_transaction;
let trunc_name = cron_job.name.chars().take(32).collect::<String>();
let tasks = std::iter::once(TaskReturnV0 {
trigger: TriggerV0::Timestamp(ctx.accounts.cron_job.current_exec_ts - QUEUE_TASK_DELAY),
trigger: TriggerV0::Timestamp(cron_job.current_exec_ts - QUEUE_TASK_DELAY),
transaction: TransactionSourceV0::CompiledV0(queue_tx),
crank_reward: None,
free_tasks: ctx.accounts.cron_job.num_tasks_per_queue_call + 1,
free_tasks: cron_job.num_tasks_per_queue_call + 1,
description: format!("queue {}", trunc_name),
})
.chain((0..num_tasks_to_queue as usize).filter_map(|i| {
Expand All @@ -164,26 +176,26 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
}));

// Past all the CronJobTransaction are the free tasks
ctx.accounts.cron_job.next_schedule_task =
ctx.remaining_accounts[ctx.accounts.cron_job.num_tasks_per_queue_call as usize].key();
cron_job.next_schedule_task =
ctx.remaining_accounts[cron_job.num_tasks_per_queue_call as usize].key();

let res = write_return_tasks(WriteReturnTasksArgs {
program_id: crate::ID,
payer_info: PayerInfo::PdaPayer(ctx.accounts.cron_job.to_account_info()),
payer_info: PayerInfo::PdaPayer(cron_job.to_account_info()),
accounts: vec![
AccountWithSeeds {
account: ctx.accounts.task_return_account_1.to_account_info(),
seeds: vec![
b"task_return_account_1".to_vec(),
ctx.accounts.cron_job.key().as_ref().to_vec(),
cron_job.key().as_ref().to_vec(),
vec![ctx.bumps.task_return_account_1],
],
},
AccountWithSeeds {
account: ctx.accounts.task_return_account_2.to_account_info(),
seeds: vec![
b"task_return_account_2".to_vec(),
ctx.accounts.cron_job.key().as_ref().to_vec(),
cron_job.key().as_ref().to_vec(),
vec![ctx.bumps.task_return_account_2],
],
},
Expand All @@ -200,28 +212,30 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
msg!("Queued {} tasks", total_tasks);

// Transfer needed lamports from the cron job to the task queue
let cron_job_info = ctx.accounts.cron_job.to_account_info();
let cron_job_info = cron_job.to_account_info();
let cron_job_min_lamports = Rent::get()?.minimum_balance(cron_job_info.data_len());
let lamports = ctx.accounts.task_queue.min_crank_reward * total_tasks as u64;
if cron_job_info.lamports() < cron_job_min_lamports + lamports {
msg!(
"Not enough lamports to fund tasks. Please requeue cron job when you have enough lamports. {}",
cron_job_info.lamports()
);
ctx.accounts.cron_job.removed_from_queue = true;
ctx.accounts.cron_job.next_schedule_task = Pubkey::default();
cron_job.removed_from_queue = true;
cron_job.next_schedule_task = Pubkey::default();
cron_job.exit(&crate::ID)?;
Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: vec![],
})
} else {
ctx.accounts.cron_job.removed_from_queue = false;
cron_job.removed_from_queue = false;
cron_job_info.sub_lamports(lamports)?;
ctx.accounts
.task_queue
.to_account_info()
.add_lamports(lamports)?;

cron_job.exit(&crate::ID)?;
Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: used_accounts,
Expand All @@ -231,10 +245,11 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
Err(e) if e.to_string().contains("rent exempt") => {
msg!(
"Not enough lamports to fund tasks. Please requeue cron job when you have enough lamports. {}",
ctx.accounts.cron_job.to_account_info().lamports()
cron_job.to_account_info().lamports()
);
ctx.accounts.cron_job.removed_from_queue = true;
ctx.accounts.cron_job.next_schedule_task = Pubkey::default();
cron_job.removed_from_queue = true;
cron_job.next_schedule_task = Pubkey::default();
cron_job.exit(&crate::ID)?;
Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: vec![],
Expand Down
6 changes: 5 additions & 1 deletion tuktuk-cli/src/cmd/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<Si
}
}
Err(tuktuk_sdk::error::Error::AccountNotFound) => Ok(None),
Err(e) => Err(e.into()),
Err(e) => Ok(Some(SimulationResult {
error: Some(e.to_string()),
logs: None,
compute_units: None,
})),
}
}

Expand Down
4 changes: 3 additions & 1 deletion tuktuk-crank-turner/src/task_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ impl TimedTask {
..self.clone()
})
.await?;

TASKS_IN_PROGRESS
.with_label_values(&[self.task_queue_name.as_str()])
.dec();
return Ok(());
}
};
Expand Down
Loading