Skip to content

Commit ad60bed

Browse files
committed
concord-server: refactor WebSocketChannelManager, allow message sources in plugins
1 parent 8022ff0 commit ad60bed

File tree

21 files changed

+378
-276
lines changed

21 files changed

+378
-276
lines changed

server/impl/src/main/java/com/walmartlabs/concord/server/ApiServerModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import com.walmartlabs.concord.server.boot.servlets.FormServletHolder;
2929
import com.walmartlabs.concord.server.boot.statics.StaticResourcesConfigurator;
3030
import com.walmartlabs.concord.server.boot.validation.ValidationModule;
31-
import com.walmartlabs.concord.server.websocket.ConcordWebSocketServlet;
31+
import com.walmartlabs.concord.server.agent.websocket.ConcordWebSocketServlet;
3232
import org.apache.shiro.mgt.SecurityManager;
3333
import org.apache.shiro.web.mgt.WebSecurityManager;
3434
import org.eclipse.jetty.ee8.servlet.FilterHolder;

server/impl/src/main/java/com/walmartlabs/concord/server/ServerResource.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import com.walmartlabs.concord.server.boot.BackgroundTasks;
2626
import com.walmartlabs.concord.server.sdk.rest.Resource;
2727
import com.walmartlabs.concord.server.task.TaskScheduler;
28-
import com.walmartlabs.concord.server.websocket.WebSocketChannelManager;
28+
import com.walmartlabs.concord.server.agent.message.MessageChannelManager;
2929
import org.jooq.Configuration;
3030

