cancel
Showing results for 
Search instead for 
Did you mean: 

Asynchronous ServiceTask with 'no Exclusice' option executed more than once

christopher1
Champ in-the-making
Champ in-the-making
Hi,
I have problem when use asynchronous service tasks with "no exclusice" option.
Some task are executed more than once (somtimes twice or triple).
Process defintion:
Start Event -> Parallel Gateway -> four parell service Tasks (No1..4) -> Parallel Gateway -> End Event
Result of sinle execute process in attached log file.
Every exeucte serice task is logged: serviceNo1..4.executed
And "initial.result"/"final.result"  is reading the process state.
initial - on start process
finall - after 10 sec (if process is still alive)

   public ProcessResult readProcessResult(String processId) {
      HistoricProcessInstance hs = historyService.createHistoricProcessInstanceQuery().processInstanceId(processId)
            .includeProcessVariables().singleResult();
      Map<String, Object> variables = hs.getProcessVariables();

      ProcessResult result = new ProcessResult();
      result.setProcessId(hs.getId());
      result.setDurationInMillis(hs.getDurationInMillis());
      result.setResultExec1((String) variables.get("ExecuteS1"));
      result.setResultExec2((String) variables.get("ExecuteS2"));
      result.setResultExec3((String) variables.get("ExecuteS3"));
      result.setResultExec4((String) variables.get("ExecuteS4"));

      return result;
   }


Services code:

@Service("serviceNo1")
public class ServiceNo1Impl implements ServiceNo1 {

   private static final Logger LOG = LoggerFactory.getLogger(ServiceNo1Impl.class);

   @Override
   public void execute(DelegateExecution execution) throws Exception {
      // do something…
      execution.setVariable("ExecuteS1", "OK");
      LOG.debug("serviceNo1.executed");
   }
}


