cancel
Showing results for 
Search instead for 
Did you mean: 

Dedicated job-executor for each web application possible?

ollib
Champ in-the-making
Champ in-the-making
Hi,

we plan a scenario with multiple web applications running on the same jboss instance using the same activiti database.

Each web application handles one type of process. The web-applications are packaged in war-files.

Is it possible to have a dedicated job executer for each web-application?

The job executor in web app "A" would then execute only the asynchronous tasks for process "A",
the job executor in web app "B" would execute only the asynchronous tasks for process "B" etc.

How can this be done?

Best regards
Oliver Bruening
3 REPLIES 3

martin_grofcik
Confirmed Champ
Confirmed Champ
Hi Oliver.

DefaultJobExecutor implementation does not support this kind of filtering.

You can try to implement your JobExecutor with this kind of filtering.

Regard
Martin

ollib
Champ in-the-making
Champ in-the-making
I came up with this solution. Not ideal, I know. Perhaps some kind of configurable filter can be implemented in the next version of Activiti 😉


public class MyJobExecutor extends DefaultJobExecutor {
  private static Logger logger = Logger.getLogger(MyJobExecutor.class.getName());
 
  private String processDefinitionId;
 
  @Override
  protected void ensureInitialization() {
    acquireJobsCmd = new MyAcquireJobsCmd(this, processDefinitionId);
    acquireJobsRunnable = new AcquireJobsRunnable(this); 
  }

  public void setProcessDefinitionId(String processDefinitionId) {
    this.processDefinitionId = processDefinitionId;
  }

  public String getProcessDefinitionId() {
    return processDefinitionId;
  }
 
}


public class MyAcquireJobsCmd implements Command<AcquiredJobs> {
 
  private static Logger logger = Logger.getLogger(MyAcquireJobsCmd.class.getName());
 
  private final JobExecutor jobExecutor;
 
  private String processDefinitionId;

  public MyAcquireJobsCmd(JobExecutor jobExecutor, String processDefinitionId) {
    this.jobExecutor = jobExecutor;
    this.processDefinitionId = processDefinitionId;
  }
 

public AcquiredJobs execute(CommandContext commandContext) {
  
    logger.info("—-> execute");
   
    String lockOwner = jobExecutor.getLockOwner();
    int lockTimeInMillis = jobExecutor.getLockTimeInMillis();
    int maxNonExclusiveJobsPerAcquisition = jobExecutor.getMaxJobsPerAcquisition();
   
    AcquiredJobs acquiredJobs = new AcquiredJobs();
    List<JobEntity> jobs = commandContext
      .getJobEntityManager()
      .findNextJobsToExecute(new Page(0, maxNonExclusiveJobsPerAcquisition));

    for (JobEntity job: jobs) {
     
      List<String> jobIds = new ArrayList<String>();
      if (job != null &&
          !acquiredJobs.contains(job.getId()) &&
          StringUtils.startsWith(job.getProcessDefinitionId(), processDefinitionId)) {
        if (job.isExclusive() && job.getProcessInstanceId() != null) {
          // acquire all exclusive jobs in the same process instance
          // (includes the current job)
          List<JobEntity> exclusiveJobs = commandContext.getJobEntityManager()
            .findExclusiveJobsToExecute(job.getProcessInstanceId());
          for (JobEntity exclusiveJob : exclusiveJobs) {
           
            if(exclusiveJob != null) {
              lockJob(exclusiveJob, lockOwner, lockTimeInMillis);
              jobIds.add(exclusiveJob.getId());
            }
          }
        } else {
          lockJob(job, lockOwner, lockTimeInMillis);
          jobIds.add(job.getId());
        }
       
      }

      acquiredJobs.addJobIdBatch(jobIds);
    }

    return acquiredJobs;
  }
 
  protected void lockJob(JobEntity job, String lockOwner, int lockTimeInMillis) {   
    job.setLockOwner(lockOwner);
    GregorianCalendar gregorianCalendar = new GregorianCalendar();
    gregorianCalendar.setTime(ClockUtil.getCurrentTime());
    gregorianCalendar.add(Calendar.MILLISECOND, lockTimeInMillis);
    job.setLockExpirationTime(gregorianCalendar.getTime());   
  }


  public void setProcessDefinitionId(String processDefinitionId) {
    this.processDefinitionId = processDefinitionId;
  }


  public String getProcessDefinitionId() {
    return processDefinitionId;
  }

}





  <bean id="processEngineConfiguration" class="org.activiti.spring.SpringProcessEngineConfiguration">
    <property name="databaseType" value="oracle"></property>
    <property name="dataSource" ref="datasource"></property>
    <property name="transactionManager" ref="transactionManager"></property>
    <property name="databaseSchemaUpdate" value="false"></property>
    <property name="jpaEntityManagerFactory" ref="entityManagerFactory"></property>
    <property name="deploymentResources" value="classpath*:/org/olli/mvc/firstProcess.bpmn" />
    <property name="jobExecutor" ref="jobExecutor"></property>
    <property name="jobExecutorActivate" value="true" />
  </bean>

  <bean id="processEngine" class="org.activiti.spring.ProcessEngineFactoryBean">
    <property name="processEngineConfiguration" ref="processEngineConfiguration"></property>
  </bean>
 
  <bean id="jobExecutor" class="org.olli.mvc.activiti.MyJobExecutor">
    <property name="processDefinitionId" value="myProcess"></property>
    <property name="waitTimeInMillis" value="30000" />
  </bean>

frederikherema1
Star Contributor
Star Contributor
Thanks for sharing the solution. We played around with a simmilar idea, but it's not really doable in a clean/generic way, so we decided this usecase should be covered by the user's themselves, using all available pluggability points.