3131
import javax.inject.Inject;
@@ -42,18 +42,18 @@ public class ServerResource implements Resource {
4242

4343
private final TaskScheduler taskScheduler;
4444
private final BackgroundTasks backgroundTasks;
45-
private final WebSocketChannelManager webSocketChannelManager;
45+
private final MessageChannelManager messageChannelManager;
4646
private final PingDao pingDao;
4747

4848
@Inject
4949
public ServerResource(TaskScheduler taskScheduler,
5050
BackgroundTasks backgroundTasks,
51-
WebSocketChannelManager webSocketChannelManager,
51+
MessageChannelManager messageChannelManager,
5252
PingDao pingDao) {
5353

5454
this.taskScheduler = taskScheduler;
5555
this.backgroundTasks = backgroundTasks;
56-
this.webSocketChannelManager = webSocketChannelManager;
56+
this.messageChannelManager = messageChannelManager;
5757
this.pingDao = pingDao;
5858
}
5959

@@ -78,7 +78,7 @@ public VersionResponse version() {
7878
public void maintenanceMode() {
7979
backgroundTasks.stop();
8080

81-
webSocketChannelManager.shutdown();
81+
messageChannelManager.shutdown();
8282
taskScheduler.stop();
8383
}
8484

server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentManager.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
* =====
2121
*/
2222

23+
import com.walmartlabs.concord.server.agent.message.MessageChannel;
24+
import com.walmartlabs.concord.server.agent.message.MessageChannelManager;
25+
import com.walmartlabs.concord.server.agent.websocket.WebSocketChannel;
2326
import com.walmartlabs.concord.server.queueclient.message.MessageType;
2427
import com.walmartlabs.concord.server.queueclient.message.ProcessRequest;
2528
import com.walmartlabs.concord.server.sdk.ProcessKey;
26-
import com.walmartlabs.concord.server.websocket.WebSocketChannel;
27-
import com.walmartlabs.concord.server.websocket.WebSocketChannelManager;
2829
import org.jooq.DSLContext;
2930

3031
import javax.inject.Inject;
@@ -38,23 +39,24 @@
3839
public class AgentManager {
3940

4041
private final AgentCommandsDao commandQueue;
41-
private final WebSocketChannelManager channelManager;
42+
private final MessageChannelManager channelManager;
4243

4344
@Inject
4445
public AgentManager(AgentCommandsDao commandQueue,
45-
WebSocketChannelManager channelManager) {
46+
MessageChannelManager channelManager) {
4647

4748
this.commandQueue = commandQueue;
4849
this.channelManager = channelManager;
4950
}
5051

5152
public Collection<AgentWorkerEntry> getAvailableAgents() {
52-
Map<WebSocketChannel, ProcessRequest> reqs = channelManager.getRequests(MessageType.PROCESS_REQUEST);
53+
Map<MessageChannel, ProcessRequest> reqs = channelManager.getRequests(MessageType.PROCESS_REQUEST);
5354
return reqs.entrySet().stream()
55+
.filter(r -> r.getKey() instanceof WebSocketChannel) // TODO a better way
5456
.map(r -> AgentWorkerEntry.builder()
5557
.channelId(r.getKey().getChannelId())
5658
.agentId(r.getKey().getAgentId())
57-
.userAgent(r.getKey().getUserAgent())
59+
.userAgent(((WebSocketChannel) r.getKey()).getUserAgent())
5860
.capabilities(r.getValue().getCapabilities())
5961
.build())
6062
.collect(Collectors.toList());

server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentWorkerEntry.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@
2727

2828
import javax.annotation.Nullable;
2929
import java.util.Map;
30-
import java.util.UUID;
3130

3231
@Value.Immutable
3332
@JsonInclude(JsonInclude.Include.NON_EMPTY)
3433
@JsonSerialize(as = ImmutableAgentWorkerEntry.class)
3534
@JsonDeserialize(as = ImmutableAgentWorkerEntry.class)
3635
public interface AgentWorkerEntry {
3736

38-
UUID channelId();
37+
String channelId();
3938

4039
@Nullable // backward-compatibility with old agent versions
4140
String agentId();

server/impl/src/main/java/com/walmartlabs/concord/server/agent/dispatcher/Dispatcher.java

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@
2929
import com.walmartlabs.concord.server.PeriodicTask;
3030
import com.walmartlabs.concord.server.agent.AgentCommand;
3131
import com.walmartlabs.concord.server.agent.Commands;
32+
import com.walmartlabs.concord.server.agent.message.MessageChannel;
33+
import com.walmartlabs.concord.server.agent.message.MessageChannelManager;
3234
import com.walmartlabs.concord.server.cfg.AgentConfiguration;
3335
import com.walmartlabs.concord.server.jooq.tables.records.AgentCommandsRecord;
3436
import com.walmartlabs.concord.server.queueclient.message.CommandRequest;
3537
import com.walmartlabs.concord.server.queueclient.message.CommandResponse;
3638
import com.walmartlabs.concord.server.queueclient.message.MessageType;
3739
import com.walmartlabs.concord.server.sdk.metrics.WithTimer;
38-
import com.walmartlabs.concord.server.websocket.WebSocketChannel;
39-
import com.walmartlabs.concord.server.websocket.WebSocketChannelManager;
4040
import org.jooq.Configuration;
4141
import org.jooq.DSLContext;
4242
import org.slf4j.Logger;
@@ -61,12 +61,12 @@ public class Dispatcher extends PeriodicTask {
6161
private static final int BATCH_SIZE = 10;
6262

6363
private final DispatcherDao dao;
64-
private final WebSocketChannelManager channelManager;
64+
private final MessageChannelManager channelManager;
6565

6666
@Inject
6767
public Dispatcher(AgentConfiguration cfg,
6868
DispatcherDao dao,
69-
WebSocketChannelManager channelManager) {
69+
MessageChannelManager channelManager) {
7070

7171
super(cfg.getCommandPollDelay().toMillis(), ERROR_DELAY);
7272
this.dao = dao;
@@ -75,7 +75,7 @@ public Dispatcher(AgentConfiguration cfg,
7575

7676
@Override
7777
protected boolean performTask() {
78-
Map<WebSocketChannel, CommandRequest> requests = this.channelManager.getRequests(MessageType.COMMAND_REQUEST);
78+
Map<MessageChannel, CommandRequest> requests = this.channelManager.getRequests(MessageType.COMMAND_REQUEST);
7979
if (requests.isEmpty()) {
8080
return false;
8181
}
@@ -148,7 +148,6 @@ private AgentCommand findCandidate(CommandRequest req, List<AgentCommand> candid
148148
}
149149

150150
private void sendResponse(Match match, AgentCommand response) {
151-
WebSocketChannel channel = match.request.channel;
152151
long correlationId = match.request.request.getCorrelationId();
153152

154153
CommandType type = CommandType.valueOf((String) response.getData().remove(Commands.TYPE_KEY));
@@ -157,7 +156,8 @@ private void sendResponse(Match match, AgentCommand response) {
157156
payload.put("type", type.toString());
158157
payload.putAll(response.getData());
159158

160-
boolean success = channelManager.sendResponse(channel.getChannelId(), CommandResponse.cancel(correlationId, payload));
159+
MessageChannel channel = match.request.channel;
160+
boolean success = channelManager.sendMessage(channel.getChannelId(), CommandResponse.cancel(correlationId, payload));
161161
if (success) {
162162
log.info("sendResponse ['{}'] -> done", correlationId);
163163
} else {
@@ -223,25 +223,10 @@ private AgentCommand convert(AgentCommandsRecord r) {
223223
}
224224
}
225225

226-
private static final class Match {
226+
private record Match(Request request, AgentCommand command) {
227227

228-
private final Request request;
229-
private final AgentCommand command;
230-
231-
private Match(Request request, AgentCommand command) {
232-
this.request = request;
233-
this.command = command;
234-
}
235228
}
236229

237-
private static final class Request {
238-
239-
private final WebSocketChannel channel;
240-
private final CommandRequest request;
241-
242-
private Request(WebSocketChannel channel, CommandRequest request) {
243-
this.channel = channel;
244-
this.request = request;
245-
}
230+
private record Request(MessageChannel channel, CommandRequest request) {
246231
}
247232
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.walmartlabs.concord.server.agent.message;
2+
3+
/*-
4+
* *****
5+
* Concord
6+
* -----
7+
* Copyright (C) 2017 - 2024 Walmart Inc.
8+
* -----
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* =====
21+
*/
22+
23+
24+
import com.walmartlabs.concord.server.queueclient.message.Message;
25+
import com.walmartlabs.concord.server.queueclient.message.MessageType;
26+
27+
import java.io.Closeable;
28+
import java.util.Optional;
29+
30+
/**
31+
* Represents an open message channel between the server and a remote agent.
32+
*/
33+
public interface MessageChannel extends Closeable {
34+
35+
/**
36+
* A unique identifier of a channel.
37+
*/
38+
String getChannelId();
39+
40+
/**
41+
* An identifier of the agent represented by this MessageChannel.
42+
*/
43+
String getAgentId();
44+
45+
/**
46+
* Attempts to send a message.
47+
* @return {@code true} if the message was sent successfully.
48+
* Returns {@code false} if the message cannot be sent at the moment.
49+
* @apiNote The implementors must expect the server to re-attempt
50+
* the sending of the same message.
51+
*/
52+
boolean offerMessage(Message msg) throws Exception;
53+
54+
/**
55+
* Attempts to grab a received message.
56+
* @return an empty value if no messages of the specified type can be returned
57+
* at the moment.
58+
*/
59+
Optional<Message> getMessage(MessageType messageType) throws Exception;
60+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package com.walmartlabs.concord.server.agent.message;
2+
3+
/*-
4+
* *****
5+
* Concord
6+
* -----
7+
* Copyright (C) 2017 - 2018 Walmart Inc.
8+
* -----
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* =====
21+
*/
22+
23+
import com.walmartlabs.concord.server.queueclient.message.Message;
24+
import com.walmartlabs.concord.server.queueclient.message.MessageType;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import javax.inject.Named;
29+
import javax.inject.Singleton;
30+
import java.util.Collections;
31+
import java.util.HashMap;
32+
import java.util.Map;
33+
import java.util.Optional;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
36+
@Named
37+
@Singleton
38+
public class MessageChannelManager {
39+
40+
private static final Logger log = LoggerFactory.getLogger(MessageChannelManager.class);
41+
42+
private final Map<String, MessageChannel> channels = new ConcurrentHashMap<>();
43+
44+
private volatile boolean isShutdown;
45+
46+
public boolean isShutdown() {
47+
return isShutdown;
48+
}
49+
50+
public void shutdown() {
51+
isShutdown = true;
52+
53+
channels.forEach((uuid, channel) -> {
54+
try {
55+
channel.close();
56+
} catch (Exception e) {
57+
log.warn("shutdown -> failed on channel {}: {}", channel.getClass(), e.getMessage());
58+
}
59+
});
60+
log.info("shutdown -> done");
61+
}
62+
63+
public void close(String channelId) {
64+
MessageChannel channel = channels.remove(channelId);
65+
if (channel == null) {
66+
log.warn("close ['{}'] -> channel not found", channelId);
67+
return;
68+
}
69+
70+
try {
71+
channel.close();
72+
} catch (Exception e) {
73+
throw new RuntimeException(e);
74+
}
75+
76+
log.info("close ['{}'] -> done", channelId);
77+
}
78+
79+
public boolean sendMessage(String channelId, Message response) {
80+
MessageChannel channel = channels.get(channelId);
81+
if (channel == null) {
82+
log.warn("sendResponse ['{}', '{}'] -> channel not found", channelId, response);
83+
return false;
84+
}
85+
86+
try {
87+
return channel.offerMessage(response);
88+
} catch (Exception e) {
89+
log.warn("sendResponse ['{}', '{}'] -> failed: {}", channelId, response, e.getMessage());
90+
return false;
91+
}
92+
}
93+
94+
@SuppressWarnings("unchecked")
95+
public <M extends Message> Map<MessageChannel, M> getRequests(MessageType requestType) {
96+
Map<MessageChannel, M> result = new HashMap<>();
97+
channels.forEach((channelId, channel) -> {
98+
try {
99+
channel.getMessage(requestType).ifPresent(msg -> {
100+
result.put(channel, (M) msg);
101+
});
102+
} catch (Exception e) {
103+
log.warn("getRequests ['{}'] -> failed on channel {}: {}", requestType, channel.getClass(), e.getMessage());
104+
}
105+
});
106+
return result;
107+
}
108+
109+
public void add(MessageChannel channel) {
110+
channels.put(channel.getChannelId(), channel);
111+
}
112+
113+
public int connectedClientsCount() {
114+
return channels.size();
115+
}
116+
117+
public Map<String, MessageChannel> getChannels() {
118+
return Collections.unmodifiableMap(channels);
119+
}
120+
121+
@SuppressWarnings("unchecked")
122+
public <C extends MessageChannel> Optional<C> getChannel(String channelId, Class<C> klass) {
123+
return Optional.ofNullable(channels.get(channelId))
124+
.filter(c -> klass.isAssignableFrom(c.getClass()))
125+
.map(c -> (C) c);
126+
}
127+
}

0 commit comments

Comments
 (0)