cancel
Showing results for 
Search instead for 
Did you mean: 

JobExecutor - Thread management

slash117
Champ in-the-making
Champ in-the-making
hi everyone,
I'm having the following problem(due to the JobExecutor of Activiti) when launching load tests on my application :
the java process's memory increses constantly until it saturates (then i get an OutOfMemory with the java process > 4 Go and the Java process is killed by the OS).
in the JVM everything seems to be ok, the Garbage collector is working well and no saturation is seen in the HEAP of the JVM.
So i think the problem is that Activiti doesn't re-use its thread pool. That means every time a task is launched with the JobExecutor a new thread is created (pool1- thread XXX).
with JProfiler i see that these threads are killed once the task assigned to each thread is finished, but it seems that these threads and their memory and not well managed on an OS level causing the crash of the Java process.

1. Wouldn't be better if the size of the threadPool doesn't change in the JobExecutor. This way we can be sure of the re-use of the threads (corePoolSise = MaxPoolSize = 10 for example)
2. Is it possible to override the DefaultJobExecutor class and develop our own implementation of the JobExecutor ?

Plateform : Activiti 5.12.1, JDK 1.6,  Jonas 5.2.2, Spring3.2.3

Reagrds,
Gilbert E.H.
5 REPLIES 5

jbarrez
Star Contributor
Star Contributor
1. The default job executor has these setting

<code>
protected int queueSize = 3;
  protected int corePoolSize = 3;
  private int maxPoolSize = 10;

  protected BlockingQueue<Runnable> threadPoolQueue;
  protected ThreadPoolExecutor threadPoolExecutor;
</code>

So it's very weird that you are seeing this behavior, as it should throttle itself when it reaches the 10 threads.
So just to confirm: if you got, say, 100 async tasks, you see 100 threads being created?

2. Yes - the whole JobExecutor infrastructure is pluggable

slash117
Champ in-the-making
Champ in-the-making
hello,
its true that the number of threads doesn't exceed 10 (= MaxPoolSize), but the fact that CorePoolSize < MaxPoolSize means that every time no more async tasks are executed, the number of threads in the pool decreases from 10 to 3. This causes a phenomenon of thread liberation that apparently causes saturation the native memory of the java process(native memory = total process memory - heap memory - permGen memory).
So instead of re-using the threads previously created, the application keeps creating and liberating threads upon the need of execution tasks.

Regards,
Gilbert E.H.

jbarrez
Star Contributor
Star Contributor
But wouldn't that be the wanted behaviour? I mean, if the load is low, you don't want to keep those 10 threads spinning up?

Probably a better solution would be to have an 'idle period' on them before they are put back into the thread pool?

slash117
Champ in-the-making
Champ in-the-making
hello Joram,
i think both solutions are good (constant ThreadPool Size or idle period) or at least better than a variable threadPoolSize without a idle period. I chose to implement the constant threadPool size (CoreSize = MaxSize = 10).

But now i have another problem : If all goes as supposed , now i should have 10 threads from the launching until the shutdown of my application. But unfortunately this is not the case. In fact every time activiti throws an exception(functional error or ActivitiLockingException ), the thread is killed and replaced by another one.
At which level do you think i should catch and manage these exceptions to avoid having this behavior.
I tried to use a catch bloc for Exception in the executeJobs method (JobExecutor) but it didn't work. The thread is killed before…

<code>
public void executeJobs(List<String> jobIds) {
    try {
      threadPoolExecutor.execute(new ExecuteJobsRunnable(this, jobIds));
    } catch (RejectedExecutionException ree) {
      rejectedJobsHandler.jobsRejected(this, jobIds);
    } catch (Exception e) {
        log.error("An error occurred while executing the jobs : " + jobIds.toString(), e);
    }
  }
</code>

Regards,
Gilbert E.H.


slash117
Champ in-the-making
Champ in-the-making
hello,
i corrected the problem by adding a catch bloc in the ExecutableJobsRunnable class thus catching the exception thrown before the ThreadPoolExecutor catchs it.

<code>
public void run() {
        final JobExecutorContext jobExecutorContext = new JobExecutorContext();
        final List<String> currentProcessorJobQueue = jobExecutorContext.getCurrentProcessorJobQueue();
        final CommandExecutor commandExecutor = jobExecutor.getCommandExecutor();

        currentProcessorJobQueue.addAll(jobIds);

        Context.setJobExecutorContext(jobExecutorContext);
        try {
            while (!currentProcessorJobQueue.isEmpty()) {
                commandExecutor.execute(new ExecuteJobsCmd(currentProcessorJobQueue.remove(0)));
            }
        } catch (Exception e) {
            log.error("An error occurred while executing the jobs : " + jobIds.toString(), e);
        } finally {
            Context.removeJobExecutorContext();
        }
    }
</code>

Regards,
Gilbert E.H.