Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#1857] Adding triggerCount
Browse files Browse the repository at this point in the history
Adds triggerCount to ProcessInstanceNode events
  • Loading branch information
fjtirado committed Mar 6, 2025
1 parent 83476d7 commit 2b77285
Show file tree
Hide file tree
Showing 20 changed files with 553 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public T doRetriggerInstanceInError(String processId, String processInstanceId)
processInstance.error().get().retrigger();

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
throw new ProcessInstanceExecutionException(processInstance.id(), processInstance.error().get().failedNodeId(), processInstance.error().get().errorMessage());
throw ProcessInstanceExecutionException.fromError(processInstance);
} else {
return buildOkResponse(processInstance.variables());
}
Expand All @@ -181,7 +181,7 @@ public T doSkipInstanceInError(String processId, String processInstanceId) {
processInstance.error().get().skip();

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
throw new ProcessInstanceExecutionException(processInstance.id(), processInstance.error().get().failedNodeId(), processInstance.error().get().errorMessage());
throw ProcessInstanceExecutionException.fromError(processInstance);
} else {
return buildOkResponse(processInstance.variables());
}
Expand All @@ -194,7 +194,7 @@ public T doTriggerNodeInstanceId(String processId, String processInstanceId, Str
processInstance.triggerNode(nodeId);

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
throw new ProcessInstanceExecutionException(processInstance.id(), processInstance.error().get().failedNodeId(), processInstance.error().get().errorMessage());
throw ProcessInstanceExecutionException.fromError(processInstance);
} else {
return buildOkResponse(processInstance.variables());
}
Expand All @@ -207,7 +207,7 @@ public T doRetriggerNodeInstanceId(String processId, String processInstanceId, S
processInstance.retriggerNodeInstance(nodeInstanceId);

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
throw new ProcessInstanceExecutionException(processInstance.id(), processInstance.error().get().failedNodeId(), processInstance.error().get().errorMessage());
throw ProcessInstanceExecutionException.fromError(processInstance);
} else {
return buildOkResponse(processInstance.variables());
}
Expand All @@ -220,7 +220,7 @@ public T doCancelNodeInstanceId(String processId, String processInstanceId, Stri
processInstance.cancelNodeInstance(nodeInstanceId);

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
throw new ProcessInstanceExecutionException(processInstance.id(), processInstance.error().get().failedNodeId(), processInstance.error().get().errorMessage());
throw ProcessInstanceExecutionException.fromError(processInstance);
} else {
return buildOkResponse(processInstance.variables());
}
Expand All @@ -233,7 +233,7 @@ public T doCancelProcessInstanceId(String processId, String processInstanceId) {
processInstance.abort();

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
throw new ProcessInstanceExecutionException(processInstance.id(), processInstance.error().get().failedNodeId(), processInstance.error().get().errorMessage());
throw ProcessInstanceExecutionException.fromError(processInstance);
} else {
return buildOkResponse(processInstance.variables());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void testMapProcessInstanceDuplicatedException() {

@Test
void testMapProcessInstanceExecutionException() {
Object response = tested.mapException(new ProcessInstanceExecutionException("processInstanceId", "nodeId", "message"));
Object response = tested.mapException(new ProcessInstanceExecutionException("processInstanceId", "nodeId", "nodeInstanceId", "message"));
assertThat(response).isEqualTo(internalErrorResponse);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,11 @@ default boolean isCancelled() {
default Map<String, Object> getMetaData() {
return Collections.emptyMap();
}

/**
* Returns the number of times the node has been triggered
*
* @return trigger count
*/
int triggerCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public interface ProcessError {

String failedNodeId();

String failedNodeInstanceId();

String errorMessage();

default Throwable errorCause() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public interface ProcessInstance<T> {
default ProcessInstance<T> checkError() {
Optional<ProcessError> error = error();
if (error.isPresent()) {
throw new ProcessInstanceExecutionException(id(), error.get().failedNodeId(), error.get().errorMessage(), error.get().errorCause());
ProcessInstanceExecutionException.fromError(this);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ public class ProcessInstanceExecutionException extends RuntimeException {

private final String processInstanceId;
private final String failedNodeId;
private final String failedNodeInstanceId;
private final String errorMessage;

public ProcessInstanceExecutionException(String processInstanceId, String failedNodeId, String errorMessage) {
this(processInstanceId, failedNodeId, errorMessage, null);
public ProcessInstanceExecutionException(String processInstanceId, String failedNodeId, String failedNodeInstanceId, String errorMessage) {
this(processInstanceId, failedNodeId, failedNodeInstanceId, errorMessage, null);
}

public ProcessInstanceExecutionException(String processInstanceId, String failedNodeId, String errorMessage, Throwable rootCause) {
public ProcessInstanceExecutionException(String processInstanceId, String failedNodeId, String failedNodeInstanceId, String errorMessage, Throwable rootCause) {
super("Process instance with id " + processInstanceId + " failed because of " + errorMessage, rootCause);
this.processInstanceId = processInstanceId;
this.failedNodeId = failedNodeId;
this.failedNodeInstanceId = failedNodeInstanceId;
this.errorMessage = errorMessage;
}

Expand Down Expand Up @@ -70,4 +72,12 @@ public String getErrorMessage() {
return errorMessage;
}

public String getFailedNodeInstanceId() {
return failedNodeInstanceId;
}

public static ProcessInstanceExecutionException fromError(ProcessInstance<?> processInstance) {
ProcessError error = processInstance.error().get();
return new ProcessInstanceExecutionException(processInstance.id(), error.failedNodeId(), error.failedNodeInstanceId(), error.errorMessage(), error.errorCause());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class ProcessInstanceNodeEventBody implements KogitoMarshallEventSupport,

private Date slaDueDate;

private int triggerCount;

private Map<String, Object> data;

@Override
Expand Down Expand Up @@ -121,6 +123,10 @@ public Date getEventDate() {
return eventDate;
}

public int triggerCount() {
return triggerCount;
}

public String getEventUser() {
return eventUser;
}
Expand Down Expand Up @@ -258,6 +264,11 @@ public Builder nodeName(String nodeName) {
return this;
}

public Builder triggerCount(int triggerCount) {
instance.triggerCount = triggerCount;
return this;
}

public Builder nodeType(String nodeType) {
instance.nodeType = nodeType;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.util.Date;
import java.util.Map;
import java.util.function.BiConsumer;

import org.kie.api.event.process.ProcessEvent;
import org.kie.api.event.process.ProcessNodeEvent;
import org.kie.api.event.process.ProcessNodeTriggeredEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeEventBody;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
Expand Down Expand Up @@ -91,7 +93,16 @@ protected ProcessInstanceStateDataEvent adapt(ProcessEvent event, Integer eventT
return piEvent;
}

protected ProcessInstanceNodeDataEvent toProcessInstanceNodeEvent(ProcessNodeTriggeredEvent event, int eventType) {
return toProcessInstanceNodeEvent(event, eventType, (k, v) -> k.triggerCount(v.triggerCount()));
}

protected ProcessInstanceNodeDataEvent toProcessInstanceNodeEvent(ProcessNodeEvent event, int eventType) {
return toProcessInstanceNodeEvent(event, eventType, (k, v) -> {
});
}

protected <T extends ProcessNodeEvent> ProcessInstanceNodeDataEvent toProcessInstanceNodeEvent(T event, int eventType, BiConsumer<ProcessInstanceNodeEventBody.Builder, T> consumer) {
Map<String, Object> metadata = AdapterHelper.buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance());
KogitoWorkflowProcessInstance pi = (KogitoWorkflowProcessInstance) event.getProcessInstance();
KogitoNodeInstance nodeInstance = (KogitoNodeInstance) event.getNodeInstance();
Expand All @@ -107,7 +118,7 @@ protected ProcessInstanceNodeDataEvent toProcessInstanceNodeEvent(ProcessNodeEve
.nodeInstanceId(event.getNodeInstance().getId())
.nodeDefinitionId(event.getNodeInstance().getNode().getUniqueId())
.slaDueDate(nodeInstance.getSlaDueDate());

consumer.accept(builder, event);
if (event.getNodeInstance() instanceof KogitoWorkItemNodeInstance workItemNodeInstance && workItemNodeInstance.getWorkItem() != null) {
KogitoWorkItem workItem = workItemNodeInstance.getWorkItem();
builder.workItemId(workItem.getStringId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,22 @@

import org.kie.api.event.process.ProcessNodeTriggeredEvent;
import org.kie.api.runtime.KieRuntime;
import org.kie.api.runtime.process.NodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;

public class KogitoProcessNodeTriggeredEventImpl extends AbstractProcessNodeEvent implements ProcessNodeTriggeredEvent {

private static final long serialVersionUID = 510l;

public KogitoProcessNodeTriggeredEventImpl(NodeInstance nodeInstance, KieRuntime kruntime, String identity) {
private int triggerCount;

public KogitoProcessNodeTriggeredEventImpl(KogitoNodeInstance nodeInstance, KieRuntime kruntime, String identity) {
super(nodeInstance, nodeInstance.getProcessInstance(), kruntime, identity);
this.triggerCount = nodeInstance.triggerCount();
}

@Override
public int triggerCount() {
return triggerCount;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public abstract class NodeInstanceImpl implements org.jbpm.workflow.instance.Nod
protected String slaTimerId;
protected Date triggerTime;
protected Date leaveTime;
protected int triggerCount;

protected transient CancelType cancelType;

Expand All @@ -112,6 +113,11 @@ public String getStringId() {
return this.id;
}

@Override
public int triggerCount() {
return triggerCount;
}

public void setNodeId(WorkflowElementIdentifier nodeId) {
this.nodeId = nodeId;
}
Expand All @@ -132,6 +138,10 @@ public String getNodeDefinitionId() {
return getNode().getUniqueId();
}

public boolean isRetrigger() {
return triggerCount > 1;
}

@Override
public int getLevel() {
return this.level;
Expand Down Expand Up @@ -220,6 +230,7 @@ public void cancel(CancelType cancelType) {

@Override
public final void trigger(KogitoNodeInstance from, String type) {
triggerCount++;
boolean hidden = false;
if (getNode().getMetaData().get(HIDDEN) != null) {
hidden = true;
Expand Down Expand Up @@ -257,7 +268,7 @@ public final void trigger(KogitoNodeInstance from, String type) {
return;
} else {
logger.error("Node instance causing process instance error in id {} in a transactional environment (Wrapping)", this.getStringId());
throw new ProcessInstanceExecutionException(this.getProcessInstance().getId(), this.getNodeDefinitionId(), e.getMessage(), e);
throw new ProcessInstanceExecutionException(this.getProcessInstance().getId(), this.getNodeDefinitionId(), this.getId(), e.getMessage(), e);
}
// stop after capturing error
}
Expand Down Expand Up @@ -743,4 +754,9 @@ protected void mapDynamicOutputData(Map<String, Object> results) {
}
}
}

public void internalSetTriggerCount(int triggerCount) {
this.triggerCount = triggerCount;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1321,13 +1321,18 @@ public void internalSetErrorNodeId(String errorNodeId) {
this.nodeIdInError = errorNodeId;
}

public void internalSetErrorNodeInstanceId(String errorNodeInstanceId) {
this.nodeInstanceIdInError = errorNodeInstanceId;
}

public void internalSetErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
this.errorCause = Optional.empty();
}

public void internalSetError(ProcessInstanceExecutionException e) {
this.nodeIdInError = e.getFailedNodeId();
this.nodeInstanceIdInError = e.getFailedNodeInstanceId();
Throwable rootException = getRootException(e);
this.errorMessage = rootException instanceof MessageException ? rootException.getMessage() : rootException.getClass().getCanonicalName() + " - " + rootException.getMessage();
this.errorCause = Optional.of(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,6 @@ public void retriggerNodeInstance(String nodeInstanceId) {
.filter(ni -> ni.getStringId().equals(nodeInstanceId))
.findFirst()
.orElseThrow(() -> new NodeInstanceNotFoundException(this.id, nodeInstanceId));

((NodeInstanceImpl) nodeInstance).retrigger(true);
removeOnFinish();
}
Expand Down Expand Up @@ -698,6 +697,7 @@ protected ProcessError buildProcessError() {

final String errorMessage = pi.getErrorMessage();
final String nodeInError = pi.getNodeIdInError();
final String nodeInstanceInError = pi.getNodeInstanceIdInError();
final Throwable errorCause = pi.getErrorCause().orElse(null);
return new ProcessError() {

Expand All @@ -706,6 +706,11 @@ public String failedNodeId() {
return nodeInError;
}

@Override
public String failedNodeInstanceId() {
return nodeInstanceInError;
}

@Override
public String errorMessage() {
return errorMessage;
Expand All @@ -719,10 +724,8 @@ public Throwable errorCause() {
@Override
public void retrigger() {
WorkflowProcessInstanceImpl pInstance = (WorkflowProcessInstanceImpl) processInstance();
NodeInstance ni = pInstance.getByNodeDefinitionId(nodeInError, pInstance.getNodeContainer());
pInstance.setState(STATE_ACTIVE);
pInstance.internalSetErrorNodeId(null);
pInstance.internalSetErrorMessage(null);
NodeInstance ni = pInstance.getNodeInstance(pi.getNodeInstanceIdInError(), true);
clearError(pInstance);
org.kie.api.runtime.process.NodeInstanceContainer nodeInstanceContainer = ni.getNodeInstanceContainer();
if (nodeInstanceContainer instanceof NodeInstance) {
((NodeInstance) nodeInstanceContainer).internalSetTriggerTime(new Date());
Expand All @@ -734,12 +737,17 @@ public void retrigger() {
@Override
public void skip() {
WorkflowProcessInstanceImpl pInstance = (WorkflowProcessInstanceImpl) processInstance();
NodeInstance ni = pInstance.getByNodeDefinitionId(nodeInError, pInstance.getNodeContainer());
NodeInstance ni = pInstance.getNodeInstance(pi.getNodeInstanceIdInError(), true);
clearError(pInstance);
((NodeInstanceImpl) ni).triggerCompleted(Node.CONNECTION_DEFAULT_TYPE, true);
removeOnFinish();
}

private void clearError(WorkflowProcessInstanceImpl pInstance) {
pInstance.setState(STATE_ACTIVE);
pInstance.internalSetErrorNodeId(null);
pInstance.internalSetErrorNodeInstanceId(null);
pInstance.internalSetErrorMessage(null);
((NodeInstanceImpl) ni).triggerCompleted(Node.CONNECTION_DEFAULT_TYPE, true);
removeOnFinish();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ private RuleFlowProcessInstance buildWorkflow(KogitoProcessInstanceProtobuf.Proc
processInstance.internalSetErrorNodeId(processInstanceProtobuf.getErrorNodeId());
}

if (processInstanceProtobuf.hasErrorNodeInstanceId()) {
processInstance.internalSetErrorNodeInstanceId(processInstanceProtobuf.getErrorNodeInstanceId());
}

if (processInstanceProtobuf.hasErrorMessage()) {
processInstance.internalSetErrorMessage(processInstanceProtobuf.getErrorMessage());
}
Expand Down Expand Up @@ -195,6 +199,8 @@ private void setCommonNodeInstanceData(RuleFlowProcessInstance processInstance,
nodeInstanceImpl.setProcessInstance(processInstance);
}

nodeInstanceImpl.internalSetTriggerCount(nodeInstanceProtobuf.getTriggerCount());

nodeInstanceImpl.setLevel(nodeInstanceProtobuf.getLevel() == 0 ? 1 : nodeInstanceProtobuf.getLevel());
}

Expand Down
Loading

0 comments on commit 2b77285

Please sign in to comment.