cancel
Showing results for 
Search instead for 
Did you mean: 

Question about parallel gateway

vladi
Champ in-the-making
Champ in-the-making
Hi,

i have tryed a very simple process with parallel gateway and two user task an each path:

<process id="concurency">
      <startEvent id="start" />
      <parallelGateway id="fork" />
      <userTask name="task_1" id="task_1" />
      <parallelGateway id="join" />
      <userTask name="task_2" id="task_2" />
      <userTask name="ready" id="ready" />
      <endEvent id="end" />
      
      <sequenceFlow sourceRef="start" targetRef="fork"/>
      <sequenceFlow sourceRef="fork" targetRef="task_1"/>
      <sequenceFlow sourceRef="fork" targetRef="task_2"/>
      <sequenceFlow sourceRef="task_1" targetRef="join"/>
      <sequenceFlow sourceRef="task_2" targetRef="join"/>
      <sequenceFlow sourceRef="join" targetRef="ready"/>
      <sequenceFlow sourceRef="ready" targetRef="end"/>
   </process>

My point was, to see the behaviour of activity by asynchronous completing the user tasks. Each user task will be completed in own thread. After joining parallel pathes i am expecting to get the "ready" task but it get no task at all. Activiti use the database for synchronisation and in this case is seems doesnt to work.

Probably is misunderstood the manier in which i have to use the activiti for such tasks…

Any idea?

Thanks,
Vladimir

The junit is:

public class CuncurencyTest {

    @Rule
    public LogInitializer logSetup = new LogInitializer();
    @Rule
    public ProcessDeployer deployer = new ProcessDeployer();
    private ScheduledExecutorService executor;

    @Before
    public void setUp() throws Exception {
        this.executor = Executors.newScheduledThreadPool(10);
    }

    @After
    public void tearDown() throws Exception {
        this.executor.shutdownNow();
        this.executor.awaitTermination(10, TimeUnit.SECONDS);
    }

    @Test
    @ProcessDeclared
    public void testConcurency() throws Exception {
        ProcessInstance pi = this.deployer.getProcessService().startProcessInstanceByKey(
                "concurency");
        TaskQuery query = this.deployer.getTaskService().createTaskQuery().processInstance(
                pi.getId());

        List<Task> tasks = query.list();
        List<Callable<Object>> taskWorkers = new ArrayList();
        // complete user tasks asynchronously
        for (final Task task : tasks) {
            taskWorkers.add(new Callable() {
                @Override
                public Object call() throws Exception {
                    CuncurencyTest.this.deployer.getTaskService().complete(task.getId());

                    return null;
                }
            });
        }
        List<Future<Object>> futures = this.executor.invokeAll(taskWorkers);
        // Wait for process finished
        for (final Future<Object> future : futures) {
            future.get();
        }

        tasks = query.list();
        // I would expect here the usertask "ready"!
        assertEquals(1, tasks.size());
        assertEquals("ready", tasks.get(0).getName());
    }
}
6 REPLIES 6

tombaeyens
Champ in-the-making
Champ in-the-making
process concurrency != multithreaded program execution

as you state correctly, activiti engine uses the db for synchronization.  a process is executed in transactional steps.  each step is calculated with a single thread.

because this question is  asked frequently, i wrote the answer in a faq: http://activiti.org/faq.html#WhatIsTheDifferenceBetweenProcessConcurrencyAndJavaConcurrency

let me know if this helps you further.

vladi
Champ in-the-making
Champ in-the-making
Thanks for this link, but it unfortunately does not clarify the situation. The described use case is exactly what i want to do - i have a process that will be driven by asynchronous events.

If i working with human user tasks the synchronisation over the database is certainly good enough. But in my case the events will be produced by a measuring system and probability for such clash is significantly higher.

The problem i see is that the database can not afford the synchronisation in real time in principle… Smiley Sad  What is needed is activiti engine synchronize the joinig parallel paths over an java monitor object or a semaphore. Is it realistic in any way? Can i as user of api influence it?


Regards, Vladimir

tombaeyens
Champ in-the-making
Champ in-the-making
What is needed is activiti engine synchronize the joinig parallel paths over an java monitor object or a semaphore. Is it realistic in any way?

Nope.  That doesn't work in a cluster.  Databases have decades research on optimisations to handle the concurrent access. They are much better then anything we could provide in that respect.

Your process and software design can also greatly reduce the number of potential optimistic locking failures.  I don't see yet how leveraging the DB for purposes that it is good at (isolation and concurrency) can be bad.

ruediger_hain
Champ in-the-making
Champ in-the-making
The test case above shows the process execution is stuck at the merging gateway when both tasks are completed at the same time.
It seems there is no exception thrown so we can't detect there is a problem with process execution.
There is no rollback done on one of the task completions either, otherwise we would still have one of the user tasks in the list, right?
@Tom: do you think this is supposed to work with the current version of Activiti (and the test case is wrong or we are having a configuration issue etc.) ?

tombaeyens
Champ in-the-making
Champ in-the-making
Try remodelling your test code without the workers and make it run with only 1 single test thread.  Then you should be able get more insight or otherwise better explain your problem here.

vladi
Champ in-the-making
Champ in-the-making
Imagine, you have two usertasks are completed by two different users at the same time. The testcase above simulate this situation. Or two asynchronous messages arrives at the same. You can't running it in your application in a single thread, it can be two different web sessions or the like.

I would expect a proper Exception signaling a conflict will be thrown by activiti, in order to i can retry the operation.