Skip to content

🔍 FT.AGGREGATE CURSOR  #708

@SylvainAssemat

Description

@SylvainAssemat

Hi Redis OM SPRING Team

I am trying to find the best way (or the quickest way) to read some data (around 50000 JSON DOCUMENTS) and i have tested some mechanism like Pagination (skip/limit) and cursor too.

For my cursor UNIT TEST, i see on my log that the cursor id seems to be the same on each iteration.

Could it be a bug or something?

On this way i can not get the same count of element as expected... 72 are missing.

   🚩 Starting cursor-based read of 47069 RDChargingStation
    - Cursor page 1: 10000 items retrieved (cursorId: 1179483090)
    - Cursor page 2: 10000 items retrieved (cursorId: 1179483090)
    - Cursor page 3: 10000 items retrieved (cursorId: 1179483090)
    - Cursor page 4: 10000 items retrieved (cursorId: 1179483090)
    - Cursor page 5: 6997 items retrieved (cursorId: 0)
   🏁 Cursor read completed: 46997 RDChargingStation retrieved in 5456 ms (expected: 47069, missing: 72)
        public List<RDChargingStation> findAllWithCursor() {
        long start = System.currentTimeMillis();
        long expectedCount = count();
        log.info("🚩 Starting cursor-based read of {} RDChargingStation", expectedCount);

        List<RDChargingStation> allResults = new ArrayList<>((int) expectedCount);

        // Création d'un nouveau SearchStream
        SearchStream<RDChargingStation> stream = entityStream.of(RDChargingStation.class);

        // Démarrer le curseur
        AggregationResult page = stream.cursor(BATCH_READ_SIZE, Duration.ofMinutes(5))
                .loadAll()
                .aggregate();
        long cursorId = page.getCursorId();
        Gson gson = getGson();

        int batchCount = 1;

        // Ajouter la première page
        List<RDChargingStation> batch = page.getResults().stream()
                .map(d -> gson.fromJson(d.get("$").toString(), RDChargingStation.class))
                .toList();
        allResults.addAll(batch);
        log.info(" - Cursor page {}: {} items retrieved (cursorId: {})", batchCount, batch.size(), cursorId);

        // Lire toutes les pages suivantes en **créant un nouveau stream à chaque fois**
        while (cursorId != 0) {
            batchCount++;

            AggregationResult nextPage = stream.getSearchOperations().cursorRead(cursorId, BATCH_READ_SIZE);
            batch = nextPage.getResults().stream()
                    .map(d -> gson.fromJson(d.get("$").toString(), RDChargingStation.class))
                    .toList();
            allResults.addAll(batch);

            cursorId = nextPage.getCursorId();
            log.info(" - Cursor page {}: {} items retrieved (cursorId: {})", batchCount, nextPage.getRows().size(), cursorId);
        }

        long duration = System.currentTimeMillis() - start;
        log.info("🏁 Cursor read completed: {} RDChargingStation retrieved in {} ms (expected: {}, missing: {})",
                allResults.size(), duration, expectedCount, expectedCount - allResults.size());

        return allResults;
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions