-
-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy paththinking_clock.rs
More file actions
355 lines (303 loc) · 10.7 KB
/
thinking_clock.rs
File metadata and controls
355 lines (303 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
//! Thinking Clock — periodic reflection with multi-model orchestration.
//!
//! The Thinking Clock provides ambient awareness by periodically running a
//! cheap/local model to assess whether the agent should proactively take
//! action. This mirrors OpenClaw's "Thinking Clock" feature:
//!
//! - A **ticker** fires at a configurable interval (e.g., every 5 minutes).
//! - A **cheap model** (e.g., local Ollama, economy tier) evaluates the
//! current context and decides if any action is needed.
//! - If the cheap model detects something worth acting on, it **escalates**
//! to the primary (more capable) model for actual execution.
//!
//! Use cases:
//! - Monitor cron job results and alert on failures.
//! - Check for pending messages that need follow-up.
//! - Periodic status summaries in messenger channels.
//! - Background awareness of system health.
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
/// Configuration for the Thinking Clock.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThinkingClockConfig {
/// Whether the thinking clock is enabled.
#[serde(default)]
pub enabled: bool,
/// Interval in seconds between ticks (default: 300 = 5 minutes).
#[serde(default = "default_interval")]
pub interval_secs: u64,
/// Model ID for the cheap/ambient check (e.g., "ollama/llama3.2:3b").
/// If not set, uses the cheapest available model from the registry.
#[serde(default)]
pub ambient_model: Option<String>,
/// Model ID for escalation (primary model).
/// If not set, uses the active model.
#[serde(default)]
pub escalation_model: Option<String>,
/// System prompt for the ambient check.
#[serde(default = "default_ambient_prompt")]
pub ambient_prompt: String,
/// Maximum tokens for the ambient check (keep low to stay cheap).
#[serde(default = "default_ambient_max_tokens")]
pub ambient_max_tokens: u32,
/// Keywords/phrases that trigger escalation from the ambient model's
/// response (e.g., ["ESCALATE", "ACTION_NEEDED"]).
#[serde(default = "default_escalation_triggers")]
pub escalation_triggers: Vec<String>,
/// Whether to log ambient check results (even when no action taken).
#[serde(default)]
pub verbose_logging: bool,
}
fn default_interval() -> u64 {
300
}
fn default_ambient_prompt() -> String {
"You are an ambient awareness monitor. Review the current context and \
determine if any action is needed. If action is needed, respond with \
'ESCALATE: <reason>'. If no action is needed, respond with 'OK'."
.to_string()
}
fn default_ambient_max_tokens() -> u32 {
256
}
fn default_escalation_triggers() -> Vec<String> {
vec![
"ESCALATE".to_string(),
"ACTION_NEEDED".to_string(),
"ALERT".to_string(),
]
}
impl Default for ThinkingClockConfig {
fn default() -> Self {
Self {
enabled: false,
interval_secs: default_interval(),
ambient_model: None,
escalation_model: None,
ambient_prompt: default_ambient_prompt(),
ambient_max_tokens: default_ambient_max_tokens(),
escalation_triggers: default_escalation_triggers(),
verbose_logging: false,
}
}
}
/// Result of an ambient check.
#[derive(Debug, Clone, Serialize)]
pub struct AmbientCheckResult {
/// The ambient model's response.
pub response: String,
/// Whether escalation was triggered.
pub escalated: bool,
/// The reason for escalation (if any).
pub escalation_reason: Option<String>,
/// Duration of the check.
pub duration_ms: u64,
}
/// Check if an ambient response should trigger escalation.
pub fn should_escalate(response: &str, triggers: &[String]) -> Option<String> {
// Use case-insensitive search directly on the original string to avoid
// byte-position misalignment between `to_uppercase()` and the original
// (multi-byte characters like ß→SS can change byte lengths).
let response_lower = response.to_lowercase();
for trigger in triggers {
let trigger_lower = trigger.to_lowercase();
if let Some(pos) = response_lower.find(&trigger_lower) {
// `pos` is a byte offset into `response_lower` which has the
// same byte length as `response` (lowercasing ASCII-range
// characters preserves byte length for the trigger keywords
// we care about: ESCALATE, ACTION_NEEDED, ALERT).
let after = &response[pos + trigger.len()..];
let reason = after
.trim_start_matches(':')
.trim_start_matches(' ')
.trim();
let reason = if reason.is_empty() {
response.to_string()
} else {
reason.to_string()
};
debug!(trigger = %trigger, reason = %reason, "Escalation triggered");
return Some(reason);
}
}
None
}
/// State for the thinking clock tick loop.
pub struct ThinkingClock {
config: ThinkingClockConfig,
tick_count: u64,
}
impl ThinkingClock {
/// Create a new thinking clock.
pub fn new(config: ThinkingClockConfig) -> Self {
Self {
config,
tick_count: 0,
}
}
/// Get the tick interval.
pub fn interval(&self) -> Duration {
Duration::from_secs(self.config.interval_secs)
}
/// Get the ambient model ID.
pub fn ambient_model(&self) -> Option<&str> {
self.config.ambient_model.as_deref()
}
/// Get the escalation model ID.
pub fn escalation_model(&self) -> Option<&str> {
self.config.escalation_model.as_deref()
}
/// Get the ambient prompt.
pub fn ambient_prompt(&self) -> &str {
&self.config.ambient_prompt
}
/// Get max tokens for ambient check.
pub fn ambient_max_tokens(&self) -> u32 {
self.config.ambient_max_tokens
}
/// Record a tick and return the tick count.
pub fn tick(&mut self) -> u64 {
self.tick_count += 1;
if self.config.verbose_logging {
debug!(tick = self.tick_count, "Thinking clock tick");
}
self.tick_count
}
/// Process an ambient model response.
pub fn process_response(&self, response: &str, duration_ms: u64) -> AmbientCheckResult {
let escalation = should_escalate(response, &self.config.escalation_triggers);
let escalated = escalation.is_some();
if escalated {
info!(
reason = ?escalation,
"Thinking clock: escalation triggered"
);
} else if self.config.verbose_logging {
debug!(response = %response, "Thinking clock: no action needed");
}
AmbientCheckResult {
response: response.to_string(),
escalated,
escalation_reason: escalation,
duration_ms,
}
}
/// Check if the thinking clock is enabled.
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
/// Get the current tick count.
pub fn tick_count(&self) -> u64 {
self.tick_count
}
}
/// Run the thinking clock loop.
///
/// This is a skeleton that the gateway integrates with its model dispatch.
/// The actual model calls are performed by the gateway using the ambient
/// and escalation model IDs from the config.
pub async fn run_thinking_clock_loop(
config: ThinkingClockConfig,
cancel: CancellationToken,
// The gateway provides a callback for each tick
on_tick: impl Fn(u64) + Send + 'static,
) {
if !config.enabled {
debug!("Thinking clock disabled");
return;
}
let mut clock = ThinkingClock::new(config);
let interval = clock.interval();
info!(
interval_secs = interval.as_secs(),
"Thinking clock started"
);
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!("Thinking clock stopped");
break;
}
_ = tokio::time::sleep(interval) => {
let tick = clock.tick();
on_tick(tick);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_defaults() {
let config = ThinkingClockConfig::default();
assert!(!config.enabled);
assert_eq!(config.interval_secs, 300);
assert_eq!(config.ambient_max_tokens, 256);
assert!(!config.escalation_triggers.is_empty());
}
#[test]
fn test_should_escalate_yes() {
let triggers = vec!["ESCALATE".to_string(), "ALERT".to_string()];
let result = should_escalate("ESCALATE: server is down", &triggers);
assert!(result.is_some());
assert_eq!(result.unwrap(), "server is down");
}
#[test]
fn test_should_escalate_no() {
let triggers = vec!["ESCALATE".to_string()];
let result = should_escalate("OK - all systems normal", &triggers);
assert!(result.is_none());
}
#[test]
fn test_should_escalate_case_insensitive() {
let triggers = vec!["ESCALATE".to_string()];
let result = should_escalate("escalate: need attention", &triggers);
assert!(result.is_some());
}
#[test]
fn test_thinking_clock_tick() {
let config = ThinkingClockConfig {
enabled: true,
..Default::default()
};
let mut clock = ThinkingClock::new(config);
assert_eq!(clock.tick_count(), 0);
assert_eq!(clock.tick(), 1);
assert_eq!(clock.tick(), 2);
assert_eq!(clock.tick_count(), 2);
}
#[test]
fn test_process_response_no_escalation() {
let config = ThinkingClockConfig::default();
let clock = ThinkingClock::new(config);
let result = clock.process_response("OK - everything is fine", 50);
assert!(!result.escalated);
assert!(result.escalation_reason.is_none());
}
#[test]
fn test_process_response_with_escalation() {
let config = ThinkingClockConfig::default();
let clock = ThinkingClock::new(config);
let result = clock.process_response("ESCALATE: cron job failed", 100);
assert!(result.escalated);
assert_eq!(result.escalation_reason.unwrap(), "cron job failed");
}
#[test]
fn test_thinking_clock_disabled() {
let config = ThinkingClockConfig::default(); // disabled by default
let clock = ThinkingClock::new(config);
assert!(!clock.is_enabled());
}
#[test]
fn test_interval() {
let config = ThinkingClockConfig {
interval_secs: 60,
..Default::default()
};
let clock = ThinkingClock::new(config);
assert_eq!(clock.interval(), Duration::from_secs(60));
}
}