Move reusable code from the Camel Integration Capability#68
Move reusable code from the Camel Integration Capability#68orpiske merged 1 commit intowanaku-ai:mainfrom
Conversation
So that we can create a Camel Context plugin that can be loaded in build time and in runtime. Ref: wanaku-ai/camel-integration-capability#56
Reviewer's GuideIntroduces a new modular runtimes layout and a Camel-specific runtime plugin: the generic runtime is split into a capabilities-runtimes parent with a common module and a Camel runtime aggregator, and a new Camel plugin module is added that can be discovered via Camel’s ContextServicePlugin SPI, load configuration, download routes/rules/dependencies, start a gRPC server for tools/resources, and register itself with Wanaku discovery/services. Sequence diagram for CamelIntegrationPlugin load lifecyclesequenceDiagram
actor CamelApp
participant CamelContext
participant CamelIntegrationPlugin
participant PluginConfiguration
participant InitializerFactory
participant Initializer
participant ServicesHttpClient
participant DownloaderFactory
participant ResourceListBuilder
participant ResourceDownloaderCallback
participant RegistrationManager
participant DiscoveryService
participant McpRulesManager
participant GrpcServer
CamelApp->>CamelContext: start()
CamelContext->>CamelIntegrationPlugin: load(camelContext)
activate CamelIntegrationPlugin
CamelIntegrationPlugin->>PluginConfiguration: load()
PluginConfiguration-->>CamelIntegrationPlugin: PluginConfiguration
CamelIntegrationPlugin->>PluginConfiguration: validate()
CamelIntegrationPlugin->>CamelIntegrationPlugin: create dataDir
CamelIntegrationPlugin->>InitializerFactory: createInitializer(initFrom, dataDir)
InitializerFactory-->>Initializer: Initializer
CamelIntegrationPlugin->>Initializer: initialize()
CamelIntegrationPlugin->>ServicesHttpClient: new(serviceConfig)
ServicesHttpClient-->>CamelIntegrationPlugin: httpClient
CamelIntegrationPlugin->>DownloaderFactory: new(httpClient, dataDir)
CamelIntegrationPlugin->>ResourceListBuilder: newBuilder()
ResourceListBuilder-->>CamelIntegrationPlugin: builder
CamelIntegrationPlugin->>ResourceListBuilder: addRoutesRef(routesRef)
CamelIntegrationPlugin->>ResourceListBuilder: addRulesRef(rulesRef)
CamelIntegrationPlugin->>ResourceListBuilder: addDependenciesRef(dependenciesRef)
ResourceListBuilder-->>CamelIntegrationPlugin: buildForPlugin(): resources
CamelIntegrationPlugin->>ResourceDownloaderCallback: new(downloaderFactory, resources)
CamelIntegrationPlugin->>CamelIntegrationPlugin: newServiceTarget()
CamelIntegrationPlugin->>CamelIntegrationPlugin: newRegistrationManager(target, callback, serviceConfig)
CamelIntegrationPlugin->>RegistrationManager: start()
RegistrationManager->>DiscoveryService: register(serviceTarget)
DiscoveryService-->>RegistrationManager: registration ok
RegistrationManager->>ResourceDownloaderCallback: onRegistration(...)
activate ResourceDownloaderCallback
ResourceDownloaderCallback->>ResourceDownloaderCallback: downloadResources()
ResourceDownloaderCallback-->>ResourceDownloaderCallback: downloadedResources
deactivate ResourceDownloaderCallback
CamelIntegrationPlugin->>ResourceDownloaderCallback: waitForDownloads()
alt downloads ok
CamelIntegrationPlugin->>McpRulesManager: new(serviceName, rulesPath)
CamelIntegrationPlugin->>McpRulesManager: loadMcpSpecAndRegister(toolTransformer, resourceTransformer)
McpRulesManager-->>CamelIntegrationPlugin: McpSpec
CamelIntegrationPlugin->>GrpcServer: new().addService(CamelTool, CamelResource, ProvisionBase)
GrpcServer-->>CamelIntegrationPlugin: server
CamelIntegrationPlugin->>GrpcServer: start()
else failure
CamelIntegrationPlugin-->>CamelContext: throw IllegalStateException
end
deactivate CamelIntegrationPlugin
Class diagram for CamelIntegrationPlugin, configuration, and initializationclassDiagram
class CamelIntegrationPlugin {
-Server grpcServer
-RegistrationManager registrationManager
-PluginConfiguration config
+load(CamelContext camelContext) void
+unload(CamelContext camelContext) void
-newServiceTarget() ServiceTarget
-newRegistrationManager(ServiceTarget serviceTarget, ResourceDownloaderCallback resourcesDownloaderCallback, ServiceConfig serviceConfig) RegistrationManager
-createMcpSpec(ServicesHttpClient servicesClient, Map downloadedResources) McpSpec
}
class PluginConfiguration {
-String registrationUrl
-int grpcPort
-String registrationAnnounceAddress
-String serviceName
-String routesRef
-String rulesRef
-String tokenEndpoint
-String clientId
-String clientSecret
-String dependenciesRef
-String repositoriesList
-String dataDir
-String initFrom
-int retries
-int retryWaitSeconds
-long initialDelay
-long period
-boolean noWait
+static load() PluginConfiguration
-static getConfigValue(Properties props, String propertyKey, String envKey, String defaultValue) String
+validate() void
+getRegistrationUrl() String
+getGrpcPort() int
+getRegistrationAnnounceAddress() String
+getServiceName() String
+getRoutesRef() String
+getRulesRef() String
+getTokenEndpoint() String
+getClientId() String
+getClientSecret() String
+getDependenciesRef() String
+getRepositoriesList() String
+getDataDir() String
+getInitFrom() String
+getRetries() int
+getRetryWaitSeconds() int
+getInitialDelay() long
+getPeriod() long
+isNoWait() boolean
}
class InitializerFactory {
+createInitializer(String initFrom, Path dataDir) Initializer
}
class Initializer {
<<interface>>
+initialize() void
}
class GitInitializer {
-String gitRepoUrl
-Path dataDir
-Path clonedRepoPath
+GitInitializer(String gitRepoUrl, Path dataDir)
+initialize() void
+getClonedRepoPath() Path
}
class NoOpInitializer {
+NoOpInitializer()
+initialize() void
}
class McpRulesManager {
-String name
-String routesRules
+McpRulesManager(String name, String routesRules)
-loadMcpSpec() McpSpec
-registerDefinitions(Map definitions, RulesTransformer transformer) void
+loadMcpSpecAndRegister(RulesTransformer toolTransformer, RulesTransformer resourceTransformer) McpSpec
}
class McpRulesReader {
+readMcpSpecFromFile(String filePath) McpSpec
+readMcpSpecFromFile(File file) McpSpec
+readMcpSpecFromInputStream(InputStream inputStream) McpSpec
+readMcpSpecFromString(String yamlContent) McpSpec
+readFromFile(String filePath) Tool
+readFromFile(File file) Tool
+readFromInputStream(InputStream inputStream) Tool
+readFromString(String yamlContent) Tool
}
class ProvisionBase {
+ProvisionBase(String name)
+provision(ProvisionRequest request, StreamObserver responseObserver) void
+properties() Map
}
CamelIntegrationPlugin --> PluginConfiguration : uses
CamelIntegrationPlugin --> InitializerFactory : uses
InitializerFactory --> Initializer : creates
GitInitializer ..|> Initializer
NoOpInitializer ..|> Initializer
CamelIntegrationPlugin --> McpRulesManager : uses
McpRulesManager --> McpRulesReader : uses
CamelIntegrationPlugin ..> ProvisionBase : creates
Class diagram for downloaders, MCP model, gRPC services, and utilitiesclassDiagram
class Downloader {
<<interface>>
+downloadResource(ResourceRefs resourceName, Map downloadedResources) void
}
class DownloaderFactory {
-ServicesHttpClient servicesHttpClient
-Path dataDir
-FileDownloader fileDownloader
-DataStoreDownloader dataStoreDownloader
+DownloaderFactory(ServicesHttpClient servicesHttpClient, Path dataDir)
+getDownloader(URI uri) Downloader
-getDataStoreDownloader() DataStoreDownloader
-getFileDownloader() FileDownloader
}
class FileDownloader {
-Path dataDir
+FileDownloader(Path dataDir)
+downloadResource(ResourceRefs resourceName, Map downloadedResources) void
}
class DataStoreDownloader {
-ServicesHttpClient servicesHttpClient
-Path dataDir
+DataStoreDownloader(ServicesHttpClient servicesHttpClient, Path dataDir)
+downloadResource(ResourceRefs resourceName, Map downloadedResources) void
}
class ResourceRefs {
<<record>>
+resourceType() ResourceType
+ref() URI
+newRoutesRef(String routesRef) ResourceRefs
+newRulesRef(String rulesRef) ResourceRefs
+newDependencyRef(String dependencyRef) ResourceRefs
}
class ResourceType {
<<enum>>
ROUTES_REF
RULES_REF
DEPENDENCY_REF
}
class ResourceDownloaderCallback {
-List resources
-CountDownLatch countDownLatch
-DownloaderFactory downloaderFactory
-Map downloadedResources
+ResourceDownloaderCallback(DownloaderFactory downloaderFactory, List resources)
+onPing(RegistrationManager manager, ServiceTarget target, int status) void
+onRegistration(RegistrationManager manager, ServiceTarget target) void
+onDeregistration(RegistrationManager manager, ServiceTarget target, int status) void
-downloadResources() void
+waitForDownloads() boolean
+getDownloadedResources() Map
}
class ResourceListBuilder {
-List resources
-boolean hasRoutesRef
+newBuilder() ResourceListBuilder
+addRoutesRef(String routesRef) ResourceListBuilder
+addRulesRef(String rulesRef) ResourceListBuilder
+addDependenciesRef(String dependenciesRef) ResourceListBuilder
+build() List
+buildForPlugin() List
}
class McpSpec {
-McpContent mcp
+McpSpec()
+getMcp() McpContent
+setMcp(McpContent mcp) void
}
class McpContent {
-McpEntityWrapper tools
-McpEntityWrapper resources
+McpContent()
+getTools() McpEntityWrapper
+setTools(McpEntityWrapper tools) void
+getResources() McpEntityWrapper
+setResources(McpEntityWrapper resources) void
}
class McpEntityWrapper {
-Map tools
+McpEntityWrapper()
+getDefinitions() Map
+setTools(Map tools) void
}
class McpEntityDeserializer
class Definition {
-Route route
-String description
-List properties
-String namespace
+Definition()
+getRoute() Route
+setRoute(Route route) void
+getDescription() String
+setDescription(String description) void
+getProperties() List
+setProperties(List properties) void
+getNamespace() String
+setNamespace(String namespace) void
}
class Route {
-String uri
-String id
+Route()
+getUri() String
+setUri(String uri) void
+getId() String
+setId(String id) void
}
class Property {
-String name
-String type
-String description
-boolean required
-Mapping mapping
+Property()
+getName() String
+setName(String name) void
+getType() String
+setType(String type) void
+getDescription() String
+setDescription(String description) void
+isRequired() boolean
+setRequired(boolean required) void
+getMapping() Mapping
+setMapping(Mapping mapping) void
}
class Mapping {
-String type
-String name
+Mapping()
+getType() String
+setType(String type) void
+getName() String
+setName(String name) void
}
class Tool {
-McpEntityWrapper toolsWrapper
+Tool()
+getTools() Map
+setToolsWrapper(McpEntityWrapper toolsWrapper) void
}
class CamelTool {
-McpSpec mcpSpec
-CamelContext camelContext
+CamelTool(CamelContext camelContext, McpSpec spec)
+getTools(McpSpec mcpSpec) Map
+invokeTool(ToolInvokeRequest request, StreamObserver responseObserver) void
-reportRouteFailure(StreamObserver responseObserver, Exception e, Definition toolDefinition) void
-resolveEndpoint(Definition toolDefinition, CamelContext camelContext) String
-extractHeaderParameters(ToolInvokeRequest request, Definition toolDefinition) Map
}
class CamelResource {
-McpSpec mcpSpec
-CamelContext camelContext
+CamelResource(CamelContext camelContext, McpSpec mcpSpec)
+getResources(McpSpec mcpSpec) Map
+resourceAcquire(ResourceRequest request, StreamObserver responseObserver) void
-resolveEndpoint(Definition definition, CamelContext camelContext) String
-reportRouteFailure(StreamObserver responseObserver, Exception e, Definition definition) void
}
class WanakuRoutesLoader {
-String dependenciesList
-String repositoriesList
-DependencyDownloaderClassLoader cl
-MavenDependencyDownloader downloader
+WanakuRoutesLoader(String dependenciesList, String repositoriesList)
+loadRoute(CamelContext context, String path) void
-downloadDependencies(CamelContext camelContext) void
-createDownloader(DependencyDownloaderClassLoader cl) MavenDependencyDownloader
-createClassLoader() DependencyDownloaderClassLoader
}
class FileUtil {
+untilAvailable(File input, boolean isDirectory) boolean
}
class GavUtil {
+group(String gav) String
+artifact(String gav) String
+version(String gav) String
}
DownloaderFactory --> Downloader : returns
FileDownloader ..|> Downloader
DataStoreDownloader ..|> Downloader
ResourceRefs --> ResourceType
ResourceDownloaderCallback --> DownloaderFactory
ResourceDownloaderCallback --> ResourceRefs
McpSpec o-- McpContent
McpContent o-- McpEntityWrapper
McpEntityWrapper o-- Definition
Definition o-- Route
Definition o-- Property
Property o-- Mapping
CamelTool --> McpSpec
CamelTool --> Definition
CamelResource --> McpSpec
CamelResource --> Definition
WanakuRoutesLoader --> GavUtil
FileDownloader --> FileUtil
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
WanakuToolTransformer, therequiredfield is an instance-level list that accumulates required properties across multiple tools; make this a local list insidetransform(orparseProperties) so each tool gets an isolatedrequiredlist. - Both
CamelTool.resolveEndpointandCamelResource.resolveEndpointblindly look up the route bytoolDefinition.getRoute().getId(); consider handling the common case where only a URI is defined (e.g. fall back to using theuridirectly or validating thatidis present) to avoidnullroute lookups or NPEs with the current YAML examples. - In
CamelIntegrationPluginTest.testLoadFailsWithoutConfiguration,plugin.load(null)will eventually dereference theCamelContext; either construct a realCamelContext(e.g.DefaultCamelContext) or mock it so the test is exercising configuration failure rather than causing aNullPointerException.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `WanakuToolTransformer`, the `required` field is an instance-level list that accumulates required properties across multiple tools; make this a local list inside `transform` (or `parseProperties`) so each tool gets an isolated `required` list.
- Both `CamelTool.resolveEndpoint` and `CamelResource.resolveEndpoint` blindly look up the route by `toolDefinition.getRoute().getId()`; consider handling the common case where only a URI is defined (e.g. fall back to using the `uri` directly or validating that `id` is present) to avoid `null` route lookups or NPEs with the current YAML examples.
- In `CamelIntegrationPluginTest.testLoadFailsWithoutConfiguration`, `plugin.load(null)` will eventually dereference the `CamelContext`; either construct a real `CamelContext` (e.g. `DefaultCamelContext`) or mock it so the test is exercising configuration failure rather than causing a `NullPointerException`.
## Individual Comments
### Comment 1
<location> `capabilities-runtimes/capabilities-runtimes-camel/capabilities-runtimes-camel-common/src/main/java/ai/wanaku/capabilities/sdk/runtime/camel/downloader/ResourceDownloaderCallback.java:42-43` </location>
<code_context>
+ public void onDeregistration(RegistrationManager manager, ServiceTarget target, int status) {}
+
+ private void downloadResources() {
+ if (resources == null || resources.isEmpty()) {
+ LOG.debug("No resources to download");
+ return;
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** Empty resources list causes waitForDownloads() to block indefinitely.
When `resources` is null or empty, this method returns without decrementing the latch, so `waitForDownloads()` will block indefinitely and plugin initialization will never finish. Ensure the latch is decremented even when there are no resources, and decide at the call site whether that case should be treated as success or failure.
</issue_to_address>
### Comment 2
<location> `capabilities-runtimes/capabilities-runtimes-camel/capabilities-runtimes-camel-common/src/main/java/ai/wanaku/capabilities/sdk/runtime/camel/util/FileUtil.java:21-30` </location>
<code_context>
+ WatchService watchService = FileSystems.getDefault().newWatchService();
</code_context>
<issue_to_address>
**issue (bug_risk):** WatchService is never closed, which can leak system resources.
The `WatchService` created here is never closed, so repeated calls to `untilAvailable` can leak OS resources. Please wrap its usage in try/finally or try-with-resources and invoke `watchService.close()` when finished, including on exceptions.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| if (resources == null || resources.isEmpty()) { | ||
| LOG.debug("No resources to download"); |
There was a problem hiding this comment.
issue (bug_risk): Empty resources list causes waitForDownloads() to block indefinitely.
When resources is null or empty, this method returns without decrementing the latch, so waitForDownloads() will block indefinitely and plugin initialization will never finish. Ensure the latch is decremented even when there are no resources, and decide at the call site whether that case should be treated as success or failure.
| WatchService watchService = FileSystems.getDefault().newWatchService(); | ||
| Path path = isDirectory ? input.toPath() : input.getParentFile().toPath(); | ||
|
|
||
| if (input.exists()) { | ||
| LOG.info("File {} already available", input); | ||
| return true; | ||
| } | ||
|
|
||
| // We watch for both the file creation and truncation | ||
| path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY); |
There was a problem hiding this comment.
issue (bug_risk): WatchService is never closed, which can leak system resources.
The WatchService created here is never closed, so repeated calls to untilAvailable can leak OS resources. Please wrap its usage in try/finally or try-with-resources and invoke watchService.close() when finished, including on exceptions.
So that we can create a Camel Context plugin that can be loaded in build time and in runtime.
Ref: wanaku-ai/camel-integration-capability#56
Summary by Sourcery
Introduce a new Camel-specific runtime structure and plugin for the capabilities SDK, extracting shared runtime logic into reusable modules and wiring them into the build.
New Features:
Enhancements:
Tests: