cancel
Showing results for 
Search instead for 
Did you mean: 

ActivitiOptimisticLockingException from RuntimeService.signa()

flowmonkey
Champ in-the-making
Champ in-the-making
Hello,
I have a seemingly common use case for a round robin processing distribution.

An Activiti process holds a collection of items to process. The processing of the items is handled by an external service and can be executed in parallel on a number of external processors. I created a multi-instance subprocess, each instance of which removes one item from the parent's collection and sends it to its corresponding external processor along with its execution ID. It then waits for the processing to complete (in a receive task) and then loops if there are any more items left in the parent's collection to be processed.

Once an external processor completes processing of an item it sends back a message containing the execution ID. A thread waiting for these messages (external to the engine) then calls RuntimeService.signal() method to resume the corresponding execution. The multi-instance executes exclusively so there is at most one instance of the subprocess running at a time. This running instance modifies the parent's collection (removes the next item to process).

What I occasionally see is an ActivitiOptimisticLockingException thrown to the thread calling RuntimeService.signal() complaining that the parent's collection was modified concurrently. Granted, it was likely modified while this particular execution was waiting in a receive task, but there should be no concurrent modifications of any kind because there is no asynchronous non-exclusive tasks/subprocesses anywhere in the process definition.

I also noticed that signal() flushed updates to the database (where the exception is thrown from). Is this intended? If the main process thread has the lock (running a different instance of the multi-instance) and the external thread calling signal() writes to the database then the exception makes sense but isn't signal() only supposed to mark the execution to be picked up by the job executor?

What am I doing wrong?

Thanks!
7 REPLIES 7

trademak
Star Contributor
Star Contributor
So you are running the multi-instance sub process in sequential mode?
Could you post the exception stack trace and the bpmn snippet showing the multi instance sub process definition?

Thanks,

flowmonkey
Champ in-the-making
Champ in-the-making
Tijs, thanks for replying. I'm not running the subprocess sequentially, but rather as a non-sequential exclusive. I need for the external service requests to be sent out all at once and then resume the corresponding execution once the external processing is complete. My understanding, based on the parallel gateway description in your book and a few forum posts I read on the subject, is that with exclusive execution there is a single thread performing a depth first search to find a non-wait (or coming out of wait) branch to run. In a sense, it's a form of queuing to a single processing thread, which makes a lot of sense to avoid synchronization issues and performance impact from synchronization. So if the main processing thread is the "consumer" of this "queuing" then a "producer" would be any thread signaling an execution in a wait state to resume by calling RuntimeService.signal(executionId).  So while my subprocess instances are executing "in parallel" meaning that any one of them can be told to resume at any time, any non-wait portion of each instance is still executing sequentially by a single main processing thread.

So again, in my understanding, unless the job executor is running multiple threads, which it shouldn't unless there are explicit asynchronous non-exclusive tasks (not a good idea in my opinion - I wonder why this is even allowed), I shouldn't be seeing any concurrent modification of any data. What I'm seeing is that an ActivitiOptimisticLockingException is thrown to a "producer" thread when it tries to mark an instance of the subprocess to be picked up next time the main processing thread gets a chance to run it.

Thanks for your help!

Here's the exception I'm getting:

org.activiti.engine.ActivitiOptimisticLockingException: ByteArrayEntity[id=220, name=var-testCases, size=693] was updated by another transaction concurrently
at org.activiti.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:620)
at org.activiti.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:502)
at org.activiti.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:175)
at org.activiti.engine.impl.interceptor.CommandContext.close(CommandContext.java:122)
at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:66)
at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:31)
at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:40)
at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:35)
at org.activiti.engine.impl.RuntimeServiceImpl.signal(RuntimeServiceImpl.java:214)
at com.huawei.rndtools.wise.ebuswebsandbox.DemoServer$1.run(DemoServer.java:87)
at java.lang.Thread.run(Thread.java:744)



