cancel
Showing results for 
Search instead for 
Did you mean: 

Dynamic async flows

mreiterer
Champ in-the-making
Champ in-the-making
Hi all,

i'm new to activiti and trying to figure out the best way to model my requirements, but did not succeed up to now.

My Requirements:

Step 1: Call external system

Step 2: receive message from external system, message can be of 2 different types: "fileReveived", "success"
The "fileReceived" message can be delivered x-times by the external system, every message should trigger an async flow. (long running and therefore should be executed in parallel).
The "success" message is delivered after all "fileReceived" calls, after receiving "success" the main flow should check if all sub flows are ended and the proceed to the next steps ..


I tried to model the sub-flow as an event subprocess, which is triggered by a message event, but after triggering the subprocess from the main flow via messageEventReveived the whole process instance is ended.




Thanks in advance,
Markus
7 REPLIES 7

martin_grofcik
Confirmed Champ
Confirmed Champ
Hi Markus,

Event subprocess can be interrupting and non interrupting.

Does it solve your issue?

Regards
Martin

mreiterer
Champ in-the-making
Champ in-the-making
Hi Martin,

thanks for your answer.

Can you explain what interrupting exactly means ?

My solution is now to call a sub-process in a java service task via runtimeService.startProcessInstanceByMessage.
This task is marked as async and also the first step in the sub-process is marked as async.

Regards
Markus

mreiterer
Champ in-the-making
Champ in-the-making
Hi Martin,

the user guide states:
Activiti only supports interrupting Event Sub-Processes.

Is this still true ?

martin_grofcik
Confirmed Champ
Confirmed Champ
Hi Markus,

Yes that's true. Sorry for my previous answer - I just saw that there is interrupting flag in the behavior and I assumed that it works.
But, there is failing jUnit test for nonInterrupting behavior. org.activiti.engine.test.bpmn.event.message.MessageEventSubprocessTest#FAILING_testNonInterruptingUnderProcessDefinition.

(If you want to you can fix it and create pull request.)

Regards
Martin

spartan
Champ in-the-making
Champ in-the-making
Hey !

I currently have the exact same need and was really rellying on Activiti to handle the synchronisation between the main process and the async updates. Still, non-interrupting subprocesses are not supported.

Do you guys have any idea or workaround about this ?

warper
Star Contributor
Star Contributor
There is a possibility to spawn as many executions as you like with ParallelGateway.
After that all you need is careful avoiding of optimisticLocking exception. It's done in exclusive event gateway, exclusive synchronous increment of number of expected results and then asynchronous non-exclusive script task that can set promptly unique process variable.

Take a look at this example process.
<code>
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlnsSmiley Surprisedmgdc="http://www.omg.org/spec/DD/20100524/DC" xmlnsSmiley Surprisedmgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
  <signal id="fileReveived" name="fileReveived" activiti:scope="global"></signal>
  <signal id="success" name="success" activiti:scope="global"></signal>
  <signal id="anotherFileCompleted" name="anotherFileCompleted" activiti:scope="global"></signal>
  <process id="testProcess2" name="testProcess2" isExecutable="true">
    <startEvent id="startevent1" name="Start"></startEvent>
    <scriptTask id="scripttask1" name="Increment number of flows" scriptFormat="groovy" activiti:autoStoreVariables="false">
      <script>execution.setVariable("nrOfFilesReceived", );
</script>
    </scriptTask>
    <intermediateCatchEvent id="signalintermediatecatchevent1" name="SignalCatchEvent">
      <signalEventDefinition signalRef="fileReveived"></signalEventDefinition>
    </intermediateCatchEvent>
    <sequenceFlow id="flow11" sourceRef="signalintermediatecatchevent1" targetRef="parallelgateway2"></sequenceFlow>
    <intermediateCatchEvent id="signalintermediatecatchevent2" name="SignalCatchEvent">
      <signalEventDefinition signalRef="fileReveived"></signalEventDefinition>
    </intermediateCatchEvent>
    <parallelGateway id="parallelgateway2" name="Parallel Gateway"></parallelGateway>
    <sequenceFlow id="flow12" sourceRef="parallelgateway2" targetRef="scripttask1"></sequenceFlow>
    <endEvent id="endevent1" name="End"></endEvent>
    <sequenceFlow id="flow15" sourceRef="parallelgateway2" targetRef="eventgateway1"></sequenceFlow>
    <scriptTask id="scripttask2" name="count results" scriptFormat="groovy" activiti:autoStoreVariables="false">
      <script>// check for completion of all filereceive tasks