Spring Boot Request code:

   @RequestMapping(value = "/postAppExcl", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
   public ProcessResult processUserApplicationExcl(@RequestBody JobApplication jobApplication)
         throws InterruptedException {

      Map<String, Object> vars = Collections.<String, Object> singletonMap("jobApplication", jobApplication);
      String executionId = null;

      // start Activiti process
      identityService.setAuthenticatedUserId(activitiUsername);
      ProcessInstance process = runtimeService.startProcessInstanceByKey("myProcessJobAppExcl", vars);
      executionId = process.getProcessInstanceId();
      LOG.debug("myProcessJobApp: executionId=" + executionId);

      // read process result
      ProcessResult result = serviceResult.readProcessResult(executionId);
      LOG.debug("initial.result=" + result);

      // try 10 times (with 1sec wait between read state)(
      if (result.getDurationInMillis() == null) {
         for (int i = 1; i <= 10; i++) {
            Thread.sleep(1000);
            result = serviceResult.readProcessResult(executionId);
            // LOG.debug(i + ". result=" + result);
            if (result.getDurationInMillis() != null) {
               LOG.debug("End process with executionId=" + executionId);
               break;
            }
         }
         if (result.getDurationInMillis() == null) {
            LOG.debug("Process executionId=" + executionId + " not completed… yet ");
         }
      }
      LOG.debug("final.result=" + result);

      return result;
   }


I can attach example source projects, where all ServiceTaska executions are writen to output log.


1 ACCEPTED ANSWER

warper
Star Contributor
Star Contributor
Hi Christopher!
The problem is in process.
Take a look at this process:
<code>
    <process id="testProcess2" name="testProcess2" isExecutable="true">
        <startEvent id="startevent1" name="Start"></startEvent>
        <parallelGateway id="parallelgateway1" name="Split" activiti:async="true" activiti:exclusive="false"></parallelGateway>
        <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="parallelgateway1"></sequenceFlow>
        <scriptTask id="scripttask1" name="Script Task 1" activiti:async="true" activiti:exclusive="false" scriptFormat="groovy" activiti:autoStoreVariables="false">
            <script>System.out.println(Thread.currentThread().getName()+"task1 - phase 1");
                execution.setVariable("exec1","1");
                System.out.println(Thread.currentThread().getName()+"task1 - phase 2");</script>
        </scriptTask>
        <scriptTask id="scripttask2" name="Script Task 2" activiti:async="true" activiti:exclusive="false" scriptFormat="groovy" activiti:autoStoreVariables="false">
            <script>System.out.println(Thread.currentThread().getName()+"task2 - phase 1");
                execution.setVariable("exec2","2");
                System.out.println(Thread.currentThread().getName()+"task2 - phase 2");</script>
        </scriptTask>
        <scriptTask id="scripttask3" name="Script Task 3" activiti:async="true" activiti:exclusive="false" scriptFormat="groovy" activiti:autoStoreVariables="false">
            <script>System.out.println(Thread.currentThread().getName()+"task3 - phase 1");
                execution.setVariable("exec3","3");
                System.out.println(Thread.currentThread().getName()+"task3 - phase 2");</script>
        </scriptTask>
        <scriptTask id="scripttask4" name="Script Task 4" activiti:async="true" activiti:exclusive="false" scriptFormat="groovy" activiti:autoStoreVariables="false">
            <script>System.out.println(Thread.currentThread().getName()+"task4 - phase 1");
                execution.setVariable("exec4","4");
                System.out.println(Thread.currentThread().getName()+"task4 - phase 2");</script>
        </scriptTask>
        <sequenceFlow id="flow2" sourceRef="parallelgateway1" targetRef="scripttask1"></sequenceFlow>
        <sequenceFlow id="flow3" sourceRef="parallelgateway1" targetRef="scripttask2"></sequenceFlow>
        <sequenceFlow id="flow4" sourceRef="parallelgateway1" targetRef="scripttask3"></sequenceFlow>
        <sequenceFlow id="flow5" sourceRef="parallelgateway1" targetRef="scripttask4"></sequenceFlow>
        <parallelGateway id="parallelgateway2" name="Merge" activiti:async="true"></parallelGateway>
        <sequenceFlow id="flow6" sourceRef="scripttask1" targetRef="parallelgateway2"></sequenceFlow>
        <sequenceFlow id="flow7" sourceRef="scripttask2" targetRef="parallelgateway2"></sequenceFlow>
        <sequenceFlow id="flow8" sourceRef="scripttask3" targetRef="parallelgateway2"></sequenceFlow>
        <sequenceFlow id="flow9" sourceRef="scripttask4" targetRef="parallelgateway2"></sequenceFlow>
        <endEvent id="endevent1" name="End"></endEvent>
        <sequenceFlow id="flow10" sourceRef="parallelgateway2" targetRef="endevent1"></sequenceFlow>
    </process>

</code>

It works for me as far as "Merge" parallel gateway is asynchronous. If it's not marked async, process stalls.
My money are on transaction context overlaps in this case, since every working execution out of 4 tries to process parallelgateway as like it's its own step, modifiying overall process state.

And process variables… They are shared among all executions. If your workers set same variables, well, some executions can't proceed, get OptimisticLockingException and retry until they succeed or number of tries runs out. It's "first commit wins" scenario. If you have access to activiti/hibernate logs, you can see what variables/states your transactions tries to modify in database.

If you have to modify common variable, you do it in exclusive part of the process. That is, async service/script/etc. task with exclusive flag. Do you have some common variables involved?

View answer in original post

6 REPLIES 6

christopher1
Champ in-the-making
Champ in-the-making
The problem occurs only when service tasks have attribute: activiti:exclusive="false"
I made a mistake in the header, should be: "no Exclusive"
Process definition:

<code>
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlnsSmiley Surprisedmgdc="http://www.omg.org/spec/DD/20100524/DC" xmlnsSmiley Surprisedmgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
  <process id="myProcessJobAppExcl" name="My process Job Application Exclusive" isExecutable="true">
    <startEvent id="startevent1" name="Start"></startEvent>
    <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="parallelgateway1"></sequenceFlow>
    <serviceTask id="servicetask1" name="Service Task No1" activiti:async="true" activiti:exclusive="false" activiti:expression="#{serviceNo1.execute(execution)}"></serviceTask>
    <serviceTask id="servicetask2" name="Service Task No2" activiti:async="true" activiti:exclusive="false" activiti:expression="#{serviceNo2.execute(execution)}"></serviceTask>
    <serviceTask id="servicetask3" name="Service Task No3" activiti:async="true" activiti:exclusive="false" activiti:expression="#{serviceNo3.execute(execution)}"></serviceTask>
    <serviceTask id="servicetask4" name="Service Task No4" activiti:async="true" activiti:exclusive="false" activiti:expression="#{serviceNo4.execute(execution)}"></serviceTask>
    <sequenceFlow id="flow2" sourceRef="parallelgateway1" targetRef="servicetask1"></sequenceFlow>
    <sequenceFlow id="flow3" sourceRef="parallelgateway1" targetRef="servicetask2"></sequenceFlow>
    <sequenceFlow id="flow4" sourceRef="parallelgateway1" targetRef="servicetask3"></sequenceFlow>
    <sequenceFlow id="flow5" sourceRef="parallelgateway1" targetRef="servicetask4"></sequenceFlow>
    <parallelGateway id="parallelgateway2" name="Parallel Gateway"></parallelGateway>
    <parallelGateway id="parallelgateway1" name="Parallel Gateway"></parallelGateway>
    <sequenceFlow id="flow6" sourceRef="servicetask1" targetRef="parallelgateway2"></sequenceFlow>
    <sequenceFlow id="flow7" sourceRef="servicetask2" targetRef="parallelgateway2"></sequenceFlow>
    <sequenceFlow id="flow8" sourceRef="servicetask3" targetRef="parallelgateway2"></sequenceFlow>
    <sequenceFlow id="flow9" sourceRef="servicetask4" targetRef="parallelgateway2"></sequenceFlow>
    <endEvent id="endevent1" name="End"></endEvent>
    <sequenceFlow id="flow10" sourceRef="parallelgateway2" targetRef="endevent1"></sequenceFlow>
  </process>
</definitions>
</code>

warper
Star Contributor
Star Contributor
Hi Christopher!
Looks like your process got several OptimisticLockingException.
Turn on logging of activiti engine & hybernate.

christopher1
Champ in-the-making
Champ in-the-making
Thanks for answer,
I set log level="DEBUG" for name="activit.org" and I found few messages:
Optimistic locking exception : org.activiti.engine.ActivitiOptimisticLockingException: ProcessInstance[8] was updated by another transaction concurrently anymore

In Activiti user guide I found:
18.1.4. Async executor configuration

I don't know how set in SpringBoot Application bean asyncExecutor…
To configure database I use: application.yml file, example:
<code>
spring:
  datasource:
    driverClassName: org.postgresql.Driver
    url: jdbcSmiley Tongueostgresql://localhost:5432/activiti?currentSchema=actsb
    username: …
    password: …
</code>

Service beans i configure directly in SpringBootApplication:
<code>
@SpringBootApplication
public class ActivitiApp {

public static void main(String[] args) {
  SpringApplication.run(ActivitiApp.class, args);
}

@Bean
public RestOperations restOperations(RestTemplateBuilder builder) {
  return builder.build();
}

@Bean
public ServiceNo1 serviceNo1() {
  return new ServiceNo1Impl();
}

@Bean
public ServiceNo2 serviceNo2() {
  return new ServiceNo2Impl();
}

@Bean
public ServiceNo3 serviceNo3() {
  return new ServiceNo3Impl();
}

@Bean
public ServiceNo4 serviceNo4() {
  return new ServiceNo4Impl();
}

@Bean
public ServiceResult serviceResult() {
  return new ServiceResultImpl();
}
}
</code>
Now will try to configure "asyncExecutor"…
And I will again ask for help if I don't find solution my problem.

christopher1
Champ in-the-making
Champ in-the-making
I added Async executor configuration (follow the instruction: http://www.activiti.org/userguide/#_advantages_of_the_async_executor)

<code>
@Bean
public SpringProcessEngineConfiguration processEngineConfiguration(DataSource dataSource,
   PlatformTransactionManager transactionManager, EntityManagerFactory entityManagerFactory,
   DefaultAsyncJobExecutor asyncExecutor) {
  SpringProcessEngineConfiguration engineConfiguration = new SpringProcessEngineConfiguration();
  engineConfiguration.setDataSource(dataSource);
  engineConfiguration.setTransactionManager(transactionManager);
  engineConfiguration.setJpaEntityManagerFactory(entityManagerFactory);
  engineConfiguration.setAsyncExecutor(asyncExecutor);
  engineConfiguration.setAsyncExecutorActivate(true);
  engineConfiguration.setAsyncExecutorEnabled(true);
  engineConfiguration.setHistory(HistoryLevel.FULL.getKey());
  engineConfiguration.setDeploymentResources(deploymentResources());
  engineConfiguration.setDeploymentMode("single-resource");
  engineConfiguration.setDatabaseSchemaUpdate("true");
  return engineConfiguration;
}

@Bean
public DefaultAsyncJobExecutor asyncExecutor() {
  DefaultAsyncJobExecutor asyncExecutor = new DefaultAsyncJobExecutor();
  asyncExecutor.setCorePoolSize(10);
  asyncExecutor.setMaxPoolSize(50);
  asyncExecutor.setKeepAliveTime(3000);
  asyncExecutor.setQueueSize(200);
  asyncExecutor.setMaxTimerJobsPerAcquisition(2);
  asyncExecutor.setMaxAsyncJobsDuePerAcquisition(2);
  asyncExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(1000);
  asyncExecutor.setDefaultTimerJobAcquireWaitTimeInMillis(1000);
  asyncExecutor.setTimerLockTimeInMillis(60000);
  asyncExecutor.setAsyncJobLockTimeInMillis(60000);
  return asyncExecutor;
}

private Resource[] deploymentResources() {
  Resource[] processResource = new Resource[1];
  processResource[0] = new ClassPathResource("processes/MyProcessApplicationExclusive.bpmn");
  return processResource;
}
</code>

but I still have ServiceTasks executed more than once.
LOG:
Optimistic locking exception : org.activiti.engine.ActivitiOptimisticLockingException: ProcessInstance[5] was updated by another transaction concurrently

Maybe some additional parameters to set in above configuration ???
I have no idea what is wrong.

warper
Star Contributor
Star Contributor
Hi Christopher!
The problem is in process.
Take a look at this process:
<code>
    <process id="testProcess2" name="testProcess2" isExecutable="true">
        <startEvent id="startevent1" name="Start"></startEvent>
        <parallelGateway id="parallelgateway1" name="Split" activiti:async="true" activiti:exclusive="false"></parallelGateway>
        <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="parallelgateway1"></sequenceFlow>
        <scriptTask id="scripttask1" name="Script Task 1" activiti:async="true" activiti:exclusive="false" scriptFormat="groovy" activiti:autoStoreVariables="false">
            <script>System.out.println(Thread.currentThread().getName()+"task1 - phase 1");
                execution.setVariable("exec1","1");
                System.out.println(Thread.currentThread().getName()+"task1 - phase 2");</script>
        </scriptTask>
        <scriptTask id="scripttask2" name="Script Task 2" activiti:async="true" activiti:exclusive="false" scriptFormat="groovy" activiti:autoStoreVariables="false">
            <script>System.out.println(Thread.currentThread().getName()+"task2 - phase 1");
                execution.setVariable("exec2","2");
                System.out.println(Thread.currentThread().getName()+"task2 - phase 2");</script>
        </scriptTask>
        <scriptTask id="scripttask3" name="Script Task 3" activiti:async="true" activiti:exclusive="false" scriptFormat="groovy" activiti:autoStoreVariables="false">
            <script>System.out.println(Thread.currentThread().getName()+"task3 - phase 1");
                execution.setVariable("exec3","3");
                System.out.println(Thread.currentThread().getName()+"task3 - phase 2");</script>
        </scriptTask>
        <scriptTask id="scripttask4" name="Script Task 4" activiti:async="true" activiti:exclusive="false" scriptFormat="groovy" activiti:autoStoreVariables="false">
            <script>System.out.println(Thread.currentThread().getName()+"task4 - phase 1");
                execution.setVariable("exec4","4");
                System.out.println(Thread.currentThread().getName()+"task4 - phase 2");</script>
        </scriptTask>
        <sequenceFlow id="flow2" sourceRef="parallelgateway1" targetRef="scripttask1"></sequenceFlow>
        <sequenceFlow id="flow3" sourceRef="parallelgateway1" targetRef="scripttask2"></sequenceFlow>
        <sequenceFlow id="flow4" sourceRef="parallelgateway1" targetRef="scripttask3"></sequenceFlow>
        <sequenceFlow id="flow5" sourceRef="parallelgateway1" targetRef="scripttask4"></sequenceFlow>
        <parallelGateway id="parallelgateway2" name="Merge" activiti:async="true"></parallelGateway>
        <sequenceFlow id="flow6" sourceRef="scripttask1" targetRef="parallelgateway2"></sequenceFlow>
        <sequenceFlow id="flow7" sourceRef="scripttask2" targetRef="parallelgateway2"></sequenceFlow>
        <sequenceFlow id="flow8" sourceRef="scripttask3" targetRef="parallelgateway2"></sequenceFlow>
        <sequenceFlow id="flow9" sourceRef="scripttask4" targetRef="parallelgateway2"></sequenceFlow>
        <endEvent id="endevent1" name="End"></endEvent>
        <sequenceFlow id="flow10" sourceRef="parallelgateway2" targetRef="endevent1"></sequenceFlow>
    </process>

</code>

It works for me as far as "Merge" parallel gateway is asynchronous. If it's not marked async, process stalls.
My money are on transaction context overlaps in this case, since every working execution out of 4 tries to process parallelgateway as like it's its own step, modifiying overall process state.

And process variables… They are shared among all executions. If your workers set same variables, well, some executions can't proceed, get OptimisticLockingException and retry until they succeed or number of tries runs out. It's "first commit wins" scenario. If you have access to activiti/hibernate logs, you can see what variables/states your transactions tries to modify in database.

If you have to modify common variable, you do it in exclusive part of the process. That is, async service/script/etc. task with exclusive flag. Do you have some common variables involved?

christopher1
Champ in-the-making
Champ in-the-making
Warper , You are great!
I changed "Merge" parallel gateways to async mode in my processes definitions with non-exclusive asynchronous task and all my problems are gone.
My service Tasks have separate variables, which are grouped together as a result after "merge" parallel gateway.
Thanks a lot for help!