Skip to content

Commit ad6ed81

Browse files
committed
Added support for running callbacks after registration, ping and deregistration
Ref: wanaku-ai/wanaku#637
1 parent 82f4204 commit ad6ed81

File tree

2 files changed

+62
-11
lines changed

2 files changed

+62
-11
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package ai.wanaku.capabilities.sdk.discovery;
2+
3+
import ai.wanaku.api.discovery.RegistrationCallback;
4+
import ai.wanaku.api.discovery.RegistrationManager;
5+
import ai.wanaku.api.types.providers.ServiceTarget;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
class RegistrationLogCallback implements RegistrationCallback {
10+
private static final Logger LOG = LoggerFactory.getLogger(RegistrationLogCallback.class);
11+
12+
@Override
13+
public void onPing(RegistrationManager manager, ServiceTarget target, int status) {
14+
if (status != 200) {
15+
LOG.warn("Pinging router failed with status {}", status);
16+
} else {
17+
LOG.trace("Pinging router completed successfully");
18+
}
19+
}
20+
21+
@Override
22+
public void onRegistration(RegistrationManager manager, ServiceTarget target) {
23+
LOG.debug("The service {} successfully registered with ID {}.", target.getService(), target.getId());
24+
}
25+
26+
@Override
27+
public void onDeregistration(RegistrationManager manager, ServiceTarget target, int status) {
28+
if (status != 200) {
29+
LOG.warn("De-registering service {} failed with status {}", target.getServiceType().asValue(), status);
30+
}
31+
}
32+
}

capabilities-discovery/src/main/java/ai/wanaku/capabilities/sdk/discovery/ZeroDepRegistrationManager.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import jakarta.ws.rs.WebApplicationException;
44
import jakarta.ws.rs.core.Response;
55

6+
import ai.wanaku.api.discovery.RegistrationCallback;
67
import ai.wanaku.api.discovery.RegistrationManager;
78
import ai.wanaku.api.exceptions.WanakuException;
89
import ai.wanaku.api.types.WanakuResponse;
@@ -15,10 +16,13 @@
1516
import com.fasterxml.jackson.core.type.TypeReference;
1617
import java.io.IOException;
1718
import java.net.http.HttpResponse;
19+
import java.util.List;
20+
import java.util.concurrent.CopyOnWriteArrayList;
1821
import java.util.concurrent.Executors;
1922
import java.util.concurrent.ScheduledExecutorService;
2023
import java.util.concurrent.ScheduledFuture;
2124
import java.util.concurrent.TimeUnit;
25+
import java.util.function.Consumer;
2226
import org.slf4j.Logger;
2327
import org.slf4j.LoggerFactory;
2428

@@ -39,6 +43,7 @@ public class ZeroDepRegistrationManager implements RegistrationManager {
3943
private volatile boolean registered;
4044
private final ScheduledExecutorService scheduler;
4145
private ScheduledFuture<?> registrationTask;
46+
private final List<RegistrationCallback> callbacks = new CopyOnWriteArrayList<>();
4247

4348
/**
4449
* Constructs a {@code ZeroDepRegistrationManager}.
@@ -69,6 +74,8 @@ public ZeroDepRegistrationManager(DiscoveryServiceHttpClient client, ServiceTarg
6974
throw new WanakuException(e);
7075
}
7176
}
77+
78+
callbacks.add(new RegistrationLogCallback());
7279
}
7380

7481
/**
@@ -104,7 +111,7 @@ private void tryRegistering() {
104111
target.setId(entity.data().getId());
105112
instanceDataManager.writeEntry(target);
106113
registered = true;
107-
LOG.debug("The service {} successfully registered with ID {}.", target.getService(), target.getId());
114+
runCallBack(c -> c.onRegistration(this, target));
108115
break;
109116
} catch (WebApplicationException e) {
110117
if (LOG.isDebugEnabled()) {
@@ -166,9 +173,8 @@ private void tryDeregistering() {
166173
if (target != null && target.getId() != null) {
167174
try {
168175
final HttpResponse<String> response = client.deregister(target);
169-
if (response.statusCode() != 200) {
170-
LOG.warn("De-registering service {} failed with status {}", target.getServiceType().asValue(), response.statusCode());
171-
}
176+
final int status = response.statusCode();
177+
runCallBack(c -> c.onDeregistration(this, target, status));
172178
} catch (Exception e) {
173179
if (LOG.isDebugEnabled()) {
174180
LOG.warn("De-registering failed with {}", e.getMessage(), e);
@@ -201,13 +207,8 @@ public void ping() {
201207
try {
202208
// Assuming client.ping(target.getId()) exists and returns HttpResponse<String>
203209
final HttpResponse<String> response = client.ping(target.getId());
204-
if (response.statusCode() != 200) {
205-
LOG.warn("Pinging router failed with status {}", response.statusCode());
206-
}
207-
208-
if (LOG.isDebugEnabled()) {
209-
LOG.trace("Pinging router completed successfully");
210-
}
210+
final int status = response.statusCode();
211+
runCallBack(c -> c.onPing(this, target, status));
211212
} catch (Exception e) {
212213
if (LOG.isDebugEnabled()) {
213214
LOG.warn("Pinging router failed with {}", e.getMessage(), e);
@@ -285,6 +286,24 @@ private void stop() {
285286
} finally {
286287
scheduler.shutdown();
287288
}
289+
}
288290

291+
private void runCallBack(Consumer<RegistrationCallback> registrationManagerConsumer) {
292+
for (RegistrationCallback callback : callbacks) {
293+
try {
294+
registrationManagerConsumer.accept(callback);
295+
} catch (Exception e) {
296+
if (LOG.isDebugEnabled()) {
297+
LOG.warn("Unable to run callback {} due to {}", callback.getClass().getName(), e.getMessage(), e);
298+
} else {
299+
LOG.warn("Unable to run callback {}} due to {}", callback.getClass().getName(), e.getMessage());
300+
}
301+
}
302+
}
303+
}
304+
305+
@Override
306+
public void addCallBack(RegistrationCallback callback) {
307+
callbacks.add(callback);
289308
}
290309
}

0 commit comments

Comments
 (0)