cancel
Showing results for 
Search instead for 
Did you mean: 

Multi-instance sub process completion concurrency problem

ochrons
Champ in-the-making
Champ in-the-making
I've got a multi-instance sub process defined which contains Java service tasks and timers. The timers are used for polling an external service (through a Java service task). If the results are not ready, the process will go back into the timer to wait.

Now, because of the timers, Activiti will use its internal thread pool to run the different executions in parallel in different threads. This is ok until the processes start getting completed at the same time. At completion, there are sometimes exceptions:

18-May-2012 20:37:35 org.activiti.engine.impl.interceptor.CommandContext close
SEVERE: Error while closing command context
org.activiti.engine.ActivitiOptimisticLockingException: VariableInstanceEntity[17] was updated by another transaction concurrently
   at org.activiti.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:452)

The variable (17) that it being updated is actually the nrOfActiveInstances counter for the sub process. This behavior seems quite silly to me, since it's nothing I can control directly.

I suppose I could work around this issue by introducing a Java Receive Task just before the sub process end event, to synchronize all Activiti controlled threads into my own thread.

Any real solutions to this problem?

- Otto
1 REPLY 1

ochrons
Champ in-the-making
Champ in-the-making
So, I solved this with a workaround involving an additional Java Receive Task and a separate thread in my code that just signals that state whenever it's encountered.

Task definition in the process XML:
<receiveTask id="syncProcess" name="Sync"></receiveTask>
Code that creates a thread and checks for any process executions in that wait state and signals them:

private RuntimeService runtimeService;
private Set<String> waitingProcesses = Collections.synchronizedSet(new HashSet<String>());
.
.
// create a thread for signaling waiting executions
Thread signalThread = new Thread(new Runnable() {
    @Override
    public void run() {
        while (true) {
            synchronized (waitingProcesses) {
                Iterator<String> iterator = waitingProcesses.iterator();
                while (iterator.hasNext()) {
                    String processId = iterator.next();
                    List<Execution> executions = runtimeService.createExecutionQuery().processInstanceId(processId)
                            .activityId("syncProcess").list();
                    for (Execution exec : executions) {
                        log.debug("Signaling waiting execution {}", exec.getId());
                        runtimeService.signal(exec.getId());
                    }
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                return;
            }
        }
    }
});
signalThread.start();

Would be great if there was a real solution to this problem instead of this kind of hack Smiley Happy

- Otto