Skip to content

Commit 9d608be

Browse files
committed
[#4] Allow job removal. Also implement clustering support for memory storage
1 parent ab529fe commit 9d608be

19 files changed

Lines changed: 927 additions & 81 deletions

File tree

core/api/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<dependency>
1515
<groupId>${project.groupId}</groupId>
1616
<artifactId>blaze-actor-core-api</artifactId>
17-
<version>${version.blaze-notify-actor}</version>
17+
<version>${version.blaze-actor}</version>
1818
</dependency>
1919
<!-- Test dependencies -->
2020
<dependency>

core/api/src/main/java/com/blazebit/job/JobContext.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.ArrayList;
3232
import java.util.Collection;
33+
import java.util.Collections;
3334
import java.util.HashMap;
3435
import java.util.Iterator;
3536
import java.util.List;
@@ -89,6 +90,21 @@ public interface JobContext extends ServiceProvider, ConfigurationSource {
8990
*/
9091
<T extends JobInstance<?>> JobInstanceProcessor<?, T> getJobInstanceProcessor(T job);
9192

93+
/**
94+
* Returns all partition keys.
95+
*
96+
* @return The list of all partition keys
97+
*/
98+
Collection<PartitionKey> getPartitionKeys();
99+
100+
/**
101+
* Returns the matching partition keys for the given job instance.
102+
*
103+
* @param jobInstance The job instance
104+
* @return The list of matching partition keys
105+
*/
106+
Collection<PartitionKey> getPartitionKeys(JobInstance<?> jobInstance);
107+
92108
/**
93109
* Refreshes the job instance schedules for the given job instance.
94110
*
@@ -731,6 +747,7 @@ protected DefaultJobContext(TransactionSupport transactionSupport, JobManagerFac
731747
Collection<PartitionKey> defaultTriggerPartitionKeys = this.partitionKeyProvider.getDefaultTriggerPartitionKeys();
732748
if (partitionKeyEntries.isEmpty()) {
733749
Collection<PartitionKey> instancePartitionKeys = this.partitionKeyProvider.getDefaultJobInstancePartitionKeys();
750+
734751
this.jobSchedulers = new HashMap<>(defaultTriggerPartitionKeys.size() + instancePartitionKeys.size());
735752
for (PartitionKey instancePartitionKey : instancePartitionKeys) {
736753
JobScheduler jobInstanceScheduler = jobSchedulerFactory.createJobScheduler(this, actorContext, DEFAULT_JOB_INSTANCE_ACTOR_NAME + "/" + instancePartitionKey, DEFAULT_JOB_INSTANCE_PROCESS_COUNT, instancePartitionKey);
@@ -818,7 +835,13 @@ public void refreshJobInstanceSchedules(JobInstance<?> jobInstance) {
818835
}
819836
}
820837

821-
private List<PartitionKey> getPartitionKeys(JobInstance<?> jobInstance) {
838+
@Override
839+
public Collection<PartitionKey> getPartitionKeys() {
840+
return Collections.unmodifiableSet(jobSchedulers.keySet());
841+
}
842+
843+
@Override
844+
public List<PartitionKey> getPartitionKeys(JobInstance<?> jobInstance) {
822845
return jobInstanceClassToPartitionKeysMapping.computeIfAbsent(jobInstance.getClass(), (k) -> {
823846
List<PartitionKey> v = new ArrayList<>(jobSchedulers.keySet().size());
824847
for (PartitionKey partitionKey : jobSchedulers.keySet()) {

core/api/src/main/java/com/blazebit/job/JobInstanceState.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ public enum JobInstanceState {
4343
/**
4444
* A job instance that reached its maximum defer count and doesn't need further processing.
4545
*/
46-
DROPPED;
46+
DROPPED,
47+
/**
48+
* A job instance with that state is going to be remove during an update.
49+
*/
50+
REMOVED;
4751

4852
}

core/api/src/main/java/com/blazebit/job/JobManager.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.time.Instant;
2020
import java.util.List;
21+
import java.util.Set;
2122

2223
/**
2324
* A manager for adding, updating and querying job instances.
@@ -41,6 +42,23 @@ public interface JobManager {
4142
*/
4243
void updateJobInstance(JobInstance<?> jobInstance);
4344

45+
/**
46+
* Removes the given job instance.
47+
*
48+
* @param jobInstance The job instance to remove
49+
*/
50+
void removeJobInstance(JobInstance<?> jobInstance);
51+
52+
/**
53+
* Removes job instances that are in one of the given states and where the last execution time is lower than the given one.
54+
*
55+
* @param states The states for which job instances to remove
56+
* @param executionTimeOlderThan Job instance with a last execution lower than this instant are removed. May be <code>null</code>
57+
* @param partitionKey The partition key
58+
* @return the number of removed job instances
59+
*/
60+
int removeJobInstances(Set<JobInstanceState> states, Instant executionTimeOlderThan, PartitionKey partitionKey);
61+
4462
/**
4563
* Returns a schedule time ordered list of job instances that need to be processed for the given partition.
4664
*

core/impl/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
<dependency>
2020
<groupId>${project.groupId}</groupId>
2121
<artifactId>blaze-actor-core-api</artifactId>
22-
<version>${version.blaze-notify-actor}</version>
22+
<version>${version.blaze-actor}</version>
2323
</dependency>
2424
<dependency>
2525
<groupId>${project.groupId}</groupId>
2626
<artifactId>blaze-actor-core-impl</artifactId>
27-
<version>${version.blaze-notify-actor}</version>
27+
<version>${version.blaze-actor}</version>
2828
<scope>runtime</scope>
2929
</dependency>
3030
<!-- Test dependencies -->

jpa/model/src/main/java/com/blazebit/job/jpa/model/JpaPartitionKey.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.blazebit.job.jpa.model;
1818

1919
import com.blazebit.job.JobInstance;
20+
import com.blazebit.job.JobInstanceState;
2021
import com.blazebit.job.PartitionKey;
2122

2223
import java.util.ArrayList;
@@ -55,6 +56,13 @@ public interface JpaPartitionKey extends PartitionKey {
5556
*/
5657
String getScheduleAttributeName();
5758

59+
/**
60+
* Returns the attribute name of the last execution attribute of the entity type as given in {@link #getJobInstanceType()}.
61+
*
62+
* @return the attribute name of the last execution attribute
63+
*/
64+
String getLastExecutionAttributeName();
65+
5866
/**
5967
* Returns the attribute name of the partition key attribute of the entity type as given in {@link #getJobInstanceType()}.
6068
*
@@ -70,13 +78,21 @@ public interface JpaPartitionKey extends PartitionKey {
7078
*/
7179
String getStatePredicate(String jobAlias);
7280

81+
/**
82+
* Returns the expression for the state of a job.
83+
*
84+
* @param jobAlias The FROM clause alias for the job
85+
* @return The state JPQL expression or an empty string
86+
*/
87+
String getStateExpression(String jobAlias);
88+
7389
/**
7490
* Returns the state value for ready jobs that must be bound in a query to the parameter name "readyState".
7591
* A <code>null</code> value means that no parameter should be bound.
7692
*
7793
* @return The ready state value of <code>null</code>
7894
*/
79-
Object getReadyStateValue();
95+
Function<JobInstanceState, Object> getStateValueMappingFunction();
8096

8197
/**
8298
* Returns the join fetches that should be applied to a query when fetching a job for this partition.
@@ -138,6 +154,14 @@ interface JpaPartitionKeyBuilder {
138154
*/
139155
JpaPartitionKeyBuilder withScheduleAttributeName(String scheduleAttributeName);
140156

157+
/**
158+
* Sets the given job last execution attribute name.
159+
*
160+
* @param lastExecutionAttributeName The job last execution attribute name
161+
* @return this for chaining
162+
*/
163+
JpaPartitionKeyBuilder withLastExecutionAttributeName(String lastExecutionAttributeName);
164+
141165
/**
142166
* Sets the given job partition key attribute name.
143167
*
@@ -155,12 +179,12 @@ interface JpaPartitionKeyBuilder {
155179
JpaPartitionKeyBuilder withStateAttributeName(String stateAttributeName);
156180

157181
/**
158-
* Sets the given job ready state value.
182+
* Sets the given state value mapping function.
159183
*
160-
* @param readyStateValue The job ready state value
184+
* @param stateValueMappingFunction The state value mapping function
161185
* @return this for chaining
162186
*/
163-
JpaPartitionKeyBuilder withReadyStateValue(Object readyStateValue);
187+
JpaPartitionKeyBuilder withStateValueMappingFunction(Function<JobInstanceState, Object> stateValueMappingFunction);
164188

165189
/**
166190
* Sets the given job attributes to fetch.
@@ -190,9 +214,10 @@ static JpaPartitionKeyBuilder builder() {
190214
Function<String, String> partitionPredicateProvider0;
191215
String idAttributeName0;
192216
String scheduleAttributeName0;
217+
String lastExecutionAttributeName0;
193218
String partitionKeyAttributeName0;
194219
String stateAttributeName0;
195-
Object readyStateValue0;
220+
Function<JobInstanceState, Object> stateValueMappingFunction0;
196221
List<String> fetches0 = new ArrayList<>();
197222

198223
@Override
@@ -225,6 +250,12 @@ public JpaPartitionKeyBuilder withScheduleAttributeName(String scheduleAttribute
225250
return this;
226251
}
227252

253+
@Override
254+
public JpaPartitionKeyBuilder withLastExecutionAttributeName(String lastExecutionAttributeName) {
255+
this.lastExecutionAttributeName0 = lastExecutionAttributeName;
256+
return this;
257+
}
258+
228259
@Override
229260
public JpaPartitionKeyBuilder withPartitionKeyAttributeName(String partitionKeyAttributeName) {
230261
this.partitionKeyAttributeName0 = partitionKeyAttributeName;
@@ -238,8 +269,8 @@ public JpaPartitionKeyBuilder withStateAttributeName(String stateAttributeName)
238269
}
239270

240271
@Override
241-
public JpaPartitionKeyBuilder withReadyStateValue(Object readyStateValue) {
242-
this.readyStateValue0 = readyStateValue;
272+
public JpaPartitionKeyBuilder withStateValueMappingFunction(Function<JobInstanceState, Object> stateValueMappingFunction) {
273+
this.stateValueMappingFunction0 = stateValueMappingFunction;
243274
return this;
244275
}
245276

@@ -257,9 +288,10 @@ public JpaPartitionKey build() {
257288
private final Function<String, String> partitionPredicateProvider = partitionPredicateProvider0;
258289
private final String idAttributeName = idAttributeName0;
259290
private final String scheduleAttributeName = scheduleAttributeName0;
291+
private final String lastExecutionAttributeName = lastExecutionAttributeName0;
260292
private final String partitionKeyAttributeName = partitionKeyAttributeName0;
261293
private final String stateAttributeName = stateAttributeName0;
262-
private final Object readyStateValue = readyStateValue0;
294+
private final Function<JobInstanceState, Object> stateValueMappingFunction = stateValueMappingFunction0;
263295
private final String[] fetches = fetches0.toArray(new String[fetches0.size()]);
264296

265297
@Override
@@ -282,6 +314,11 @@ public String getScheduleAttributeName() {
282314
return scheduleAttributeName;
283315
}
284316

317+
@Override
318+
public String getLastExecutionAttributeName() {
319+
return lastExecutionAttributeName;
320+
}
321+
285322
@Override
286323
public String getPartitionKeyAttributeName() {
287324
return partitionKeyAttributeName;
@@ -293,8 +330,13 @@ public String getStatePredicate(String jobAlias) {
293330
}
294331

295332
@Override
296-
public Object getReadyStateValue() {
297-
return readyStateValue;
333+
public String getStateExpression(String jobAlias) {
334+
return jobAlias + "." + stateAttributeName;
335+
}
336+
337+
@Override
338+
public Function<JobInstanceState, Object> getStateValueMappingFunction() {
339+
return stateValueMappingFunction;
298340
}
299341

300342
@Override

jpa/storage/src/main/java/com/blazebit/job/jpa/storage/JpaJobManager.java

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.blazebit.job.spi.TransactionSupport;
3434

3535
import javax.persistence.EntityManager;
36+
import javax.persistence.Query;
3637
import javax.persistence.TypedQuery;
3738
import javax.persistence.metamodel.EntityType;
3839
import java.io.Serializable;
@@ -42,6 +43,7 @@
4243
import java.util.List;
4344
import java.util.Map;
4445
import java.util.Set;
46+
import java.util.function.Function;
4547

4648
/**
4749
* A JPA based implementation of the {@link JobManager} interface.
@@ -257,21 +259,21 @@ public List<JobInstance<?>> getJobInstancesToProcess(int partition, int partitio
257259
String partitionKeyAttributeName = jpaPartitionKey.getPartitionKeyAttributeName();
258260
String scheduleAttributeName = jpaPartitionKey.getScheduleAttributeName();
259261
String statePredicate = jpaPartitionKey.getStatePredicate("e");
260-
Object readyStateValue = jpaPartitionKey.getReadyStateValue();
262+
Function<JobInstanceState, Object> stateValueMappingFunction = jpaPartitionKey.getStateValueMappingFunction();
261263
String joinFetches = jpaPartitionKey.getJoinFetches("e");
262264
TypedQuery<? extends JobInstance<?>> typedQuery = entityManager.createQuery(
263265
"SELECT e FROM " + jobInstanceType.getName() + " e " +
264266
joinFetches + " " +
265-
"WHERE " + statePredicate + " " +
267+
"WHERE e." + scheduleAttributeName + " <= :now " +
266268
(partitionPredicate.isEmpty() ? "" : "AND " + partitionPredicate + " ") +
267269
(partitionCount > 1 ? "AND MOD(e." + partitionKeyAttributeName + ", " + partitionCount + ") = " + partition + " " : "") +
268-
"AND e." + scheduleAttributeName + " <= :now " +
270+
(statePredicate == null || statePredicate.isEmpty() ? "" : "AND " + statePredicate + " ") +
269271
"ORDER BY e." + scheduleAttributeName + " ASC, e." + idAttributeName + " ASC",
270272
jobInstanceType
271273
);
272274
typedQuery.setParameter("now", clock.instant());
273-
if (readyStateValue != null) {
274-
typedQuery.setParameter("readyState", readyStateValue);
275+
if (stateValueMappingFunction != null) {
276+
typedQuery.setParameter("readyState", stateValueMappingFunction);
275277
}
276278
List<JobInstance<?>> jobInstances = (List<JobInstance<?>>) (List) typedQuery
277279
// TODO: lockMode for update? advisory locks?
@@ -298,17 +300,18 @@ public Instant getNextSchedule(int partition, int partitionCount, PartitionKey p
298300
String partitionKeyAttributeName = jpaPartitionKey.getPartitionKeyAttributeName();
299301
String scheduleAttributeName = jpaPartitionKey.getScheduleAttributeName();
300302
String statePredicate = jpaPartitionKey.getStatePredicate("e");
301-
Object readyStateValue = jpaPartitionKey.getReadyStateValue();
303+
Function<JobInstanceState, Object> stateValueMappingFunction = jpaPartitionKey.getStateValueMappingFunction();
302304
TypedQuery<Instant> typedQuery = entityManager.createQuery(
303305
"SELECT e." + scheduleAttributeName + " FROM " + jobInstanceType.getName() + " e " +
304-
"WHERE " + statePredicate + " " +
306+
"WHERE 1=1 " +
307+
(statePredicate == null || statePredicate.isEmpty() ? "" : "AND " + statePredicate + " ") +
305308
(partitionPredicate.isEmpty() ? "" : "AND " + partitionPredicate + " ") +
306309
(partitionCount > 1 ? "AND MOD(e." + partitionKeyAttributeName + ", " + partitionCount + ") = " + partition + " " : "") +
307310
"ORDER BY e." + scheduleAttributeName + " ASC, e." + idAttributeName + " ASC",
308311
Instant.class
309312
);
310-
if (readyStateValue != null) {
311-
typedQuery.setParameter("readyState", readyStateValue);
313+
if (stateValueMappingFunction != null) {
314+
typedQuery.setParameter("readyState", stateValueMappingFunction);
312315
}
313316

314317
List<Instant> nextSchedule = typedQuery.setMaxResults(1).getResultList();
@@ -325,8 +328,50 @@ public void updateJobInstance(JobInstance<?> jobInstance) {
325328
}
326329
if (!entityManager.contains(jobInstance)) {
327330
entityManager.merge(jobInstance);
331+
if (jobInstance.getState() == JobInstanceState.REMOVED) {
332+
entityManager.flush();
333+
removeJobInstance(jobInstance);
334+
}
335+
} else if (jobInstance.getState() == JobInstanceState.REMOVED) {
336+
removeJobInstance(jobInstance);
328337
}
329338

330339
entityManager.flush();
331340
}
341+
342+
@Override
343+
public void removeJobInstance(JobInstance<?> jobInstance) {
344+
entityManager.remove(jobInstance);
345+
}
346+
347+
@Override
348+
public int removeJobInstances(Set<JobInstanceState> states, Instant executionTimeOlderThan, PartitionKey partitionKey) {
349+
JpaPartitionKey jpaPartitionKey = (JpaPartitionKey) partitionKey;
350+
String stateExpression = jpaPartitionKey.getStateExpression("i");
351+
if (stateExpression != null && !stateExpression.isEmpty() && !states.isEmpty()) {
352+
StringBuilder sb = new StringBuilder();
353+
sb.append("DELETE FROM ").append(partitionKey.getJobInstanceType().getName()).append(" i ")
354+
.append("WHERE ").append(stateExpression).append(" IN (");
355+
int i = 0;
356+
int size = states.size();
357+
for (; i != size; i++) {
358+
sb.append("param").append(i).append(',');
359+
}
360+
sb.setCharAt(sb.length() - 1, ')');
361+
if (executionTimeOlderThan != null) {
362+
sb.append(" AND i.").append(jpaPartitionKey.getLastExecutionAttributeName()).append(" < :lastExecution");
363+
}
364+
Query query = entityManager.createQuery(sb.toString());
365+
i = 0;
366+
for (JobInstanceState state : states) {
367+
query.setParameter("param" + i, jpaPartitionKey.getStateValueMappingFunction().apply(state));
368+
i++;
369+
}
370+
if (executionTimeOlderThan != null) {
371+
query.setParameter("lastExecution", executionTimeOlderThan);
372+
}
373+
return query.executeUpdate();
374+
}
375+
return 0;
376+
}
332377
}

0 commit comments

Comments
 (0)