cancel
Showing results for 
Search instead for 
Did you mean: 

shell service tasks not showing in runtime tables

aoliver
Champ in-the-making
Champ in-the-making
Hello,
I am an activiti noob and am having issues with a Shell Task implementation.  Since Shell Tasks are not available in the Activiti Designer yet, i created a Service Task (Java Delegate) that mimics the Shell Task Behavior.  These Shell Tasks are running BI processes and take several minutes.  I'm using the Activiti REST API to invoke the workflows.  Activiti-Explorer and Activiti-REST are both configured to connect to an Oracle database.

3 Questions:
1) While the Shell tasks are running, i see no information in the ACT_RU_EXECUTION table.  I would expect to see my process instance row there while these tasks are running.
2) How can i get the  REST call to return once the workflow is invoked instead of returning after the workflow has completed?
3) In Activiti Explorer, when i run the attached workflow, no other activity is allowed in the activiti-explorer app.  I am even unable to log into another session.  Why is Activiti Explorer locking up like this?


Here is a snippet of the service task xml:
    <serviceTask id="NetGStageServiceTask" name="NetGStageService Task" activiti:class="com.scrippsnetworks.ibi.revenueWorkflows.serviceTasks.ShellServiceTask">
      <extensionElements>
        <activiti:field name="command">
          <activiti:expression>${ShellScriptRoot}/ADSD/adsd_staging${ShellScriptSuffix}</activiti:expression>
        </activiti:field>
        <activiti:field name="wait">
          <activiti:string>true</activiti:string>
        </activiti:field>
        <activiti:field name="outputVariable">
          <activiti:string>netGStageStatus</activiti:string>
        </activiti:field>
        <activiti:field name="errorCodeVariable">
          <activiti:string>netGStageReturnCode</activiti:string>
        </activiti:field>
        <activiti:field name="arg1">
          <activiti:expression>${DartRunDate}</activiti:expression>
        </activiti:field>
      </extensionElements>
    </serviceTask>


Here is the java delegate code:
public class ShellServiceTask implements JavaDelegate {

   final static Logger logger = Logger.getLogger(ShellServiceTask.class.getName());

   protected Expression command;
   protected Expression wait;
   protected Expression arg1;
   protected Expression arg2;
   protected Expression arg3;
   protected Expression arg4;
   protected Expression arg5;
   protected Expression outputVariable;
   protected Expression errorCodeVariable;
   protected Expression redirectError;
   protected Expression cleanEnv;
   protected Expression directory;

   String commandStr;
   String arg1Str;
   String arg2Str;
   String arg3Str;
   String arg4Str;
   String arg5Str;
   String waitStr;
   String resultVariableStr;
   String errorCodeVariableStr;
   Boolean waitFlag;
   Boolean redirectErrorFlag;
   Boolean cleanEnvBoolan;
   String directoryStr;

   @Override
   public void execute(DelegateExecution execution) throws Exception {
      logger.info("In " + command.getValue(execution) + " ShellTask script with variables = " + execution.getVariables());
      executeScript(execution);
      // create a new value for execution
      
      logger.info("Leaving " + command.getValue(execution) + " ShellTask script with variables = " + execution.getVariables());

   }

   private void readFields(DelegateExecution execution) {
       commandStr = getStringFromField(command, execution);
       arg1Str = getStringFromField(arg1, execution);
       arg2Str = getStringFromField(arg2, execution);
       arg3Str = getStringFromField(arg3, execution);
       arg4Str = getStringFromField(arg4, execution);
       arg5Str = getStringFromField(arg5, execution);
       waitStr = getStringFromField(wait, execution);
       resultVariableStr = getStringFromField(outputVariable, execution);
       errorCodeVariableStr = getStringFromField(errorCodeVariable, execution);

       String redirectErrorStr = getStringFromField(redirectError, execution);
       String cleanEnvStr = getStringFromField(cleanEnv, execution);

       waitFlag = waitStr == null || waitStr.equals("true");
       redirectErrorFlag = redirectErrorStr != null && redirectErrorStr.equals("true");
       cleanEnvBoolan = cleanEnvStr != null && cleanEnvStr.equals("true");
       directoryStr = getStringFromField(directory, execution);
   }

