cancel
Showing results for 
Search instead for 
Did you mean: 

DB updated not in time during process execution

kayj
Champ in-the-making
Champ in-the-making
Hi,

I need to get notified (without adding ExecutionListeners to the BPMN model) about events. I'm using ActivitiEventListener to get notified.

mProcessEngine.getRuntimeService().addEventListener( new ActivitiEventListener() {
  @Override
  public void onEvent(ActivitiEvent event) {
  }
  …
}

Unfortunately it doesn't work as expected. It seems like the Activiti DB is not updated until a blocking entity is reached (e.g. waiting for a signaling). As a result of this behaviour I can't access variables from the DB which have already been written by a ScriptTask in the BPMN model, but are not yet written to the DB. Is this intended behaviour? Is it possible to force an update? Or do you have  different options for the requirement of getting nofied without adding listeners to the BPMN model?

Thank in advance,

Kathrin
10 REPLIES 10

vasile_dirla
Star Contributor
Star Contributor
do you have a JUnit for this scenario?
if so please upload it here.

kayj
Champ in-the-making
Champ in-the-making
I created a simple test project to show the issue.

The application initializes the Activiti Engine and starts a simple process. The initial map includes one var "clientIp = '0.0.0.0' ". The process uses a ScriptTask to write the var "clientIp" to a new value "192.168.1.1". Then it reaches a gateway which waits for signaling.
After 10 seconds the signal is send.

As you can see in the log, the "clientIp" is not available at all until the signal has been send.
The behaviour I'm expecting is that the data can already be accessed (that must obviously be cached somewhere in the engine) even before the data is persisted to the DB. Currently it doesn't make sense at all that these events are thrown by the engine if the data, like the var map, can't be accessed.

Java Application:

<code>
package activit.event;

import java.util.HashMap;
import java.util.Map;

import org.activiti.engine.ProcessEngine;
import org.activiti.engine.ProcessEngineConfiguration;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventListener;
import org.activiti.engine.delegate.event.impl.ActivitiActivityCancelledEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiActivityEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiEntityEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiEntityExceptionEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiEntityWithVariablesEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiErrorEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiEventDispatcherImpl;
import org.activiti.engine.delegate.event.impl.ActivitiEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiMessageEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiProcessCancelledEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiSequenceFlowTakenEventImpl;
import org.activiti.engine.delegate.event.impl.ActivitiVariableEventImpl;
import org.activiti.engine.history.HistoricVariableInstance;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.ProcessInstance;

public class Application {

private static ProcessEngine mProcessEngine;

private static String PROCESS_PATH = "SimpleProcess.bpmn20.xml";
private static String CLIENT_IP = "clientIp";
private static String PROCESS_ID = "SimpleProcessId";
private static String GATEWAY_ID = "EventGatewayId";
private static String NO_ERROR_SIGNAL ="noerror";

public static void main(String[] args) {
  initializeEngine();
}

private static void initializeEngine(){
  mProcessEngine = ProcessEngineConfiguration
    .createStandaloneInMemProcessEngineConfiguration()
    .buildProcessEngine();

  mProcessEngine.getRepositoryService().createDeployment()
    .addClasspathResource( PROCESS_PATH)
    .deploy();

  addListener();
 
  Map<String, Object> variableMap = new HashMap<String, Object>();
  variableMap.put(CLIENT_IP, "0.0.0.0");      
  ProcessInstance processInstance = mProcessEngine.getRuntimeService().startProcessInstanceByKey(PROCESS_ID, variableMap);

  triggerSignal(processInstance);
}

private static void triggerSignal(final ProcessInstance processInstance) {
  new Thread( new Runnable(){
   public void run() {
    try {
     Thread.sleep(10000);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    System.out.println("Signaling Task! ————————–");

    final Execution execution = mProcessEngine.getRuntimeService().createExecutionQuery()
           .processInstanceId( processInstance.getId() )
           .activityId( GATEWAY_ID)
           .singleResult();
   
    mProcessEngine.getRuntimeService().signalEventReceived(NO_ERROR_SIGNAL, execution.getId());
   }
  }).start();
}

private static void addListener() {
  mProcessEngine.getRuntimeService().addEventListener( new ActivitiEventListener() {

   public void onEvent(ActivitiEvent event) {
    String result = "TEST: " +event.getType().toString() + ": ";
   
    if ( event.getClass() == ActivitiActivityEventImpl.class){
     ActivitiActivityEventImpl bla = (ActivitiActivityEventImpl)event;
     result += (bla.getActivityId() + ": " + bla.getActivityName());
    }else if ( event.getClass() == ActivitiVariableEventImpl.class){
     ActivitiVariableEventImpl blub = (ActivitiVariableEventImpl)event;
     result += (blub.getVariableName() + ": " +blub.getVariableValue());
    }else if(event.getClass() == ActivitiActivityCancelledEventImpl.class){
     ActivitiActivityCancelledEventImpl blub = (ActivitiActivityCancelledEventImpl)event;
     result += (blub.getActivityId());
    }else if(event.getClass() == ActivitiEntityEventImpl.class){
     ActivitiEntityEventImpl blub = (ActivitiEntityEventImpl)event;
     result += (blub.getExecutionId());
    }else if(event.getClass() == ActivitiEntityExceptionEventImpl.class){
     ActivitiEntityExceptionEventImpl blub = (ActivitiEntityExceptionEventImpl)event;
     result += (blub.getExecutionId());
    }else if(event.getClass() == ActivitiEntityWithVariablesEventImpl.class){
     ActivitiEntityWithVariablesEventImpl blub = (ActivitiEntityWithVariablesEventImpl)event;
     result += (blub.getExecutionId());
    }else if(event.getClass() == ActivitiErrorEventImpl.class){
     ActivitiErrorEventImpl blub = (ActivitiErrorEventImpl)event;
     result += (blub.getActivityId());
    }else if(event.getClass() == ActivitiActivityCancelledEventImpl.class){
     ActivitiActivityCancelledEventImpl blub = (ActivitiActivityCancelledEventImpl)event;
     result += (blub.getActivityId());
    }else if(event.getClass() == ActivitiSequenceFlowTakenEventImpl.class){
     ActivitiSequenceFlowTakenEventImpl blub = (ActivitiSequenceFlowTakenEventImpl)event;
     result += (blub.getId());
    }else if(event.getClass() == ActivitiProcessCancelledEventImpl.class){
     ActivitiProcessCancelledEventImpl blub = (ActivitiProcessCancelledEventImpl)event;
     result += (blub.getExecutionId());
    }else if(event.getClass() == ActivitiMessageEventImpl.class){
     ActivitiMessageEventImpl blub = (ActivitiMessageEventImpl)event;
     result += (blub.getActivityId());
    }else if(event.getClass() == ActivitiEventDispatcherImpl.class){
     ActivitiEventDispatcherImpl blub = (ActivitiEventDispatcherImpl)event;
     result += blub.isEnabled();
    }else if(event.getClass() == ActivitiEventImpl.class){
     ActivitiEventImpl blub = (ActivitiEventImpl)event;
     result += (blub.getExecutionId());
    }
   
    result += ", " +printClientIp(event);
    System.out.println(result);
   }

   public boolean isFailOnException() {
    return false;
   }
  });
}


protected static String printClientIp(ActivitiEvent event) {
  HistoricVariableInstance clientIpRaw = mProcessEngine.getHistoryService().createHistoricVariableInstanceQuery()
    .processInstanceId(event.getProcessInstanceId())
    .variableName(CLIENT_IP)
    .singleResult();
 
  if ( clientIpRaw != null){
   return (String) clientIpRaw.getValue();
  }else{
   return "''";
  }
}
}

</code>

BPMN Model:
<code>
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlnsSmiley Surprisedmgdc="http://www.omg.org/spec/DD/20100524/DC" xmlnsSmiley Surprisedmgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
  <signal id="error" name="error"></signal>
  <signal id="noerror" name="noerror"></signal>
  <process id="SimpleProcessId" name="Signal Handling" isExecutable="true">
    <startEvent id="ProcessStartId" name="Process Start"></startEvent>
    <eventBasedGateway id="EventGatewayId" name="Event Gateway"></eventBasedGateway>
    <intermediateCatchEvent id="SignalCatchEventErrorId" name="SignalCatchEvent">
      <signalEventDefinition signalRef="error"></signalEventDefinition>
    </intermediateCatchEvent>
    <intermediateCatchEvent id="SignalCatchEventNoError" name="SignalCatchEvent">
      <signalEventDefinition signalRef="noerror"></signalEventDefinition>
    </intermediateCatchEvent>
    <sequenceFlow id="flow11" sourceRef="EventGatewayId" targetRef="SignalCatchEventNoError"></sequenceFlow>
    <sequenceFlow id="flow12" sourceRef="EventGatewayId" targetRef="SignalCatchEventErrorId"></sequenceFlow>
    <sequenceFlow id="flow13" sourceRef="SignalCatchEventErrorId" targetRef="SignalHandlingEndErrorId"></sequenceFlow>
    <endEvent id="SignalHandlingEndNoErrorId" name="Signal Handling End No Error"></endEvent>
    <endEvent id="SignalHandlingEndErrorId" name="Signal Handling End Error"></endEvent>
    <sequenceFlow id="flow16" sourceRef="ProcessStartId" targetRef="ScriptTaskId"></sequenceFlow>
    <scriptTask id="ScriptTaskId" name="Script Task" scriptFormat="groovy" activiti:autoStoreVariables="false">
      <script>out.println "write client ip: 192.168.1.1 —————————————–";
execution.setVariable("clientIp","192.168.1.1");</script>
    </scriptTask>
    <sequenceFlow id="flow19" sourceRef="ScriptTaskId" targetRef="EventGatewayId"></sequenceFlow>
    <sequenceFlow id="flow20" sourceRef="SignalCatchEventNoError" targetRef="SignalHandlingEndNoErrorId"></sequenceFlow>
  </process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_SimpleProcessId">
    <bpmndi:BPMNPlane bpmnElement="SimpleProcessId" id="BPMNPlane_SimpleProcessId">
      <bpmndi:BPMNShape bpmnElement="ProcessStartId" id="BPMNShape_ProcessStartId">
        <omgdc:Bounds height="35.0" width="35.0" x="70.0" y="270.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="EventGatewayId" id="BPMNShape_EventGatewayId">
        <omgdc:Bounds height="40.0" width="40.0" x="320.0" y="267.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="SignalCatchEventErrorId" id="BPMNShape_SignalCatchEventErrorId">
        <omgdc:Bounds height="35.0" width="35.0" x="323.0" y="400.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="SignalCatchEventNoError" id="BPMNShape_SignalCatchEventNoError">
        <omgdc:Bounds height="35.0" width="35.0" x="500.0" y="270.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="SignalHandlingEndNoErrorId" id="BPMNShape_SignalHandlingEndNoErrorId">
        <omgdc:Bounds height="35.0" width="35.0" x="640.0" y="270.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="SignalHandlingEndErrorId" id="BPMNShape_SignalHandlingEndErrorId">
        <omgdc:Bounds height="35.0" width="35.0" x="323.0" y="550.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="ScriptTaskId" id="BPMNShape_ScriptTaskId">
        <omgdc:Bounds height="55.0" width="105.0" x="150.0" y="260.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge bpmnElement="flow11" id="BPMNEdge_flow11">
        <omgdi:waypoint x="360.0" y="287.0"></omgdi:waypoint>
        <omgdi:waypoint x="500.0" y="287.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow12" id="BPMNEdge_flow12">
        <omgdi:waypoint x="340.0" y="307.0"></omgdi:waypoint>
        <omgdi:waypoint x="340.0" y="400.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow13" id="BPMNEdge_flow13">
        <omgdi:waypoint x="340.0" y="435.0"></omgdi:waypoint>
        <omgdi:waypoint x="340.0" y="550.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow16" id="BPMNEdge_flow16">
        <omgdi:waypoint x="105.0" y="287.0"></omgdi:waypoint>
        <omgdi:waypoint x="150.0" y="287.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow19" id="BPMNEdge_flow19">
        <omgdi:waypoint x="255.0" y="287.0"></omgdi:waypoint>
        <omgdi:waypoint x="320.0" y="287.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow20" id="BPMNEdge_flow20">
        <omgdi:waypoint x="535.0" y="287.0"></omgdi:waypoint>
        <omgdi:waypoint x="640.0" y="287.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</definitions>
</code>

Maven dependencies:
<code>
<dependencies>
<dependency>
        <groupId>org.codehaus.groovy</groupId>
        <artifactId>groovy-all</artifactId>
        <version>2.1.3</version>
    </dependency>
<dependency>
  <groupId>org.activiti</groupId>
  <artifactId>activiti-engine</artifactId>
  <version>5.17.0</version>
</dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <version>1.3.168</version>
    </dependency>
</dependencies>
</code>

jbarrez
Star Contributor
Star Contributor
Activiti flushes at the end of the transaction, so yes, newly created variables won't be visible.

There is a way to get them… but it's very hacky. You would need to get the DbSqlSession from the Activiti Context and see what's in the cache (inserted/updated list).

Where are you catching this event? Does the event have an execution? if so, you might get it through there.

kayj
Champ in-the-making
Champ in-the-making
For some of the ActivitiEvents the ExecutionId can be accessed. But since there is nothing written to the DB, before signaling in this example, there is no way to get the execution using the "createXXXquery" methods provided by Activiti. How else can I access these vars? …is there a not-so-hacky-solution?

kayj
Champ in-the-making
Champ in-the-making
This did the trick!

String clientIp = (String)mProcessEngine.getRuntimeService().getVariable(event.getExecutionId(),  CLIENT_IP);

abhishek1
Champ in-the-making
Champ in-the-making
I am having a similar issue with Activiti runtime service. The runtime table in activiti DB doesnot update which leads to not getting the process Id.

kayj
Champ in-the-making
Champ in-the-making
The table doesn't update instantly. Unfortunately it takes some time… This is what we noticed…

jbarrez
Star Contributor
Star Contributor
> "The table doesn't update instantly. Unfortunately it takes some time"

Can you expand upon that? The table will be updated from the moment the process instance reaches a wait state. Normally, this is instant, unless you have heavy logic in between.

kayj
Champ in-the-making
Champ in-the-making
This is what I meant. It updates when it reaches a wait state, which is not instantly. We often ran into the issue, that for instance a variable has not yet been written into the DB when it was accessed.