Skip to content

Commit 7c8b9d9

Browse files
committed
Initial implementation of fine-grained authorisation for MQTT
1 parent a4537c0 commit 7c8b9d9

File tree

30 files changed

+586
-151
lines changed

30 files changed

+586
-151
lines changed

FROST-Server.Auth.Basic/src/main/java/de/fraunhofer/iosb/ilt/frostserver/auth/basic/BasicAuthProvider.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@
3434
import de.fraunhofer.iosb.ilt.frostserver.settings.annotation.DefaultValueInt;
3535
import de.fraunhofer.iosb.ilt.frostserver.util.AuthProvider;
3636
import de.fraunhofer.iosb.ilt.frostserver.util.LiquibaseUser;
37+
import de.fraunhofer.iosb.ilt.frostserver.util.UserCaches;
3738
import de.fraunhofer.iosb.ilt.frostserver.util.exception.UpgradeFailedException;
3839
import de.fraunhofer.iosb.ilt.frostserver.util.user.PrincipalExtended;
3940
import de.fraunhofer.iosb.ilt.frostserver.util.user.UserClientInfo;
4041
import de.fraunhofer.iosb.ilt.frostserver.util.user.UserData;
4142
import java.io.IOException;
4243
import java.io.Writer;
4344
import java.util.Map;
44-
import java.util.concurrent.ConcurrentHashMap;
4545
import org.slf4j.Logger;
4646
import org.slf4j.LoggerFactory;
4747

@@ -93,8 +93,7 @@ public class BasicAuthProvider implements AuthProvider, LiquibaseUser, ConfigDef
9393
private int maxPassLength = MAX_PASSWORD_LENGTH;
9494
private int maxNameLength = MAX_USERNAME_LENGTH;
9595

96-
private final Map<String, UserClientInfo> clientidToUserinfo = new ConcurrentHashMap<>();
97-
private final Map<String, UserClientInfo> usernameToUserinfo = new ConcurrentHashMap<>();
96+
private final UserCaches userCaches = new UserCaches();
9897

