cancel
Showing results for 
Search instead for 
Did you mean: 

Proposed changes to the job handling mechanism

marcus1
Champ in-the-making
Champ in-the-making
Hi Activiti team,

I have recently added support for batch processing to a project that uses Activiti. It is implemented as an alternative way to handle multi-instance behaviour. Every batch has for example 100 elements and each batch is implemented as a job. Functionally this works quite well, but I had to copy classes from Activiti and modify them that way to make it work.

These are several improvements to the async job executor mechanism that would make this much cleaner and which I think would benefit Activiti in general:

  1. Bugfix: The mapping for JobEntity misses the duedate_ column. This causes the due date to be reset to null each time a job is locked. (As a result the job will not be retried after an application restart.)

  2. Extract an abstract super class "AbstractAsyncJobExecutor" from DefaultAsyncJobExecutor that contains everything but the ExecutorService-specific functionality. Implementations that do not use an ExecutorService (but for example a WorkManager) should not inherit the ES-related fields and methods.

  3. Currently when the JobExecutor's queue is full, new jobs will still be offered and executed immediately due to the CallerRunsPolicy (which will not work in all environments, for example Websphere, because it required support for suspending transactions). I think this defeats the purpose of having a finite queue and instead would propose to stop offering new jobs once the queue is (almost) full. I would add a method to the job executor that indicates whether or not the job executor still has capacity (e.g. boolean isFull). If there is no room the AcquireAsyncJobsDueRunnable should not acquire new jobs for a certain time.

  4. Refactor ExecuteAsyncRunnable: simplify the run method by extracting methods lock, execute and unlock. This makes the code more readable and allows overriding only the execute part for example. Also fix indentation by replacing tabs with spaces.

  5. ExecuteAsyncRunnable: simplify locking by using a while (true) loop rather than recursively calling retryAsyncJob.

  6. AcquireAsyncJobsDueRunnable: only acquire jobs when there is room in the JobExecutor (see #3).

  7. When a new message job is added (JobEntityManager.send(MessageEntity message)) the "hintAsyncExecutor" is called causing the job to be executed immediately after the transaction is committed. When there are a lot of jobs (i.e. batches) the JobExecutor's queue will overflow, causing the same problem as described in point 3. I'm not entirely sure how to fix this yet. Perhaps by adding an alternative method instead of send which does not call hintAsyncExecutor.
Please let me know what you think. If you agree I'll make a pull request.

If you are interested I can contribute the batch functionality itself as a next step.
1 REPLY 1

jbarrez
Star Contributor
Star Contributor
Marcus, all these points sound like fanastic things. A PR on this would of course be welcomed.

FYI:
-point 4 has been amended, I believe: https://github.com/Activiti/Activiti/blob/master/modules/activiti-engine/src/main/java/org/activiti/...
- point 7 is something I actually was looking into today … but didn't find a decent solution yet too. But you probably you'll see a commit in this area around this in one of the next days.