cancel
Showing results for 
Search instead for 
Did you mean: 

Async service task retry after success

pineapple
Champ in-the-making
Champ in-the-making
Hi,

I executed the following model (instance diagram picture: https://s13.postimg.org/x3g0j6oiv/Async_task_sync_problem.jpg😞
Start Event -> Service Task -> Inclusive Parallel Gateway with Service Tasks -> Service Task -> End Event.

When the workflow starts, all the service tasks gets instantiated properly, and the task execution completes successfully for all of them.
However, in logs we see that some successful tasks (inside the inclusive gateway scope) are being executed again (3 times) and as you can see in the instance diagram picture there is a probelm with service task "C".

I found the following topic which described similar problem: https://forums.activiti.org/content/async-service-tasks-fails-update-status
How to configure org.activiti.engine.impl.interceptor.CommandConfig#contextReusePossible to false ?
Is that the correct solution?

Thanks.
9 REPLIES 9

martin_grofcik
Confirmed Champ
Confirmed Champ
Hi,

Could you reproduce the issue in the jUnit test?
https://forums.activiti.org/content/sticky-how-write-unit-test
I would say that there is an exception in the async execution path and that's why job is restarted 3 times.

Regards
Martin

pineapple
Champ in-the-making
Champ in-the-making
I will try reproduce it in jUnit test.
If you think that is an exception in async execution path why there is no any log indication for that?
Meanwhile, can you explain how to use the suggested workaround here: https://forums.activiti.org/comment/23347#comment-23347 ?

martin_grofcik
Confirmed Champ
Confirmed Champ

org.activiti.engine.impl.interceptor.CommandConfig#CommandConfig(boolean)
org.activiti.engine.impl.interceptor.CommandConfig#setContextReusePossible

pineapple
Champ in-the-making
Champ in-the-making
Where should i put those lines?

pineapple
Champ in-the-making
Champ in-the-making
Ok I tried to write jUnit test which simulated the issue, and I got the following exception:

<code>
org.activiti.engine.ActivitiOptimisticLockingException: ProcessInstance[4] was updated by another transaction concurrently

at org.activiti.engine.impl.db.DbSqlSession$CheckedDeleteOperation.execute(DbSqlSession.java:298)
at org.activiti.engine.impl.db.DbSqlSession.flushRegularDeletes(DbSqlSession.java:929)
at org.activiti.engine.impl.db.DbSqlSession.flushDeletes(DbSqlSession.java:895)
at org.activiti.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:620)
at org.activiti.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:212)
at org.activiti.engine.impl.interceptor.CommandContext.close(CommandContext.java:138)
at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:66)
at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:37)
at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:40)
at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:35)
at org.activiti.engine.impl.RepositoryServiceImpl.deleteDeployment(RepositoryServiceImpl.java:91)
at org.activiti.engine.impl.test.TestHelper.annotationDeploymentTearDown(TestHelper.java:116)
at org.activiti.engine.test.ActivitiRule.finished(ActivitiRule.java:265)
at org.activiti.engine.test.ActivitiRule.finishedQuietly(ActivitiRule.java:179)
at org.activiti.engine.test.ActivitiRule.access$400(ActivitiRule.java:86)
at org.activiti.engine.test.ActivitiRule$1.evaluate(ActivitiRule.java:135)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)


</code>

The process model "Process2.bpmn.xml"
<code>
<?xml version="1.0" encoding="UTF-8"?>

