cancel
Showing results for 
Search instead for 
Did you mean: 

Jobs and Clusters

yblazart
Champ in-the-making
Champ in-the-making
Hi ! I want to install multiple instance of activity, sharing the same database.

In my workflows, I will have some timers for example. I made some test.
As I read, it's seems that JobExecutor do a 'lock' in the database for each job to execute, so this ensure that the job executor on the other server will not attempt to execute it (AcquireJobsCmd,AcquireJobsRunnable,JobExecutor). Ok that's great.

But in my tests, I see a lot of stacktraces like this :

18 juil. 2012 12:07:01 org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable run
GRAVE: exception during job acquisition: TimerEntity[246a2666-d0c0-11e1-8fa6-001b63a220cd] was updated by another transaction concurrently
org.activiti.engine.ActivitiOptimisticLockingException: TimerEntity[246a2666-d0c0-11e1-8fa6-001b63a220cd] was updated by another transaction concurrently
   at org.activiti.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:452)
   at org.activiti.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:348)
   at org.activiti.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:149)
   at org.activiti.engine.impl.interceptor.CommandContext.close(CommandContext.java:105)
   at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:49)
   at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:33)
   at org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable.run(AcquireJobsRunnable.java:57)
   at java.lang.Thread.run(Thread.java:680)

For me, this ActivitiOptimisticLockingException is not an error (Sever or not) in this case, we should not see a such thing in logs.

Have you plan something to this, or can I propose a patch ? The code would be located in AcquireJobsRunnable line :



public class AcquireJobsRunnable implements Runnable {

    private static Logger log = Logger.getLogger(AcquireJobsRunnable.class.getName());
    protected final JobExecutor jobExecutor;
    protected volatile boolean isInterrupted = false;
    protected volatile boolean isJobAdded = false;
    protected final Object MONITOR = new Object();
    protected final AtomicBoolean isWaiting = new AtomicBoolean(false);

    public AcquireJobsRunnable(JobExecutor jobExecutor) {
        this.jobExecutor = jobExecutor;
    }

    public synchronized void run() {
        log.info(jobExecutor.getName() + " starting to acquire jobs");

        final CommandExecutor commandExecutor = jobExecutor.getCommandExecutor();
        long millisToWait = 0;
        float waitIncreaseFactor = 2;
        long maxWait = 60 * 1000;

        while (!isInterrupted) {
            int maxJobsPerAcquisition = jobExecutor.getMaxJobsPerAcquisition();

            try {
                AcquiredJobs acquiredJobs = null;
                boolean jobsLockAcquired;
                try {
                    acquiredJobs = commandExecutor.execute(jobExecutor.getAcquireJobsCmd());
                    jobsLockAcquired = true;
                } catch (ActivitiOptimisticLockingException lockException) {
                    // lock can't be acquired,
                    jobsLockAcquired = false;
                    log.log(Level.INFO,"some Jobs are already locked");
                }
                if (jobsLockAcquired) {
                    for (List<String> jobIds : acquiredJobs.getJobIdBatches()) {
                        jobExecutor.executeJobs(jobIds);
                    }

                    // if all jobs were executed
                    millisToWait = jobExecutor.getWaitTimeInMillis();
                    int jobsAcquired = acquiredJobs.getJobIdBatches().size();
                    if (jobsAcquired < maxJobsPerAcquisition) {

                        isJobAdded = false;

                        // check if the next timer should fire before the normal sleep time is over
                        Date duedate = new Date(ClockUtil.getCurrentTime().getTime() + millisToWait);
                        List<TimerEntity> nextTimers = commandExecutor.execute(new GetUnlockedTimersByDuedateCmd(duedate, new Page(0, 1)));

                        if (!nextTimers.isEmpty()) {
                            long millisTillNextTimer = nextTimers.get(0).getDuedate().getTime() - ClockUtil.getCurrentTime().getTime();
                            if (millisTillNextTimer < millisToWait) {
                                millisToWait = millisTillNextTimer;
                            }
                        }

                    } else {
                        millisToWait = 0;
                    }
                }
            } catch (Exception e) {
                log.log(Level.SEVERE, "exception during job acquisition: " + e.getMessage(), e);
                millisToWait *= waitIncreaseFactor;
                if (millisToWait > maxWait) {
                    millisToWait = maxWait;
                } else if (millisToWait == 0) {
                    millisToWait = jobExecutor.getWaitTimeInMillis();
                }
            }

            if ((millisToWait > 0) && (!isJobAdded)) {
                try {
                    log.fine("job acquisition thread sleeping for " + millisToWait + " millis");
                    synchronized (MONITOR) {
                        if (!isInterrupted) {
                            isWaiting.set(true);
                            MONITOR.wait(millisToWait);
                        }
                    }
                    log.fine("job acquisition thread woke up");
                } catch (InterruptedException e) {
                    log.fine("job acquisition wait interrupted");
                } finally {
                    isWaiting.set(false);
                }
            }
        }
        log.info(jobExecutor.getName() + " stopped job acquisition");
    }

    public void stop() {
        synchronized (MONITOR) {
            isInterrupted = true;
            if (isWaiting.compareAndSet(true, false)) {
                MONITOR.notifyAll();
            }
        }
    }

    public void jobWasAdded() {
        isJobAdded = true;
        if (isWaiting.compareAndSet(true, false)) {
            // ensures we only notify once
            // I am OK with the race condition     
            synchronized (MONITOR) {
                MONITOR.notifyAll();
            }
        }
    }
}

2 REPLIES 2

frederikherema1
Star Contributor
Star Contributor
What exactly are you proposing? The monitor lock object? This only works when engines are in the same JVM.

If it's not an error and you don't want to see it logged, can't you exclude it from being logged?

yblazart
Champ in-the-making
Champ in-the-making
the fact that some jobs are the lock acquired in another JVM is not an error, this is the normal behavior. So, having some stack traces for that in logs will be annoying for the production team.  Yes we can make a filter in java.util.logging, but  I just think that would be managed in the code. I just put an 'example', but now while trying to make my own patch, it seems to be a little bit complicated….