Skip to content

Live data preview and node transition frequency on Flink minicluster #8047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 39 commits into
base: staging
Choose a base branch
from

Conversation

mgoworko
Copy link
Contributor

@mgoworko mgoworko commented May 7, 2025

Describe your changes

  • added listener, that collects live data processed by the scenario
  • the data is stored in a ConcurrentHashMap-based ring buffer, samples corresponding to the last N contextIds are stored
  • also added node transition frequency calculation - node transition counts are stored in buckets for each epoch second and calculated in time windows

The data can be fetched using /api/scenarioTesting/{scenario_name}/liveData

At the moment only for Flink minicluster, because it is run a part of the JVM and we can pass listener to the job.

Checklist before merge

  • Related issue ID is placed at the beginning of PR title in [brackets] (can be GH issue or Nu Jira issue)
  • Code is cleaned from temporary changes and commented out lines
  • Parts of the code that are not easy to understand are documented in the code
  • Changes are covered by automated tests
  • Showcase in dev-application.conf added to demonstrate the feature
  • Documentation added or updated
  • Added entry in Changelog.md describing the change from the perspective of a public distribution user
  • Added MigrationGuide.md entry in the appropriate subcategory if introducing a breaking change
  • Verify that PR will be squashed during merge

mgoworko added 2 commits May 12, 2025 15:48
# Conflicts:
#	engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala
@mgoworko mgoworko marked this pull request as ready for review May 13, 2025 09:33
mgoworko added 3 commits May 13, 2025 11:34
# Conflicts:
#	components-api/src/main/scala/pl/touk/nussknacker/engine/ModelConfig.scala
#	engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala
@mgoworko mgoworko changed the title Draft, do not review - Flink minicluster live data preview In progress - Flink minicluster live data preview May 13, 2025
@github-actions github-actions bot added the docs label May 18, 2025
@mgoworko mgoworko changed the title In progress - Flink minicluster live data preview Live data preview and node transition frequency on Flink minicluster May 18, 2025
@github-actions github-actions bot added client client main fe ui labels May 18, 2025
@TouK TouK deleted a comment from github-actions bot May 19, 2025
@mgoworko mgoworko requested a review from arkadius May 19, 2025 07:58
* @param id correlation id/trace id used for tracing (logs, error presentation) and for tests mechanism, it should be always defined
* @param variables variables available in evaluation
* @param parentContext context used for scopes handling, mainly for fragment invocation purpose
*/
case class Context(
initialId: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will no longer be needed after the ContextId refactor. The initialId is basically ContextId without prefix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's add a TODO/FIXME

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the initialId field. Instead of it, the initial context id is now recognized by regex. There is a todo added about it. After the ContextId refactor the regex won't be necessary, because we will be able co compare parts of the ContextId case class instead.


final case class LiveDataPreview(
liveDataSamples: TestResults[Json],
nodeTransitionFrequency: Map[NodeTransition, BigDecimal],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that using "throughput" word is more natural in this context. The "frequency" is used when we talk about some cyclic mechanism to show how often it is triggered.


def getLiveData(
processIdWithName: ProcessIdWithName,
): Future[Option[LiveDataPreview]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Option is mysterious. It could be interpreted in many ways. Let's use Either with explicit error instead. For deployed scenarios, we should always return defined, empty LiveDataPreview. The error should be returned when someone ask for scenario that is not deployed or is finished/stopped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, either with error in Left. The error only says that the information is not available for the scenario. In that place we do not have detailed information as to why.

}

case class ContextId(value: String)

/**
* Context is container for variables used in expression evaluation
*
* @param initialId the initial id of the context (id may change during the processing, but the initial id is preserved in this field
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add one more sentence in the comment, why do we need this field with preserved id? Also, maybe we could replace case class with normal class to make changes of initialId statically impossible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this field, details in the other comment.

Any
] =
baseNuApiEndpoint
.summary("Perform test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What test this endpoint perform? :P

.summary("Perform test")
.tag("Testing")
.get
.in("scenarioTesting" / path[ProcessName]("scenarioName") / "liveData")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why endpoint is nested in scenarioTesing resource? Let's create a new liveData endpoint. I know that under the hood we reuse some features from the scenario testing mechanism, but we don't need to expose it in the interface.

* @param id correlation id/trace id used for tracing (logs, error presentation) and for tests mechanism, it should be always defined
* @param variables variables available in evaluation
* @param parentContext context used for scopes handling, mainly for fragment invocation purpose
*/
case class Context(
initialId: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's add a TODO/FIXME

Option(listenerStorages.getIfPresent(processName.value)).map(_.getLiveDataPreview)
}

private[livedata] def cleanResults(processName: ProcessName): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that you don't clean results after the job finish, because you want to make it still available for the user for some period of time? Maybe let's write a comment about that because from the first glance it looks like some memory leak.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, comment is now above this method and describes when we want to clean the data

val isNewKey = !results.containsKey(key)
if (isNewKey) orderOfResults.add(key)
results.put(key, newValue)
while (orderOfResults.size() > maxNumberOfSamples) {
Copy link
Member

@arkadius arkadius May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size has O(n) complexity. Have you considered using the ready solution such as EvictingQueue from guave? we do synchronisation, so I can't see too much added value from using thread-safe structures. Keeping two information in two data structures: results and orderOfResults is hard to analyze in code, e.g. as a reviewer I can't guratanee that it is correctly cleaned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately guava EvictingQueue won't work here. It does not support modifying elements by key. But my solution was not good either.

I changed the solution. I created RingBuffer util, with unit tests. It is based on the LinkedHashMap, with added synchronization.


private val results = new ConcurrentHashMap[K, V]()
private val orderOfResults = new ConcurrentLinkedQueue[K]()
private val transitionsByEpochSecond = new ConcurrentHashMap[Long, ConcurrentLinkedQueue[NodeTransition]]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could just use a MultiMap to make it simpler? Or a Map NodeTransition ->InstantRateMeter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately there is no ready-to-use util to do exactly what is needed here. I asked LLM's about it too :) . (The main difficulty is that we have to drop the transitions, that are older than the treshold)

I created an util class SlidingWindowCounter. It wraps the logic around it and I think it is quite simple and easy to understand. I also added unit tests of that util.

@github-actions github-actions bot removed ui client client main fe labels May 21, 2025
Copy link
Contributor

created: #8133
⚠️ Be careful! Snapshot changes are not necessarily the cause of the error. Check the logs.

mgoworko added 10 commits May 22, 2025 15:33
# Conflicts:
#	e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala
# Conflicts:
#	designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioTestingApiHttpService.scala
#	designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioTesting/Dtos.scala
#	designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioTesting/ScenarioTestingApiEndpoints.scala
#	designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/ScenarioTestingApiHttpServiceSpec.scala
#	designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/SchemalessKafkaJsonTypeTests.scala
#	docs-internal/api/nu-designer-openapi.yaml
#	docs/Changelog.md
# Conflicts:
#	components/openapi/src/main/scala/pl/touk/nussknacker/openapi/enrichers/OpenAPIEnricherFactory.scala
@@ -2023,6 +2024,20 @@ lazy val deploymentManagerApi = (project in file("designer/deployment-manager-ap
)
.dependsOn(extensionsApi, testUtils % Test)

lazy val liveDataCollector = (project in file("designer/live-data-collector"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a new module, reusable by other DMs, instead of adding it in the Flink engine module.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants