Skip to content

Commit 8b2921a

Browse files
committed
feat(processing_engine): Store triggers by DbId and TriggerId instead of name
1 parent f456fd4 commit 8b2921a

File tree

10 files changed

+133
-153
lines changed

10 files changed

+133
-153
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

influxdb3/tests/cli/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2516,7 +2516,7 @@ def process_scheduled_call(influxdb3_local, schedule_time, args=None):
25162516
.unwrap();
25172517

25182518
// Wait for trigger to run several times
2519-
tokio::time::sleep(std::time::Duration::from_millis(3100)).await;
2519+
tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
25202520

25212521
// Query to see what values were written before disabling
25222522
let first_query_result = server
@@ -2535,7 +2535,7 @@ def process_scheduled_call(influxdb3_local, schedule_time, args=None):
25352535
server.enable_trigger(db_name, trigger_name).run().unwrap();
25362536

25372537
// Wait for trigger to run again
2538-
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
2538+
tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
25392539

25402540
// Query results after re-enabling
25412541
let second_query_result = server

influxdb3_catalog/src/catalog.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -300,9 +300,8 @@ impl Catalog {
300300
self.inner.read().db_exists(db_id)
301301
}
302302

303-
/// Get active triggers by database and trigger name
304-
// NOTE: this could be id-based in future
305-
pub fn active_triggers(&self) -> Vec<(Arc<str>, Arc<str>)> {
303+
/// Get active triggers by database ID and trigger ID
304+
pub fn active_triggers(&self) -> Vec<(DbId, TriggerId)> {
306305
let inner = self.inner.read();
307306
let result = inner
308307
.databases
@@ -314,7 +313,7 @@ impl Catalog {
314313
if trigger.disabled {
315314
None
316315
} else {
317-
Some((Arc::clone(&db.name), Arc::clone(&trigger.trigger_name)))
316+
Some((db.id, trigger.trigger_id))
318317
}
319318
})
320319
})

influxdb3_catalog/src/catalog/update.rs

+1
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ impl Catalog {
539539
db.id,
540540
db.name(),
541541
vec![DatabaseCatalogOp::CreateTrigger(TriggerDefinition {
542+
database_id: db.id,
542543
trigger_id,
543544
trigger_name: trigger_name.into(),
544545
plugin_filename: plugin_filename.to_string(),

influxdb3_catalog/src/log.rs

+1
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ pub struct TriggerDefinition {
642642
pub trigger_id: TriggerId,
643643
pub trigger_name: Arc<str>,
644644
pub plugin_filename: String,
645+
pub database_id: DbId,
645646
pub database_name: Arc<str>,
646647
pub node_id: Arc<str>,
647648
pub trigger: TriggerSpecificationDefinition,

influxdb3_catalog/src/snapshot.rs

+3
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ impl Snapshot for TableDefinition {
211211

212212
#[derive(Debug, Serialize, Deserialize)]
213213
pub(crate) struct ProcessingEngineTriggerSnapshot {
214+
pub database_id: DbId,
214215
pub trigger_id: TriggerId,
215216
pub trigger_name: Arc<str>,
216217
pub node_id: Arc<str>,
@@ -227,6 +228,7 @@ impl Snapshot for TriggerDefinition {
227228

228229
fn snapshot(&self) -> Self::Serialized {
229230
Self::Serialized {
231+
database_id: self.database_id,
230232
trigger_id: self.trigger_id,
231233
trigger_name: Arc::clone(&self.trigger_name),
232234
node_id: Arc::clone(&self.node_id),
@@ -241,6 +243,7 @@ impl Snapshot for TriggerDefinition {
241243

242244
fn from_snapshot(snap: Self::Serialized) -> Self {
243245
Self {
246+
database_id: snap.database_id,
244247
trigger_id: snap.trigger_id,
245248
trigger_name: snap.trigger_name,
246249
node_id: snap.node_id,

influxdb3_processing_engine/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ hyper.workspace = true
1919
iox_time.workspace = true
2020
influxdb3_catalog = { path = "../influxdb3_catalog" }
2121
influxdb3_client = { path = "../influxdb3_client" }
22+
influxdb3_id = { path = "../influxdb3_id" }
2223
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
2324
influxdb3_py_api = { path = "../influxdb3_py_api" }
2425
influxdb3_types = { path = "../influxdb3_types" }

0 commit comments

Comments
 (0)