cancel
Showing results for 
Search instead for 
Did you mean: 

Concurrent signals to same process instance.

mhanrahan
Confirmed Champ
Confirmed Champ
Hi

I have a process that makes heavy use of the camel integration within activiti. I have a process that waits on a receive task for a message to come in via Camel, does some processing on the event, then loops back around to the receive activity and waits for the next message. The activity after the receive event is marked as asynchronous.

As part of this, I have implemented some behaviour that if the process is not waiting at the receive task (because a message has come in already and is still being processed, retry later. This relies on detecting that the signal() has failed.

My issue is that when multiple messages come in very close to each other (which is a common scenario), I get the following exception (more than once):


org.activiti.engine.ActivitiOptimisticLockingException: ByteArrayEntity[id=58975, name=var-camelBody, size=0] was updated by another transaction concurrently
        at org.activiti.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:779)[193:org.activiti.engine:5.16.3]
        at org.activiti.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:590)[193:org.activiti.engine:5.16.3]
        at org.activiti.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:211)[193:org.activiti.engine:5.16.3]
        at org.activiti.engine.impl.interceptor.CommandContext.close(CommandContext.java:137)[193:org.activiti.engine:5.16.3]
        at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:66)[193:org.activiti.engine:5.16.3]
        at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:31)[193:org.activiti.engine:5.16.3]
        at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:40)[193:org.activiti.engine:5.16.3]
        at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:35)[193:org.activiti.engine:5.16.3]
        at org.activiti.engine.impl.RuntimeServiceImpl.signal(RuntimeServiceImpl.java:227)[193:org.activiti.engine:5.16.3]
        at Proxybcddd436_0b02_498d_9638_8c0ea392a730.signal(Unknown Source)[:]
        at org.activiti.camel.ActivitiProducer.signal(ActivitiProducer.java:103)[196:org.activiti.camel:5.16.3]
        at org.activiti.camel.ActivitiProducer.process(ActivitiProducer.java:68)[196:org.activiti.camel:5.16.3]
        at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:153)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.RecipientList.process(RecipientList.java:112)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)[74:org.apache.camel.camel-core:2.12.3]
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[74:org.apache.camel.camel-core:2.12.3]


The behaviour I'm looking for is that it is the same as if the messages were a time apart - the first message get processed, and if it hasn't returned to the receive event, have an easily detectable way of handling that.

Do you have any advice on how I could achieve this?

I've tried catching the ActivitiOptimisticLockingException on the camel end, but it sometimes is thrown on the first message, so my process only gets the second or third message.

Regards,
Michael Hanrahan
1 REPLY 1

jbarrez
Star Contributor
Star Contributor
I don't think there's an easy solution for this, except putting a queue in front of the receive task …
The thing is, on the lowest level, you've got two transactions trying to do the same thing, and one wins … leading to the optimistic locking exception … that's fundamental to Activiti and not easy to circumvent.