<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:activiti="http://activiti.org/bpmn"
             xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlnsSmiley Surprisedmgdc="http://www.omg.org/spec/DD/20100524/DC"
             xmlnsSmiley Surprisedmgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema"
             expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">

  <process id="Process2" name="Process2" isExecutable="true">
    <startEvent id="startevent1" name="Start"></startEvent>
    <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="parallelgateway1"></sequenceFlow>
    <serviceTask id="Task2" name="Task 2" activiti:async="true" activiti:exclusive="false" activiti:class="org.activiti.DummyActivity"></serviceTask>
    <sequenceFlow id="flow2" sourceRef="parallelgateway1" targetRef="Task2"></sequenceFlow>
    <serviceTask id="Task1" name="Task 1" activiti:async="true" activiti:exclusive="false" activiti:class="org.activiti.DummyActivity"></serviceTask>
    <sequenceFlow id="flow3" sourceRef="parallelgateway1" targetRef="Task1"></sequenceFlow>
    <sequenceFlow id="flow4" sourceRef="Task1" targetRef="parallelgateway2"></sequenceFlow>
    <sequenceFlow id="flow5" sourceRef="Task2" targetRef="parallelgateway2"></sequenceFlow>
    <inclusiveGateway id="parallelgateway1" name="Parallel Gateway"></inclusiveGateway>
    <inclusiveGateway id="parallelgateway2" name="Parallel Gateway"></inclusiveGateway>
    <serviceTask id="sid-537E92FE-5A63-43DE-AAAC-824790627465" name="Task 5" activiti:async="true" activiti:exclusive="false" activiti:class="org.activiti.DummyActivity"></serviceTask>
    <sequenceFlow id="sid-5735A7E3-58B2-4D9F-8A6E-9B2CF029EDC9" sourceRef="parallelgateway7" targetRef="sid-537E92FE-5A63-43DE-AAAC-824790627465"></sequenceFlow>
    <serviceTask id="sid-C821F6ED-BFEE-486B-82F8-23BDCE34C454" name="Task 4" activiti:async="true" activiti:exclusive="false" activiti:class="org.activiti.DummyActivity"></serviceTask>
    <sequenceFlow id="sid-24B4F490-094B-49E9-B29F-DA1FEF0EBEB6" sourceRef="parallelgateway7" targetRef="sid-C821F6ED-BFEE-486B-82F8-23BDCE34C454"></sequenceFlow>
    <sequenceFlow id="sid-0B593128-D779-4B44-95B2-C22B47B872C5" sourceRef="sid-C821F6ED-BFEE-486B-82F8-23BDCE34C454" targetRef="parallelgateway8"></sequenceFlow>
    <sequenceFlow id="sid-B7923CE8-D24C-4FB5-A741-9197A30C9653" sourceRef="sid-537E92FE-5A63-43DE-AAAC-824790627465" targetRef="parallelgateway8"></sequenceFlow>
    <endEvent id="sid-0F728BCD-7FF8-4014-AC9B-711063FEDF77" name="End"></endEvent>
    <inclusiveGateway id="parallelgateway7" name="Parallel Gateway"></inclusiveGateway>
    <inclusiveGateway id="parallelgateway8" name="Parallel Gateway"></inclusiveGateway>
    <serviceTask id="sid-D43765C1-9B5C-435C-8A83-01CF01F5CF95" name="Task 3" activiti:class="org.activiti.DummyActivity"></serviceTask>
    <sequenceFlow id="flow6" sourceRef="parallelgateway2" targetRef="sid-D43765C1-9B5C-435C-8A83-01CF01F5CF95"></sequenceFlow>
    <sequenceFlow id="sid-7AA33207-8D98-4692-85C6-8D7553532F6B" sourceRef="sid-D43765C1-9B5C-435C-8A83-01CF01F5CF95" targetRef="parallelgateway7"></sequenceFlow>
    <serviceTask id="sid-1355E685-5066-4B75-ABC1-2C2297897363" name="Task 6" activiti:class="org.activiti.DummyActivity"></serviceTask>
    <sequenceFlow id="sid-28B4A5B2-51AE-45CE-AA05-937C33E52EDF" sourceRef="parallelgateway8" targetRef="sid-1355E685-5066-4B75-ABC1-2C2297897363"></sequenceFlow>
    <sequenceFlow id="sid-FD662489-42DE-4D82-8A77-B819C554948E" sourceRef="sid-1355E685-5066-4B75-ABC1-2C2297897363" targetRef="sid-0F728BCD-7FF8-4014-AC9B-711063FEDF77"></sequenceFlow>
  </process>
