Skip to content

Commit 19681b4

Browse files
authored
Merge pull request #5396 from Sage-Bionetworks/release-562
Release 562
2 parents 98f2781 + d13ab02 commit 19681b4

File tree

17 files changed

+256
-141
lines changed

17 files changed

+256
-141
lines changed

lib/lib-grid-db/src/main/java/org/sagebionetworks/grid/db/GridIndexDao.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.sagebionetworks.grid.db;
22

33
import java.sql.Timestamp;
4+
import java.time.Duration;
45
import java.util.List;
56
import java.util.Optional;
67

@@ -249,7 +250,17 @@ List<ArrayNode> getArrayNodesInOrder(String sessionIdString, Long replicaId, Log
249250
* @param setMethod
250251
* @return
251252
*/
252-
MessageChain createMessageChain(MessageChain setMethod);
253+
MessageChain createMessageChain(MessageChain setMethod, Duration expires);
254+
255+
/**
256+
* Refresh the expiration of the provided message chain.
257+
* @param sessionId
258+
* @param replicaId
259+
* @param chainId
260+
* @param expires
261+
* @return
262+
*/
263+
boolean refreshMessageChain(String sessionId, Long replicaId, Integer chainId, Duration expires);
253264

254265
/**
255266
* Get a {@link MessageChain} if it exists.
@@ -260,6 +271,17 @@ List<ArrayNode> getArrayNodesInOrder(String sessionIdString, Long replicaId, Log
260271
* @return Optional.empty() if the chain no longer exists
261272
*/
262273
Optional<MessageChain> getMessageChain(String sessionId, Long replicaId, Integer chainId);
274+
275+
/**
276+
* Determine if a non-expired message chain already exists for the given method name.
277+
*
278+
* @param sessionId
279+
* @param replicaId
280+
* @param method
281+
* @return
282+
*/
283+
Optional<MessageChain> getNonExpiredMessageChain(String sessionId, Long replicaId, String method);
284+
263285

264286
/**
265287
* Delete a message chain upon completion. This will free up the ID to be

lib/lib-grid-db/src/main/java/org/sagebionetworks/grid/db/GridIndexDaoImpl.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.sql.ResultSet;
88
import java.sql.SQLException;
99
import java.sql.Timestamp;
10+
import java.time.Duration;
1011
import java.util.Collections;
1112
import java.util.List;
1213
import java.util.Optional;
@@ -623,17 +624,20 @@ public Integer createNextMessageId(String sessionIdString, Long replicaId, int m
623624

624625
@Override
625626
@GridTransaction(readOnly = false)
626-
public MessageChain createMessageChain(MessageChain chain) {
627+
public MessageChain createMessageChain(MessageChain chain, Duration expires) {
627628
ValidateArgument.required(chain, "chain");
628629
ValidateArgument.required(chain.getId(), "chain.id");
629630
ValidateArgument.required(chain.getSessionId(), "chain.sessionId");
630631
ValidateArgument.required(chain.getReplicaId(), "chain.replicaId");
631632
ValidateArgument.required(chain.getMethod(), "chain.method");
633+
ValidateArgument.required(expires, "chain.expires");
632634
Long sessionId = validateReplica(chain.getSessionId(), chain.getReplicaId());
633635
jdbcTemplate.update(
634-
"INSERT INTO GRID_REPLICA_MESSAGE (SESSION_ID, REPLICA_ID, MESSAGE_ID, METHOD_NAME, CREATED_ON)"
635-
+ " VALUES (?,?,?,?,NOW()) ON DUPLICATE KEY UPDATE METHOD_NAME = ?, CREATED_ON = NOW()",
636-
sessionId, chain.getReplicaId(), chain.getId(), chain.getMethod(), chain.getMethod());
636+
"INSERT INTO GRID_REPLICA_MESSAGE (SESSION_ID, REPLICA_ID, MESSAGE_ID, METHOD_NAME, CREATED_ON, EXPIRES_On)"
637+
+ " VALUES (?,?,?,?,NOW(),NOW() + INTERVAL ? SECOND) ON DUPLICATE KEY"
638+
+ " UPDATE METHOD_NAME = ?, CREATED_ON = NOW(), EXPIRES_ON = NOW() + INTERVAL ? SECOND",
639+
sessionId, chain.getReplicaId(), chain.getId(), chain.getMethod(), expires.getSeconds(),
640+
chain.getMethod(), expires.getSeconds());
637641
return getMessageChain(chain.getSessionId(), chain.getReplicaId(), chain.getId()).get();
638642
}
639643

@@ -650,6 +654,31 @@ public Optional<MessageChain> getMessageChain(String sessionIdString, Long repli
650654
}
651655
}
652656

657+
@Override
658+
@GridTransaction(readOnly = false)
659+
public boolean refreshMessageChain(String sessionIdString, Long replicaId, Integer chainId, Duration expires) {
660+
Long sessionId = validateReplica(sessionIdString, replicaId);
661+
ValidateArgument.required(expires, "expires");
662+
return jdbcTemplate.update(
663+
"UPDATE GRID_REPLICA_MESSAGE SET EXPIRES_ON = NOW() + INTERVAL ? SECOND WHERE"
664+
+ " SESSION_ID = ? AND REPLICA_ID = ? AND MESSAGE_ID = ?",
665+
expires.getSeconds(), sessionId, replicaId, chainId) > 0;
666+
}
667+
668+
@Override
669+
public Optional<MessageChain> getNonExpiredMessageChain(String sessionIdString, Long replicaId, String method) {
670+
Long sessionId = validateReplica(sessionIdString, replicaId);
671+
ValidateArgument.required(method, "method");
672+
try {
673+
return jdbcTemplate
674+
.query("SELECT * FROM GRID_REPLICA_MESSAGE WHERE SESSION_ID = ? AND REPLICA_ID = ? AND METHOD_NAME = ?"
675+
+ " AND EXPIRES_ON > NOW() LIMIT 1", MESSAGE_CHAIN_MAPPER, sessionId, replicaId, method)
676+
.stream().findFirst();
677+
} catch (EmptyResultDataAccessException e) {
678+
return Optional.empty();
679+
}
680+
}
681+
653682
@Override
654683
@GridTransaction(readOnly = false)
655684
public void deleteMessageChain(String sessionIdString, Long replicaId, Integer chainId) {
@@ -707,4 +736,6 @@ public Long getClockSequenceMaximum(String sessionIdString, Long replicaId) {
707736
Long.class, sessionId, replicaId);
708737
return max != null ? max : 1L;
709738
}
739+
740+
710741
}

lib/lib-grid-db/src/main/java/org/sagebionetworks/grid/db/GridIndexManager.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.sagebionetworks.grid.db;
22

3+
import java.time.Duration;
34
import java.util.List;
45
import java.util.Map;
56
import java.util.Optional;
@@ -60,6 +61,26 @@ public interface GridIndexManager {
6061
* @param chainId
6162
*/
6263
void completeMessageChain(String sessionId, Long replicaId, Integer chainId);
64+
65+
/**
66+
* Refresh the expiration on the provided message chain.
67+
*
68+
* @param sessionId
69+
* @param replicaId
70+
* @param chainId
71+
* @return
72+
*/
73+
boolean refreshMessageChain(String sessionId, Long replicaId, Integer chainId);
6374

75+
/**
76+
* Determine if a non-expired message chain already exists for the given method name.
77+
*
78+
* @param sessionId
79+
* @param replicaId
80+
* @param method
81+
* @return
82+
*/
83+
Optional<MessageChain> getNonExpiredMessageChain(String sessionId, Long replicaId, String method);
84+
6485
void truncateAll();
6586
}