Here is the subprocess definition:

    <subProcess id="subprocess1" name="Sub Process">
      <multiInstanceLoopCharacteristics isSequential="false" activiti:collection="gtrList" activiti:elementVariable="gtr"></multiInstanceLoopCharacteristics>
      <startEvent id="startevent2" name="Start"></startEvent>
      <sequenceFlow id="flow23" sourceRef="startevent2" targetRef="servicetask6"></sequenceFlow>
      <serviceTask id="servicetask6" name="Run Test Case" activiti:class="com.huawei.rndtools.wise.designersandbox.demo.services.RunTestCaseService">
        <extensionElements>
          <activiti:executionListener event="end" class="com.huawei.rndtools.wise.useragent.ActivitiEngineImpl"></activiti:executionListener>
        </extensionElements>
      </serviceTask>
      <sequenceFlow id="flow24" sourceRef="servicetask6" targetRef="receivetask1"></sequenceFlow>
      <receiveTask id="receivetask1" name="Wait for completion"></receiveTask>
      <sequenceFlow id="flow25" sourceRef="receivetask1" targetRef="servicetask7"></sequenceFlow>
      <exclusiveGateway id="exclusivegateway1" name="Exclusive Gateway"></exclusiveGateway>
      <sequenceFlow id="flow26" sourceRef="servicetask7" targetRef="exclusivegateway1"></sequenceFlow>
      <sequenceFlow id="flow27" sourceRef="exclusivegateway1" targetRef="servicetask6">
        <conditionExpression xsi:type="tFormalExpression"><![CDATA[${!testCases.isEmpty()}]]></conditionExpression>
      </sequenceFlow>
      <serviceTask id="servicetask7" name="Process Test Case Completion" activiti:class="com.huawei.rndtools.wise.designersandbox.demo.services.ProcessTestCaseCompletionService">
        <extensionElements>
          <activiti:executionListener event="end" class="com.huawei.rndtools.wise.useragent.ActivitiEngineImpl"></activiti:executionListener>
        </extensionElements>
      </serviceTask>
      <endEvent id="endevent2" name="End"></endEvent>
      <sequenceFlow id="flow28" sourceRef="exclusivegateway1" targetRef="endevent2">
        <conditionExpression xsi:type="tFormalExpression"><![CDATA[${testCases.isEmpty()}]]></conditionExpression>
      </sequenceFlow>
    </subProcess>

flowmonkey
Champ in-the-making
Champ in-the-making
Just trying to resuscitate this thread. Thanks!

jbarrez
Star Contributor
Star Contributor
Sorry for not responding, but I didn't know how to help when I read it …. I have no clue why you're seeing this if everything is sync, to be honest.

Do you have a simple unit test of your process where this is happening?

flowmonkey
Champ in-the-making
Champ in-the-making
Joram, thanks for responding. The problem is intermittent by nature, so I have no test that would reproduce it reliably. Maybe we can attack it from a different angle. Assuming this is not a bug, let me describe what I'm doing and perhaps you can give me some pointers as to how to do it better.

The problem:
- I'm running a fully automated flow that does not involve user tasks;
- I need to distribute a large number of long-running service tasks to a set of external processors and execute them concurrently
- I need to use Activiti essentially as a scheduler for these tasks
- The model is simple: given the number of external processors dispatch a task to each one and when a task completes on a processor dispatch the next task to this processor; do this until there are no more tasks and terminate the process.

Implementation (Activiti part):
- Create a multi-instance async exclusive sub-process, each instance corresponding to an external processor; the exclusivity should guarantee that a single thread runs any instance of the sub-process at any given time
- Keep a queue of tasks in the parent process; again, exclusivity should guarantee that even though this queue is "shared" among all instances of the sub-process any access to it still happens from a single thread
- Run a loop in the sub-process: check if the queue is not empty; deque a task; send it to the processor; hit a receive state. Use the execution ID as the transaction ID

Implementation (external to Activiti):
- Implement a callback to process external task completions (separate thread or threads external to Activiti); the transaction ID (Activiti execution ID) is passed on the callback
- Any time an external task completes call signal() on the Runtime Service passing the corresponding execution ID.

The issue:
ActivitiOptimisticLockingException is thrown intermittently to the thread listening for callback events (external to Activiti) when it calls signal() (see the stack trace above).

What am I doing wrong, or is there a better, more "Activiti-like" way of doing this?

Side note: this description does not explain why I need for the multi-instance sub-process to be async. The same algorithm should produce a similar behavior regardless of the sub-process being sync or async exclusive. The reason is that there is another branch with a wait task parallel to the sub-process and I want for that wait state to be hit before any of the sub-process instances start executing.

Thanks!

trademak
Star Contributor
Star Contributor
What I think is happening is that there are multiple signal events being executed at the same time for the same process instance. This will throw an ActivitiOptimisticLockingException for the second event. A solution could be to use a queue to make sure one event is handled at the same time. Another solution could be to retry after a short time period.

Best regards,

flowmonkey
Champ in-the-making
Champ in-the-making
Thanks Tijs! Funny that as I was typing the previous post, this part: "Implement a callback to process external task completions (separate thread or threads external to Activiti)" gave me a pause. It made me wonder if multiple threads calling signal() could be the problem. I went back to look at the code and it *appears* that this is just a single thread but I need to do more digging to know for sure. I also made a mental note to synchronize around the call to signal() and see if the problem goes away. And now you are essentially saying the same thing.

I'll make sure that the call to signal() is thread-safe and post what I see.

Thanks again!