Skip to content

Commit 7d46cdb

Browse files
committed
Added support for retrieving the data from a data store
Added support for running callbacks after registration, ping and deregistration Ref: wanaku-ai/wanaku#637
1 parent 82f4204 commit 7d46cdb

File tree

4 files changed

+214
-14
lines changed

4 files changed

+214
-14
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package ai.wanaku.capabilities.sdk.common.exceptions;
2+
3+
import ai.wanaku.api.exceptions.WanakuException;
4+
5+
/**
6+
* Exception thrown when web-related operations fail in the Wanaku capabilities SDK.
7+
* <p>
8+
* This exception is used to indicate failures in HTTP/web communications, including
9+
* network errors, HTTP errors, or other web-related issues that occur during
10+
* capability operations. It captures the HTTP status code associated with the failure.
11+
* </p>
12+
*/
13+
public class WanakuWebException extends WanakuException {
14+
private final int statusCode;
15+
16+
/**
17+
* Constructs a new WanakuWebException with the specified HTTP status code.
18+
*
19+
* @param statusCode the HTTP status code associated with this exception
20+
*/
21+
public WanakuWebException(int statusCode) {
22+
this.statusCode = statusCode;
23+
}
24+
25+
/**
26+
* Constructs a new WanakuWebException with the specified detail message and HTTP status code.
27+
*
28+
* @param message the detail message explaining the reason for the exception
29+
* @param statusCode the HTTP status code associated with this exception
30+
*/
31+
public WanakuWebException(String message, int statusCode) {
32+
super(message);
33+
this.statusCode = statusCode;
34+
}
35+
36+
/**
37+
* Constructs a new WanakuWebException with the specified detail message, cause, and HTTP status code.
38+
*
39+
* @param message the detail message explaining the reason for the exception
40+
* @param cause the underlying cause of this exception
41+
* @param statusCode the HTTP status code associated with this exception
42+
*/
43+
public WanakuWebException(String message, Throwable cause, int statusCode) {
44+
super(message, cause);
45+
this.statusCode = statusCode;
46+
}
47+
48+
/**
49+
* Constructs a new WanakuWebException with the specified cause and HTTP status code.
50+
*
51+
* @param cause the underlying cause of this exception
52+
* @param statusCode the HTTP status code associated with this exception
53+
*/
54+
public WanakuWebException(Throwable cause, int statusCode) {
55+
super(cause);
56+
this.statusCode = statusCode;
57+
}
58+
59+
/**
60+
* Constructs a new WanakuWebException with the specified detail message, cause,
61+
* suppression enabled or disabled, writable stack trace enabled or disabled, and HTTP status code.
62+
*
63+
* @param message the detail message explaining the reason for the exception
64+
* @param cause the underlying cause of this exception
65+
* @param enableSuppression whether or not suppression is enabled or disabled
66+
* @param writableStackTrace whether or not the stack trace should be writable
67+
* @param statusCode the HTTP status code associated with this exception
68+
*/
69+
public WanakuWebException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace, int statusCode) {
70+
super(message, cause, enableSuppression, writableStackTrace);
71+
this.statusCode = statusCode;
72+
}
73+
74+
/**
75+
* Returns the HTTP status code associated with this exception.
76+
*
77+
* @return the HTTP status code
78+
*/
79+
public int getStatusCode() {
80+
return statusCode;
81+
}
82+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package ai.wanaku.capabilities.sdk.discovery;
2+
3+
import ai.wanaku.api.discovery.DiscoveryCallback;
4+
import ai.wanaku.api.discovery.RegistrationManager;
5+
import ai.wanaku.api.types.providers.ServiceTarget;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
class DiscoveryLogCallback implements DiscoveryCallback {
10+
private static final Logger LOG = LoggerFactory.getLogger(DiscoveryLogCallback.class);
11+
12+
@Override
13+
public void onPing(RegistrationManager manager, ServiceTarget target, int status) {
14+
if (status != 200) {
15+
LOG.warn("Pinging router failed with status {}", status);
16+
} else {
17+
LOG.trace("Pinging router completed successfully");
18+
}
19+
}
20+
21+
@Override
22+
public void onRegistration(RegistrationManager manager, ServiceTarget target) {
23+
LOG.debug("The service {} successfully registered with ID {}.", target.getService(), target.getId());
24+
}
25+
26+
@Override
27+
public void onDeregistration(RegistrationManager manager, ServiceTarget target, int status) {
28+
if (status != 200) {
29+
LOG.warn("De-registering service {} failed with status {}", target.getServiceType().asValue(), status);
30+
}
31+
}
32+
}

capabilities-discovery/src/main/java/ai/wanaku/capabilities/sdk/discovery/ZeroDepRegistrationManager.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import jakarta.ws.rs.WebApplicationException;
44
import jakarta.ws.rs.core.Response;
55