lib/lib-grid-db/src/main/java/org/sagebionetworks/grid/db/GridIndexManagerImpl.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.sagebionetworks.grid.db;
22

3+
import java.time.Duration;
34
import java.util.Collections;
45
import java.util.List;
56
import java.util.Map;
@@ -19,6 +20,7 @@
1920
@GridTransaction(readOnly = true)
2021
public class GridIndexManagerImpl implements GridIndexManager {
2122

23+
public static final Duration MAX_MESSAGE_DURATION = Duration.ofSeconds(60);
2224
public static final int MAX_MESSAGE_ID = 65535;
2325

2426
private static final Logger log = LogManager.getLogger(GridIndexManagerImpl.class);
@@ -57,8 +59,8 @@ public Map<IndexType, Set<LogicalTimestamp>> applyPatch(String sessionId, Long r
5759
/*
5860
* Set the replica's clock to reflect the applied patch. For bootstrap patches
5961
* (created during grid initialization), we must be careful not to increment
60-
* this replica's sequence beyond other replicas' sequences, as this could
61-
* cause outstanding bootstrap patches to be ignored during synchronization.
62+
* this replica's sequence beyond other replicas' sequences, as this could cause
63+
* outstanding bootstrap patches to be ignored during synchronization.
6264
*/
6365
dao.setClock(sessionId, replicaId, patchClock);
6466
return changes;
@@ -98,7 +100,8 @@ public MessageChain startMessageChain(String sessionId, Long replicaId, String m
98100
createReplicaIfNotExist(sessionId, replicaId);
99101
Integer id = dao.createNextMessageId(sessionId, replicaId, MAX_MESSAGE_ID);
100102
return dao.createMessageChain(
101-
new MessageChain().setSessionId(sessionId).setReplicaId(replicaId).setMethod(method).setId(id));
103+
new MessageChain().setSessionId(sessionId).setReplicaId(replicaId).setMethod(method).setId(id),
104+
MAX_MESSAGE_DURATION);
102105
}
103106

104107
@Override
@@ -117,4 +120,15 @@ public void completeMessageChain(String sessionId, Long replicaId, Integer chain
117120
public void truncateAll() {
118121
dao.truncateAll();
119122
}
123+
124+
@Override
125+
@GridTransaction(readOnly = false)
126+
public boolean refreshMessageChain(String sessionId, Long replicaId, Integer chainId) {
127+
return dao.refreshMessageChain(sessionId, replicaId, chainId, MAX_MESSAGE_DURATION);
128+
}
129+
130+
@Override
131+
public Optional<MessageChain> getNonExpiredMessageChain(String sessionId, Long replicaId, String method) {
132+
return dao.getNonExpiredMessageChain(sessionId, replicaId, method);
133+
}
120134
}

lib/lib-grid-db/src/main/resources/schema/Grid-Message-ddl.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CREATE TABLE IF NOT EXISTS GRID_REPLICA_MESSAGE (
44
MESSAGE_ID MEDIUMINT NOT NULL,
55
METHOD_NAME VARCHAR(256) CHARACTER SET latin1 COLLATE latin1_bin NOT NULL,
66
CREATED_ON TIMESTAMP(3) NOT NULL,
7+
EXPIRES_ON TIMESTAMP(3) DEFAULT NULL,
78
PRIMARY KEY (SESSION_ID, REPLICA_ID, MESSAGE_ID),
89
CONSTRAINT `GRID_MESSAGE_FK` FOREIGN KEY (SESSION_ID, REPLICA_ID) REFERENCES GRID_REPLICA(SESSION_ID, REPLICA_ID) ON DELETE CASCADE
910
)

lib/lib-grid-db/src/test/java/org/sagebionetworks/grid/db/GridIndexDaoImplTest.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static org.junit.jupiter.api.Assertions.assertTrue;
88

99
import java.sql.Timestamp;
10+
import java.time.Duration;
1011
import java.util.Collections;
1112
import java.util.LinkedHashMap;
1213
import java.util.List;
@@ -778,23 +779,30 @@ public void testCreateNextMessageId() {
778779
}
779780

780781
@Test
781-
public void testMessageChainCRUD() {
782+
public void testMessageChainCRUD() throws InterruptedException {
782783
gridIndexDao.createReplicaIfNotExists(sessionIdOne, replicaIdOne);
783784
gridIndexDao.createReplicaIfNotExists(sessionIdTwo, replicaIdTwo);
784785
int maxValues = 100;
786+
Duration expires = Duration.ofSeconds(2);
785787
// one
786788
Integer idOne = gridIndexDao.createNextMessageId(sessionIdOne, replicaIdOne, maxValues);
787789
assertEquals(Optional.empty(), gridIndexDao.getMessageChain(sessionIdOne, replicaIdOne, idOne));
788790
MessageChain chainOne = new MessageChain().setSessionId(sessionIdOne).setReplicaId(replicaIdOne).setId(idOne)
789791
.setMethod("methodOne");
790-
MessageChain backOne = gridIndexDao.createMessageChain(chainOne);
792+
MessageChain backOne = gridIndexDao.createMessageChain(chainOne, expires);
791793
MessageChain expected = new MessageChain().setSessionId(sessionIdOne).setReplicaId(replicaIdOne).setId(idOne)
792794
.setMethod("methodOne").setCreatedOn(backOne.getCreatedOn());
793795
assertEquals(expected, backOne);
794796
assertEquals(Optional.of(expected), gridIndexDao.getMessageChain(sessionIdOne, replicaIdOne, idOne));
797+
assertEquals(Optional.of(expected),
798+
gridIndexDao.getNonExpiredMessageChain(sessionIdOne, replicaIdOne, chainOne.getMethod()));
799+
Thread.sleep(2001L);
800+
assertEquals(Optional.empty(),
801+
gridIndexDao.getNonExpiredMessageChain(sessionIdOne, replicaIdOne, chainOne.getMethod()));
802+
795803
// update
796804
chainOne.setMethod("updatedMethod");
797-
backOne = gridIndexDao.createMessageChain(chainOne);
805+
backOne = gridIndexDao.createMessageChain(chainOne, expires);
798806
expected = new MessageChain().setSessionId(sessionIdOne).setReplicaId(replicaIdOne).setId(idOne)
799807
.setMethod("updatedMethod").setCreatedOn(backOne.getCreatedOn());
800808
assertEquals(expected, backOne);
@@ -803,7 +811,7 @@ public void testMessageChainCRUD() {
803811
Integer idTwo = gridIndexDao.createNextMessageId(sessionIdTwo, replicaIdTwo, maxValues);
804812
MessageChain chainTwo = new MessageChain().setSessionId(sessionIdTwo).setReplicaId(replicaIdTwo).setId(idTwo)
805813
.setMethod("methodTwo");
806-
backOne = gridIndexDao.createMessageChain(chainTwo);
814+
backOne = gridIndexDao.createMessageChain(chainTwo, expires);
807815
expected = new MessageChain().setSessionId(sessionIdTwo).setReplicaId(replicaIdTwo).setId(idTwo)
808816
.setMethod("methodTwo").setCreatedOn(backOne.getCreatedOn());
809817
assertEquals(expected, backOne);
@@ -814,6 +822,28 @@ public void testMessageChainCRUD() {
814822
assertEquals(Optional.empty(), gridIndexDao.getMessageChain(sessionIdOne, replicaIdOne, idOne));
815823
assertEquals(Optional.of(expected), gridIndexDao.getMessageChain(sessionIdTwo, replicaIdTwo, idTwo));
816824
}
825+
826+
@Test
827+
public void testMessageChainExpiration() throws InterruptedException {
828+
gridIndexDao.createReplicaIfNotExists(sessionIdOne, replicaIdOne);
829+
int maxValues = 100;
830+
Duration expires = Duration.ofSeconds(1);
831+
// one
832+
Integer idOne = gridIndexDao.createNextMessageId(sessionIdOne, replicaIdOne, maxValues);
833+
assertEquals(Optional.empty(), gridIndexDao.getMessageChain(sessionIdOne, replicaIdOne, idOne));
834+
MessageChain chainOne = new MessageChain().setSessionId(sessionIdOne).setReplicaId(replicaIdOne).setId(idOne)
835+
.setMethod("methodOne");
836+
// call under test
837+
MessageChain backOne = gridIndexDao.createMessageChain(chainOne, expires);
838+
Thread.sleep(1000L);
839+
expires = Duration.ofSeconds(10);
840+
// call under test
841+
assertTrue(gridIndexDao.refreshMessageChain(sessionIdOne, replicaIdOne, idOne, expires));
842+
Thread.sleep(1000L);
843+
// all under test
844+
assertEquals(Optional.of(backOne),
845+
gridIndexDao.getNonExpiredMessageChain(sessionIdOne, replicaIdOne, chainOne.getMethod()));
846+
}
817847

818848
@Test
819849
public void testGetRootObject() {

lib/lib-grid-db/src/test/java/org/sagebionetworks/grid/db/GridIndexManagerImplTest.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,8 @@ public class GridIndexManagerImplTest {
5757
public void before() {
5858
sessionId = "sessionOne";
5959
replicaId = 123L;
60-
newConstant = new NewConstant(
61-
new LogicalTimestamp().setReplicaId(1L).setSequenceNumber(2L),
62-
new ConValue(ConType.BOOLEAN, true)
63-
);
60+
newConstant = new NewConstant(new LogicalTimestamp().setReplicaId(1L).setSequenceNumber(2L),
61+
new ConValue(ConType.BOOLEAN, true));
6462
newVector = new NewVector(new LogicalTimestamp().setReplicaId(3L).setSequenceNumber(4L));
6563
patch = new Patch().setPatchId(new LogicalTimestamp().setReplicaId(5L).setSequenceNumber(6L))
6664
.setOperations(List.of(newConstant, newVector));
@@ -210,4 +208,25 @@ public void testIsPatchAlreadyAppliedWithNewerReplica() {
210208
assertTrue(manager.isPatchAlreadyApplied(sessionId, replicaId, patch.getPatchId()));
211209
}
212210

211+
@Test
212+
public void testStartMessageChain() {
213+
int nextId = 101;
214+
String methodName = "some-method";
215+
when(mockDao.createNextMessageId(sessionId, replicaId, GridIndexManagerImpl.MAX_MESSAGE_ID)).thenReturn(nextId);
216+
MessageChain chain = new MessageChain().setId(nextId).setSessionId(sessionId).setReplicaId(replicaId)
217+
.setMethod(methodName);
218+
when(mockDao.createMessageChain(chain, GridIndexManagerImpl.MAX_MESSAGE_DURATION)).thenReturn(chain);
219+
// call under test
220+
assertEquals(chain, manager.startMessageChain(sessionId, replicaId, methodName));
221+
}
222+
223+
@Test
224+
public void testRefreshMessageChain() {
225+
int chainId = 111;
226+
when(mockDao.refreshMessageChain(sessionId, replicaId, chainId, GridIndexManagerImpl.MAX_MESSAGE_DURATION))
227+
.thenReturn(true);
228+
// call under test
229+
assertTrue(manager.refreshMessageChain(sessionId, replicaId, chainId));
230+
}
231+
213232
}

0 commit comments

Comments
 (0)