cancel
Showing results for 
Search instead for 
Did you mean: 

Correct way to signal a receiveTask

exp2345
Champ in-the-making
Champ in-the-making
I am attempting to signal a receive task from a delegate Java thread started from a Service Task and have run into two issues with the implementation.

I have already read the threads http://forums.activiti.org/content/error-while-signaling-activity-receive-task and http://forums.activiti.org/content/signalling-process-fails-intermittently and my sample code is much simpler (no JMS) but the scenario is similar.

Two approaches tried are:

POLLING

Repeatedly calling RuntimeService.signal(executionId) until it succeeds. It works, but I'm not happy with the robustness of this solution.

RuntimeService.signal() throws a NullPointerException instead of an ActivitiException. I'm not sure if this is a bug or expected behaviour.

EVENT LISTENER

I expect that registering an event listener on ACTIVITI_STARTED before leaving the Service Task will allow me to wait until the receive state has started before starting the delegate thread. When I query for the Execution using the executionId from ActivitiEvent RuntimeService returns a null execution which prevents me from verifying that the correct task has started. Is this expected behaviour?

SERVICE TASK


import org.activiti.engine.ProcessEngine;
import org.activiti.engine.ProcessEngines;
import org.activiti.engine.RuntimeService;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventListener;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.bpmn.behavior.TaskActivityBehavior;
import org.activiti.engine.impl.pvm.delegate.ActivityExecution;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.ExecutionQuery;

public class ThreadDelegate extends TaskActivityBehavior {

    @Override
    public void execute(ActivityExecution execution) throws Exception {

        System.out.println("Started thread delegate for execution id " + execution.getId());

        Object counterObj = execution.getVariable("counter");
        if(counterObj == null) {
            execution.setVariable("counter", 0);
            System.out.println("Counter initialised at 0");
        } else {
            int counter = (int) counterObj;
            counter++;
            execution.setVariable("counter", counter);
            System.out.println("Counter incremented to " + counter);
        }
        final String executionId = execution.getId();

        final Thread t = new Thread() {
            @Override
            public void run() {
                System.out.println("Thread running");

//                Forcing the thread to sleep causes the process the execute normally
//                try {
//                    Thread.sleep(5000);
//                } catch (InterruptedException e) { }

                signalWaitState();

                /* KLUDGE alert!
                 * This does work but I hope to achieve signalling without a polling loop.
                 */
                /*
                int retries = 0;
                while(retries < 5) {
                    try {
                        signalWaitState();
                        return;
                    } catch (NullPointerException e) {
                        retries++;
                        System.out.println("Waiting before retry " + retries);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e2) { }
                    }
                }
                */
            }

            /*
              * Sometimes a NPE exception is thrown
              *     at org.activiti.engine.impl.persistence.entity.ExecutionEntity.signal(ExecutionEntity.java:375)
              * unless the thread is forced to sleep first. Presumably this is because the wait state has not been
              * executed yet.
              */
            private void signalWaitState() {
                ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
                RuntimeService runtimeService = processEngine.getRuntimeService();
                runtimeService.signal(executionId);
                System.out.println("Signalled exeuection id " + executionId);
            }
        };

        /*
         * The purpose of this event listener was to ensure that the waitState has been
         * entered before starting the delegate thread. This was intended to prevent the
         * intermittent NPE when called signal() when delegate thread completes execution.
         *
         * Even though the executionId is valid the query returns a null execution.
         */


        ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
        RuntimeService runtimeService = processEngine.getRuntimeService();
        runtimeService.addEventListener(new ActivitiEventListener() {
            @Override
            public void onEvent(ActivitiEvent event) {
                RuntimeService runtimeService = event.getEngineServices().getRuntimeService();
                System.out.println("Event type: " + event.getType() + " executionId: " + event.getExecutionId() + " piid: " + event.getProcessInstanceId());
                // TODO: filter on executionId
                try {
                    ExecutionQuery q = runtimeService.createExecutionQuery();
                    q.executionId(event.getExecutionId());
                    Execution execution = q.singleResult();
                    String activityId = execution.getActivityId();
                    System.out.println("Activity ID: " + activityId);
                    if("waitState".equals(activityId)) {
                        t.start();
                    }
                    System.out.println("Started delegate thread.");
                } finally {
                    runtimeService.removeEventListener(this);
                }
            }

            @Override
            public boolean isFailOnException() {
                return true;
            }
        });


        this.leave(execution);
        System.out.println("Left service task");

        // Start the thread here unless using the event listener above
//        t.start();
        System.out.println("Started delegate thread.");
    }
}


BPMN