6+
import ai.wanaku.api.discovery.DiscoveryCallback;
67
import ai.wanaku.api.discovery.RegistrationManager;
78
import ai.wanaku.api.exceptions.WanakuException;
89
import ai.wanaku.api.types.WanakuResponse;
@@ -15,10 +16,13 @@
1516
import com.fasterxml.jackson.core.type.TypeReference;
1617
import java.io.IOException;
1718
import java.net.http.HttpResponse;
19+
import java.util.List;
20+
import java.util.concurrent.CopyOnWriteArrayList;
1821
import java.util.concurrent.Executors;
1922
import java.util.concurrent.ScheduledExecutorService;
2023
import java.util.concurrent.ScheduledFuture;
2124
import java.util.concurrent.TimeUnit;
25+
import java.util.function.Consumer;
2226
import org.slf4j.Logger;
2327
import org.slf4j.LoggerFactory;
2428

@@ -39,6 +43,7 @@ public class ZeroDepRegistrationManager implements RegistrationManager {
3943
private volatile boolean registered;
4044
private final ScheduledExecutorService scheduler;
4145
private ScheduledFuture<?> registrationTask;
46+
private final List<DiscoveryCallback> callbacks = new CopyOnWriteArrayList<>();
4247

4348
/**
4449
* Constructs a {@code ZeroDepRegistrationManager}.
@@ -69,6 +74,8 @@ public ZeroDepRegistrationManager(DiscoveryServiceHttpClient client, ServiceTarg
6974
throw new WanakuException(e);
7075
}
7176
}
77+
78+
callbacks.add(new DiscoveryLogCallback());
7279
}
7380

7481
/**
@@ -104,7 +111,7 @@ private void tryRegistering() {
104111
target.setId(entity.data().getId());
105112
instanceDataManager.writeEntry(target);
106113
registered = true;
107-
LOG.debug("The service {} successfully registered with ID {}.", target.getService(), target.getId());
114+
runCallBack(c -> c.onRegistration(this, target));
108115
break;
109116
} catch (WebApplicationException e) {
110117
if (LOG.isDebugEnabled()) {
@@ -166,9 +173,8 @@ private void tryDeregistering() {
166173
if (target != null && target.getId() != null) {
167174
try {
168175
final HttpResponse<String> response = client.deregister(target);
169-
if (response.statusCode() != 200) {
170-
LOG.warn("De-registering service {} failed with status {}", target.getServiceType().asValue(), response.statusCode());
171-
}
176+
final int status = response.statusCode();
177+
runCallBack(c -> c.onDeregistration(this, target, status));
172178
} catch (Exception e) {
173179
if (LOG.isDebugEnabled()) {
174180
LOG.warn("De-registering failed with {}", e.getMessage(), e);
@@ -201,13 +207,8 @@ public void ping() {
201207
try {
202208
// Assuming client.ping(target.getId()) exists and returns HttpResponse<String>
203209
final HttpResponse<String> response = client.ping(target.getId());
204-
if (response.statusCode() != 200) {
205-
LOG.warn("Pinging router failed with status {}", response.statusCode());
206-
}
207-
208-
if (LOG.isDebugEnabled()) {
209-
LOG.trace("Pinging router completed successfully");
210-
}
210+
final int status = response.statusCode();
211+
runCallBack(c -> c.onPing(this, target, status));
211212
} catch (Exception e) {
212213
if (LOG.isDebugEnabled()) {
213214
LOG.warn("Pinging router failed with {}", e.getMessage(), e);
@@ -285,6 +286,24 @@ private void stop() {
285286
} finally {
286287
scheduler.shutdown();
287288
}
289+
}
288290

291+
private void runCallBack(Consumer<DiscoveryCallback> registrationManagerConsumer) {
292+
for (DiscoveryCallback callback : callbacks) {
293+
try {
294+
registrationManagerConsumer.accept(callback);
295+
} catch (Exception e) {
296+
if (LOG.isDebugEnabled()) {
297+
LOG.warn("Unable to run callback {} due to {}", callback.getClass().getName(), e.getMessage(), e);
298+
} else {
299+
LOG.warn("Unable to run callback {}} due to {}", callback.getClass().getName(), e.getMessage());
300+
}
301+
}
302+
}
303+
}
304+
305+
@Override
306+
public void addCallBack(DiscoveryCallback callback) {
307+
callbacks.add(callback);
289308
}
290309
}

capabilities-services-client/src/main/java/ai/wanaku/capabilities/sdk/services/ServicesHttpClient.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import jakarta.ws.rs.core.MediaType;
44

55
import ai.wanaku.api.exceptions.WanakuException;
6+
import ai.wanaku.api.types.DataStore;
67
import ai.wanaku.api.types.ForwardReference;
78
import ai.wanaku.api.types.Namespace;
89
import ai.wanaku.api.types.ResourceReference;
910
import ai.wanaku.api.types.ToolReference;
1011
import ai.wanaku.api.types.WanakuResponse;
1112
import ai.wanaku.api.types.io.ResourcePayload;
1213
import ai.wanaku.api.types.io.ToolPayload;
14+
import ai.wanaku.capabilities.sdk.common.exceptions.WanakuWebException;
1315
import ai.wanaku.capabilities.sdk.common.serializer.Serializer;
1416
import ai.wanaku.capabilities.sdk.services.config.ServicesClientConfig;
1517
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -87,7 +89,7 @@ private <T, R> T executePost(String path, R payload, TypeReference<T> typeRefere
8789
if (response.statusCode() >= 200 && response.statusCode() < 300) {
8890
return objectMapper.readValue(response.body(), typeReference);
8991
} else {
90-
throw new WanakuException("HTTP error: " + response.statusCode() + " - " + response.body());
92+
throw new WanakuWebException("HTTP error: " + response.statusCode() + " - " + response.body(), response.statusCode());
9193
}
9294
} catch (JsonProcessingException e) {
9395
throw new WanakuException("JSON processing error", e);
@@ -122,7 +124,7 @@ private <R> void executePut(String path, R payload) {
122124
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
123125

124126
if (response.statusCode() < 200 || response.statusCode() >= 300) {
125-
throw new WanakuException("HTTP error: " + response.statusCode() + " - " + response.body());
127+
throw new WanakuWebException("HTTP error: " + response.statusCode() + " - " + response.body(), response.statusCode());
126128
}
127129
} catch (JsonProcessingException e) {
128130
throw new WanakuException("JSON processing error", e);
@@ -158,7 +160,7 @@ private <T> T executeGet(String path, TypeReference<T> typeReference) {
158160
if (response.statusCode() >= 200 && response.statusCode() < 300) {
159161
return objectMapper.readValue(response.body(), typeReference);
160162
} else {
161-
throw new WanakuException("HTTP error: " + response.statusCode() + " - " + response.body());
163+
throw new WanakuWebException("HTTP error: " + response.statusCode() + " - " + response.body(), response.statusCode());
162164
}
163165
} catch (JsonProcessingException e) {
164166
throw new WanakuException("JSON processing error", e);
@@ -371,4 +373,69 @@ public void removeForward(ForwardReference forwardReference) {
371373
public WanakuResponse<List<Namespace>> listNamespaces() {
372374
return executeGet("/api/v1/namespaces/list", new TypeReference<WanakuResponse<List<Namespace>>>() {});
373375
}
376+
377+
// ==================== DataStores API Methods ====================
378+
379+
/**
380+
* Adds a new data store entry.
381+
*
382+
* @param dataStore The {@link DataStore} to add.
383+
* @return The response containing the added data store.
384+
* @throws WanakuException If an error occurs during the request.
385+
*/
386+
public WanakuResponse<DataStore> addDataStore(DataStore dataStore) {
387+
return executePost("/api/v1/data-store/add", dataStore, new TypeReference<WanakuResponse<DataStore>>() {});
388+
}
389+
390+
/**
391+
* Lists all data stores.
392+
*
393+
* @return The response containing the list of all data stores.
394+
* @throws WanakuException If an error occurs during the request.
395+
*/
396+
public WanakuResponse<List<DataStore>> listDataStores() {
397+
return executeGet("/api/v1/data-store/list", new TypeReference<WanakuResponse<List<DataStore>>>() {});
398+
}
399+
400+
/**
401+
* Gets a data store by ID.
402+
*
403+
* @param id The ID of the data store to retrieve.
404+
* @return The response containing the data store.
405+
* @throws WanakuException If an error occurs during the request.
406+
*/
407+
public WanakuResponse<DataStore> getDataStoreById(String id) {
408+
return executeGet("/api/v1/data-store/get?id=" + id, new TypeReference<WanakuResponse<DataStore>>() {});
409+
}
410+
411+
/**
412+
* Gets data stores by name.
413+
*
414+
* @param name The name of the data stores to retrieve.
415+
* @return The response containing the list of data stores.
416+
* @throws WanakuException If an error occurs during the request.
417+
*/
418+
public WanakuResponse<List<DataStore>> getDataStoresByName(String name) {
419+
return executeGet("/api/v1/data-store/get?name=" + name, new TypeReference<WanakuResponse<List<DataStore>>>() {});
420+
}
421+
422+
/**
423+
* Removes a data store by ID.
424+
*
425+
* @param id The ID of the data store to remove.
426+
* @throws WanakuException If an error occurs during the request.
427+
*/
428+
public void removeDataStore(String id) {
429+
executeDelete("/api/v1/data-store/remove?id=" + id);
430+
}
431+
432+
/**
433+
* Removes data stores by name.
434+
*
435+
* @param name The name of the data stores to remove.
436+
* @throws WanakuException If an error occurs during the request.
437+
*/
438+
public void removeDataStoresByName(String name) {
439+
executeDelete("/api/v1/data-store/remove?name=" + name);
440+
}
374441
}

0 commit comments

Comments
 (0)