cancel
Showing results for 
Search instead for 
Did you mean: 

Recommended approach for long running tasks

lofi
Champ in-the-making
Champ in-the-making
What is the recommended approach to run a long running task? Currently I'm using a java service task which starts the external processes via ProcessBuilder. In extreme cases these processes can run for hours and days.

From what I found on this forum regarding the same question it may be that some activiti connections remain open which will cause a timeout. In order to prevent this, the task is flagged with async=true. However that means that the task is flagged as "finished" and activiti continues the workflow while the process is actually still running, but it should wait until the external process is finished.

Is there a mechanism to run long running tasks without system resources being occupied and with activiti waiting for the task to finish?

Thank you very much for the feedback!
15 REPLIES 15

lofi
Champ in-the-making
Champ in-the-making
In addition here's the code I'm using, unfortunately I'm not allowed to edit my post.

Creation of the task:


ServiceTask task = new ServiceTask();
task.setAsynchronous( true);
task.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_CLASS);
task.setImplementation( "application.ShellTask");

List<FieldExtension> fields = new ArrayList<>();

FieldExtension field = new FieldExtension();
field.setFieldName( "shellCommand");
field.setStringValue( "notepad.exe");
fields.add( field);

task.setFieldExtensions( fields);

and the process invoker, it simply executes the process asynchronouosly, here simply using Runtime's exec method:


package application;

import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.Expression;
import org.activiti.engine.delegate.JavaDelegate;

public class ShellTask implements JavaDelegate {

  Expression shellCommand;

  public void execute(DelegateExecution execution) throws Exception {

    String command = (String) shellCommand.getValue( execution);

    Runtime.getRuntime().exec( command);

  }
}

martin_grofcik
Confirmed Champ
Confirmed Champ
Hi Lofi,

Yes there is, you have to make your shell script asynchronous. It means that it will just get signal from activiti to do something and immediately returns.
After that process execution can wait in signal/message intermediate catching event to process further.

Regards
Martin

lofi
Champ in-the-making
Champ in-the-making
Thank you. I read that up. If I understand it correctly, that means I'd have to model it explicitly in the BPMN workflow, doesn't it? Is there no other way, i. e. internally via code?

What I would like to have in terms of the bpmn model:

Start Event -> Java Service Task which executes a Process asynchronously in another thread -> End Event

where End Event isn't reached until the external (async) process is finished. I'd prefer that you wouldn't have to model another element between the Service Task and the End Event, if possible. Because if I have 10 elements in sequence, that intermediate catching event necessity would make 20 elements of it. Which normally wouldn't be a problem, but it increases complexity for the ones who model the workflow.

martin_grofcik
Confirmed Champ
Confirmed Champ
Hi Lofi,

The proposed changes affect process model and service implementation too.

If the only issue is the complexity of the model, you have several possibilities to cope with that: e.g.
1. Encapsulate call into subprocess,
2. create custom task to do the job.

Regards
Martin

lofi
Champ in-the-making
Champ in-the-making
Thank you Martin. "Encapsulating call into subprocess" is my 2nd option, thanks to your help I now know how to wait for the async task completion using the eventing mechanism. However, I'd rather have your "create custom task" suggestion, but I don't exactly know what you mean.

Should I extend from AbstractCustomServiceTask, implement my own ActivitiBehaviour or create my own custom extension as described here:

http://www.activiti.org/userguide/#bpmnCustomExtensions

Again, thank you very much for you help!

martin_grofcik
Confirmed Champ
Confirmed Champ
Hi Lofi,

It depends on your requirements, how far you go. 🙂
Let's start with the simple custom task and you can extend it further….

Regards
Martin

lofi
Champ in-the-making
Champ in-the-making
So after searching how to create a simple custom task I ended up with the information that instead of a JavaDelegate I'd have to implement SignallableActivityBehavior.

The problem I'm facing now that the signal doesn't arrive. All I could send is signal( executionId), but I need a custom signal (success, error, etc), without knowing the execution id.

My code simplified:


public class MyCustomTask implements SignallableActivityBehavior {

