Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion solana-programs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion solana-programs/programs/cron/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cron"
version = "0.2.4"
version = "0.2.5"
description = "Created with Anchor"
edition = "2021"

Expand Down
125 changes: 70 additions & 55 deletions solana-programs/programs/cron/src/instructions/queue_cron_tasks_v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,18 @@ use tuktuk_program::{
RunTaskReturnV0, TaskQueueV0, TaskReturnV0, TransactionSourceV0, TriggerV0,
};

use crate::{
error::ErrorCode,
state::{CronJobTransactionV0, CronJobV0},
};
use crate::state::{CronJobTransactionV0, CronJobV0};

pub const QUEUED_TASKS_PER_QUEUE: u8 = 3;
// Queue tasks 5 minutes before the cron job is scheduled to run
// This way we don't take up space in the task queue for tasks that are running
// A long time from now
pub const QUEUE_TASK_DELAY: i64 = 60 * 5;

#[derive(Accounts)]
pub struct QueueCronTasksV0<'info> {
#[account(
mut,
has_one = task_queue
)]
pub cron_job: Box<Account<'info, CronJobV0>>,
/// CHECK: Checked via require in handler
#[account(mut)]
pub cron_job: UncheckedAccount<'info>,
pub task_queue: Box<Account<'info, TaskQueueV0>>,
/// CHECK: Used to write return data
#[account(
Expand All @@ -48,58 +42,82 @@ pub struct QueueCronTasksV0<'info> {
pub system_program: Program<'info, System>,
}

#[macro_export]
macro_rules! try_from {
($ty: ty, $acc: expr) => {{
let account_info = $acc.as_ref();
<$ty>::try_from(unsafe {
core::mem::transmute::<
&anchor_lang::prelude::AccountInfo<'_>,
&anchor_lang::prelude::AccountInfo<'_>,
>(account_info)
})
}};
}

pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
let stale_task_age = ctx.accounts.task_queue.stale_task_age;
let now = Clock::get()?.unix_timestamp;
if ctx.accounts.cron_job.data_is_empty() {
msg!("Cron job was closed, completing task");
return Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: vec![],
});
}
let mut cron_job = try_from!(Account<CronJobV0>, &ctx.accounts.cron_job)?;
require_eq!(cron_job.task_queue, ctx.accounts.task_queue.key());

// Only proceed if we're within the queue window of the next execution
if (now + QUEUE_TASK_DELAY) < ctx.accounts.cron_job.current_exec_ts {
if (now + QUEUE_TASK_DELAY) < cron_job.current_exec_ts {
msg!("Too early to queue tasks, current time {} is not within {} seconds of next execution {}",
now, QUEUE_TASK_DELAY, ctx.accounts.cron_job.current_exec_ts);
return Err(error!(ErrorCode::TooEarly));
now, QUEUE_TASK_DELAY, cron_job.current_exec_ts);

// Return Ok so this task closes
return Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: vec![],
});
}

