Skip to content

Commit 4472eed

Browse files
fix(reindex): Fix lock treated as stale on reindexing (#911)
Implements: MSEARCH-1181
1 parent 5f17fba commit 4472eed

File tree

4 files changed

+100
-0
lines changed

4 files changed

+100
-0
lines changed

src/main/java/org/folio/search/service/reindex/ReindexStatusService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,17 @@ public boolean isMergeCompleted() {
108108
return statusRepository.isMergeCompleted();
109109
}
110110

111+
/**
112+
* Checks if any reindex operation is currently in progress (merge or upload).
113+
*
114+
* @return true if any entity type has a status of MERGE_IN_PROGRESS or UPLOAD_IN_PROGRESS
115+
*/
116+
public boolean isReindexInProgress() {
117+
return statusRepository.getReindexStatuses().stream()
118+
.anyMatch(status -> status.getStatus() == ReindexStatus.MERGE_IN_PROGRESS
119+
|| status.getStatus() == ReindexStatus.UPLOAD_IN_PROGRESS);
120+
}
121+
111122
private List<ReindexStatusEntity> constructNewStatusRecords(List<ReindexEntityType> entityTypes,
112123
ReindexStatus status) {
113124
return entityTypes.stream()

src/main/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesService.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.folio.search.service.InstanceChildrenResourceService;
2020
import org.folio.search.service.ResourceService;
2121
import org.folio.search.service.reindex.ReindexConstants;
22+
import org.folio.search.service.reindex.ReindexStatusService;
2223
import org.folio.search.service.reindex.jdbc.InstanceChildResourceRepository;
2324
import org.folio.search.service.reindex.jdbc.ItemRepository;
2425
import org.folio.search.service.reindex.jdbc.MergeInstanceRepository;
@@ -42,6 +43,7 @@ public class ScheduledInstanceSubResourcesService {
4243
private final TenantRepository tenantRepository;
4344
private final Map<ReindexEntityType, ReindexJdbcRepository> repositories;
4445
private final SubResourcesLockRepository subResourcesLockRepository;
46+
private final ReindexStatusService reindexStatusService;
4547
private final SystemUserScopedExecutionService executionService;
4648
private final int subResourceBatchSize;
4749
private final long staleLockThresholdMs;
@@ -51,6 +53,7 @@ public ScheduledInstanceSubResourcesService(ResourceService resourceService,
5153
TenantRepository tenantRepository,
5254
List<ReindexJdbcRepository> repositories,
5355
SubResourcesLockRepository subResourcesLockRepository,
56+
ReindexStatusService reindexStatusService,
5457
SystemUserScopedExecutionService executionService,
5558
MergeInstanceRepository instanceRepository,
5659
ItemRepository itemRepository,
@@ -59,6 +62,7 @@ public ScheduledInstanceSubResourcesService(ResourceService resourceService,
5962
this.tenantRepository = tenantRepository;
6063
this.repositories = buildRepositoriesMap(repositories, instanceRepository, itemRepository);
6164
this.subResourcesLockRepository = subResourcesLockRepository;
65+
this.reindexStatusService = reindexStatusService;
6266
this.executionService = executionService;
6367
this.subResourceBatchSize = searchConfigurationProperties.getIndexing().getSubResourceBatchSize();
6468
this.staleLockThresholdMs = searchConfigurationProperties.getIndexing().getStaleLockThresholdMs();
@@ -115,13 +119,28 @@ private void processEntityTypeWithLock(ReindexEntityType entityType, String tena
115119
}
116120

117121
private void handleLockAcquisitionFailure(ReindexEntityType entityType, String tenant) {
122+
if (isReindexInProgress()) {
123+
log.info("persistChildren::Skipping stale lock check for entity type {} in tenant {} - reindex is in progress",
124+
entityType, tenant);
125+
return;
126+
}
127+
118128
if (subResourcesLockRepository.checkAndReleaseStaleLock(entityType, tenant, staleLockThresholdMs)) {
119129
log.warn("persistChildren::Released stale lock for entity type {} in tenant {}. "
120130
+ "Lock was older than threshold of {} ms",
121131
entityType, tenant, staleLockThresholdMs);
122132
}
123133
}
124134

135+
private boolean isReindexInProgress() {
136+
try {
137+
return reindexStatusService.isReindexInProgress();
138+
} catch (Exception e) {
139+
log.warn("persistChildren::Failed to check reindex status, assuming no reindex in progress", e);
140+
return false;
141+
}
142+
}
143+
125144
private void processSubResources(ReindexEntityType entityType, String tenant, Timestamp timestamp) {
126145
SubResourceResult result = null;
127146
String lastId = null;

src/test/java/org/folio/search/service/reindex/ReindexStatusServiceTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,4 +179,46 @@ void updateReindexMergeInProgress() {
179179
// assert
180180
verify(statusRepository).setMergeInProgress(entityTypes);
181181
}
182+
183+
@Test
184+
void isReindexInProgress_trueWhenMerge() {
185+
// given
186+
when(statusRepository.getReindexStatuses()).thenReturn(List.of(
187+
new ReindexStatusEntity(ReindexEntityType.INSTANCE, ReindexStatus.MERGE_IN_PROGRESS),
188+
new ReindexStatusEntity(ReindexEntityType.HOLDINGS, ReindexStatus.MERGE_COMPLETED)));
189+
190+
// act
191+
var actual = service.isReindexInProgress();
192+
193+
// assert
194+
assertThat(actual).isTrue();
195+
}
196+
197+
@Test
198+
void isReindexInProgress_trueWhenUpload() {
199+
// given
200+
when(statusRepository.getReindexStatuses()).thenReturn(List.of(
201+
new ReindexStatusEntity(ReindexEntityType.INSTANCE, ReindexStatus.UPLOAD_IN_PROGRESS),
202+
new ReindexStatusEntity(ReindexEntityType.HOLDINGS, ReindexStatus.MERGE_COMPLETED)));
203+
204+
// act
205+
var actual = service.isReindexInProgress();
206+
207+
// assert
208+
assertThat(actual).isTrue();
209+
}
210+
211+
@Test
212+
void isReindexInProgress_false() {
213+
// given
214+
when(statusRepository.getReindexStatuses()).thenReturn(List.of(
215+
new ReindexStatusEntity(ReindexEntityType.INSTANCE, ReindexStatus.UPLOAD_COMPLETED),
216+
new ReindexStatusEntity(ReindexEntityType.HOLDINGS, ReindexStatus.MERGE_COMPLETED)));
217+
218+
// act
219+
var actual = service.isReindexInProgress();
220+
221+
// assert
222+
assertThat(actual).isFalse();
223+
}
182224
}

src/test/java/org/folio/search/service/scheduled/ScheduledInstanceSubResourcesServiceTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.folio.search.model.types.ReindexEntityType;
2424
import org.folio.search.service.InstanceChildrenResourceService;
2525
import org.folio.search.service.ResourceService;
26+
import org.folio.search.service.reindex.ReindexStatusService;
2627
import org.folio.search.service.reindex.jdbc.ItemRepository;
2728
import org.folio.search.service.reindex.jdbc.MergeInstanceRepository;
2829
import org.folio.search.service.reindex.jdbc.SubResourceResult;
@@ -45,6 +46,7 @@ class ScheduledInstanceSubResourcesServiceTest {
4546
private @Mock ResourceService resourceService;
4647
private @Mock TenantRepository tenantRepository;
4748
private @Mock SubResourcesLockRepository subResourcesLockRepository;
49+
private @Mock ReindexStatusService reindexStatusService;
4850
private @Mock SystemUserScopedExecutionService executionService;
4951
private @Mock InstanceChildrenResourceService instanceChildrenResourceService;
5052
private @Mock SubjectRepository subjectRepository;
@@ -65,6 +67,7 @@ void setUp() {
6567
tenantRepository,
6668
List.of(subjectRepository),
6769
subResourcesLockRepository,
70+
reindexStatusService,
6871
executionService,
6972
instanceRepository,
7073
itemRepository,
@@ -179,13 +182,15 @@ void persistChildren_ShouldReleaseStaleLockWhenLockAcquisitionFails() {
179182
.when(executionService).executeSystemUserScoped(anyString(), any());
180183
when(tenantRepository.fetchDataTenantIds()).thenReturn(List.of(TENANT_ID));
181184
when(subResourcesLockRepository.lockSubResource(any(), eq(TENANT_ID))).thenReturn(Optional.empty());
185+
when(reindexStatusService.isReindexInProgress()).thenReturn(false);
182186
when(subResourcesLockRepository.checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong())).thenReturn(true);
183187

184188
// Act
185189
service.persistChildren();
186190

187191
// Assert
188192
verify(subResourcesLockRepository, times(3)).lockSubResource(any(), eq(TENANT_ID));
193+
verify(reindexStatusService, times(3)).isReindexInProgress();
189194
verify(subResourcesLockRepository, times(3)).checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong());
190195
verify(subResourcesLockRepository, never()).unlockSubResource(any(), any(), any());
191196
verify(subjectRepository, never()).fetchByTimestamp(anyString(), any(), anyInt());
@@ -199,19 +204,42 @@ void persistChildren_ShouldNotProcessWhenLockFailsAndNoStaleLock() {
199204
.when(executionService).executeSystemUserScoped(anyString(), any());
200205
when(tenantRepository.fetchDataTenantIds()).thenReturn(List.of(TENANT_ID));
201206
when(subResourcesLockRepository.lockSubResource(any(), eq(TENANT_ID))).thenReturn(Optional.empty());
207+
when(reindexStatusService.isReindexInProgress()).thenReturn(false);
202208
when(subResourcesLockRepository.checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong())).thenReturn(false);
203209

204210
// Act
205211
service.persistChildren();
206212

207213
// Assert
208214
verify(subResourcesLockRepository, times(3)).lockSubResource(any(), eq(TENANT_ID));
215+
verify(reindexStatusService, times(3)).isReindexInProgress();
209216
verify(subResourcesLockRepository, times(3)).checkAndReleaseStaleLock(any(), eq(TENANT_ID), anyLong());
210217
verify(subResourcesLockRepository, never()).unlockSubResource(any(), any(), any());
211218
verify(subjectRepository, never()).fetchByTimestamp(anyString(), any(), anyInt());
212219
verifyNoInteractions(instanceRepository, itemRepository, resourceService);
213220
}
214221

222+
@Test
223+
void persistChildren_ShouldSkipStaleLockCheckWhenReindexInProgress() {
224+
// Arrange
225+
doAnswer(invocation -> invocation.<Callable<?>>getArgument(1).call())
226+
.when(executionService).executeSystemUserScoped(anyString(), any());
227+
when(tenantRepository.fetchDataTenantIds()).thenReturn(List.of(TENANT_ID));
228+
when(subResourcesLockRepository.lockSubResource(any(), eq(TENANT_ID))).thenReturn(Optional.empty());
229+
when(reindexStatusService.isReindexInProgress()).thenReturn(true);
230+
231+
// Act
232+
service.persistChildren();
233+
234+
// Assert
235+
verify(subResourcesLockRepository, times(3)).lockSubResource(any(), eq(TENANT_ID));
236+
verify(reindexStatusService, times(3)).isReindexInProgress();
237+
verify(subResourcesLockRepository, never()).checkAndReleaseStaleLock(any(), any(), anyLong());
238+
verify(subResourcesLockRepository, never()).unlockSubResource(any(), any(), any());
239+
verify(subjectRepository, never()).fetchByTimestamp(anyString(), any(), anyInt());
240+
verifyNoInteractions(instanceRepository, itemRepository, resourceService);
241+
}
242+
215243
private void mockSubResourceResult(String tenantId, Timestamp timestamp) {
216244
when(subjectRepository.fetchByTimestamp(tenantId, timestamp, 3))
217245
.thenReturn(new SubResourceResult(List.of(Map.of("id", "1", "tenantId", tenantId)), null));

0 commit comments

Comments
 (0)