-
Notifications
You must be signed in to change notification settings - Fork 220
Description
Expected Behavior
r2dbc based application should be able to stream large amounts of data from a database whilst running within an application with a small memory footprint
Actual Behaviour
When attempting to read DTOs from the database, Micronaut is loading all records into a list structure, leading to eventual OutOfMemoryErrors when handling large numbers of records, with slow time-to-first-byte.
Steps To Reproduce
Below is a massively cut down application that demonstrates the issue.
If we run a postgres database locally, and start the micronaut application with very restrictive memory parameters (in our real application we run in a kubernetes environment where resources are tightly controlled)
-Xms32m -Xmx32m -Djdk.nio.maxCachedBufferSize=10m -XX:MaxDirectMemorySize=10m
Then hitting the "/find-no-wrapper" endpoint runs successfully, and is able to process all of the rows.
As does hitting "/find-with-tuple", as this returns a different mapper type.
However, when hitting "/find-with-wrapper" we get OutOfMemoryError's
//
// NonExistentEntity.java
//
@MappedEntity
public class NonExistentEntity {
@Id String neverMappedId;
}
//
// MyWrapper.java
//
@Serdeable
record MyWrapper(String text) {}
//
// MyController.java
//
@Controller
public class MyController {
private final Logger log = LoggerFactory.getLogger(getClass());
@Inject MyRepository repository;
// This appears to be fully streamed and does not suffer from memory issue (required fetchSize to be set to avoid direct buffer issues)
@Post("/find-no-wrapper")
public Flux<String> findNoWrapper() {
return process(repository.findNoWrapper(), "findNoWrapper");
}
// This will error out with an OutOfMemoryError, as creating ALL of the dtos in memory first before processing
@Post("/find-with-wrapper")
public Flux<String> findWithWrapper() {
return process(repository.findWithWrapper(), "findWithWrapper");
}
// this one also works, as when processing Tuple objects it uses a different mapper
@Post("/find-with-tuple")
public Flux<String> findWithTuple() {
return process(repository.findWithTuple(), "findWithTuple");
}
private Flux<String> process(Flux<?> flux, String method) {
log.info("!!!!{} ... start", method);
AtomicInteger counter = new AtomicInteger();
return flux
.filter(x -> {
var i = counter.incrementAndGet();
if (i % 10000 == 0) {
var value = (x instanceof Tuple t) ? Arrays.asList(t.toArray()) : x;
log.info("!!!!{} ... {} -> {}", method, i, value);
}
return false;
})
.doOnComplete(() -> log.info("!!!!{}(complete) ... {}", method, counter.get()))
.thenMany(Flux.just("{}(complete)", method));
}
}
//
// MyRepository.java
//
@R2dbcRepository(dialect = Dialect.POSTGRES, dataSource = "ds")
public interface MyRepository extends GenericRepository<NonExistentEntity, String> {
// this will generate 10,000,000 rows of 800 characters length each
String QUERY = "SELECT lpad('hi', 800, 'x') AS text FROM generate_series(1, 10000000)";
@Query(QUERY)
Flux<String> findNoWrapper();
@Query(QUERY)
Flux<MyWrapper> findWithWrapper();
@Query(QUERY)
Flux<Tuple> findWithTuple();
}
//
// application.yml
//
r2dbc:
datasources:
ds:
options:
applicationName: local
fetchSize: 1000
database: myds
driver: postgresql
host: localhost
port: 5432
username: postgresql
password:
The underlying cause is in the DefaultR2dbcRepositoryOperations#findAll method, where the createMapper returns a "SqlResultEntityTypeMapper" so it will read ALL of the rows to a list before then converting back to a Flux. This results in the list being dynamically expanded up to the maximum memory available before finally crashing out with OutOfMemoryErrors.
Lines 537 to 553 in 3628f3c
| public <T, R> Flux<R> findAll(@NonNull PreparedQuery<T, R> pq) { | |
| SqlPreparedQuery<T, R> preparedQuery = getSqlPreparedQuery(pq); | |
| return executeReadFlux(preparedQuery, connection -> { | |
| Statement statement = prepareStatement(connection::createStatement, preparedQuery, false, false); | |
| preparedQuery.bindParameters(new R2dbcParameterBinder(connection, statement, preparedQuery)); | |
| SqlTypeMapper<Row, R> mapper = createMapper(preparedQuery, Row.class); | |
| if (mapper instanceof SqlResultEntityTypeMapper<Row, R> entityTypeMapper) { | |
| SqlResultEntityTypeMapper.PushingMapper<Row, List<R>> rowsMapper = entityTypeMapper.readManyMapper(); | |
| return executeAndMapEachRow(statement, row -> { | |
| rowsMapper.processRow(row); | |
| return ""; | |
| }).collectList().flatMapIterable(ignore -> rowsMapper.getResult()); | |
| } | |
| return executeAndMapEachRowNullable(statement, row -> mapper.map(row, preparedQuery.getResultType())); | |
| }); | |
| } |
Environment Information
JDK 21, on Windows and Unix
Postgres 17
Example Application
No response
Version
4.9.4