  public static String MY_CUSTOM_TASK_SIGNAL = "TaskFinishedSignal";
 
  @Override
  public void execute(ActivityExecution execution) throws Exception {

    Thread thread = new Thread( new MyRunnable());
    thread.start();
   
  }

  @Override
  public void signal(ActivityExecution execution, String signalEvent, Object signalData) throws Exception {

    if( !MY_CUSTOM_TASK_SIGNAL.equals( signalEvent))
      return;
   
    PvmTransition transition = execution.getActivity().getOutgoingTransitions().get( 0);
    execution.take( transition);
   
  }

}

public class MyRunnable implements Runnable {

@Override
public void run() {

   try {
 
  System.out.println( "Start MyRunnable");
 
  Thread.sleep( 10000);
 
  System.out.println( "Stop MyRunnable");
 
   } catch( InterruptedException e) {
 
  e.printStackTrace();
 
   } finally {
 
  Map<String,Object> map = new HashMap<>();
  processEngine.getRuntimeService().signal( MY_CUSTOM_TASK_SIGNAL, map);
 
   }

}

}

I can't hand over the execution id to the runnable because otherwise the MyRunnable class would have to manage it. All it should have to do is to simply send a signal that the task is finished.

Does anyone know how to do that? Or is there a better approach, i. e. can I set a wait state in the execute method and send a signal afterwards?

martin_grofcik
Confirmed Champ
Confirmed Champ
Hi Lofi,

I see 2 possibilities:
-> send executionId to the parallel thread and call RuntimeService.signal(execId, signalName, data)
-> subscribe to signal (similar to intermediate signal catch event) and start parallel thread afterwards.

Regards
Martin

lofi
Champ in-the-making
Champ in-the-making
Thank you very much, Martin! I think the first suggestion is the one I should be going with. Although there is no method with the signature signal( execId, signalName, data). All I've seen is that I can only hand over a map to the signal and evaluate that one. The signalEvent and the signalData of

public void signal(ActivityExecution execution, String signalEvent, Object signalData)

are null.

I'm using a process manager now which uses a registry with system process id <> activiti execution id mapping.

Basically this:


public class ProcessManager {

  public static Map<Integer,String> registry = new HashMap<>();
 
  public static void register( Integer processId, String executionId) {
    registry.put( processId, executionId);
  }

  public static void unregister( Integer processId) {
    registry.remove( processId);
  }
 
}



public class ShellTask implements SignallableActivityBehavior {
 
  @Override
  public void execute(ActivityExecution execution) throws Exception {
   
    Thread thread = new Thread( new ProcessExecutor( execution.getId()));
    thread.start();
   
  }

  @Override
  public void signal(ActivityExecution execution, String signalEvent, Object signalData) throws Exception {

    Map<String,Object> map = execution.getVariables();
   
    System.out.println( "EXECUTION_RESULT = " + map.get( "EXECUTION_RESULT"));
   
    if( !map.containsKey( "EXECUTION_RESULT")
return;

    PvmTransition transition = execution.getActivity().getOutgoingTransitions().get( 0);
    execution.take( transition);
   
  }

}



public class ProcessExecutor implements Runnable {

  private Integer processId;
  private String executionId;
 
  public ProcessExecutor( String executionId) {
    this.executionId = executionId;
  }
 
  @Override
  public void run() {

    try {
     
      Process process = Runtime.getRuntime().exec( "notepad.exe");
     
      processId = process.getId();
     
      ProcessManager.register( processId, executionId);

      process.waitFor(); // just for testing; the actual process will run asynchronously
     
    } catch( IOException | InterruptedException e) {
     
      e.printStackTrace();
     
    } finally {

      // send signal after process finishes
      // just for testing; this block will be executed in a process monitor which checks if the async process has completed
      Map<String,Object> map = new HashMap<>();
      map.put( "EXECUTION_RESULT", "SUCCESS");
     
      processEngine.getRuntimeService().signal( executionId, map);
     
      ProcessManager.unregister( processId);
     
    }

  }

}


Of course the registry should store the data in the database in order to survive a server restart.

If there's a better approach or something like this that's already available in Activiti, I'd be glad to hear it.