if now - ctx.accounts.cron_job.current_exec_ts > stale_task_age as i64 {
if now - cron_job.current_exec_ts > stale_task_age as i64 {
msg!("Cron job is stale, resetting");
ctx.accounts.cron_job.current_exec_ts = now;
ctx.accounts.cron_job.current_transaction_id = 0;
cron_job.current_exec_ts = now;
cron_job.current_transaction_id = 0;
}

let max_num_tasks_remaining =
ctx.accounts.cron_job.next_transaction_id - ctx.accounts.cron_job.current_transaction_id;
let max_num_tasks_remaining = cron_job.next_transaction_id - cron_job.current_transaction_id;
let num_tasks_to_queue =
(ctx.accounts.cron_job.num_tasks_per_queue_call as u32).min(max_num_tasks_remaining);
ctx.accounts.cron_job.current_transaction_id += num_tasks_to_queue;
(cron_job.num_tasks_per_queue_call as u32).min(max_num_tasks_remaining);
cron_job.current_transaction_id += num_tasks_to_queue;

let trigger = TriggerV0::Timestamp(ctx.accounts.cron_job.current_exec_ts);
let trigger = TriggerV0::Timestamp(cron_job.current_exec_ts);

// If we reached the end this time, reset to 0 and move the next execution time forward
if ctx.accounts.cron_job.current_transaction_id == ctx.accounts.cron_job.next_transaction_id {
ctx.accounts.cron_job.current_transaction_id = 0;
let schedule = Schedule::from_str(&ctx.accounts.cron_job.schedule).unwrap();
if cron_job.current_transaction_id == cron_job.next_transaction_id {
cron_job.current_transaction_id = 0;
let schedule = Schedule::from_str(&cron_job.schedule).unwrap();
// Find the next execution time after the last one
let ts = ctx.accounts.cron_job.current_exec_ts;
let ts = cron_job.current_exec_ts;
let date_time_ts = &DateTime::<Utc>::from_naive_utc_and_offset(
DateTime::from_timestamp(ts, 0).unwrap().naive_utc(),
Utc,
);
ctx.accounts.cron_job.current_exec_ts =
schedule.next_after(date_time_ts).unwrap().timestamp();
cron_job.current_exec_ts = schedule.next_after(date_time_ts).unwrap().timestamp();
msg!(
"Will have finished execution ts: {}, moving to {}",
ts,
ctx.accounts.cron_job.current_exec_ts
cron_job.current_exec_ts
);
}

let remaining_accounts = (ctx.accounts.cron_job.current_transaction_id
..ctx.accounts.cron_job.current_transaction_id
+ ctx.accounts.cron_job.num_tasks_per_queue_call as u32)
let remaining_accounts = (cron_job.current_transaction_id
..cron_job.current_transaction_id + cron_job.num_tasks_per_queue_call as u32)
.map(|i| {
Pubkey::find_program_address(
&[
b"cron_job_transaction",
ctx.accounts.cron_job.key().as_ref(),
cron_job.key().as_ref(),
&i.to_le_bytes(),
],
&crate::ID,
Expand All @@ -113,7 +131,7 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
program_id: crate::ID,
accounts: [
crate::__cpi_client_accounts_queue_cron_tasks_v0::QueueCronTasksV0 {
cron_job: ctx.accounts.cron_job.to_account_info(),
cron_job: cron_job.to_account_info(),
task_queue: ctx.accounts.task_queue.to_account_info(),
task_return_account_1: ctx.accounts.task_return_account_1.to_account_info(),
task_return_account_2: ctx.accounts.task_return_account_2.to_account_info(),
Expand All @@ -130,19 +148,13 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
}],
vec![],
)?;
let free_tasks_per_transaction = ctx.accounts.cron_job.free_tasks_per_transaction;
let trunc_name = ctx
.accounts
.cron_job
.name
.chars()
.take(32)
.collect::<String>();
let free_tasks_per_transaction = cron_job.free_tasks_per_transaction;
let trunc_name = cron_job.name.chars().take(32).collect::<String>();
let tasks = std::iter::once(TaskReturnV0 {
trigger: TriggerV0::Timestamp(ctx.accounts.cron_job.current_exec_ts - QUEUE_TASK_DELAY),
trigger: TriggerV0::Timestamp(cron_job.current_exec_ts - QUEUE_TASK_DELAY),
transaction: TransactionSourceV0::CompiledV0(queue_tx),
crank_reward: None,
free_tasks: ctx.accounts.cron_job.num_tasks_per_queue_call + 1,
free_tasks: cron_job.num_tasks_per_queue_call + 1,
description: format!("queue {}", trunc_name),
})
.chain((0..num_tasks_to_queue as usize).filter_map(|i| {
Expand All @@ -164,26 +176,26 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
}));

// Past all the CronJobTransaction are the free tasks
ctx.accounts.cron_job.next_schedule_task =
ctx.remaining_accounts[ctx.accounts.cron_job.num_tasks_per_queue_call as usize].key();
cron_job.next_schedule_task =
ctx.remaining_accounts[cron_job.num_tasks_per_queue_call as usize].key();

let res = write_return_tasks(WriteReturnTasksArgs {
program_id: crate::ID,
payer_info: PayerInfo::PdaPayer(ctx.accounts.cron_job.to_account_info()),
payer_info: PayerInfo::PdaPayer(cron_job.to_account_info()),
accounts: vec![
AccountWithSeeds {
account: ctx.accounts.task_return_account_1.to_account_info(),
seeds: vec![
b"task_return_account_1".to_vec(),
ctx.accounts.cron_job.key().as_ref().to_vec(),
cron_job.key().as_ref().to_vec(),
vec![ctx.bumps.task_return_account_1],
],
},
AccountWithSeeds {
account: ctx.accounts.task_return_account_2.to_account_info(),
seeds: vec![
b"task_return_account_2".to_vec(),
ctx.accounts.cron_job.key().as_ref().to_vec(),
cron_job.key().as_ref().to_vec(),
vec![ctx.bumps.task_return_account_2],
],
},
Expand All @@ -200,28 +212,30 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
msg!("Queued {} tasks", total_tasks);

// Transfer needed lamports from the cron job to the task queue
let cron_job_info = ctx.accounts.cron_job.to_account_info();
let cron_job_info = cron_job.to_account_info();
let cron_job_min_lamports = Rent::get()?.minimum_balance(cron_job_info.data_len());
let lamports = ctx.accounts.task_queue.min_crank_reward * total_tasks as u64;
if cron_job_info.lamports() < cron_job_min_lamports + lamports {
msg!(
"Not enough lamports to fund tasks. Please requeue cron job when you have enough lamports. {}",
cron_job_info.lamports()
);
ctx.accounts.cron_job.removed_from_queue = true;
ctx.accounts.cron_job.next_schedule_task = Pubkey::default();
cron_job.removed_from_queue = true;
cron_job.next_schedule_task = Pubkey::default();
cron_job.exit(&crate::ID)?;
Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: vec![],
})
} else {
ctx.accounts.cron_job.removed_from_queue = false;
cron_job.removed_from_queue = false;
cron_job_info.sub_lamports(lamports)?;
ctx.accounts
.task_queue
.to_account_info()
.add_lamports(lamports)?;

cron_job.exit(&crate::ID)?;
Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: used_accounts,
Expand All @@ -231,10 +245,11 @@ pub fn handler(ctx: Context<QueueCronTasksV0>) -> Result<RunTaskReturnV0> {
Err(e) if e.to_string().contains("rent exempt") => {
msg!(
"Not enough lamports to fund tasks. Please requeue cron job when you have enough lamports. {}",
ctx.accounts.cron_job.to_account_info().lamports()
cron_job.to_account_info().lamports()
);
ctx.accounts.cron_job.removed_from_queue = true;
ctx.accounts.cron_job.next_schedule_task = Pubkey::default();
cron_job.removed_from_queue = true;
cron_job.next_schedule_task = Pubkey::default();
cron_job.exit(&crate::ID)?;
Ok(RunTaskReturnV0 {
tasks: vec![],
accounts: vec![],
Expand Down
2 changes: 1 addition & 1 deletion tuktuk-cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tuktuk-cli"
version = "0.2.10"
version = "0.2.11"
description = "A cli for tuktuk"
homepage.workspace = true
repository.workspace = true
Expand Down
6 changes: 5 additions & 1 deletion tuktuk-cli/src/cmd/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<Si
}
}
Err(tuktuk_sdk::error::Error::AccountNotFound) => Ok(None),
Err(e) => Err(e.into()),
Err(e) => Ok(Some(SimulationResult {
error: Some(e.to_string()),
logs: None,
compute_units: None,
})),
}
}

Expand Down
2 changes: 1 addition & 1 deletion tuktuk-crank-turner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tuktuk-crank-turner"
version = "0.2.21"
version = "0.2.22"
authors.workspace = true
edition.workspace = true
license.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion tuktuk-crank-turner/src/task_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ impl TimedTask {
..self.clone()
})
.await?;

TASKS_IN_PROGRESS
.with_label_values(&[self.task_queue_name.as_str()])
.dec();
return Ok(());
}
};
Expand Down
Loading