cancel
Showing results for 
Search instead for 
Did you mean: 

Running parallel tasks with acitiviti

hellowrakesh123
Champ in-the-making
Champ in-the-making
Hi,
I am new with Activiti and trying to get the parallel gateways work but i couldn't get it working the way I was expecting. Am I doing something wrong here?

I have 2 tasks Build-1 and Build-2, I want to fork off the 2 tasks at the same time as soon as the workflow starts.

When i am running the workflow, its first starting the first process (Build-1) and after it completes, it starts the other (Build-2). I am not sure what is wrong here. I tried using activiti:async but it didnt work for me.

<process id="ccaProcess" name="ccaProcess">
      <startEvent id="startevent1" name="Start"></startEvent>
       <serviceTask id="servicetask1" name="Build-1" activiti:class="com.mycomp.step.cca.bpm.CcaDelegate"></serviceTask>
       <serviceTask id="servicetask2" name="Build-2" activiti:class="com.mycomp.step.cca.bpm.CcaDelegate"></serviceTask>
       <parallelGateway id="parallelgateway2" name="Parallel Gateway"></parallelGateway>
       <endEvent id="endevent1" name="End"></endEvent>
       <sequenceFlow id="flow4" name="" sourceRef="servicetask1" targetRef="parallelgateway2"></sequenceFlow>
       <sequenceFlow id="flow5" name="" sourceRef="servicetask2" targetRef="parallelgateway2"></sequenceFlow>
       <sequenceFlow id="flow6" name="" sourceRef="parallelgateway2" targetRef="endevent1"></sequenceFlow>
       <parallelGateway id="parallelgateway3" name="Parallel Gateway"></parallelGateway>
       <sequenceFlow id="flow8" name="" sourceRef="parallelgateway3" targetRef="servicetask1"></sequenceFlow>
       <sequenceFlow id="flow9" name="" sourceRef="parallelgateway3" targetRef="servicetask2"></sequenceFlow>
       <sequenceFlow id="flow10" name="" sourceRef="startevent1" targetRef="parallelgateway3"></sequenceFlow>
   </process>

Here is the java code i used to start the process:
runtimeService.startProcessInstanceByKey("ccaProcess");

– Assume runtimeService is a spring managed bean.
– The Delegates implement JavaDelegate.
12 REPLIES 12

frederikherema1
Star Contributor
Star Contributor
It seems like you can't to have the 2 service-tasks being executed in parallel?

The parallel gateway in BPMN indicates the path of execution splits into several paths. Dis doesn't necessarily mean they should be executed by different threads. When you call the startProcessInstance(), the calling thread will execute the workflow until a wait state is reached. When reaching a parallel gateway, there are two "concurrent execution paths" created and each of them is executed until it reaches a wait state/end or parallel join. This is done by THE SAME THREAD, so the paths are executed sequentially internally. However, when the startProcessInstance() returns, all parallel executions are executed and waiting/finished. So they are actually executed in parallel in the same API-call but not parallel as in multi-threaded.

If you want to execute the 2 service-tasks in parallel, they should both have "async=true".

hellowrakesh123
Champ in-the-making
Champ in-the-making
Thank you for the post. This works but I have another problem. I am running activiti as a standalone application which is forked off using a command line. Now the problem with async=true is that, my program exits as soon as the activiti process is started. Is there a way to block until the whole process is done.
I can work around this problem by starting it in a thread and waiting till the process moves to the history but it would be great if i can get an api or more robust solution/way of doing it.?
Thank you.!
  Rakesh

frederikherema1
Star Contributor
Star Contributor
Most of the time, your application will run in some kind of environment (web server, osgi, ..) which is not just a simple main-thread and creates at least one non-deamon thread that keeps the app alive. You should google for solutions around this, it's a common java question.

hellowrakesh123
Champ in-the-making
Champ in-the-making
Hi,
I tried running the same application in tomcat but still can't get the acitiviti:async="true" working. Here is the bpm definition I have, when i set acitiviti:async="true", nothing gets executed, when i remove acitiviti:async="true", it gets executed but all in sequential order. Am i missing something here.

