Skip to content

Commit 934a80f

Browse files
authored
fix(interchain-indexer): Fix zero-count expansion for filtered message paths (#1663)
* Tune message-paths logic * Fix codereview issues
1 parent d544f76 commit 934a80f

3 files changed

Lines changed: 171 additions & 44 deletions

File tree

interchain-indexer/.memory-bank/research/stats-subsystem.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,10 @@ Zero-chain visibility is service-wide and configurable:
413413
- message-path endpoints drive the query from `chains` (excluding the
414414
selected chain) and left-join `stats_messages` / aggregated
415415
`stats_messages_days`
416-
- explicit `counterparty_chain_ids` still suppresses synthesized zero rows
416+
- with explicit `counterparty_chain_ids`, message-path endpoints still drive
417+
from `chains`, restrict rows to the requested counterparties, exclude the
418+
selected chain itself, and return zero-valued rows for requested
419+
counterparties that exist in `chains` but have no aggregate row
417420
- when `false`: both families return only rows with positive aggregated stats
418421
- `/stats/chains` filters
419422
`COALESCE(sc.unique_transfer_users_count, 0) > 0 OR COALESCE(sc.unique_message_users_count, 0) > 0)`

interchain-indexer/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ Defines which bridges (cross-chain mechanisms) to index. Each entry is one bridg
104104
| `INTERCHAIN_INDEXER__BUFFER_SETTINGS__MAINTENANCE_INTERVAL` | | | `500` |
105105
| `INTERCHAIN_INDEXER__STATS__BACKFILL_ON_START` | | Recalculate the statistics tables for messages and transfers (`stats_messages`, `stats_asset*`) on service startup. This is needed only after the first application of the `m20260312_175120_add_stats_tables` migration, and only if there are existing DB records before it. This option should normally be disabled after the migration to reduce service startup time. | `false` |
106106
| `INTERCHAIN_INDEXER__STATS__CHAINS_RECALCULATION_PERIOD_SECS` | | Interval in seconds between full recomputations of per-chain distinct user counters in `stats_chains` (from `crosschain_messages` / `crosschain_transfers`, any status). Only chains with at least one counted user address keep a row; stale rows are deleted. Set to `0` to disable the background task. | `3600` |
107-
| `INTERCHAIN_INDEXER__STATS__INCLUDE_ZERO_CHAINS` | | When `true`, stats endpoints (`/api/v1/stats/chains` and `/api/v1/stats/chain/{chain_id}/messages-paths/*`) include known chains from `chains` even when the aggregated stats row is missing or has a zero value. Disable it to return only chains with positive aggregated stats. | `true` |
107+
| `INTERCHAIN_INDEXER__STATS__INCLUDE_ZERO_CHAINS` | | When `true`, stats endpoints (`/api/v1/stats/chains` and `/api/v1/stats/chain/{chain_id}/messages-paths/*`) include known chains from `chains` even when the aggregated stats row is missing or has a zero value. For message paths with `counterparty_chain_ids`, zero rows are still returned for the explicitly requested counterparties that exist in `chains`, and no other counterparties are added. Disable it to return only chains with positive aggregated stats. | `true` |
108108

109109
[anchor]: <> (anchors.envs.end.service)
110110

interchain-indexer/interchain-indexer-logic/src/database.rs

Lines changed: 166 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -173,25 +173,29 @@ enum MessagePathDirection {
173173
Incoming,
174174
}
175175

176-
fn should_expand_message_paths(
177-
include_zero_chains: bool,
178-
counterparty_chain_ids: Option<&[i64]>,
179-
) -> bool {
180-
include_zero_chains
181-
&& counterparty_chain_ids
182-
.map(|chain_ids| chain_ids.is_empty())
183-
.unwrap_or(true)
184-
}
185-
186176
fn build_all_time_message_paths_query(
187177
chain_id: i64,
188178
direction: MessagePathDirection,
189179
counterparty_chain_ids: Option<&[i64]>,
190180
include_zero_chains: bool,
191181
) -> (String, Vec<Value>) {
192-
if should_expand_message_paths(include_zero_chains, counterparty_chain_ids) {
182+
if include_zero_chains {
183+
let mut values = vec![Value::BigInt(Some(chain_id))];
184+
let mut where_parts = vec![
185+
"c.id <> $1".to_string(),
186+
"EXISTS (SELECT 1 FROM chains WHERE id = $1)".to_string(),
187+
];
188+
189+
if let Some(ids) = counterparty_chain_ids.filter(|chain_ids| !chain_ids.is_empty()) {
190+
let placeholders: Vec<String> = (0..ids.len()).map(|i| format!("${}", i + 2)).collect();
191+
where_parts.push(format!("c.id IN ({})", placeholders.join(", ")));
192+
for &id in ids {
193+
values.push(Value::BigInt(Some(id)));
194+
}
195+
}
196+
193197
let sql = match direction {
194-
MessagePathDirection::Outgoing => {
198+
MessagePathDirection::Outgoing => format!(
195199
r#"
196200
SELECT $1::bigint AS src_chain_id,
197201
c.id AS dst_chain_id,
@@ -200,12 +204,12 @@ FROM chains c
200204
LEFT JOIN stats_messages sm
201205
ON sm.src_chain_id = $1
202206
AND sm.dst_chain_id = c.id
203-
WHERE c.id <> $1
204-
AND EXISTS (SELECT 1 FROM chains WHERE id = $1)
207+
WHERE {}
205208
ORDER BY messages_count DESC, src_chain_id ASC, dst_chain_id ASC
206-
"#
207-
}
208-
MessagePathDirection::Incoming => {
209+
"#,
210+
where_parts.join("\n AND ")
211+
),
212+
MessagePathDirection::Incoming => format!(
209213
r#"
210214
SELECT c.id AS src_chain_id,
211215
$1::bigint AS dst_chain_id,
@@ -214,14 +218,14 @@ FROM chains c
214218
LEFT JOIN stats_messages sm
215219
ON sm.src_chain_id = c.id
216220
AND sm.dst_chain_id = $1
217-
WHERE c.id <> $1
218-
AND EXISTS (SELECT 1 FROM chains WHERE id = $1)
221+
WHERE {}
219222
ORDER BY messages_count DESC, src_chain_id ASC, dst_chain_id ASC
220-
"#
221-
}
223+
"#,
224+
where_parts.join("\n AND ")
225+
),
222226
};
223227

224-
return (sql.to_string(), vec![Value::BigInt(Some(chain_id))]);
228+
return (sql, values);
225229
}
226230

227231
let filter_column = match direction {
@@ -265,7 +269,7 @@ fn build_bounded_message_paths_query(
265269
counterparty_chain_ids: Option<&[i64]>,
266270
include_zero_chains: bool,
267271
) -> (String, Vec<Value>) {
268-
if should_expand_message_paths(include_zero_chains, counterparty_chain_ids) {
272+
if include_zero_chains {
269273
let mut aggregate_where_parts = vec![match direction {
270274
MessagePathDirection::Outgoing => "src_chain_id = $1".to_string(),
271275
MessagePathDirection::Incoming => "dst_chain_id = $1".to_string(),
@@ -282,6 +286,28 @@ fn build_bounded_message_paths_query(
282286
if let Some(to_date) = to_date {
283287
aggregate_where_parts.push(format!("date < ${placeholder}"));
284288
values.push(Value::ChronoDate(Some(Box::new(to_date))));
289+
placeholder += 1;
290+
}
291+
292+
let mut where_parts = vec![
293+
"c.id <> $1".to_string(),
294+
"EXISTS (SELECT 1 FROM chains WHERE id = $1)".to_string(),
295+
];
296+
297+
if let Some(ids) = counterparty_chain_ids.filter(|chain_ids| !chain_ids.is_empty()) {
298+
let placeholders: Vec<String> = (0..ids.len())
299+
.map(|i| format!("${}", placeholder + i))
300+
.collect();
301+
let placeholders = placeholders.join(", ");
302+
let counterparty_column = match direction {
303+
MessagePathDirection::Outgoing => "dst_chain_id",
304+
MessagePathDirection::Incoming => "src_chain_id",
305+
};
306+
aggregate_where_parts.push(format!("{counterparty_column} IN ({placeholders})"));
307+
where_parts.push(format!("c.id IN ({placeholders})"));
308+
for &id in ids {
309+
values.push(Value::BigInt(Some(id)));
310+
}
285311
}
286312

287313
let sql = match direction {
@@ -298,11 +324,11 @@ LEFT JOIN (
298324
WHERE {}
299325
GROUP BY dst_chain_id
300326
) sm ON sm.dst_chain_id = c.id
301-
WHERE c.id <> $1
302-
AND EXISTS (SELECT 1 FROM chains WHERE id = $1)
327+
WHERE {}
303328
ORDER BY messages_count DESC, src_chain_id ASC, dst_chain_id ASC
304329
"#,
305-
aggregate_where_parts.join(" AND ")
330+
aggregate_where_parts.join(" AND "),
331+
where_parts.join("\n AND ")
306332
),
307333
MessagePathDirection::Incoming => format!(
308334
r#"
@@ -317,11 +343,11 @@ LEFT JOIN (
317343
WHERE {}
318344
GROUP BY src_chain_id
319345
) sm ON sm.src_chain_id = c.id
320-
WHERE c.id <> $1
321-
AND EXISTS (SELECT 1 FROM chains WHERE id = $1)
346+
WHERE {}
322347
ORDER BY messages_count DESC, src_chain_id ASC, dst_chain_id ASC
323348
"#,
324-
aggregate_where_parts.join(" AND ")
349+
aggregate_where_parts.join(" AND "),
350+
where_parts.join("\n AND ")
325351
),
326352
};
327353

@@ -6160,8 +6186,8 @@ mod tests {
61606186

61616187
#[tokio::test]
61626188
#[ignore = "needs database to run"]
6163-
async fn message_paths_include_zero_counterparty_does_not_synthesize_missing_rows() {
6164-
let _db = init_db("message_paths_include_zero_counterparty_missing").await;
6189+
async fn message_paths_include_zero_counterparty_expands_requested_known_rows_only() {
6190+
let _db = init_db("message_paths_include_zero_counterparty_expand").await;
61656191
let interchain_db = InterchainDatabase::new(_db.client());
61666192
interchain_db
61676193
.upsert_chains(vec![
@@ -6180,19 +6206,43 @@ mod tests {
61806206
name: Set("C".into()),
61816207
..Default::default()
61826208
},
6209+
chains::ActiveModel {
6210+
id: Set(4),
6211+
name: Set("D".into()),
6212+
..Default::default()
6213+
},
61836214
])
61846215
.await
61856216
.unwrap();
61866217
interchain_db
61876218
.create_or_update_stats_messages(1, 2, 5)
61886219
.await
61896220
.unwrap();
6221+
interchain_db
6222+
.create_or_update_stats_messages(1, 4, 7)
6223+
.await
6224+
.unwrap();
61906225

61916226
let rows = interchain_db
6192-
.get_outgoing_message_paths(1, None, None, Some(&[3]), true)
6227+
.get_outgoing_message_paths(1, None, None, Some(&[1, 3, 4, 999]), true)
61936228
.await
61946229
.unwrap();
6195-
assert!(rows.is_empty());
6230+
assert_eq!(
6231+
rows,
6232+
vec![
6233+
MessagePathStatsRow {
6234+
src_chain_id: 1,
6235+
dst_chain_id: 4,
6236+
messages_count: 7
6237+
},
6238+
MessagePathStatsRow {
6239+
src_chain_id: 1,
6240+
dst_chain_id: 3,
6241+
messages_count: 0
6242+
},
6243+
]
6244+
);
6245+
assert!(rows.iter().all(|row| row.dst_chain_id != 1));
61966246
}
61976247

61986248
#[tokio::test]
@@ -6245,8 +6295,8 @@ mod tests {
62456295

62466296
#[tokio::test]
62476297
#[ignore = "needs database to run"]
6248-
async fn message_paths_bounded_counterparty_filter_applies_in_sql() {
6249-
let _db = init_db("message_paths_bounded_counterparty").await;
6298+
async fn message_paths_include_zero_incoming_counterparty_expands_requested_known_rows_only() {
6299+
let _db = init_db("message_paths_include_zero_incoming_counterparty_expand").await;
62506300
let interchain_db = InterchainDatabase::new(_db.client());
62516301
interchain_db
62526302
.upsert_chains(vec![
@@ -6265,13 +6315,79 @@ mod tests {
62656315
name: Set("C".into()),
62666316
..Default::default()
62676317
},
6318+
chains::ActiveModel {
6319+
id: Set(4),
6320+
name: Set("D".into()),
6321+
..Default::default()
6322+
},
6323+
])
6324+
.await
6325+
.unwrap();
6326+
interchain_db
6327+
.create_or_update_stats_messages(1, 3, 4)
6328+
.await
6329+
.unwrap();
6330+
interchain_db
6331+
.create_or_update_stats_messages(4, 3, 8)
6332+
.await
6333+
.unwrap();
6334+
6335+
let rows = interchain_db
6336+
.get_incoming_message_paths(3, None, None, Some(&[2, 3, 4, 999]), true)
6337+
.await
6338+
.unwrap();
6339+
assert_eq!(
6340+
rows,
6341+
vec![
6342+
MessagePathStatsRow {
6343+
src_chain_id: 4,
6344+
dst_chain_id: 3,
6345+
messages_count: 8
6346+
},
6347+
MessagePathStatsRow {
6348+
src_chain_id: 2,
6349+
dst_chain_id: 3,
6350+
messages_count: 0
6351+
},
6352+
]
6353+
);
6354+
assert!(rows.iter().all(|row| row.src_chain_id != 3));
6355+
}
6356+
6357+
#[tokio::test]
6358+
#[ignore = "needs database to run"]
6359+
async fn message_paths_include_zero_bounded_counterparty_expands_requested_known_rows_only() {
6360+
let _db = init_db("message_paths_include_zero_bounded_counterparty").await;
6361+
let interchain_db = InterchainDatabase::new(_db.client());
6362+
interchain_db
6363+
.upsert_chains(vec![
6364+
chains::ActiveModel {
6365+
id: Set(1),
6366+
name: Set("A".into()),
6367+
..Default::default()
6368+
},
6369+
chains::ActiveModel {
6370+
id: Set(2),
6371+
name: Set("B".into()),
6372+
..Default::default()
6373+
},
6374+
chains::ActiveModel {
6375+
id: Set(3),
6376+
name: Set("C".into()),
6377+
..Default::default()
6378+
},
6379+
chains::ActiveModel {
6380+
id: Set(4),
6381+
name: Set("D".into()),
6382+
..Default::default()
6383+
},
62686384
])
62696385
.await
62706386
.unwrap();
62716387

62726388
for (date, src, dst, count) in [
62736389
(NaiveDate::from_ymd_opt(2026, 3, 8).unwrap(), 1, 2, 2),
6274-
(NaiveDate::from_ymd_opt(2026, 3, 8).unwrap(), 1, 3, 7),
6390+
(NaiveDate::from_ymd_opt(2026, 3, 8).unwrap(), 1, 4, 7),
62756391
] {
62766392
stats_messages_days::Entity::insert(stats_messages_days::ActiveModel {
62776393
date: Set(date),
@@ -6290,19 +6406,27 @@ mod tests {
62906406
1,
62916407
Some(NaiveDate::from_ymd_opt(2026, 3, 8).unwrap()),
62926408
Some(NaiveDate::from_ymd_opt(2026, 3, 9).unwrap()),
6293-
Some(&[2]),
6409+
Some(&[1, 2, 3, 999]),
62946410
true,
62956411
)
62966412
.await
62976413
.unwrap();
62986414
assert_eq!(
62996415
rows,
6300-
vec![MessagePathStatsRow {
6301-
src_chain_id: 1,
6302-
dst_chain_id: 2,
6303-
messages_count: 2
6304-
}]
6416+
vec![
6417+
MessagePathStatsRow {
6418+
src_chain_id: 1,
6419+
dst_chain_id: 2,
6420+
messages_count: 2
6421+
},
6422+
MessagePathStatsRow {
6423+
src_chain_id: 1,
6424+
dst_chain_id: 3,
6425+
messages_count: 0
6426+
},
6427+
]
63056428
);
6429+
assert!(rows.iter().all(|row| row.dst_chain_id != 1));
63066430
}
63076431

63086432
#[tokio::test]

0 commit comments

Comments
 (0)