9998
@Override
10099
public InitResult init(CoreSettings coreSettings) {
@@ -120,6 +119,11 @@ public InitResult init(CoreSettings coreSettings) {
120119
return InitResult.INIT_OK;
121120
}
122121

122+
@Override
123+
public UserCaches getUserCaches() {
124+
return userCaches;
125+
}
126+
123127
@Override
124128
public void addFilter(Object context, CoreSettings coreSettings) {
125129
BasicAuthFilterHelper.createFilters(context, coreSettings);
@@ -136,14 +140,13 @@ public boolean isValidUser(String clientId, String userName, String password) {
136140
boolean admin = userData.roles.contains(roleAdmin);
137141

138142
final PrincipalExtended userPrincipal = new PrincipalExtended(userData.userName, admin, userData.roles);
139-
final UserClientInfo userInfo = usernameToUserinfo.computeIfAbsent(userData.userName, t -> new UserClientInfo());
143+
final UserClientInfo userInfo = userCaches.getOrCreateUserInfo(userData.userName);
140144
userInfo.setUserPrincipal(userPrincipal);
141-
142-
String oldClientId = userInfo.addClientId(clientId, maxClientsPerUser);
143-
if (oldClientId != null) {
144-
clientidToUserinfo.remove(oldClientId);
145+
if (userPrincipal.getUserKey() >= 0) {
146+
userCaches.registerPrincipal(userPrincipal.getUserKey(), userPrincipal);
145147
}
146-
clientidToUserinfo.put(clientId, userInfo);
148+
userCaches.registerClientId(userInfo, clientId);
149+
147150
return validUser;
148151
}
149152

@@ -157,15 +160,6 @@ public boolean userHasRole(String clientId, String userName, String roleName) {
157160
.userHasRole(userName, roleName);
158161
}
159162

160-
@Override
161-
public PrincipalExtended getUserPrincipal(String clientId) {
162-
UserClientInfo userInfo = clientidToUserinfo.get(clientId);
163-
if (userInfo == null) {
164-
return PrincipalExtended.ANONYMOUS_PRINCIPAL;
165-
}
166-
return userInfo.getUserPrincipal();
167-
}
168-
169163
@Override
170164
public Map<String, Object> createLiqibaseParams(PersistenceManager pm, Map<String, Object> target) {
171165
final DatabaseHandler dbHandler = DatabaseHandler.getInstance(coreSettings);

FROST-Server.Auth.Keycloak/src/main/java/de/fraunhofer/iosb/ilt/frostserver/auth/keycloak/KeycloakAuthProvider.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings;
3434
import de.fraunhofer.iosb.ilt.frostserver.settings.Settings;
3535
import de.fraunhofer.iosb.ilt.frostserver.util.AuthProvider;
36+
import de.fraunhofer.iosb.ilt.frostserver.util.UserCaches;
3637
import de.fraunhofer.iosb.ilt.frostserver.util.exception.UpgradeFailedException;
3738
import de.fraunhofer.iosb.ilt.frostserver.util.user.PrincipalExtended;
3839
import de.fraunhofer.iosb.ilt.frostserver.util.user.UserClientInfo;
@@ -72,15 +73,13 @@ public class KeycloakAuthProvider implements AuthProvider {
7273

7374
private CoreSettings coreSettings;
7475
private String roleAdmin;
75-
private int maxClientsPerUser;
7676
private boolean registerUserLocally;
7777
private boolean authenticateOnly;
7878
private DatabaseHandler databaseHandler;
7979
private int maxPassLength = MAX_PASSWORD_LENGTH;
8080
private int maxNameLength = MAX_USERNAME_LENGTH;
8181

82-
private final Map<String, UserClientInfo> clientidToUserinfo = new ConcurrentHashMap<>();
83-
private final Map<String, UserClientInfo> usernameToUserinfo = new ConcurrentHashMap<>();
82+
private final UserCaches userCaches = new UserCaches();
8483

8584
/**
8685
* The map of clients. We need those to determine the authorisation.
@@ -95,7 +94,7 @@ public InitResult init(CoreSettings coreSettings) {
9594
OPTIONS.put("keycloak-config-file", FROST_SERVER_KEYCLOAKJSON);
9695
final Settings authSettings = coreSettings.getAuthSettings();
9796
roleAdmin = authSettings.get(TAG_AUTH_ROLE_ADMIN, CoreSettings.class);
98-
maxClientsPerUser = authSettings.getInt(TAG_MAX_CLIENTS_PER_USER, KeycloakSettings.class);
97+
userCaches.setMaxClientsPerUser(authSettings.getInt(TAG_MAX_CLIENTS_PER_USER, KeycloakSettings.class));
9998
maxPassLength = authSettings.getInt(TAG_MAX_PASSWORD_LENGTH, KeycloakSettings.class);
10099
maxNameLength = authSettings.getInt(TAG_MAX_USERNAME_LENGTH, KeycloakSettings.class);
101100
registerUserLocally = authSettings.getBoolean(TAG_REGISTER_USER_LOCALLY, KeycloakSettings.class);
@@ -115,6 +114,11 @@ public InitResult init(CoreSettings coreSettings) {
115114
return InitResult.INIT_OK;
116115
}
117116

117+
@Override
118+
public UserCaches getUserCaches() {
119+
return userCaches;
120+
}
121+
118122
@Override
119123
public void addFilter(Object context, CoreSettings coreSettings) {
120124
KeycloakFilterHelper.createFilters(context, coreSettings);
@@ -141,14 +145,12 @@ public boolean isValidUser(String clientId, String username, String password) {
141145
boolean admin = userData.roles.contains(roleAdmin);
142146

143147
final PrincipalExtended userPrincipal = new PrincipalExtended(userData.userName, admin, userData.roles);
144-
final UserClientInfo userInfo = usernameToUserinfo.computeIfAbsent(userData.userName, t -> new UserClientInfo());
148+
final UserClientInfo userInfo = userCaches.getOrCreateUserInfo(userData.userName);
145149
userInfo.setUserPrincipal(userPrincipal);
146-
147-
String oldClientId = userInfo.addClientId(clientId, maxClientsPerUser);
148-
if (oldClientId != null) {
149-
clientidToUserinfo.remove(oldClientId);
150+
if (userPrincipal.getUserKey() >= 0) {
151+
userCaches.registerPrincipal(userPrincipal.getUserKey(), userPrincipal);
150152
}
151-
clientidToUserinfo.put(clientId, userInfo);
153+
userCaches.registerClientId(userInfo, clientId);
152154

153155
return validUser;
154156
}
@@ -202,15 +204,6 @@ public boolean userHasRole(String clientId, String userName, String roleName) {
202204
return hasRole;
203205
}
204206

205-
@Override
206-
public PrincipalExtended getUserPrincipal(String clientId) {
207-
UserClientInfo userInfo = clientidToUserinfo.get(clientId);
208-
if (userInfo == null) {
209-
return PrincipalExtended.ANONYMOUS_PRINCIPAL;
210-
}
211-
return userInfo.getUserPrincipal();
212-
}
213-
214207
@Override
215208
public Map<String, Object> createLiqibaseParams(PersistenceManager pm, Map<String, Object> target) {
216209
return target;

FROST-Server.Core.Model/src/main/java/de/fraunhofer/iosb/ilt/frostserver/model/DefaultEntity.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public int hashCode() {
318318

319319
@Override
320320
public String toString() {
321-
return "Entity: " + entityType;
321+
return "Entity: " + entityType + ": " + getPrimaryKeyValues();
322322
}
323323

324324
}

FROST-Server.Core.Model/src/main/java/de/fraunhofer/iosb/ilt/frostserver/model/core/Entity.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@
3030

3131
/**
3232
* Interface defining basic methods of an Entity.
33-
*
34-
* @author jab
35-
* @author scf
3633
*/
3734
public interface Entity extends NavigableElement {
3835

FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,7 @@
6565
import org.slf4j.LoggerFactory;
6666

6767
/**
68-
*
69-
* @author Michael Jacoby
70-
* @author scf
68+
* The main manager for all MQTT related things.
7169
*/
7270
public class MqttManager implements SubscriptionListener, MessageListener, EntityCreateListener {
7371

@@ -80,8 +78,8 @@ public class MqttManager implements SubscriptionListener, MessageListener, Entit
8078

8179
private final Map<EntityType, SubscriptionManager> subscriptions = new HashMap<>();
8280
private final CoreSettings settings;
83-
private final SubscriptionFactory subscriptionFactory;
8481

82+
private SubscriptionFactory subscriptionFactory;
8583
private MqttServer server;
8684

8785
private int entityChangedQueueSize;
@@ -112,7 +110,6 @@ public MqttManager(CoreSettings settings) {
112110
throw new IllegalArgumentException("setting must be non-null");
113111
}
114112
this.settings = settings;
115-
subscriptionFactory = new SubscriptionFactory(settings);
116113

117114
init();
118115
}
@@ -152,6 +149,7 @@ private void init() {
152149
entityCreateProcessors);
153150
// start MQTT server
154151
server = MqttServerFactory.get(settings);
152+
subscriptionFactory = new SubscriptionFactory(settings, server.getUserCaches());
155153
server.addSubscriptionListener(this);
156154
server.addEntityCreateListener(this);
157155
server.start();
@@ -193,12 +191,14 @@ private void handleEntityChangedEvent(EntityChangedMessage message) {
193191
}
194192
// check if there is any subscription, if not do not publish at all
195193
if (!subscriptions.containsKey(entityType)) {
194+
LOGGER.trace(" No subscriptions for {}.", entityType);
196195
return;
197196
}
198197

199198
Entity entity = message.getEntity();
200199
Set<Property> fields = message.getFields();
201200
try (PersistenceManager persistenceManager = PersistenceManagerFactory.getInstance(settings).create()) {
201+
LOGGER.trace(" Checking Subscriptions for {}.", entityType);
202202
subscriptions.get(entityType).handleEntityChanged(persistenceManager, entity, fields);
203203
} catch (Exception ex) {
204204
LOGGER.error("error handling MQTT subscriptions", ex);
@@ -208,6 +208,7 @@ private void handleEntityChangedEvent(EntityChangedMessage message) {
208208
public void notifySubscription(Subscription subscription, Entity entity) {
209209
final String topic = subscription.getTopic();
210210
try {
211+
LOGGER.trace(" Notifying {} of change in {}", topic, entity);
211212
String payload = subscription.formatMessage(entity);
212213
server.publish(topic, payload, settings.getMqttSettings().getQosLevel());
213214
} catch (IOException ex) {

FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttServer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import de.fraunhofer.iosb.ilt.frostserver.mqtt.create.EntityCreateListener;
2121
import de.fraunhofer.iosb.ilt.frostserver.mqtt.subscription.SubscriptionListener;
2222
import de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings;
23+
import de.fraunhofer.iosb.ilt.frostserver.util.UserCaches;
2324

2425
/**
2526
*
@@ -33,6 +34,8 @@ public interface MqttServer {
3334

3435
public void stop();
3536

37+
public UserCaches getUserCaches();
38+
3639
public void publish(String topic, String payload, int qos);
3740

3841
public void addSubscriptionListener(SubscriptionListener listener);

FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionManager.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@
2727
import java.util.Map;
2828
import java.util.Set;
2929
import java.util.concurrent.atomic.AtomicInteger;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032

3133
/**
3234
* A manger for subscriptions for a single entity type.
33-
*
34-
* @author scf
3535
*/
3636
class SubscriptionManager {
3737

38+
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionManager.class.getName());
39+
3840
/**
3941
* The main entity type of the subscriptions in this manager.
4042
*/
@@ -65,9 +67,11 @@ public EntityType getEntityType() {
6567

6668
public void handleEntityChanged(PersistenceManager persistenceManager, Entity entity, Set<Property> fields) {
6769
for (SubscriptionSetDirectParent subSet : parentedSubscriptions.values()) {
70+
LOGGER.trace(" Direct subscription for {}.", entityType);
6871
subSet.handleEntityChanged(persistenceManager, entity, fields);
6972
}
7073
for (Subscription subscription : complexSubscriptions.getSubscriptions().keySet()) {
74+
LOGGER.trace(" Complex subscription for {}.", entityType);
7175
if (subscription.matches(persistenceManager, entity, fields)) {
7276
Entity newEntity = subscription.fetchExpand(persistenceManager, entity);
7377
mqttManager.notifySubscription(subscription, newEntity);

FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionSetDirectParent.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232

3333
/**
3434
* A set of subscriptions that are keyed to a direct parent for faster access.
35-
*
36-
* @author scf
3735
*/
3836
class SubscriptionSetDirectParent {
3937

@@ -53,16 +51,21 @@ public SubscriptionSetDirectParent(MqttManager mqttManager, NavigationPropertyMa
5351
public void handleEntityChanged(PersistenceManager persistenceManager, Entity entity, Set<Property> fields) {
5452
Entity parent = (Entity) entity.getProperty(relationToParent);
5553
if (parent == null) {
54+
LOGGER.trace(" No parent");
5655
return;
5756
}
5857
PkValue pkValue = parent.getPrimaryKeyValues();
5958
SubscriptionSet subsForParent = subscriptions.get(pkValue);
6059
if (subsForParent == null) {
60+
LOGGER.trace(" No subs for parent {}", parent);
6161
return;
6262
}
6363
// for each subscription on EntityType check match
64+
LOGGER.trace(" Matching {} subs with parent {}", topicCount, parent);
6465
for (Subscription subscription : subsForParent.getSubscriptions().keySet()) {
66+
LOGGER.trace(" Matching parent {} to {}", parent, subscription);
6567
if (subscription.matches(persistenceManager, entity, fields)) {
68+
LOGGER.trace(" Match for parent {}", parent);
6669
Entity newEntity = subscription.fetchExpand(persistenceManager, entity);
6770
mqttManager.notifySubscription(subscription, newEntity);
6871
}
@@ -79,6 +82,9 @@ public boolean addSubscription(Subscription subscription) {
7982
}
8083
SubscriptionSet subsForParent = subscriptions.computeIfAbsent(parentPk, t -> new SubscriptionSet(topicCount));
8184
subsForParent.addSubscription(subscription);
85+
if (LOGGER.isTraceEnabled()) {
86+
LOGGER.trace(" Added subscription {} for {}", subscriptions.size(), subscription);
87+
}
8288
return true;
8389
}
8490
}

0 commit comments

Comments
 (0)