cancel
Showing results for 
Search instead for 
Did you mean: 

asyncJobLockTimeInMillis not work in DefaultAsyncJobExecutor

chenyh
Champ in-the-making
Champ in-the-making
Hello, I recently integrated the latest Activiti 5.20.0 with my Web App using spring. part of my config file is just as follow:

        <bean id="asyncExecutor" class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor">
      <property name="corePoolSize" value="10" />
      <property name="maxPoolSize" value="100" />
      <property name="keepAliveTime" value="5000" />
      <property name="queueSize" value="100" />
      <property name="maxTimerJobsPerAcquisition" value="30" />
      <property name="maxAsyncJobsDuePerAcquisition" value="30" />
      <property name="defaultAsyncJobAcquireWaitTimeInMillis" value="1000" />
      <property name="defaultTimerJobAcquireWaitTimeInMillis" value="1000" />
      <property name="timerLockTimeInMillis" value="300000" />
      <property name="asyncJobLockTimeInMillis" value="300000" />
   </bean>

    <bean id="processEngineConfiguration" class="org.activiti.spring.SpringProcessEngineConfiguration">
        <property name="dataSource" ref="actDataSource" />
        <property name="transactionManager" ref="actTxManager" />
        <property name="jobExecutorActivate" value="false" />
        <property name="asyncExecutorEnabled" value="true" />
        <property name="asyncExecutorActivate" value="true" />
        <property name="dbHistoryUsed" value="false" />
        <property name="dbIdentityUsed" value="false" />
        <property name="history" value="activity" />
        <property name="databaseType" value="mssql" />
        <property name="bulkInsertEnabled" value="false" />
        <property name="deploymentResources" value="classpath*:/diagrams/autodeploy/*.bpmn" />
        <property name="deploymentMode" value="single-resource" />
        <property name="asyncExecutor" ref="asyncExecutor" />
   </bean>

All runs well, but when I try to shut down the activiti engine when it runs a async job in java service task:
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
  <process id="myProcess" name="My process" isExecutable="true">
    <startEvent id="startevent1" name="Start"></startEvent>
    <serviceTask id="servicetask1" name="Service Task" activiti:async="true" activiti:expression="#{singlePayOrderService.virtualAcctTransD2Settle(singlePayOrderId)}"></serviceTask>
    <endEvent id="endevent1" name="End"></endEvent>
    <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="servicetask1"></sequenceFlow>
    <sequenceFlow id="flow2" sourceRef="servicetask1" targetRef="endevent1"></sequenceFlow>
  </process>
</definitions>

the service task's logic can be ignored, because I just debug the app, and shut down the engine when it run into the java code. After the lock time expired, I restart my activie engine, but no retry will executed!
So I checked the code, the async job executor use "AcquireTimerJobsRunnable" and "AcquireAsyncJobsDueRunnable" to fetch the "to be executed jobs", and use "AcquireTimerJobsCmd" and "AcquireAsyncJobsDueCmd" to execute the search query.
however, when I deep it to the sql file in "job.xml", it shows the sql id is "selectNextTimerJobsToExecute","selectAsyncJobsDueToExecute". Neither of the 2 sql can select the LOCK_EXP_TIME_ with the TYPE_="message".
Moreover, I try to switch the async executor to the legacy jobExecutor, it works fine. I mean when I restart the activiti engine, it will retry the job when lock time expired. I still check the code, which use the "selectNextJobsToExecute" sql, and works file.
Please kindly advise whether I should do it my self or there should be some reason to disable the auto retry mechanism.
Thanks in advance.
11 REPLIES 11

chenyh
Champ in-the-making
Champ in-the-making
Any comment on my topic would be high appreciated!
I think this case is in common.

