Skip to content

Commit 827edea

Browse files
committed
fix: list invocations pagination
1 parent 0b6de09 commit 827edea

File tree

6 files changed

+162
-88
lines changed

6 files changed

+162
-88
lines changed

server/src/http_objects.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,8 @@ pub struct CreateNamespaceResponse {
421421
#[derive(Debug, Serialize, Deserialize, ToSchema)]
422422
pub struct GraphInvocations {
423423
pub invocations: Vec<Invocation>,
424-
pub cursor: Option<String>,
424+
pub prev_cursor: Option<String>,
425+
pub next_cursor: Option<String>,
425426
}
426427

427428
#[derive(Debug, Serialize, Deserialize, ToSchema)]

server/src/routes.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ async fn graph_invocations(
636636
Some(CursorDirection::Backward) => Some(state_store::scanner::CursorDirection::Backward),
637637
None => None,
638638
};
639-
let (invocation_ctxs, cursor) = state
639+
let (invocation_ctxs, prev_cursor, next_cursor) = state
640640
.indexify_state
641641
.reader()
642642
.list_invocations(
@@ -651,11 +651,13 @@ async fn graph_invocations(
651651
for invocation_ctx in invocation_ctxs {
652652
invocations.push(invocation_ctx.into());
653653
}
654-
let cursor = cursor.map(|c| BASE64_STANDARD.encode(c));
654+
let prev_cursor = prev_cursor.map(|c| BASE64_STANDARD.encode(c));
655+
let next_cursor = next_cursor.map(|c| BASE64_STANDARD.encode(c));
655656

656657
Ok(Json(GraphInvocations {
657658
invocations,
658-
cursor,
659+
prev_cursor,
660+
next_cursor,
659661
}))
660662
}
661663

@@ -971,7 +973,7 @@ async fn find_invocation(
971973
Ok(Json(invocation_ctx.into()))
972974
}
973975

974-
/// Delete a specific invocation
976+
/// Delete a specific invocation
975977
#[utoipa::path(
976978
delete,
977979
path = "/namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}",

server/state_store/src/scanner.rs

+63-15
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use data_model::{
1717
TaskOutputsIngestedEvent,
1818
UnprocessedStateChanges,
1919
};
20+
use indexify_utils::get_epoch_time_in_ms;
2021
use metrics::Timer;
2122
use opentelemetry::KeyValue;
2223
use rocksdb::{Direction, IteratorMode, ReadOptions, TransactionDB};
@@ -493,21 +494,21 @@ impl StateReader {
493494
cursor: Option<&[u8]>,
494495
limit: usize,
495496
direction: Option<CursorDirection>,
496-
) -> Result<(Vec<GraphInvocationCtx>, Option<Vec<u8>>)> {
497+
) -> Result<(Vec<GraphInvocationCtx>, Option<Vec<u8>>, Option<Vec<u8>>)> {
497498
let kvs = &[KeyValue::new("op", "list_invocations")];
498499
let _timer = Timer::start_with_labels(&self.metrics.state_read, kvs);
499-
let key_prefix = [namespace.as_bytes(), b"|", compute_graph.as_bytes()].concat();
500+
let key_prefix = [namespace.as_bytes(), b"|", compute_graph.as_bytes(), b"|"].concat();
500501

501502
let direction = direction.unwrap_or(CursorDirection::Forward);
502503
let mut read_options = ReadOptions::default();
503504
read_options.set_readahead_size(10_194_304);
504505

505506
let mut upper_bound = key_prefix.clone();
506-
upper_bound.push(0xff);
507+
upper_bound.extend(&get_epoch_time_in_ms().to_be_bytes());
507508
read_options.set_iterate_upper_bound(upper_bound);
508509

509510
let mut lower_bound = key_prefix.clone();
510-
lower_bound.push(0x00);
511+
lower_bound.extend(&(0 as u64).to_be_bytes());
511512
read_options.set_iterate_lower_bound(lower_bound);
512513

513514
let mut iter = self.db.raw_iterator_cf_opt(
@@ -516,29 +517,67 @@ impl StateReader {
516517
);
517518

518519
match cursor {
519-
Some(cursor) => iter.seek(cursor),
520-
None => iter.seek_to_last(),
520+
Some(cursor) => {
521+
match direction {
522+
CursorDirection::Backward => iter.seek(cursor),
523+
CursorDirection::Forward => iter.seek_for_prev(cursor),
524+
}
525+
// Skip the first item (cursor position)
526+
if iter.valid() {
527+
match direction {
528+
CursorDirection::Forward => iter.prev(),
529+
CursorDirection::Backward => iter.next(),
530+
}
531+
}
532+
}
533+
None => match direction {
534+
CursorDirection::Backward => iter.seek_to_first(), // Start at beginning of range
535+
CursorDirection::Forward => iter.seek_to_last(), // Start at end of range
536+
},
521537
}
522538

523539
let mut rows = Vec::new();
524540
let mut next_cursor = None;
541+
let mut prev_cursor = None;
525542

526543
// Collect results
527-
while iter.valid() {
544+
while iter.valid() && rows.len() < limit {
528545
if let Some((key, _v)) = iter.item() {
529-
if rows.len() < limit {
530-
rows.push(key.to_vec());
531-
} else {
532-
next_cursor = Some(key.to_vec());
533-
break;
534-
}
546+
rows.push(key.to_vec());
535547
} else {
536-
break;
548+
break; // No valid item found
537549
}
538550

551+
// Move the iterator after capturing the current item
539552
match direction {
540-
CursorDirection::Backward => iter.next(),
541553
CursorDirection::Forward => iter.prev(),
554+
CursorDirection::Backward => iter.next(),
555+
}
556+
}
557+
558+
// Check if there are more items after our limit
559+
if iter.valid() {
560+
let key = rows.last().cloned();
561+
match direction {
562+
CursorDirection::Forward => {
563+
next_cursor = key;
564+
}
565+
CursorDirection::Backward => {
566+
prev_cursor = key;
567+
}
568+
}
569+
}
570+
571+
// Set the previous cursor if we have a valid item
572+
if cursor.is_some() {
573+
let key = rows.first().cloned();
574+
match direction {
575+
CursorDirection::Forward => {
576+
prev_cursor = key;
577+
}
578+
CursorDirection::Backward => {
579+
next_cursor = key;
580+
}
542581
}
543582
}
544583

@@ -556,13 +595,22 @@ impl StateReader {
556595
}
557596
}
558597

598+
match direction {
599+
CursorDirection::Forward => {}
600+
CursorDirection::Backward => {
601+
// We keep the ordering the same even if we traverse in the opposite direction
602+
invocation_prefixes.reverse();
603+
}
604+
}
605+
559606
let invocations = self.get_rows_from_cf_multi_key::<GraphInvocationCtx>(
560607
invocation_prefixes.iter().map(|v| v.as_slice()).collect(),
561608
IndexifyObjectsColumns::GraphInvocationCtx,
562609
)?;
563610

564611
Ok((
565612
invocations.into_iter().filter_map(|v| v).collect(),
613+
prev_cursor,
566614
next_cursor,
567615
))
568616
}

server/ui/src/routes/Namespace/IndividualComputeGraphPage.tsx

+62-56
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
import { Box, Breadcrumbs, Typography, Stack, Chip, Button, CircularProgress } from '@mui/material'
1+
import {
2+
Box,
3+
Breadcrumbs,
4+
Typography,
5+
Stack,
6+
Chip,
7+
Button,
8+
CircularProgress,
9+
} from '@mui/material'
210
import { TableDocument } from 'iconsax-react'
311
import NavigateNextIcon from '@mui/icons-material/NavigateNext'
412
import NavigateBeforeIcon from '@mui/icons-material/NavigateBefore'
@@ -14,13 +22,18 @@ import axios from 'axios'
1422
import { getIndexifyServiceURL } from '../../utils/helpers'
1523

1624
const IndividualComputeGraphPage = () => {
17-
const { invocationsList, computeGraph, namespace, cursor } = useLoaderData() as IndividualComputeGraphLoaderData
25+
const {
26+
invocationsList,
27+
computeGraph,
28+
namespace,
29+
prevCursor: prevCursorLoader,
30+
nextCursor: nextCursorLoader,
31+
} = useLoaderData() as IndividualComputeGraphLoaderData
1832

1933
const [invocations, setInvocations] = useState<Invocation[]>(invocationsList)
2034
const [isLoading, setIsLoading] = useState(false)
21-
const [currentCursor, setCurrentCursor] = useState<string | null>(null)
22-
const [nextCursor, setNextCursor] = useState<string | null>(cursor)
23-
const [cursorHistory, setCursorHistory] = useState<string[]>([])
35+
const [prevCursor, setPrevCursor] = useState<string | null>(prevCursorLoader)
36+
const [nextCursor, setNextCursor] = useState<string | null>(nextCursorLoader)
2437

2538
const handleDelete = useCallback((updatedList: Invocation[]) => {
2639
const sortedList = [...updatedList].sort(
@@ -29,56 +42,49 @@ const IndividualComputeGraphPage = () => {
2942
setInvocations(sortedList)
3043
}, [])
3144

32-
const fetchInvocations = useCallback(async (cursor: string | null, direction: 'forward' | 'backward') => {
33-
setIsLoading(true)
34-
try {
35-
const serviceURL = getIndexifyServiceURL()
36-
const limit = 20
37-
const url = `${serviceURL}/namespaces/${namespace}/compute_graphs/${computeGraph.name}/invocations?limit=${limit}${
38-
cursor ? `&cursor=${cursor}` : ''
39-
}&direction=${direction}`
40-
41-
const response = await axios.get(url)
42-
const data = response.data
43-
44-
setInvocations([...data.invocations].sort(
45-
(a, b) => (b.created_at ?? 0) - (a.created_at ?? 0)
46-
))
47-
48-
if (direction === 'forward') {
49-
if (cursor) {
50-
setCursorHistory(prev => [...prev, cursor])
51-
}
52-
setCurrentCursor(cursor)
53-
setNextCursor(data.cursor || null)
54-
} else {
55-
if (cursorHistory.length > 0) {
56-
setCursorHistory(prev => prev.slice(0, -1))
57-
}
58-
setCurrentCursor(cursorHistory.length > 1 ? cursorHistory[cursorHistory.length - 2] : null)
59-
setNextCursor(data.cursor || null)
45+
const fetchInvocations = useCallback(
46+
async (cursor: string | null, direction: 'forward' | 'backward') => {
47+
setIsLoading(true)
48+
try {
49+
const serviceURL = getIndexifyServiceURL()
50+
const limit = 20
51+
const url = `${serviceURL}/namespaces/${namespace}/compute_graphs/${
52+
computeGraph.name
53+
}/invocations?limit=${limit}${
54+
cursor ? `&cursor=${cursor}` : ''
55+
}&direction=${direction}`
56+
57+
const response = await axios.get(url)
58+
const data = response.data
59+
60+
setInvocations([...data.invocations])
61+
62+
setPrevCursor(data.prev_cursor)
63+
setNextCursor(data.next_cursor)
64+
console.log(direction, {
65+
prevCursor: data.prev_cursor,
66+
nextCursor: data.next_cursor,
67+
})
68+
} catch (error) {
69+
console.error('Error fetching invocations:', error)
70+
} finally {
71+
setIsLoading(false)
6072
}
61-
} catch (error) {
62-
console.error("Error fetching invocations:", error)
63-
} finally {
64-
setIsLoading(false)
65-
}
66-
}, [namespace, computeGraph.name, cursorHistory])
73+
},
74+
[namespace, computeGraph.name]
75+
)
6776

6877
const handleNextPage = useCallback(() => {
69-
const cursor = nextCursor || currentCursor
70-
if (cursor) {
71-
fetchInvocations(cursor, 'forward')
78+
if (nextCursor) {
79+
fetchInvocations(nextCursor, 'forward')
7280
}
73-
}, [nextCursor, currentCursor, fetchInvocations])
81+
}, [nextCursor, fetchInvocations])
7482

7583
const handlePreviousPage = useCallback(() => {
76-
if (cursorHistory.length > 0) {
77-
const prevCursor = cursorHistory[cursorHistory.length - 1]
84+
if (prevCursor) {
7885
fetchInvocations(prevCursor, 'backward')
7986
}
80-
}, [cursorHistory, fetchInvocations])
81-
87+
}, [prevCursor, fetchInvocations])
8288
return (
8389
<Stack direction="column" spacing={3}>
8490
<Breadcrumbs
@@ -132,29 +138,29 @@ const IndividualComputeGraphPage = () => {
132138
onDelete={handleDelete}
133139
/>
134140

135-
<Box
136-
sx={{
137-
display: 'flex',
138-
justifyContent: 'space-between',
141+
<Box
142+
sx={{
143+
display: 'flex',
144+
justifyContent: 'space-between',
139145
mt: 2,
140-
alignItems: 'center'
146+
alignItems: 'center',
141147
}}
142148
>
143149
<Button
144150
startIcon={<NavigateBeforeIcon />}
145151
onClick={handlePreviousPage}
146-
disabled={cursorHistory.length === 0 || isLoading}
152+
disabled={!prevCursor || isLoading}
147153
variant="outlined"
148154
>
149155
Previous
150156
</Button>
151-
157+
152158
{isLoading && <CircularProgress size={24} />}
153-
159+
154160
<Button
155161
endIcon={<NavigateNextIcon />}
156162
onClick={handleNextPage}
157-
disabled={isLoading}
163+
disabled={!nextCursor || isLoading}
158164
variant="outlined"
159165
>
160166
Next

server/ui/src/routes/Namespace/types.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ export interface ComputeGraphLoaderData extends NamespaceLoaderData {
1919
export interface IndividualComputeGraphLoaderData extends NamespaceLoaderData {
2020
invocationsList: Invocation[]
2121
computeGraph: ComputeGraph
22-
cursor: string | null;
22+
prevCursor: string | null
23+
nextCursor: string | null
2324
direction?: string
2425
}
2526

0 commit comments

Comments
 (0)