// you have number of tasks in nrOfFilesReceived
// also you have results in variables resultXXXX
// query engine for process variables, count them</script>
    </scriptTask>
    <sequenceFlow id="flow16" sourceRef="signalintermediatecatchevent2" targetRef="scripttask2"></sequenceFlow>
    <scriptTask id="scripttask5" name="Script Task" activiti:async="true" activiti:exclusive="false" scriptFormat="groovy" activiti:autoStoreVariables="false">
      <script>//do your job here
//after all work is done, set result
execution.setVariable("result${execution.getId()}", 1);

//try to send async signal anotherFileCompleted to result collection execution, catch and ignore exceptions</script>
    </scriptTask>
    <sequenceFlow id="flow18" sourceRef="scripttask1" targetRef="scripttask5"></sequenceFlow>
    <sequenceFlow id="flow19" sourceRef="scripttask5" targetRef="endevent1"></sequenceFlow>
    <serviceTask id="servicetask1" name="Init number of files" activiti:expression="${0}" activiti:resultVariableName="nrOfFilesReceived"></serviceTask>
    <sequenceFlow id="flow20" sourceRef="startevent1" targetRef="servicetask1"></sequenceFlow>
    <eventBasedGateway id="eventgateway1" name="Event Gateway"></eventBasedGateway>
    <sequenceFlow id="flow21" sourceRef="eventgateway1" targetRef="signalintermediatecatchevent2"></sequenceFlow>
    <sequenceFlow id="flow22" sourceRef="eventgateway1" targetRef="signalintermediatecatchevent1"></sequenceFlow>
    <sequenceFlow id="flow23" sourceRef="servicetask1" targetRef="eventgateway1"></sequenceFlow>
    <exclusiveGateway id="exclusivegateway1" name="Exclusive Gateway"></exclusiveGateway>
    <endEvent id="endevent2" name="End"></endEvent>
    <sequenceFlow id="flow25" name="all done" sourceRef="exclusivegateway1" targetRef="endevent2"></sequenceFlow>
    <sequenceFlow id="flow26" sourceRef="scripttask2" targetRef="exclusivegateway1"></sequenceFlow>
    <intermediateCatchEvent id="signalintermediatecatchevent3" name="SignalCatchEvent">
      <signalEventDefinition signalRef="anotherFileCompleted"></signalEventDefinition>
    </intermediateCatchEvent>
    <sequenceFlow id="flow27" name="waiting for them" sourceRef="exclusivegateway1" targetRef="signalintermediatecatchevent3"></sequenceFlow>
    <sequenceFlow id="flow28" sourceRef="signalintermediatecatchevent3" targetRef="scripttask2"></sequenceFlow>
  </process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_testProcess2">
    <bpmndi:BPMNPlane bpmnElement="testProcess2" id="BPMNPlane_testProcess2">
      <bpmndi:BPMNShape bpmnElement="startevent1" id="BPMNShape_startevent1">
        <omgdc:Bounds height="35.0" width="35.0" x="10.0" y="210.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="scripttask1" id="BPMNShape_scripttask1">
        <omgdc:Bounds height="55.0" width="131.0" x="480.0" y="151.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="signalintermediatecatchevent1" id="BPMNShape_signalintermediatecatchevent1">
        <omgdc:Bounds height="35.0" width="35.0" x="310.0" y="161.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="signalintermediatecatchevent2" id="BPMNShape_signalintermediatecatchevent2">
        <omgdc:Bounds height="35.0" width="35.0" x="310.0" y="260.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="parallelgateway2" id="BPMNShape_parallelgateway2">
        <omgdc:Bounds height="40.0" width="40.0" x="390.0" y="158.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="endevent1" id="BPMNShape_endevent1">
        <omgdc:Bounds height="35.0" width="35.0" x="790.0" y="161.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="scripttask2" id="BPMNShape_scripttask2">
        <omgdc:Bounds height="55.0" width="105.0" x="400.0" y="250.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="scripttask5" id="BPMNShape_scripttask5">
        <omgdc:Bounds height="55.0" width="105.0" x="650.0" y="151.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="servicetask1" id="BPMNShape_servicetask1">
        <omgdc:Bounds height="55.0" width="105.0" x="80.0" y="200.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="eventgateway1" id="BPMNShape_eventgateway1">
        <omgdc:Bounds height="40.0" width="40.0" x="220.0" y="207.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="exclusivegateway1" id="BPMNShape_exclusivegateway1">
        <omgdc:Bounds height="40.0" width="40.0" x="560.0" y="257.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="endevent2" id="BPMNShape_endevent2">
        <omgdc:Bounds height="35.0" width="35.0" x="730.0" y="260.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="signalintermediatecatchevent3" id="BPMNShape_signalintermediatecatchevent3">
        <omgdc:Bounds height="35.0" width="35.0" x="563.0" y="330.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge bpmnElement="flow11" id="BPMNEdge_flow11">
        <omgdi:waypoint x="345.0" y="178.0"></omgdi:waypoint>
        <omgdi:waypoint x="390.0" y="178.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow12" id="BPMNEdge_flow12">
        <omgdi:waypoint x="430.0" y="178.0"></omgdi:waypoint>
        <omgdi:waypoint x="480.0" y="178.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow15" id="BPMNEdge_flow15">
        <omgdi:waypoint x="410.0" y="158.0"></omgdi:waypoint>
        <omgdi:waypoint x="409.0" y="126.0"></omgdi:waypoint>
        <omgdi:waypoint x="188.0" y="126.0"></omgdi:waypoint>
        <omgdi:waypoint x="188.0" y="226.0"></omgdi:waypoint>
        <omgdi:waypoint x="220.0" y="227.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow16" id="BPMNEdge_flow16">
        <omgdi:waypoint x="345.0" y="277.0"></omgdi:waypoint>
        <omgdi:waypoint x="400.0" y="277.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow18" id="BPMNEdge_flow18">
        <omgdi:waypoint x="611.0" y="178.0"></omgdi:waypoint>
        <omgdi:waypoint x="650.0" y="178.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow19" id="BPMNEdge_flow19">
        <omgdi:waypoint x="755.0" y="178.0"></omgdi:waypoint>
        <omgdi:waypoint x="790.0" y="178.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow20" id="BPMNEdge_flow20">
        <omgdi:waypoint x="45.0" y="227.0"></omgdi:waypoint>
        <omgdi:waypoint x="80.0" y="227.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow21" id="BPMNEdge_flow21">
        <omgdi:waypoint x="240.0" y="247.0"></omgdi:waypoint>
        <omgdi:waypoint x="240.0" y="277.0"></omgdi:waypoint>
        <omgdi:waypoint x="310.0" y="277.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow22" id="BPMNEdge_flow22">
        <omgdi:waypoint x="240.0" y="207.0"></omgdi:waypoint>
        <omgdi:waypoint x="240.0" y="178.0"></omgdi:waypoint>
        <omgdi:waypoint x="310.0" y="178.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow23" id="BPMNEdge_flow23">
        <omgdi:waypoint x="185.0" y="227.0"></omgdi:waypoint>
        <omgdi:waypoint x="220.0" y="227.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow25" id="BPMNEdge_flow25">
        <omgdi:waypoint x="600.0" y="277.0"></omgdi:waypoint>
        <omgdi:waypoint x="730.0" y="277.0"></omgdi:waypoint>
        <bpmndi:BPMNLabel>
          <omgdc:Bounds height="14.0" width="37.0" x="611.0" y="260.0"></omgdc:Bounds>
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow26" id="BPMNEdge_flow26">
        <omgdi:waypoint x="505.0" y="277.0"></omgdi:waypoint>
        <omgdi:waypoint x="560.0" y="277.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow27" id="BPMNEdge_flow27">
        <omgdi:waypoint x="580.0" y="297.0"></omgdi:waypoint>
        <omgdi:waypoint x="580.0" y="330.0"></omgdi:waypoint>
        <bpmndi:BPMNLabel>
          <omgdc:Bounds height="14.0" width="78.0" x="597.0" y="316.0"></omgdc:Bounds>
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="flow28" id="BPMNEdge_flow28">
        <omgdi:waypoint x="563.0" y="347.0"></omgdi:waypoint>
        <omgdi:waypoint x="452.0" y="347.0"></omgdi:waypoint>
        <omgdi:waypoint x="452.0" y="305.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</definitions>
</code>

P.S. Waiting for results is not ideal, it needs testing, and there are other possibilities. I've not test it, to be honest.

spartan
Champ in-the-making
Champ in-the-making
Thank you Warper, this combo seems to be doing the trick.