cancel
Showing results for 
Search instead for 
Did you mean: 

Running parallel tasks with acitiviti

hellowrakesh123
Champ in-the-making
Champ in-the-making
Hi,
I am new with Activiti and trying to get the parallel gateways work but i couldn't get it working the way I was expecting. Am I doing something wrong here?

I have 2 tasks Build-1 and Build-2, I want to fork off the 2 tasks at the same time as soon as the workflow starts.

When i am running the workflow, its first starting the first process (Build-1) and after it completes, it starts the other (Build-2). I am not sure what is wrong here. I tried using activiti:async but it didnt work for me.

<process id="ccaProcess" name="ccaProcess">
      <startEvent id="startevent1" name="Start"></startEvent>
       <serviceTask id="servicetask1" name="Build-1" activiti:class="com.mycomp.step.cca.bpm.CcaDelegate"></serviceTask>
       <serviceTask id="servicetask2" name="Build-2" activiti:class="com.mycomp.step.cca.bpm.CcaDelegate"></serviceTask>
       <parallelGateway id="parallelgateway2" name="Parallel Gateway"></parallelGateway>
       <endEvent id="endevent1" name="End"></endEvent>
       <sequenceFlow id="flow4" name="" sourceRef="servicetask1" targetRef="parallelgateway2"></sequenceFlow>
       <sequenceFlow id="flow5" name="" sourceRef="servicetask2" targetRef="parallelgateway2"></sequenceFlow>
       <sequenceFlow id="flow6" name="" sourceRef="parallelgateway2" targetRef="endevent1"></sequenceFlow>
       <parallelGateway id="parallelgateway3" name="Parallel Gateway"></parallelGateway>
       <sequenceFlow id="flow8" name="" sourceRef="parallelgateway3" targetRef="servicetask1"></sequenceFlow>
       <sequenceFlow id="flow9" name="" sourceRef="parallelgateway3" targetRef="servicetask2"></sequenceFlow>
       <sequenceFlow id="flow10" name="" sourceRef="startevent1" targetRef="parallelgateway3"></sequenceFlow>
   </process>

Here is the java code i used to start the process:
runtimeService.startProcessInstanceByKey("ccaProcess");

– Assume runtimeService is a spring managed bean.
– The Delegates implement JavaDelegate.
12 REPLIES 12

hellowrakesh123
Champ in-the-making
Champ in-the-making
Thank you for the response. I read the threads and have managed to make the workflow work with receive tasks. However, it has invoked a doubt regarding the way workflow will serve, please correct me if my understanding is wrong.

BPMN specification defines parallel workflows which is implemented in Activiti as well. Going over the documentation and suggestions by you, the *real* parallel execution which runs tasks concurrently really isn't there by default due to the reasons mentioned in documentation and by you as well. This means even we express the workflow via parallel gateways it is really sequential. Receive tasks do provide us a way to achieve this but this limits us to use the normal service tasks. So even if the tasks are not really long running but also non db, we have to choose receive task which doesn't look right. Also with receive task, there is a problem if two tasks which run in parallel joins nodes at the same time, it still has locking issue but now we can manually synchronize it to make sure it doesn't hit the issue during signal. So the problem is same but it works using receive tasks because we know when to synchronize which we can't do with normal service task. The way I solve is to lock a common monitor in the thread spawned by the service tasks which signals the receive tasks. I acquire the class lock and makes sure all threads wait for the lock to be acquired before signal.

So here are my few questions:
1) Is there is way in service tasks to synchronize using our code (override any method) similar to common monitor.? This will solve the problem with async=true and exclusiveJob=false and allow real parallel paths concurrently.
2) Isn't it something which activiti/engine should do by default or is there is something i missing to understand how it is implemented.? This is a common problem and going for a workaround using receive tasks doesn't sound very convincing although its a simple solution. The problem with the approach you suggested is that, the first service task is just a dummy tasks which queue the request. The other problem with that approach is the service tasks is required to know the proceeding receive task which means i can't have dynamically created workflows else i will have to put some real code to make service task aware of the proceeding receive task. Infact we can work without service tasks and have the receive task do it extending ReceiveTaskActivityBehavior and starting a thread from it. Is there a specific reason for using service task before receive task or its only for queuing or i am missing anything here.

I am looking at all the aspects and trying to understand if i am making the right choice.
Thanks in advance.!
  Rakesh

frederikherema1
Star Contributor
Star Contributor
I suggest reading this thread, as it's a similar discussion…

http://forums.activiti.org/en/viewtopic.php?f=6&t=4138&start=20

hellowrakesh123
Champ in-the-making
Champ in-the-making
[img]Hi,
I read the discussion and understand the suggestions. My question is more on how to make sure *pure* service task synchronize on a monitor before completing. You mentioned following example to have mutex logic. How can this be implemented in the delegate analogous to signal event in receive tasks. The code you gave requires me to write logic outside my delegate. This will cause me to have flow control logic outside of delegate which will not be scalable in our case. We are dynamically generating the workflow and running it. The few tasks are long running ones where we are using receive tasks and works as we discussed. For the tasks which are not long running and implement using JavaDelegate (another web service call but are non-db transactional i.e. can't be rolled back), having parallel gateway causes tasks to be re-executed with OptimisticLockException. I want to make sure when they join, i implicitly synchronize complete() on a common monitor either in delegate or somewhere which is generic for all JavaDelegates and can use async=true and exclusiveJob=false for concurrent tasks.

So my requirement is simple, i have set of parallel tasks and dynamic workflow (i don't want to put too much code outside delegate as it will restrict us from generating workflows. Each time the workflow could have different number of tasks for each user). So the logic for locking needs to be generic for service tasks. All this is achieved using Receive Tasks and locking before signal and releasing in finally block and its works quite wel. How do we achieve same for the service tasks else we would be restricted to use Receive Tasks for all our requirements. Also attached the workflow we are using.

public  class YourCompanyService {


  private Set<String> processLocks = new HashSet<…>;


public boolean printTheCard(String cardIdentifier) {
   // Correlate between card-id and task/process to use
   String processId = runtimeService.createtaskQuery()……singleResult();

   // JVM-wide lock for this specific use-case.
   boolean isSafe = false;
   synchronized(this) {
        isSafe = processLocks.contains(processId);
        if(isSafe) {
           processLocks.add(processId);
        }
   }

   if(!isSafe) {
        throw new MyCompanyException("looks like you're too late…");
   }

   // important, release the lock in "finally"
   try {
        // Finish task
        taskService……
        ….
    }
    finally
    {
       synchronized(this) {
             processLocks.remove(processId);
       }

    }
}

}
[/img]