</definitions>
</code>

Unit test class "MyUnitTest.java"

<java>
package org.activiti;

import org.activiti.engine.impl.jobexecutor.JobExecutor;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.test.ActivitiRule;
import org.activiti.engine.test.Deployment;
import org.junit.Rule;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;

public class MyUnitTest
{

    private static ConcurrentHashMap<String, Integer> taskCounter = new ConcurrentHashMap<String, Integer>();
    @Rule
    public ActivitiRule activitiRule = new ActivitiRule();

    public static synchronized void incrementTaskCounter(String taskName)
    {
        boolean isTaskExecuted = taskCounter.containsKey(taskName);
        String exceptionMessage = "Task '" + taskName + "' had been already executed!";
        assertFalse(exceptionMessage, isTaskExecuted);

        System.out.println("Incrementing task counter for '" + taskName + "'");
        taskCounter.put(taskName, 1);
    }

    @Test
    @Deployment(resources = {"org/activiti/test/Process2.bpmn20.xml"})
    public void test()
    {
        System.out.println("Creating instance of Proccess2");
        Map<String, Object> params = new HashMap<String, Object>();
        params.put("param1", "value1");
        params.put("param2", "value2");
        params.put("param3", "value3");

        ProcessInstance processInstance = activitiRule.getRuntimeService()
                                                      .startProcessInstanceByKey("Process2", params);

        assertNotNull(processInstance);

        activitiRule.getProcessEngine().getProcessEngineConfiguration().setAsyncExecutorActivate(true);
        activitiRule.getProcessEngine().getProcessEngineConfiguration().setAsyncExecutorEnabled(true);
        JobExecutor jobExecutor = activitiRule.getProcessEngine().getProcessEngineConfiguration().getJobExecutor();
        jobExecutor.start();

        long jobCount = activitiRule.getManagementService().createJobQuery().count();
        System.out.println("CreateJobQuery count: " + jobCount);

    }


}

</java>

Executed Java Service Task "DummyActivitiy.java"

<java>
package org.activiti;

import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.JavaDelegate;

import java.util.Map;

public class DummyActivity implements JavaDelegate
{
    public void execute(DelegateExecution execution) throws Exception
    {
        String activityName = execution.getCurrentActivityName();
        MyUnitTest.incrementTaskCounter(activityName);

        System.out.println("—– [ Start: " + activityName + " ] —–");
        Map<String, Object> vars = execution.getVariables();
        for (Map.Entry<String, Object> stringObjectEntry : vars.entrySet())
        {
            System.out.println("[Parameter in " + activityName + " ] " + stringObjectEntry.getKey() + " = " + stringObjectEntry
                    .getValue()
                    .toString());
        }

        System.out.println("—– [ End: " + activityName + " ] —–");
    }
}

</java>

martin_grofcik
Confirmed Champ
Confirmed Champ
Can you provide link to github fork?

pineapple
Champ in-the-making
Champ in-the-making
Github fork:
https://github.com/g2124458/activiti-unit-test-template

BTW, can you explain exactly where should I put / configure those lines:
<code>
org.activiti.engine.impl.interceptor.CommandConfig#CommandConfig(boolean)
org.activiti.engine.impl.interceptor.CommandConfig#setContextReusePossible
</code>

pineapple
Champ in-the-making
Champ in-the-making
Any help?

jbarrez
Star Contributor
Star Contributor
ok, ran your test. What you are seeing (the optimistic locking) is correct and how Activiti is designed: your service tasks are async and they will run in parallel threads. These threads all try to move the same execution forward (the join typically) and fail, cause only one can win.

To avoid this scenario, make your service tasks exclusive, by which the engine guarantees to only run one at the same time.