-
|
Hey team, I'm currently running into an issue while rolling out a new sharded projection (events by tag; JDBC with Postgres). It's basically processing envelopes so fast that the DB CPU becomes saturated, resulting in other parts of the application throttling. I've tried the following things so far: 1. Artificially throttling each projector instanceI thought that I could somehow slow down the projector requesting new events from the journal by sleeping somewhere. I can't really do it in the handler because this would block the JDBC transaction. Therefore, I tried to do it right before session creation, in the session creator callback: return JdbcProjection.groupedWithin(
ProjectionId.of(PROJECTION_NAME, tag),
sourceProvider,
{
// The session creation is called on each run of the projector to create the session (transaction).
// By running sleep here, we can delay the run of the projector without blocking the DB connection/transaction.
maybeSleep()
HibernateSession(...)
},
{ },
system
)Unfortunately, this only works partially because it blocks the full thread (via 2. Reducing the number of concurrent projector instancesWe're having > 100 tags, so I thought if I manually restricted the number of instances to, let's say, 20 and had these 20 do more than just one tag, it could keep the impact low. However, I'm not seeing any API to do that. Am I maybe missing something crucial here? |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 13 replies
-
|
Pekko-Projections is not an area that I claim expertise in. I know that it is not an area that the active Pekko volunteers generally look at a great deal and it doesn't appear to be heavily used. Hopefully, someone with more Projections experience may be able to help. Here are some devil's advocate suggestions that you might consider. Have a checked your DB instance to see if its performance issues can be addressed? Maybe some DB tuning could help. |
Beta Was this translation helpful? Give feedback.
-
|
For |
Beta Was this translation helpful? Give feedback.
-
|
Might be a bit of a radical idea, but as a long term solution it seems like r2dbc in conjunction with pekko streams would properly solve this issue (r2dbc implements the reactive streams protocol which mean converting them to pekko-streams is trivial)? There is already pekko-projection-r2dbc albeit its not implemented via pure streams (instead it materializes the transactions prematurely into |
Beta Was this translation helpful? Give feedback.
-
|
@rsmidt Maybe JdbcProjection.atLeastOnce(
ProjectionId.of(PROJECTION_NAME, tag),
new SourceProvider[Offset, Envelope] {
val wrapped = sourceProvider
override def source(offset: () => Future[Option[Long]]): Future[Source[Envelope, NotUsed]] =
wrapped.source(offset).map(_.throttle(1, 1.second))
override def extractOffset(envelope: Envelope): Long = wrapped.extractOffset(envelope)
override def extractCreationTime(envelope: Envelope): Long = wrapped.extractOffset(envelope)
},
...
) |
Beta Was this translation helpful? Give feedback.
@rsmidt Maybe
sourceProvidercould be wrapped to add throttling to the source?