Skip to content

Commit 6256b08

Browse files
committed
Prevent unwanted spinning on migration jobs
Motivation ---------- A busy-wait loop prevents migration jobs from being cancelled while waiting for: - Invalid source or destination lists. - Insufficient eligible pools in pool list to satisfy replication request (when `waitForTargets` is `true`). Modification ------------ - Modify state machine to enter `SLEEPING` state when appropriate rather than entering a cycle of spawning tasks that immediately fail. Result ------ Jobs where `waitForTargets` is `true` with insufficient eligible destination pools are now cancellable, and will resume when (if) more eligible pools become available. Ticket: - Acked-by: Tigran Mkrtchyan Target: trunk Request: - Require-book: no Require-notes: yes
1 parent 3d91759 commit 6256b08

File tree

4 files changed

+270
-16
lines changed

4 files changed

+270
-16
lines changed

modules/dcache/src/main/java/org/dcache/pool/migration/Job.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -498,11 +498,16 @@ private void schedule() {
498498
setState(State.FINISHED);
499499
} else if (_state == State.RUNNING &&
500500
(!_definition.sourceList.isValid() ||
501-
!_definition.poolList.isValid())) {
501+
!_definition.poolList.isValid() ||
502+
(_definition.waitForTargets && _definition.poolList.getPools().isEmpty()))) {
502503
setState(State.SLEEPING);
503504
} else if (_state == State.RUNNING) {
504505
Iterator<PnfsId> i = _queued.iterator();
505506
while ((_running.size() < _concurrency) && i.hasNext()) {
507+
if (_state != State.RUNNING) {
508+
break;
509+
}
510+
506511
Expression stopWhen = _definition.stopWhen;
507512
if (stopWhen != null && evaluateLifetimePredicate(stopWhen)) {
508513
stop();
@@ -515,7 +520,8 @@ private void schedule() {
515520
}
516521

517522
PnfsId pnfsId = i.next();
518-
if (!_context.lock(pnfsId)) {
523+
boolean locked = _context.lock(pnfsId);
524+
if (!locked) {
519525
addNote(new Note(0, pnfsId, "File is locked"));
520526
continue;
521527
}
@@ -735,13 +741,7 @@ public void taskFailed(Task task, int rc, String msg) {
735741
_queued.add(pnfsId);
736742
_context.unlock(pnfsId);
737743
}
738-
739-
if (_state == State.RUNNING) {
740-
setState(State.SLEEPING);
741-
} else {
742-
schedule();
743-
}
744-
744+
schedule();
745745
addNote(new Note(task.getId(), pnfsId, msg));
746746
} finally {
747747
_lock.unlock();

modules/dcache/src/main/java/org/dcache/pool/migration/Task.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ public boolean getMustMovePins() {
9999
return !_pinsToMove.isEmpty();
100100
}
101101

102+
public boolean getWaitForTargets() {
103+
return _parameters.waitForTargets;
104+
}
105+
102106
public long getId() {
103107
return _id;
104108
}

modules/dcache/src/main/smc/org/dcache/pool/migration/Task.sm

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,16 @@ Entry
7070
{
7171
}
7272
query_success
73-
[ !ctxt.isMetaOnly() ]
73+
[ !ctxt.isMetaOnly() && ctxt.moreReplicasPossible() ]
7474
InitiatingCopy
7575
{
7676
}
77+
query_success
78+
[ !ctxt.isMetaOnly() ]
79+
Done
80+
{
81+
notifyCompletedWithInsufficientReplicas();
82+
}
7783
query_success
7884
Failed
7985
{
@@ -95,7 +101,7 @@ Entry
95101
{
96102
}
97103
copy_timeout
98-
[ ctxt.isEager() && !ctxt.isMetaOnly() ]
104+
[ ctxt.isEager() && !ctxt.isMetaOnly() && ctxt.moreReplicasPossible() ]
99105
InitiatingCopy
100106
{
101107
}
@@ -111,7 +117,7 @@ Entry
111117
updateExistingReplica();
112118
}
113119
copy_noroute
114-
[ ctxt.isEager() && !ctxt.isMetaOnly() ]
120+
[ ctxt.isEager() && !ctxt.isMetaOnly() && ctxt.moreReplicasPossible() ]
115121
InitiatingCopy
116122
{
117123
}
@@ -133,7 +139,7 @@ Entry
133139
updateExistingReplica();
134140
}
135141
copy_failure(rc: Integer, cause: Object)
136-
[ !ctxt.isMetaOnly() ]
142+
[ !ctxt.isMetaOnly() && ctxt.moreReplicasPossible() ]
137143
InitiatingCopy
138144
{
139145
}
@@ -198,9 +204,21 @@ Entry
198204
{
199205
}
200206
copy_nopools
207+
[ !ctxt.getWaitForTargets() ]
208+
Done
209+
{
210+
notifyCompletedWithInsufficientReplicas();
211+
}
212+
copy_nopools
213+
[ ctxt.getWaitForTargets() ]
201214
Failed
202215
{
203-
fail(NO_POOL_ONLINE, "No targets");
216+
fail(NO_POOL_ONLINE, "No targets available");
217+
}
218+
copy_nopools
219+
Done
220+
{
221+
failPermanently(NO_POOL_ONLINE, "No targets available");
204222
}
205223
copy_noroute
206224
Failed
@@ -281,19 +299,31 @@ WaitingForCopyReplicaReply
281299
{
282300
}
283301
copy_success
284-
[ !ctxt.isMetaOnly() ]
302+
[ !ctxt.isMetaOnly() && ctxt.moreReplicasPossible() ]
285303
InitiatingCopy
286304
{
287305
}
306+
copy_success
307+
[ !ctxt.isMetaOnly() ]
308+
Done
309+
{
310+
notifyCompletedWithInsufficientReplicas();
311+
}
288312
copy_success
289313
Failed
290314
{
291315
failPermanently(FILE_NOT_IN_REPOSITORY, "File skipped because it does not have enough existing replicas");
292316
}
317+
copy_nopools
318+
[ !ctxt.getWaitForTargets() ]
319+
Done
320+
{
321+
notifyCompletedWithInsufficientReplicas();
322+
}
293323
copy_nopools
294324
Failed
295325
{
296-
fail(NO_POOL_ONLINE, "No targets");
326+
fail(NO_POOL_ONLINE, "No targets available");
297327
}
298328
copy_noroute
299329
Failed
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package org.dcache.pool.migration;
2+
3+
import java.util.Collections;
4+
import java.util.Comparator;
5+
import java.util.concurrent.ScheduledExecutorService;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.function.Predicate;
8+
9+
import org.dcache.cells.CellStub;
10+
import org.dcache.namespace.FileAttribute;
11+
import org.dcache.pool.repository.CacheEntry;
12+
import org.dcache.pool.repository.Repository;
13+
import org.dcache.vehicles.FileAttributes;
14+
import org.junit.Before;
15+
import org.junit.Test;
16+
import org.mockito.ArgumentCaptor;
17+
18+
import static org.mockito.ArgumentMatchers.any;
19+
import static org.mockito.ArgumentMatchers.anyLong;
20+
import static org.mockito.Mockito.atLeast;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.when;
25+
26+
import com.google.common.collect.ImmutableList;
27+
28+
import diskCacheV111.util.PnfsId;
29+
import dmg.cells.nucleus.CellAddressCore;
30+
import dmg.cells.nucleus.CellPath;
31+
32+
public class MigrationModuleSpinTest {
33+
34+
private MigrationContext context;
35+
private ScheduledExecutorService executor;
36+
private RefreshablePoolList poolList;
37+
private RefreshablePoolList sourceList;
38+
private JobDefinition definition;
39+
private Repository repository;
40+
private Predicate<CacheEntry> filter;
41+
42+
private JobDefinition createJobDefinition(boolean waitForTargets) {
43+
CacheEntryMode mode = new CacheEntryMode(CacheEntryMode.State.SAME,
44+
Collections.emptyList());
45+
return new JobDefinition(
46+
filter, // filter
47+
mode, // sourceMode
48+
mode, // targetMode
49+
mock(PoolSelectionStrategy.class), // selectionStrategy
50+
mock(Comparator.class), // comparator
51+
sourceList, // sourceList
52+
poolList, // poolList
53+
0, // refreshPeriod
54+
false, // isPermanent
55+
false, // isEager
56+
false, // isMetaOnly
57+
1, // replicas
58+
false, // mustMovePins
59+
false, // computeChecksumOnUpdate
60+
false, // maintainAtime
61+
null, // pauseWhen
62+
null, // stopWhen
63+
false, // forceSourceMode
64+
waitForTargets // waitForTargets
65+
);
66+
}
67+
68+
@Before
69+
public void setUp() {
70+
context = mock(MigrationContext.class);
71+
executor = mock(ScheduledExecutorService.class);
72+
poolList = mock(RefreshablePoolList.class);
73+
sourceList = mock(RefreshablePoolList.class);
74+
repository = mock(Repository.class);
75+
filter = mock(Predicate.class);
76+
77+
when(context.getExecutor()).thenReturn(executor);
78+
when(context.getPoolStub()).thenReturn(mock(CellStub.class));
79+
when(context.getPnfsStub()).thenReturn(mock(CellStub.class));
80+
81+
CellStub pinManagerStub = mock(CellStub.class);
82+
CellAddressCore cellAddress = new CellAddressCore("PinManager");
83+
CellPath cellPath = new CellPath(cellAddress);
84+
when(pinManagerStub.getDestinationPath()).thenReturn(cellPath);
85+
when(context.getPinManagerStub()).thenReturn(pinManagerStub);
86+
when(context.lock(any(PnfsId.class))).thenReturn(true);
87+
88+
when(context.getRepository()).thenReturn(repository);
89+
when(repository.iterator()).thenReturn(Collections.emptyIterator());
90+
91+
definition = createJobDefinition(false);
92+
93+
when(poolList.isValid()).thenReturn(true);
94+
when(sourceList.isValid()).thenReturn(true);
95+
96+
// Mock pool list to return empty list (no pools available)
97+
when(poolList.getPools()).thenReturn(ImmutableList.of());
98+
when(poolList.getOfflinePools()).thenReturn(ImmutableList.of());
99+
100+
FileAttributes attributes = mock(FileAttributes.class);
101+
when(attributes.isDefined(FileAttribute.LOCATIONS)).thenReturn(true);
102+
when(attributes.getLocations()).thenReturn(Collections.emptyList());
103+
104+
// We need to make sure any CacheEntry returns these attributes
105+
// But CacheEntry is mocked in test methods.
106+
// I'll add a helper or just set it up in test methods.
107+
}
108+
109+
@Test(timeout = 5000)
110+
public void testSpin() throws Exception {
111+
// Setup a job with some queued tasks
112+
PnfsId pnfsId = new PnfsId("000000000000000000000001");
113+
CacheEntry entry = mock(CacheEntry.class);
114+
when(entry.getPnfsId()).thenReturn(pnfsId);
115+
116+
FileAttributes attributes = new FileAttributes();
117+
attributes.setLocations(Collections.emptyList());
118+
when(entry.getFileAttributes()).thenReturn(attributes);
119+
120+
// Mock repository to return the entry
121+
when(repository.iterator()).thenReturn(Collections.singletonList(pnfsId).iterator());
122+
when(repository.getEntry(pnfsId)).thenReturn(entry);
123+
124+
// Mock filter to accept the entry
125+
when(filter.test(entry)).thenReturn(true);
126+
127+
Job job = new Job(context, definition);
128+
129+
// Start the job
130+
job.start();
131+
132+
// Capture and run the initialization task
133+
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
134+
verify(executor).submit(captor.capture());
135+
captor.getValue().run();
136+
137+
// If the bug is fixed, the job should finish without sleeping/spinning
138+
verify(executor, times(0)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
139+
}
140+
141+
@Test(timeout = 5000)
142+
public void testInsufficientPools_WaitForTargetsFalse() throws Exception {
143+
// Setup a job with some queued tasks
144+
PnfsId pnfsId = new PnfsId("000000000000000000000001");
145+
CacheEntry entry = mock(CacheEntry.class);
146+
when(entry.getPnfsId()).thenReturn(pnfsId);
147+
148+
// Mock repository to return the entry
149+
when(repository.iterator()).thenReturn(Collections.singletonList(pnfsId).iterator());
150+
when(repository.getEntry(pnfsId)).thenReturn(entry);
151+
152+
FileAttributes attributes = new FileAttributes();
153+
attributes.setLocations(Collections.emptyList());
154+
when(entry.getFileAttributes()).thenReturn(attributes);
155+
156+
// Mock pool list to be valid but empty
157+
when(poolList.isValid()).thenReturn(true);
158+
when(poolList.getPools()).thenReturn(ImmutableList.of());
159+
160+
// Mock filter to accept the entry
161+
when(filter.test(entry)).thenReturn(true);
162+
163+
Job job = new Job(context, definition);
164+
job.start();
165+
166+
// Capture and run the initialization task
167+
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
168+
verify(executor).submit(captor.capture());
169+
captor.getValue().run();
170+
171+
// If the fix works, the task should complete (with insufficient replicas) and NOT retry.
172+
// So getPools() should be called at least once.
173+
verify(poolList, atLeast(1)).getPools();
174+
175+
// Verify that Job did NOT sleep (schedule was not called for sleeping)
176+
verify(executor, times(0)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
177+
}
178+
179+
@Test(timeout = 5000)
180+
public void testInsufficientPools_WaitForTargetsTrue() throws Exception {
181+
// Setup a job with some queued tasks
182+
PnfsId pnfsId = new PnfsId("000000000000000000000001");
183+
CacheEntry entry = mock(CacheEntry.class);
184+
when(entry.getPnfsId()).thenReturn(pnfsId);
185+
186+
// Mock repository to return the entry
187+
when(repository.iterator()).thenReturn(Collections.singletonList(pnfsId).iterator());
188+
when(repository.getEntry(pnfsId)).thenReturn(entry);
189+
190+
FileAttributes attributes = new FileAttributes();
191+
attributes.setLocations(Collections.emptyList());
192+
when(entry.getFileAttributes()).thenReturn(attributes);
193+
194+
// Mock pool list to be valid but empty
195+
when(poolList.isValid()).thenReturn(true);
196+
when(poolList.getPools()).thenReturn(ImmutableList.of());
197+
198+
// Mock filter to accept the entry
199+
when(filter.test(entry)).thenReturn(true);
200+
201+
// Create definition with waitForTargets = true
202+
definition = createJobDefinition(true);
203+
204+
Job job = new Job(context, definition);
205+
job.start();
206+
207+
// Capture and run the initialization task
208+
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
209+
verify(executor).submit(captor.capture());
210+
captor.getValue().run();
211+
212+
// Task should NOT start. Job should sleep immediately.
213+
214+
// Verify that Job scheduled a sleep
215+
verify(executor, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
216+
217+
// Verify that execute was NOT called (Task was not started)
218+
verify(executor, times(0)).execute(any(Runnable.class));
219+
}
220+
}

0 commit comments

Comments
 (0)