   public String toString() {
      return "command="+command+", arg1="+arg1+", arg2="+arg2+", arg3="+arg3+", arg4="+arg4+", arg5="+arg5+
            ", wait="+wait+", outputVariable="+outputVariable+", errorCodeVariable="+errorCodeVariable;
   }
   
   public void executeScript(DelegateExecution execution) {
      logger.info("starting execute");
       
      readFields(execution);
       
      List<String> argList = new ArrayList<String>();
         argList.add(commandStr);
       
         if (arg1Str != null)
            argList.add(arg1Str);
         if (arg2Str != null)
            argList.add(arg2Str);
         if (arg3Str != null)
            argList.add(arg3Str);
         if (arg4Str != null)
            argList.add(arg4Str);
         if (arg5Str != null)
            argList.add(arg5Str);
       
         ProcessBuilder processBuilder = new ProcessBuilder(argList);
       
         try {
            processBuilder.redirectErrorStream(redirectErrorFlag);
            if (cleanEnvBoolan) {
               Map<String, String> env = processBuilder.environment();
               env.clear();
            }
          
            if (directoryStr != null && directoryStr.length() > 0)
               processBuilder.directory(new File(directoryStr));
          
               Process process = processBuilder.start();
          
               if (waitFlag) {
                  int errorCode = process.waitFor();
             
                  if (resultVariableStr != null) {
                     String result = convertStreamToStr(process.getInputStream());
                     execution.setVariable(resultVariableStr, result);
                  }
             
                  if (errorCodeVariableStr != null) {
                     execution.setVariable(errorCodeVariableStr, Integer.toString(errorCode));
                  }

               }
         } catch (Exception e) {
            logger.info("Could not execute shell command with error = " + e);
            throw new ActivitiException("Could not execute shell command" , e);
         }

   }
      
   public static String convertStreamToStr(InputStream is) throws IOException {
      if (is != null) {
         Writer writer = new StringWriter();
         char[] buffer = new char[1024];
         try {
            Reader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
            int n;
            while ((n = reader.read(buffer)) != -1) {
               writer.write(buffer, 0, n);
            }
         } finally {
            is.close();
         }
         return writer.toString();
      } else {
         return "";
      }
   }

   protected String getStringFromField(Expression expression, DelegateExecution execution) {
      if (expression != null) {
         Object value = expression.getValue(execution);
         if (value != null) {
            return value.toString();
         }
      }
      return null;
   }
}


What can i change to solve these 3 issues?

thanks
15 REPLIES 15

