cancel
Showing results for 
Search instead for 
Did you mean: 

Synchronization puzzle

fritz128
Champ in-the-making
Champ in-the-making
To understand why I decide to use a semaphore for async process launching You should use the whole post In other case you can read only question section.
Question:
Can I synchronously launch asynchronous subprocess? I mean (see code below), if I release semaphore inside a asynchronous subprocess, can I stop for-loop till the async subprocess will be started?

for (int i = 0; i < 10; i++) {
  semaphore.acquire();
  runtimeService.signalEventReceived(…); //Launch async subprocess, inside which I release the semaphore
}


My worflow diagram:
http://s1023.photobucket.com/user/bahmatjuk/media/processdiagram_zps180927cd.png.html?sort=3&o=0

What I have
Multiple asynchronous subprocesses that are started by signal.
I launch my subprocesses in for loop from 'B' user task. But each subprocess should contain variables with the same name.. It's very importand, because when I start subprocesses in for-loop with signalEventReceived method I can write only to global context, and then rewrite them to local context. It means that I should use some synchronization to ensure that the next subprocess won't rewrite global context before the previous copy variables to it own local context

Synchronization:
I achieve synchronization with help of semaphore:
handler bean (outside of activiti process)

for (int intVar: intVarList) {
  semaphore.acquire(); //ACQUIRE SEMAPHORE
  Map<String, Object> globalVars = new HashMap<String, Object>();
  globalVars.put("var", intVar);
  runtimeService.signalEventReceived("launchCandidateSubprocess", mainProcessExecution.getId(), globalVars);
}

And then in each subprocess:

public class FirstSubprocessService implements JavaDelegate [
  @Override
  public void execute(delegateExecution execution) {
     //……
     Integer localVar = (Integer) execution.get("var");
     execution.setVariableLocal("var", localVar);
     semaphore.release(); // RELEASE THE SAME SEMAPHORE
  }
}


What a problem:
I acquire semaphore in the main thread, but I have to release semaphore in async subprocess! Activiti doesn't run new async subprocess directly from runtimeService.signalEventReceived method. It means that 'ACQUIRE SEMAPHORE' line reaches before async subprocess is launched, as a result before it can release a semaphore

FAQ:
Q1: Why not multiinstance?
A1: Because I don't know how many subprocesses should I start. I may want to start additional subprocesses in any moment.
Q2: Are semaphores in handler and FirstSubprocessService the same?
A2: Yes. I manage this with help of SingletonSemaphorefactoryBean.
5 REPLIES 5

trademak
Star Contributor
Star Contributor
I don't think I understand it yet. If you want each sub process to have it's own scope of variables I would recommend to use a call activity. Is than an option for you?

Best regards,

fritz128
Champ in-the-making
Champ in-the-making
As I understand I can't simultaneously launch multiple async callActivities from one user task.
Question1 Can I?

main point
And another question. I've already written a demo with the same Activiti/Spring configurations, the same workflow and the same way to call asynchronous subprocesses.  And it works! But the same doesn't work in my application!
I debuged it and it appeares that in my app after signalEventReceived doesn't invoke next method of java.lang.Thread class. In my demo where all works fine it the proceeding of new async subprocess begins from those method.:
<java>
@Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }
</java>
Question2
How does signalEventReceived works. Does it directly invoke  Thread.run method??

jbarrez
Star Contributor
Star Contributor
signavlEventReceived uses the current thread, ie the thread which calls that method.

In case of async, it will hand it over to the job executor. Is the job executor running?

fritz128
Champ in-the-making
Champ in-the-making
How can I check if the JobExecutor is running?
I've set the next option property name="jobExecutorActivate" value="true"
As I mentioned above, in case when it doesn't work it doesn't invole Thread.run and ExecuteJobRunnable.run methods!

What is VERY STRANGE:
I've the same code in two places of my application: in first it works, in the second it doesn't!

The first (works fine):
<java>
runMainProcess();
for (int i = 0; i < 10; i++)  {
   SemaphoreWrapper.acquire();
   runtimeService.signalEventReceived("signal", executionId);
}
</java>

But if to move for-loop and runMainProcess() to two different requests it DOESN'T work. I've checked in debug: executionId, commandExecutor and all other parameters are the same!

In which cases Activiti doesn't run new async subprocess before next cycle of for-loop?

jbarrez
Star Contributor
Star Contributor
That sounds very very weird … can't really explain why the exact code does two different things dependending on the setup.