<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn" targetNamespace="Examples">
<process id="cca-workflow-process" name="cca-workflow-process">
  <documentation>Place documentation for the 'mtrose-bpm-workflow'
   process here.</documentation>
  <startEvent id="startevent1" name="Start"></startEvent>
  <parallelGateway id="parallelgateway1" name="Parallel Gateway"></parallelGateway>
  <serviceTask id="build" name="build"
   activiti:class="com.cisco.step.cca.engine.delegate.BuildTaskDelegate" activiti:async="true"></serviceTask>
  <parallelGateway id="parallelgateway2" name="Parallel Gateway"></parallelGateway>
  <serviceTask id="hss_sanity_c4" name="hss_sanity_c4"
   activiti:class="com.cisco.step.cca.engine.delegate.SanityTaskDelegate"></serviceTask>
  <serviceTask id="sa" name="sa"
   activiti:class="com.cisco.step.cca.engine.delegate.StaticAnalysisTaskDelegate"></serviceTask>
  <serviceTask id="cr" name="cr"
   activiti:class="com.cisco.step.cca.engine.delegate.CodeReviewTaskDelegate" activiti:async="true"></serviceTask>
  <serviceTask id="ut" name="ut"
   activiti:class="com.cisco.step.cca.engine.delegate.UnitTestTaskDelegate" activiti:async="true"></serviceTask>
  <sequenceFlow id="flow1" name="" sourceRef="startevent1"
   targetRef="parallelgateway1"></sequenceFlow>
  <parallelGateway id="parallelgateway3" name="Parallel Gateway"></parallelGateway>
  <endEvent id="endevent1" name="End"></endEvent>
  <sequenceFlow id="flow5" name="" sourceRef="build"
   targetRef="parallelgateway2"></sequenceFlow>
  <sequenceFlow id="flow6" name="" sourceRef="parallelgateway2"
   targetRef="hss_sanity_c4"></sequenceFlow>
  <sequenceFlow id="flow7" name="" sourceRef="parallelgateway2"
   targetRef="sa"></sequenceFlow>
  <parallelGateway id="parallelgateway4" name="Parallel Gateway"></parallelGateway>
  <sequenceFlow id="flow10" name="" sourceRef="parallelgateway3"
   targetRef="parallelgateway4"></sequenceFlow>
  <sequenceFlow id="flow11" name="" sourceRef="cr"
   targetRef="parallelgateway4"></sequenceFlow>
  <sequenceFlow id="flow12" name="" sourceRef="ut"
   targetRef="parallelgateway4"></sequenceFlow>
  <sequenceFlow id="flow13" name="" sourceRef="parallelgateway4"
   targetRef="endevent1"></sequenceFlow>
  <sequenceFlow id="flow14" name="" sourceRef="parallelgateway1"
   targetRef="build"></sequenceFlow>
  <sequenceFlow id="flow15" name="" sourceRef="parallelgateway1"
   targetRef="cr"></sequenceFlow>
  <sequenceFlow id="flow16" name="" sourceRef="parallelgateway1"
   targetRef="ut"></sequenceFlow>
  <receiveTask id="sanity_response" name="sanity_response" activiti:class="com.cisco.step.cca.engine.delegate.SanityStatusTaskDelegate"></receiveTask>
  <sequenceFlow id="flow17" name="" sourceRef="hss_sanity_c4"
   targetRef="sanity_response"></sequenceFlow>
  <sequenceFlow id="flow18" name="" sourceRef="sanity_response"
   targetRef="parallelgateway3"></sequenceFlow>
  <sequenceFlow id="flow19" name="" sourceRef="sa"
   targetRef="parallelgateway3"></sequenceFlow>
</process>
</definitions>

My delegates are implemented using JavaDelegate:

public class BuildTaskDelegate implements JavaDelegate {

public void execute(DelegateExecution execution) throws Exception {
                System.out.println("before…");
  Process process=Runtime.getRuntime().exec("sleep 10");
                System.out.println("after…");
}
}

frederikherema1
Star Contributor
Star Contributor
Make sure your job-executor is active. You can verify this by checking your engine-settings. The jobExecutorActivate should be true. Alternatively, you can double-check in code by getting the processEngineConfiguration-object from the ProcessEngine (cast to IMPL may be needed) en call getJobExecutor().isActive().

hellowrakesh123
Champ in-the-making
Champ in-the-making
ohh so thats the issue, it was set as false, it worked as I changed the jobExecutorActivate=true.
Thank you so much for the help.
May be this is something you might want to add to the documentation where you explained async.

frederikherema1
Star Contributor
Star Contributor
Glad it helped Smiley Wink

The section on the async states the job executor is used for this:

This background thread is the activiti job executor (actually a thread pool) which periodically polls the database for jobs. So behind the scenes, when we reach the "generate invoice" task, we are creating a job "message" for activiti to continue the process later and persisting it into the database. This job is then picked up by the job executor and executed. We are also giving the local job executor a little hint that there is a new job, to improve performance.

hellowrakesh123
Champ in-the-making
Champ in-the-making
Hi,
Another issue, although i read about this but how do we handle this (i mean how to make sure it doesn't happen). With async=true and exclusiveJob=false, it is working as expected but now i am seeing exceptions:
org.activiti.engine.ActivitiOptimisticLockingException: ExecutionEntity[4] was updated by another transaction concurrently

This happens when more than one task tries to update the parallel join nodes. If it fails, it tries again and run the task again. This is undesirable and how should i make sure that it doesn't get executed again.
I have a dirty solution, create one more task in front of each parallel task which doesn't do anything but just passes by to make sure even if it is executed, it doesn't affect anything.
Thanks in advance.!
  Rakesh

frederikherema1
Star Contributor
Star Contributor
When both tasks are "async", they both get acquired by the job-executor at the same time (since it has multiple threads available) and actually executed concurrently. When the two paths reach the join, one of them will get an exception, trying to update the shared execution that forked off the 2 parallel. The job-executor will retry a job for 3 times until it stops.

I'm afraid this is just how the engine works. Concurrent updates of the same entity in different transactions can't be handled differently, smart merging or updating can cause to stale state or even data-loss so that's not desirable.

What you can do to have minimal effect of this is trying to make sure the service YOU call in the service-task use the same transaction, so the rollback by the failing execution cleans up any work performed. If this is not possible (other DB, non-transactional operations,…) you should consider something like this:

(read full post at http://forums.activiti.org/en/viewtopic.php?f=6&t=4139&p=16108&hilit=exeternal%20thread%20pool)
  • The service-task prepares (eg. read variables from process) an creates the "job" or runnable to be executed by the external thread pool
  • Service-task queues the job (saving reference to process-instance id) and finishes
  • Right after the service-taks, there's a receive-task that waits, releasing the calling thead and not holding open the transaction.
  • Long-running operation completes and signals the process-instance and perhaps set some variables on the process based on the results before signaling.

  • Also interesting info about signaling/jobexecutor:
    http://forums.activiti.org/en/viewtopic.php?f=6&t=4138
    http://forums.activiti.org/en/viewtopic.php?f=6&t=4165&hilit=signal