frederikherema1
Star Contributor
Star Contributor
I'm not gonna review your whole code Smiley Wink But I notice you're using member-variables in a JavaDelegate (String arg1Str; etc). This is valid in an ActivityBehaviour (each activity has it's own instance) but it's NOT safe with JavaDelegates, as the user guide states:

Note: there will be only one instance of that Java class created for the serviceTask it is defined on. All process-instances share the same class instance that will be used to call execute(DelegateExecution). This means that the class must not use any member variables and must be thread-safe, since it can be executed simultaneously from different threads. This also influences the way Field injection is handled.

So instead of reading them and storing, you should read them on every "execute()" call…

aoliver
Champ in-the-making
Champ in-the-making
Ah ok, thanks for reminding me of that.  I've removed the non-Expression member vars.

I understand about not reviewing all the code, but i was hoping you might just verify that i'm handling the Activiti-stuff correctly in the long-running thread class including signalling appropriately.  The class name is:  ShellTaskThread

I've pasted in the cleaner and simplified code:


public class ShellServiceTask implements JavaDelegate {

final static Logger logger = Logger.getLogger(ShellServiceTask.class.getName());

protected Expression command;
protected Expression wait;
protected Expression arg1;
protected Expression arg2;
protected Expression arg3;
protected Expression arg4;
protected Expression arg5;
protected Expression outputVariable;
protected Expression errorCodeVariable;
protected Expression redirectError;
protected Expression cleanEnv;
protected Expression directory;

@Override
public void execute(DelegateExecution execution) throws Exception {
  logger.info("In " + command.getValue(execution) + " ShellTask script with variables = " + execution.getVariables());
  String commandStr = getStringFromField(command, execution);
  String arg1Str = getStringFromField(arg1, execution);
  String arg2Str = getStringFromField(arg2, execution);
     String arg3Str = getStringFromField(arg3, execution);
     String arg4Str = getStringFromField(arg4, execution);
     String arg5Str = getStringFromField(arg5, execution);
     String waitStr = getStringFromField(wait, execution);
     String resultVariableStr = getStringFromField(outputVariable, execution);
     String errorCodeVariableStr = getStringFromField(errorCodeVariable, execution);

     String redirectErrorStr = getStringFromField(redirectError, execution);
     String cleanEnvStr = getStringFromField(cleanEnv, execution);
     String directoryStr = getStringFromField(directory, execution);
    
     Boolean waitFlag = waitStr == null || waitStr.equals("true");
     Boolean redirectErrorFlag = redirectErrorStr != null && redirectErrorStr.equals("true");
     Boolean cleanEnvBoolean = cleanEnvStr != null && cleanEnvStr.equals("true");

     // Build commands for shell command
  List<String> argList = new ArrayList<String>();
  argList.add(commandStr);
  
  if (arg1Str != null)
   argList.add(arg1Str);
  if (arg2Str != null)
   argList.add(arg2Str);
  if (arg3Str != null)
   argList.add(arg3Str);
  if (arg4Str != null)
   argList.add(arg4Str);
  if (arg5Str != null)
   argList.add(arg5Str);
 
     // Start async thread
  logger.info("Firing executor with processInstanceId = " + execution.getProcessInstanceId());
 
  Thread executor = new Thread(new ShellTaskThread(argList, waitFlag, redirectErrorFlag, cleanEnvBoolean, directoryStr,
    resultVariableStr, errorCodeVariableStr, execution.getProcessInstanceId()));
  executor.start();
  logger.info("Executor started…leaving workflow");
      
  logger.info("Leaving " + command.getValue(execution) + " ShellTask script with variables = " + execution.getVariables());

}

public String toString() {
  return "command="+command+", arg1="+arg1+", arg2="+arg2+", arg3="+arg3+", arg4="+arg4+", arg5="+arg5+
    ", wait="+wait+", outputVariable="+outputVariable+", errorCodeVariable="+errorCodeVariable;
}

protected String getStringFromField(Expression expression, DelegateExecution execution) {
  if (expression != null) {
   Object value = expression.getValue(execution);
   if (value != null) {
    return value.toString();
   }
  }
  return null;
}
}

class ShellTaskThread implements Runnable {
final static Logger logger = Logger.getLogger(ShellTaskThread.class.getName());

List<String> command;
boolean waitFlag;
String resultVariableStr;
String errorCodeVariableStr;
boolean cleanEnv;
boolean redirectErrorFlag;
String directoryStr;
String processId;

public ShellTaskThread(List<String> command, boolean waitFlag, boolean redirectErrorFlag,
   boolean cleanEnv, String directoryStr,
   String resultVariableStr, String errorCodeVariableStr, String processId) {
  this.command = command;
  this.waitFlag = waitFlag;
  this.resultVariableStr = resultVariableStr;
  this.errorCodeVariableStr = errorCodeVariableStr;
  this.redirectErrorFlag = redirectErrorFlag;
  this.cleanEnv = cleanEnv;
  this.directoryStr = directoryStr;
  this.processId = processId;
}

@Override
public void run() {

  logger.info("running async thread with command: " + command);
  logger.info("other vars: resultVariableStr: " + resultVariableStr + " errorCodeVariableStr: " +
     errorCodeVariableStr + " processId: " + processId);
 
  // set activiti context
  ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
  RuntimeService runtimeService = processEngine.getRuntimeService();
  Execution execution = runtimeService.createExecutionQuery().processInstanceId(processId).singleResult();

  logger.info("execution = " + execution.getId() + ": " + execution.getProcessInstanceId());
  ProcessBuilder processBuilder = new ProcessBuilder(command);
   
  try {
   processBuilder.redirectErrorStream(redirectErrorFlag);
   if (cleanEnv) {
    Map<String, String> env = processBuilder.environment();
    env.clear();
   }
   
   if (directoryStr != null && directoryStr.length() > 0)
    processBuilder.directory(new File(directoryStr));
   
   Process process = processBuilder.start();
  
   if (waitFlag) {
    try {
     int errorCode = process.waitFor();
     logger.info("errorCode = " + errorCode);

     if (resultVariableStr != null) {
      String result = convertStreamToStr(process.getInputStream());
      runtimeService.setVariable(execution.getId(), resultVariableStr, result);
     }
    
     if (errorCodeVariableStr != null) {
      runtimeService.setVariable(execution.getId(), errorCodeVariableStr, Integer.toString(errorCode));
     }
    } catch (InterruptedException e) {
     if (resultVariableStr != null) {
      runtimeService.setVariable(execution.getId(), resultVariableStr, e);
     }
     if (errorCodeVariableStr != null) {
      runtimeService.setVariable(execution.getId(), errorCodeVariableStr, "1");
     }
    
     throw e;
    }

   }
  
  } catch (Exception e) {
   logger.info("Could not execute shell command " + command + " with error = " + e);
  } finally {
   logger.info("Leaving thread with process variables = " + runtimeService.getVariables(execution.getId()));
   runtimeService.signal(execution.getId());
  }
}
 
public static String convertStreamToStr(InputStream is) throws IOException {
  if (is != null) {
   Writer writer = new StringWriter();
   char[] buffer = new char[1024];
   try {
    Reader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
    int n;
    while ((n = reader.read(buffer)) != -1) {
     writer.write(buffer, 0, n);
    }
   } finally {
    is.close();
   }
   return writer.toString();
  } else {
   return "";
  }
}

}

Thank you for your time and patience on this matter.  Again, you have been most helpful.

frederikherema1
Star Contributor
Star Contributor
The thread uses the activiti API correctly, setting process-vars and finally signaling the execution/proces, look good.

aoliver
Champ in-the-making
Champ in-the-making
I have noticed a rare error in this design that has occurred at least twice.  In the long-running thread, before the ProcessBuilder is created, i have been getting null pointer errors (Note: the long-running thread is in a jar deployed to both activiti-explorer and activiti-rest).  It must occur on one of these three lines:


  ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
  RuntimeService runtimeService = processEngine.getRuntimeService();
  Execution execution = runtimeService.createExecutionQuery().processInstanceId(processId).singleResult();

I've added better logging to attempt to catch it.

Could this be that the long-running thread is executing this code before the calling service task has moved to the receive task and persisted the execution state?

Should i move this code until after the ProcessBuilder command has executed?

Thanks again for your assistance.

frederikherema1
Star Contributor
Star Contributor
In what call exactly do you get a null pointer? The code above can only NP if the PE, or runtime-service is null. Even if the execution doesn't exist (yet), singleResult() will just return null but no NPE will be thrown

aoliver
Champ in-the-making
Champ in-the-making
It is in one of those three (well i guess 2) statements.  But i do not know specifically which one.  I've only seen the error twice while running in the dev appserver (tomcat6 on linux).  I've never seen the error in local unit tests (eclipse/tomcat and standalone tomcat for rest-api).  Additionally, i see no other apparent errors that the ProcessEngine failed to initialize.  Also, subsequent workflow runs were successful.

I have better logging in place now.  When i see the error again, i will provide you more information.

Thanks again for your help.