cancel
Showing results for 
Search instead for 
Did you mean: 

Parallel getway fork can't execute task service concurrently

craig_wu9
Champ in-the-making
Champ in-the-making
I got tow task service named task1 and task2, both with activiti:async="true" set.
And I have active the JobExecutor at activiti.cfg.xml. But the result is the two tasks executed sequence.
It will print task1:1 to task1:49 and pause, then continue to print task1: 50 to task1: 100. After that start to print task2:0 to task2:10.

I have checked the source code of Activiti Engine with debug. And found in my case, at DefaultJobExecutor 's blew method
  public void executeJobs(List<String> jobIds) {
    try {
         threadPoolExecutor.execute(new ExecuteJobsRunnable(this, jobIds));
    }catch (RejectedExecutionException e) {
      rejectedJobsHandler.jobsRejected(this, jobIds);
    }
  }
The paramenter jobIds has tow data, and I checked the db, the tow job ids just represent the task1 and task2. So in my opinion, the tow tasks are executed in one process, Is that true?

Or same thing mistake I have made?


my activiti.cfg.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans   http://www.springframework.org/schema/beans/spring-beans.xsd">
  <bean id="processEngineConfiguration" class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">
    <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/act2" />
    <property name="jdbcDriver" value="com.mysql.jdbc.Driver" />
    <property name="jdbcUsername" value="act" />
    <property name="jdbcPassword" value="act" />
   
    <!– Database configurations –>
    <property name="databaseSchemaUpdate" value="true" />
   
    <!– job executor configurations –>
    <property name="jobExecutorActivate" value="true" />
   
    <!– mail server configurations –>
    <property name="mailServerPort" value="5025" />   
  </bean>

</beans>

my bpmn xml file
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
  xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
  xmlns:activiti="http://activiti.org/bpmn
  targetNamespace="Examples">

  <process id="test">

    <startEvent id="start" />
   
    <parallelGateway id="fork" />
   
    <sequenceFlow id="flow1" sourceRef="start" targetRef="fork" />
   
    <serviceTask id="task1" name="Task1" activiti:async="true" activiti:class="psn.wxf.test.paralleltest.Task1" />
    <serviceTask id="task2" name="Task2" activiti:async="true" activiti:class="psn.wxf.test.paralleltest.Task2" />
   
   
    <sequenceFlow id="flow2" sourceRef="fork" targetRef="task1" />
    <sequenceFlow id="flow3" sourceRef="fork" targetRef="task2" />
   
    <parallelGateway id="join" />
   
    <sequenceFlow id="flow4" sourceRef="task1" targetRef="join" />
    <sequenceFlow id="flow5" sourceRef="task2" targetRef="join" />
   
    <serviceTask id="task3" name="Task3" activiti:class="psn.wxf.test.paralleltest.Task3" />
   
    <sequenceFlow id="flow7" sourceRef="join" targetRef="task3" />
   
    <sequenceFlow id="flow8" sourceRef="task3" targetRef="theEnd" />

    <endEvent id="theEnd" />

  </process>

</definitions>

my Task1 java class
public class Task1 implements JavaDelegate {

   @Override
   public void execute(DelegateExecution execution) throws Exception {
      for(int i=0;i<100;i++) {
         if (i==50) Thread.sleep(1000);
         System.out.println("Task1:"+i);
      }
   }

}
my task2 java class
public class Task2 implements JavaDelegate {

   @Override
   public void execute(DelegateExecution execution) throws Exception {
      for(int j=0;j<10;j++) {
         if (j==5) {
            j=5;
         }
         System.out.println("Task2:"+j);
      }
   }

}

my main java class, with bpmn file deployed before.
public class EngineTest {
   public static void main(String[] args) {
      ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
      RepositoryService repositoryService = processEngine
            .getRepositoryService();
      RuntimeService runtimeService = processEngine.getRuntimeService();
      runtimeService.startProcessInstanceByKey("test");
      processEngine.close();
   }
}
9 REPLIES 9

frederikherema1
Star Contributor
Star Contributor
How do you "know" they are run sequential? The first task will probably be on the queue first, and instantly waits for one second (1000 millis). Have you tried with a longer sleep-time? Does task 2 still gets executed after task1?

If so, double-check you're using an activiti version that support async (I think 5.8 or 5.9). If this doesn't do the trick, debug some more and see if the "executeJobs" method is called twice and check what pool-size is used…

craig_wu9
Champ in-the-making
Champ in-the-making
Thanks for your replay.

The version of the activiti I used is 5.9.

I change the task 1 to sleep 10 seconds with Thread.sleep(10000), the result is same.

The method "executeJobs" is called twice.
First time, the parameter jobIds has tow elements (I mentioned before, the tow elements are job id of the task1 and task2 in table act_ru_job).
Second time, the parameter jobIds is an empty list. And when I pause at here(breakpoint), I can see the output of task1 and task2.
This means, when the first time call the "executeJobs", the tow tasks are all executed.

I checked the "threadPoolExecutor" the size is default setting. CoreSize =3, maximumPoolSize=10

frederikherema1
Star Contributor
Star Contributor
Didn't get that the first time that it was TWO (not tow) id's passed in the list. So it makes sense they are executed sequentially, the question is why they are offered to the job-executor in batch instead of each job at a time.

See http://activiti.org/userguide/index.html#exclusiveJobs > jobs in one process-instance are by default exclusive, read the section there to get an insight of why this is

ronald_van_kuij
Champ on-the-rise
Champ on-the-rise
Didn't Daniel Meyer implement an 'optimistic-locking-exception-job-executer-retry-to-commit-only' mechanism to circumvent the optimistic locking thing written in the 'why exclusive jobs' part?

http://jira.codehaus.org/browse/ACT-436

yes, I know Activiti is not a multi-threading system, but it limits functionality in that you have to start sending messages to e.g. external jms if you want to have concurrency or???

frederikherema1
Star Contributor
Star Contributor
Sure, you can have them execute in parallel, just turn the "exclusive" to false on those async-tasks in the same process.

ronald_van_kuij
Champ on-the-rise
Champ on-the-rise
Yes, I know that, but to circumvent troubles due to the OLE, you could configure this 'retry' mechnism, right? (And I do not mean the normal job retry mechanism)

frederikherema1
Star Contributor
Star Contributor
Ronald, I'm not aware of this kind of mechanism being in place, but you can be right though…

craig_wu9
Champ in-the-making
Champ in-the-making
Thanks for your replay.
I got what I want.
But also got an exception  Smiley Sad
org.activiti.engine.ActivitiOptimisticLockingException: ExecutionEntity[6201] was updated by another transaction concurrently

ronald_van_kuij
Champ on-the-rise
Champ on-the-rise
Ronald, I'm not aware of this kind of mechanism being in place, but you can be right though…

I'll try to find it… and let you know.