chenyh
Champ in-the-making
Champ in-the-making
sorry, I detail my question:
When the activiti engine stopped during executing a service task(I marked the service task as asynchronized). in the RU_JOB table,  a job data will like below:
TYPE_="message", DUE_DATA_=null, RETRIES_>0(default=3), LOCK_OWNER_=null and LOCK_EXP_TIME_=<some time.
When I restart the activiti engine after the LOCK_EXP_TIME_ , the JOB could not be selected by the async job executor. That means this process instance is hang there forever.
But if I switch the async job executor to jobexecutor, when the current time is later than LOCK_EXP_TIME_, the the job could be selected out and execute.
I deep the code, and find the job select sql for async job executor is below:
<code>
<select id="selectNextTimerJobsToExecute" parameterType="org.activiti.engine.impl.db.ListQueryParameterObject" resultMap="jobResultMap">
    ${limitBefore}
    select
      RES.* ${limitBetween}      
    from ${prefix}ACT_RU_JOB RES   
      LEFT OUTER JOIN ${prefix}ACT_RU_EXECUTION PI ON PI.ID_ = RES.PROCESS_INSTANCE_ID_
    where (RES.RETRIES_ &gt; 0)
      and (RES.DUEDATE_ is null or RES.DUEDATE_ &lt;= #{parameter, jdbcType=TIMESTAMP})
      and (RES.LOCK_OWNER_ is null or RES.LOCK_EXP_TIME_ &lt;= #{parameter, jdbcType=TIMESTAMP})
      and TYPE_ = 'timer'
      and (
          (RES.EXECUTION_ID_ is null)
          or
          (PI.SUSPENSION_STATE_ = 1)
      ) 
    ${limitAfter}    
  </select>
 
  <select id="selectAsyncJobsDueToExecute" parameterType="org.activiti.engine.impl.db.ListQueryParameterObject" resultMap="jobResultMap">
    ${limitBefore}
    select
      RES.* ${limitBetween}      
    from ${prefix}ACT_RU_JOB RES   
      LEFT OUTER JOIN ${prefix}ACT_RU_EXECUTION PI ON PI.ID_ = RES.PROCESS_INSTANCE_ID_
    where RES.RETRIES_ &gt; 0
      and RES.DUEDATE_ is not null
      and RES.DUEDATE_ &lt;= #{parameter, jdbcType=TIMESTAMP}
      and (RES.LOCK_EXP_TIME_ is null or RES.LOCK_EXP_TIME_ &lt;= #{parameter, jdbcType=TIMESTAMP})
      and TYPE_ = 'message'
      and (
          (RES.EXECUTION_ID_ is null)
          or
          (PI.SUSPENSION_STATE_ = 1)
      ) 
    ${limitAfter}    
  </select>
</code>
and the sql for jobexecutor is below:
<code>
<select id="selectNextJobsToExecute" parameterType="org.activiti.engine.impl.db.ListQueryParameterObject" resultMap="jobResultMap">
   ${limitBefore}
    select
     RES.* ${limitBetween}     
    from ${prefix}ACT_RU_JOB RES   
     LEFT OUTER JOIN ${prefix}ACT_RU_EXECUTION PI ON PI.ID_ = RES.PROCESS_INSTANCE_ID_
    where (RES.RETRIES_ &gt; 0)
      and (RES.DUEDATE_ is null or RES.DUEDATE_ &lt;= #{parameter, jdbcType=TIMESTAMP})
      and (RES.LOCK_OWNER_ is null or RES.LOCK_EXP_TIME_ &lt;= #{parameter, jdbcType=TIMESTAMP})
     and (
        (RES.EXECUTION_ID_ is null)
       or
       (PI.SUSPENSION_STATE_ = 1)
      ) 
    ${limitAfter}    
  </select>
</code>

So my question is why they are different, is there any reason for the async job executor not to select the expired job out?

6pker
Champ in-the-making
Champ in-the-making
To be honest, I do want to do some help, but it's quite difficult to understand your question.

"After the lock time expired, I restart my activie engine, but no retry will executed!"

Only the jobs with RETRIES_ >= 0 and DUDATE_ is not null in ACT_RU_JOB will be retried.


martin_grofcik
Confirmed Champ
Confirmed Champ
Hi chenyh,

Is it possible to reproduce the issue in the jUnit test please?
https://forums.activiti.org/content/sticky-how-write-unit-test

Regards
Martin

Hi Martin, my issue use the async job executor and non memory db (H2), and I need to shutdown the engine during the service task executing.
So I think it's hard to write all these in a jUnit test case.
How about write some sample? Or package these in spring boot? (It's really need some works)

jbarrez
Star Contributor
Star Contributor
Sure, samples that clearly demonstrate a problem are always valuable!

louisj
Champ in-the-making
Champ in-the-making
I am experiencing the exact same problem when using the Activiti async job executor.
Looks like service tasks which were busy executing at the time that my server is rebooted, do not get picked up by the async job executor when the Activiti engine starts up again. I am using Activiti 5.20.0.

jbarrez
Star Contributor
Star Contributor
> Looks like service tasks which were busy executing at the time that my server is rebooted, do not get picked up by the async job executor when the Activiti engine starts up again. I am using Activiti 5.20.0.

That shouldn't happen: if that situation arises, the current transaction is rolled back to the previous wait state and the job will be retried later on.

Is there anything special about this service task? or is it a regular java call with setting/getting some vars.?

louisj
Champ in-the-making
Champ in-the-making
Task is asynchronous and exclusive
Task type = expression
Expression = call to method on Spring bean annotated with @Component and @Transactional
Inside the Spring bean method I:
- Read process variables from the DelegateExecution object that is passed in
- Call a stored procedure on Oracle database
- Persist records to a table
- Set variables on the DelegateExecution object that is passed in