<blockcode>
<process id="simpleThreaded" name="Simple Threaded" isExecutable="true">
    <startEvent id="sid-E3A6E22C-879A-445A-A41E-54D32380A289"/>
   
    <serviceTask id="threadDelegate" activiti:class="ThreadDelegate"/>
   
    <receiveTask id="waitState"/>
   
   <exclusiveGateway id="sid-35B8B065-DCB0-44B1-B14B-DE14789F0194"/>

    <intermediateCatchEvent id="sid-8BD44534-5684-4B80-AA0B-7FA1E4C2F7A5">
      <timerEventDefinition>
        <timeDuration>PT5S</timeDuration>
      </timerEventDefinition>
    </intermediateCatchEvent>
   
   <endEvent id="sid-3E6882EB-427F-42B9-AB41-973B95024CBE"/>

    <sequenceFlow id="sid-E62C4877-7B6B-4BAE-9708-89540EC67DE8" sourceRef="sid-8BD44534-5684-4B80-AA0B-7FA1E4C2F7A5" targetRef="threadDelegate"/>
    <sequenceFlow id="sid-C86D79E2-521F-4B97-8695-4EC1315D6232" sourceRef="waitState" targetRef="sid-35B8B065-DCB0-44B1-B14B-DE14789F0194"/>
    <sequenceFlow id="sid-C3EB1FBA-316A-41FE-B29C-B9B6051B6C59" sourceRef="sid-E3A6E22C-879A-445A-A41E-54D32380A289" targetRef="threadDelegate"/>
    <sequenceFlow id="out" sourceRef="threadDelegate" targetRef="waitState"/>
    <sequenceFlow id="sid-43094329-90FD-4E8E-890F-2667044EBD2B" sourceRef="sid-35B8B065-DCB0-44B1-B14B-DE14789F0194" targetRef="sid-3E6882EB-427F-42B9-AB41-973B95024CBE">
      <conditionExpression xsi:type="tFormalExpression"><![CDATA[${counter > 3}]]></conditionExpression>
    </sequenceFlow>
    <sequenceFlow id="sid-CE41E03C-2AB5-4B71-A04D-AD22ED8A06F0" sourceRef="sid-35B8B065-DCB0-44B1-B14B-DE14789F0194" targetRef="sid-8BD44534-5684-4B80-AA0B-7FA1E4C2F7A5">
      <conditionExpression xsi:type="tFormalExpression"><![CDATA[${counter <= 3}]]></conditionExpression>
    </sequenceFlow>
  </process>
</blockcode>
5 REPLIES 5

jbarrez
Star Contributor
Star Contributor
> I expect that registering an event listener on ACTIVITI_STARTED before leaving the Service Task will allow me to wait until the receive state > has started before starting the delegate thread. When I query for the Execution using the executionId from ActivitiEvent RuntimeService
> returns a null execution which prevents me from verifying that the correct task has started. Is this expected behaviour?

Yes - the event is fired but probably the transaction hasn't been committed to the database.

If the call is so fast, why make the call async? Doing the call synchronous would be logical here.
Anyway, in this case polling is probably easiest.

exp2345
Champ in-the-making
Champ in-the-making
Thanks for your reply. Many of the calls do take a long time, but I created this example to reproduce behaviour I observed then the calls happened to return immediately.

I'll go ahead with polling and look into patching signal() to throw something other than NullPointerException when the execution hasn't been committed yet.

Edit: According to the API it should return ActivitiObjectNotFoundException (http://activiti.org/javadocs/org/activiti/engine/RuntimeService.html#signal(java.lang.String))

exp2345
Champ in-the-making
Champ in-the-making
Is it expected behaviour for the thread that calls signal() to continue executing the process even if that thread is not part of the executor pool?

The documentation indicates that this is not the case:

"A process instance can have various wait states and this service contains various operations to 'signal' the instance that the external trigger is received and the process instance can be continued."

I'd expect the 'signal' to immediately return yielding control back to the external trigger instead of executing further commands within the external trigger's thread. Please find attached a log file demonstrating this behaviour using the class and bpmn above.

exp2345
Champ in-the-making
Champ in-the-making
Is it expected behaviour for the thread that calls signal() to continue executing the process even if that thread is not part of the executor pool?

The documentation indicates that this is not the case:

"A process instance can have various wait states and this service contains various operations to 'signal' the instance that the external trigger is received and the process instance can be continued."

I'd expect the 'signal' to immediately return yielding control back to the external trigger instead of executing further commands within the external trigger's thread. Please find attached a log file demonstrating this behaviour using the class and bpmn above.

trademak
Star Contributor
Star Contributor
Yes this is expected behaviour. It will execute until the next wait state.
If this is not the desired behaviour you can add an asynchronous element in the process definition so the thread will only execute until this asynchronous element.

Best regards,