老物重识-Quartz

初次写作尝试:本文试以问答形式对quartz做一些介绍。

Ⅰ Quartz是什么?为什么要有这样一篇文章?

Quartz 是一个完全由 Java 编写的开源作业调度框架,为在 Java 应用程序中进行作业调度提供了简单却强大的机制。Quartz最早的issue记录在jira.terracotta.org,时间可以追溯到大约2010年的2月。Quartz年代久远难以满足技术人的新奇感,也没有hadoop、spark那么重量、时髦,但深入了解一下这位老前辈的设计思想,仍能使我们获得一些开发方面的参考,这也正是本文的初衷。

Ⅱ 作业调度是什么?

想象一下,所有程序都是被动触发的,比如对于web程序,一次外部请求就是一次触发。有一类程序需要由时间触发,比如抢火车票的场景,每隔一段时间查询一次余票,最简单的实现,就是构造一个死循环,不断查询余票,每次启动主程序时,同时启动这个线程,这样就有了一个随时间周期性自动运行的程序。周期性触发称为一种调度规则,“由时间触发程序运行”就称为“作业调度”。

Ⅲ 我们为什么需要作业调度?

时间的流动不知疲倦永不停歇,由时间触发程序能自动化大量业务过程。

Ⅳ Quartz怎样实现作业调度?

仔细考虑一下我们需要的作业调度应该有些什么:

1. 一个“调度者”,

2. 一个“调度规则”,

3. 一个被调度的“作业”

这三者可以构成一个有基本功能的作业调度,在quartz中,Scheduler对应“调度者”, Trigger对应“调度规则”,Job对应“作业”。

调度规则和作业配置在quartz的保存方式有很多实现,基于内存的RAMJobStore、基于JDBC的JobStoreSupport等等。在Scheduler和JobStore之间还有一层DriveDelegate,DriveDelegate非常像JobStore,只是扩展了基于不同database实现时的一些实现。Listener则提供了Scheduler、Trigger、Job运行时,一些时间点的通知。

自绘业务架构图:

Ⅴ 描述一下Quartz的一些基本活动?

首先是Quartz启动时的活动图:

看一下对应的源码片段:

启动的入口在QuartzScheduler的start:

1. 在QuartzScheduler.start()前,主调度线程SchedulerThread和其他配置通过StdSchedulerFactory初始化,源代码在StdSchedulerFactory.instantiate(),接近800行,此处不列举。

2. 检查Scheduler是否正在关闭或者已关闭。

3. 通知SchedulerListener正在启动。

4. 启动Scheduler,启动插件。

5. 如果Scheduler从暂停恢复运行,通知JobSupport恢复Scheduler。

6. 通知主调度线程开始运行。

7. 通知SchedulerListener启动完成。

public class QuartzScheduler implements RemotableQuartzScheduler {
        public void start() throws SchedulerException {
            // 检查Scheduler是否正在关闭或者已关闭
            if (shuttingDown|| closed) {
                throw new SchedulerException(
                        "The Scheduler cannot be restarted after shutdown() has been called.");
            }

            //通知SchedulerListener正在启动
            notifySchedulerListenersStarting();

            if (initialStart == null) {
                initialStart = new Date();
//启动Scheduler
                this.resources.getJobStore().schedulerStarted();
//启动各种插件
                startPlugins();
            } else {
//如果Scheduler从暂停恢复运行,通知JobSupport恢复Scheduler
                resources.getJobStore().schedulerResumed();
            }
            //通知主调度线程可以开始运行
            schedThread.togglePause(false);
            //通知SchedulerListener启动完成
            notifySchedulerListenersStarted();
        }
    }

通知监听:创建SchedulerListener的列表并逐个通知。

public class QuartzScheduler implements RemotableQuartzScheduler {
    public void notifySchedulerListenersStarting() {
        // 创建SchedulerListener的列表.
        List<SchedulerListener> schedListeners = buildSchedulerListenerList();
        // 逐个通知Listener
        for (SchedulerListener sl : schedListeners) {
            sl.schedulerStarting();
        }
    }
}

获取监听列表:从ListenerManager获取Listener,这里需要开发者主动将自己的Listener注册到ListenerManager。

public class QuartzScheduler implements RemotableQuartzScheduler {
        private List<SchedulerListener> buildSchedulerListenerList() {
            List<SchedulerListener> allListeners = new LinkedList<SchedulerListener>();
// 从ListenerManager获取Listener 这里需要开发者主动将自己的Listener注册到ListenerManager
            allListeners.addAll(getListenerManager().getSchedulerListeners());
            allListeners.addAll(getInternalSchedulerListeners());
            return allListeners;
        }
    }

正式启动Scheduler

1.   集群部署时初始化ClusterManager线程并启动。

2.   单机部署时恢复Job。

3.   初始化MisFire处理线程并启动。

public abstract class JobStoreSupportimplements JobStore,Constants {

        public void schedulerStarted() throws SchedulerException {
            // 集群部署时初始化ClusterManager线程并启动
            if (isClustered()) {
                clusterManagementThread = new ClusterManager();
                clusterManagementThread.initialize();
            } else {
//单机部署直接恢复Job
                recoverJobs();
            }
            //初始化MisFire处理线程并启动
            misfireHandler = new MisfireHandler();
            misfireHandler.initialize();
        }
    }

首先初始化ClusterManager,把ClusterManager放进线程池执行。

class ClusterManager extends Thread {
        public void initialize() {
            // 初始化ClusterManager
            this.manage();
            ThreadExecutor executor = getThreadExecutor();
// 把ClusterManager放进线程池执行
            executor.execute(ClusterManager.this);
        }
    }

ClusterManage 进一步doCheckin。

class ClusterManager extends Thread {
    private boolean manage() {
        boolean res = false;
        // 节点登入集群
        res = doCheckin();
        return res;
    }
}

Checkin 细节:

1.   每次checkin都检查是否有意外中断的作业。

2.   从db获取锁后,再恢复作业。

public abstract class JobStoreSupportimplements JobStore,Constants {
        protected boolean doCheckin() throws JobPersistenceException {
            boolean recovered = false;
            Connection conn = getNonManagedTXConnection();
            try {
                // 每次都要检查是否有意外中断的作业
                List<SchedulerStateRecord> failedRecords = null;
                if (!firstCheckIn) {
                    failedRecords = clusterCheckIn(conn);
                    commitConnection(conn);
                }

                if (firstCheckIn || (failedRecords.size() > 0)) {
// 从db获取锁
                    getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
                    transStateOwner = true;
                    failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
                    if (failedRecords.size() > 0) {
                        getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                        // 恢复中断的作业
                        clusterRecover(conn, failedRecords);
                        recovered = true;
                    }
                }
                commitConnection(conn);
            } catch (JobPersistenceException e) {
                rollbackConnection(conn);
            } finally {
            }
            firstCheckIn = false;
            return recovered;
        }
    }

恢复作业:首先找到意外中断的调度记录,保存更新节点checkin的时间。

public abstract class JobStoreSupportimplements JobStore,Constants {
    protected List<SchedulerStateRecord> clusterCheckIn(Connection conn) throws JobPersistenceException {
        //找到意外中断的调度记录
        List<SchedulerStateRecord> failedInstances = findFailedInstances(conn);
        try {
            //  保存更新节点checkin的时间
            lastCheckin = System.currentTimeMillis();
            if (getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {
                getDelegate().insertSchedulerState(conn, getInstanceId(),
                        lastCheckin, getClusterCheckinInterval());
            }
        } catch (Exception e) {
        }
        return failedInstances;
    }
}

恢复细节:查询获得一个集群中所有失败节点的列表,如果当前节点首次checkin,列表里会有当前节点:

1.   查询所有节点的调度记录。

2.   找到当前节点的调度记录。

3.   找到所有调度意外中断的节点的记录。

如果是当前节点第一次checkIn,还要把有触发记录但丢失调度记录的补全,构造虚拟的调度记录。

public abstract class JobStoreSupportimplements JobStore,Constants {
        protected List<SchedulerStateRecord> findFailedInstances(Connection conn) throws JobPersistenceException {
            try {
                List<SchedulerStateRecord> failedInstances = new LinkedList<SchedulerStateRecord>();
                long timeNow = System.currentTimeMillis();
// 查询所有节点的调度记录
                List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null);
                for (SchedulerStateRecord rec : states) {
                    // 找到当前节点的记录
                    if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
                        if (firstCheckIn) {
                            failedInstances.add(rec);
                        }
                    } else {
                        // 找到所有调度意外中断的节点的记录
                        if (calcFailedIfAfter(rec) < timeNow) {
                            failedInstances.add(rec);
                        }
                    }
                }
// 如果是当前节点第一次checkIn,还要把有触发记录但丢失调度记录的补全,构造虚拟的调度记录.
                if (firstCheckIn) {
                    failedInstances.addAll(findOrphanedFailedInstances(conn, states));
                }
                return failedInstances;
            } catch (Exception e) {
            }
        }
    }

细节:针对有Trigger调度记录但没有Scheduler调度记录的,创建虚拟的Scheduler调度记录

public abstract class JobStoreSupportimplements JobStore,Constants {
    private List<SchedulerStateRecord> findOrphanedFailedInstances(
            Connection conn,
            List<SchedulerStateRecord> schedulerStateRecords)
            throws SQLException, NoSuchDelegateException {
        List<SchedulerStateRecord> orphanedInstances = new ArrayList<SchedulerStateRecord>();
        Set<String> allFiredTriggerInstanceNames = getDelegate().selectFiredTriggerInstanceNames(conn);
        if (!allFiredTriggerInstanceNames.isEmpty()) {
            for (SchedulerStateRecord rec : schedulerStateRecords) {
                allFiredTriggerInstanceNames.remove(rec.getSchedulerInstanceId());
            }
            for (String inst : allFiredTriggerInstanceNames) {
                SchedulerStateRecord orphanedInstance = new SchedulerStateRecord();
                orphanedInstance.setSchedulerInstanceId(inst);
                orphanedInstances.add(orphanedInstance);
            }
        }
        return orphanedInstances;
    }
}

完成作业恢复:然后用这些调度记录创建SimpleTriggerImpl,恢复对应的Trigger,通知主调度线程调度。

public abstract class JobStoreSupportimplements JobStore,Constants {
    protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances) throws JobPersistenceException {
        if (failedInstances.size() > 0) {
            long recoverIds = System.currentTimeMillis();
            try {
                for (SchedulerStateRecord rec : failedInstances) {
                    List<FiredTriggerRecord> firedTriggerRecs = getDelegate().selectInstancesFiredTriggerRecords(conn, rec.getSchedulerInstanceId());
                    int acquiredCount = 0;
                    int recoveredCount = 0;
                    int otherCount = 0;
                    Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();
                    for (FiredTriggerRecord ftRec : firedTriggerRecs) {
                        TriggerKey tKey = ftRec.getTriggerKey();
                        JobKey jKey = ftRec.getJobKey();
                        triggerKeys.add(tKey);
                        if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
                            getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_WAITING, STATE_BLOCKED);
                        } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
                            getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_PAUSED, STATE_PAUSED_BLOCKED);
                        }
                        if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
                            getDelegate().updateTriggerStateFromOtherState(conn, tKey, STATE_WAITING, STATE_ACQUIRED);
                            acquiredCount++;
                        } else if (ftRec.isJobRequestsRecovery()) {
                            if (jobExists(conn, jKey)) {
                                SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl("recover_" + rec.getSchedulerInstanceId() + "_" + String.valueOf(recoverIds++), Scheduler.DEFAULT_RECOVERY_GROUP, new Date(ftRec.getScheduleTimestamp()));
                                rcvryTrig.setJobName(jKey.getName());
                                rcvryTrig.setJobGroup(jKey.getGroup());
                                rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
                                rcvryTrig.setPriority(ftRec.getPriority());
                                JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
                                rcvryTrig.setJobDataMap(jd);
                                rcvryTrig.computeFirstFireTime(null);
                                storeTrigger(conn, rcvryTrig, null, false, STATE_WAITING, false, true);
                                recoveredCount++;
                            } else {
                                otherCount++;
                            }
                        } else {
                            otherCount++;
                        }
                        if (ftRec.isJobDisallowsConcurrentExecution()) {
                            getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_WAITING, STATE_BLOCKED);
                            getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_PAUSED, STATE_PAUSED_BLOCKED);
                        }
                    }
                    getDelegate().deleteFiredTriggers(conn, rec.getSchedulerInstanceId());
                    int completeCount = 0;
                    for (TriggerKey triggerKey : triggerKeys) {
                        if (getDelegate().selectTriggerState(conn, triggerKey).equals(STATE_COMPLETE)) {
                            List<FiredTriggerRecord> firedTriggers = getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
                            if (firedTriggers.isEmpty()) {
                                if (removeTrigger(conn, triggerKey)) {
                                    completeCount++;
                                }
                            }
                        }
                    }
                    if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
                        getDelegate().deleteSchedulerState(conn, rec.getSchedulerInstanceId());
                    }
                }
            } catch (Throwable e) {
            }
        }
    }
}

此外,ClusterManager运行时也会周期性地恢复其他异常节点调度的Trigger,并且立即通知当前节点的调度线程插入这些立即触发的Trigger。

class ClusterManager extends Thread {
        public void run() {
            while (!shutdown) {
                if (!shutdown) {
                    long timeToSleep = getClusterCheckinInterval();
                    long transpiredTime = (System.currentTimeMillis() - lastCheckin);
                    timeToSleep = timeToSleep - transpiredTime;
                    if (timeToSleep <= 0) {
                        timeToSleep = 100L;
                    }
                    if (numFails > 0) {
                        timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                    }
                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception ignore) {
                    }
                }
                if (!shutdown && this.manage()) {
//通知当前节点的主调度线程插入一批新的Trigger触发
                    signalSchedulingChangeImmediately(0L);
                }
            }
        }
    }

错过了Trigger的触发时间会怎样?

有一个专门处理错过触发时间超过一定阈值(60s)的线程,活动图如下:

Misfire 处理线程启动:

1.   查询错过触发时间阈值的作业。

2.   通知主调度线程插入这些重新调度的作业。

class MisfireHandler extends Thread {
        @Override
        public void run() {
            while (!shutdown) {
                long sTime = System.currentTimeMillis();
// 查询错过触发阈值的作业
                RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
// 通知主调度线程插入这些重新调度的作业
                if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
                    signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
                }
                if (!shutdown) {
                    long timeToSleep = 50l;
                    //sleep直到下个循环
                    if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                        timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                        if (timeToSleep <= 0) {
                            timeToSleep = 50l;
                        }
                        if (numFails > 0) {
                            timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                        }
                    }
                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception ignore) {
                    }
                }//while !shutdown
            }
        }
    }

manage 继续调用doRecoverMisfires。

class MisfireHandler extends Thread {
    private RecoverMisfiredJobsResult manage() {
        try {
            //查询错过触发的job
            RecoverMisfiredJobsResult res = doRecoverMisfires();
            return res;
        } catch (Exception e) {
        }
        return RecoverMisfiredJobsResult.NO_OP;
    }
}

Count 是否有错过触发需要重新调度的作业,再获取集群锁,然后再获取作业。

class MisfireHandler extends Thread {
        protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException {
            boolean transOwner = false;
            Connection conn = getNonManagedTXConnection();
            try {
                RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
// Count是否有错过触发需要重新调度的作业,再获取集群锁,然后再获取作业
                int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
                        getDelegate().countMisfiredTriggersInState(conn, STATE_WAITING, getMisfireTime()) :
                        Integer.MAX_VALUE;
                if (misfireCount == 0) {
                } else {
                    transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                    result = recoverMisfiredJobs(conn, false);
                }
                commitConnection(conn);
                return result;
            } finally {
            }
        }
    }

最后通知主调度线程正式启动。

public class QuartzSchedulerThread extends Thread {
    void togglePause(boolean pause) {
        synchronized (sigLock) {
            paused = pause;
            if (paused) {
                signalSchedulingChange(0);
            } else {
                sigLock.notifyAll();
            }
        }
    }
}

然后是调度作业的活动图:

再来看主调度线程的代码片段:

1.   检查是否关闭、暂停主调度线程,然后wait。

2.   DB 有异常时稍微等待再继续。

3.   获取空闲线程

4.   获取一批即将调度的Trigger。

5.   距触发时间有一段时间时,检查是否有其他插入的Trigger,wait

6.   如果找到了其他插入的Trigger,释放当前的一批Trigger,重新循环。

7.   通知JobStore  trigger已经被触发,获取触发结果。

8.   Trigger 触发结果是失败时释放这个Trigger。

9.   Trigger 触发成功时,创建JobRunShell对象,JobRunlShell初始化获取调度作业。

10. 线程池运行调度作业。

public class QuartzSchedulerThread extends Thread {
        public void run() {
            int acquiresFailed = 0;
// 检查是否关闭、暂停主调度线程,然后wait
            while (!halted.get()) {
                try {
                    synchronized (sigLock) {
                        while (paused && !halted.get()) {
                            sigLock.wait(1000L);
                            acquiresFailed = 0;
                        }
                        if (halted.get()) {
                            break;
                        }
                    }
// DB有异常时稍微等待再继续
                    if (acquiresFailed > 1) {
                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                        Thread.sleep(delay);
                    }
// 获取空闲线程,但这里其实恒为true
                    int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                    if (availThreadCount > 0) {
                        List<OperableTrigger> triggers;
                        long now = System.currentTimeMillis();
                        clearSignaledSchedulingChange();
                        try {
// 获取一批即将调度的Trigger
                            triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                            acquiresFailed = 0;
                        } catch (JobPersistenceException jpe) {
                            if (acquiresFailed == 0) {
                                qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.", jpe);
                            }
                            acquiresFailed++;
                            continue;
                        }
                        acquiresFailed++;
                        continue;
                    }
                    if (triggers != null && !triggers.isEmpty()) {
                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
// 距触发时间有一段时间时,检查是否有其他插入的Trigger,wait
                        while (timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    sigLock.wait(timeUntilTrigger);
                                }
                            }
//如果找到了其他插入的Trigger,释放当前的一批Trigger,重新循环
                            if (releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                        }
                        if (triggers.isEmpty())
                            continue;
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
                        boolean goAhead = true;
                        synchronized (sigLock) {
                            goAhead = !halted.get();
                        }
                        if (goAhead) {
                            try {
//通知JobStore  trigger已经被触发,获取触发结果
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if (res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError("An error occurred while firing triggers '" + triggers + "'", se);
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }
                        }
                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result = bndles.get(i);
                            TriggerFiredBundle bndle = result.getTriggerFiredBundle();
                            Exception exception = result.getException();
//Trigger触发结果是失败时释放这个Trigger
                            if (exception instanceof RuntimeException) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }
//Trigger触发成功时,创建JobRunShell对象,JobRunlShell初始化获取调度作业
                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
//线程池运行调度作业,
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }
                        }
                        continue;
                    } else {
                        continue; // while (!halted)
                    }
                    long now = System.currentTimeMillis();
                    long waitTime = now + getRandomizedIdleWaitTime();
                    long timeUntilContinue = waitTime - now;
                    synchronized (sigLock) {
                        if (!halted.get()) {
                            if (!isScheduleChanged()) {
                                sigLock.wait(timeUntilContinue);
                            }
                        }
                    }
                } catch (RuntimeException re) {
                }
            }
        }
    }

默认线程池运行作业:

1.   wait 空闲线程或继续执行。

2.   即使线程池被关闭,依然可以继续执行作业。

public class SimpleThreadPool implements ThreadPool {
        public boolean runInThread(Runnable runnable) {
            synchronized (nextRunnableLock) {
                handoffPending = true;
                // wait空闲线程或继续执行
// Wait until a worker thread is available
                while ((availWorkers.size() < 1) && !isShutdown) {
                    nextRunnableLock.wait(500);
                }
                if (!isShutdown) {
                    WorkerThread wt = (WorkerThread) availWorkers.removeFirst();
                    busyWorkers.add(wt);
                    wt.run(runnable);
                } else {
// 即使线程池被关闭,依然可以继续执行作业
                    WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
                    busyWorkers.add(wt);
                    workers.add(wt);
                    wt.start();
                }
                nextRunnableLock.notifyAll();
                handoffPending = false;
            }
            return true;
        }
    }

这里的run只是唤醒当前对象在另一个线程里的wait。

class WorkerThread extends Thread {
    public void run(Runnable newRunnable) {
        synchronized (lock) {
            runnable = newRunnable;
            lock.notifyAll();
        }
    }
}

JobRunShell 在这里执行。

class WorkerThread extends Thread {
        public void run() {
            boolean ran = false;
            while (run.get()) {
                try {
                    synchronized (lock) {
// 没有作业时wait空循环
                        while (runnable == null && run.get()) {
                            lock.wait(500);
                        }
// 有作业时执行
                        if (runnable != null) {
                            ran = true;
                            runnable.run();
                        }
                    }
                } finally {
                    if (getPriority() != tp.getThreadPriority()) {
                        setPriority(tp.getThreadPriority());
                    }
                    if (runOnce) {
                        run.set(false);
                        clearFromBusyWorkersList(this);
                    } else if (ran) {
                        ran = false;
                        makeAvailable(this);
                    }
                }
            }
        }
    }

JobShell 内部实现:

1. 如果有事务,开启事务。

2. 通知TriggerListener和JobListener。

3. 传入Job的上下文运行Job。

4. 事务控制的Job在事务异常时才允许重复运行job。

5. 结束事务。

public class JobRunShell extends SchedulerListenerSupport implements Runnable {
        public void run() {

            qs.addInternalSchedulerListener(this);
            try {
                OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
                JobDetail jobDetail = jec.getJobDetail();
                do {
                    JobExecutionException jobExEx = null;
                    Job job = jec.getJobInstance();
                    try {
// 如果有事务,开启事务
                        begin();
                    } catch (SchedulerException se) {
                        break;
                    }
// 通知TriggerListener和JobListener
                    try {
                        if (!notifyListenersBeginning(jec)) {
                            break;
                        }
                    } catch (VetoedException ve) {
                        try {
                            CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
                            qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
                            if (jec.getTrigger().getNextFireTime() == null) {
                                qs.notifySchedulerListenersFinalized(jec.getTrigger());
                            }
                            complete(true);
                        } catch (SchedulerException se) {
                            qs.notifySchedulerListenersError("Error during veto of Job (" + jec.getJobDetail().getKey()
                                    + ": couldn't finalize execution.", se);
                        }
                        break;
                    }
                    long startTime = System.currentTimeMillis();
                    long endTime = startTime;
// 传入Job的上下文运行Job
// execute the job
                    try {
                        job.execute(jec);
                        endTime = System.currentTimeMillis();
                    } catch (JobExecutionException jee) {
                        endTime = System.currentTimeMillis();
                        jobExEx = jee;
                    } catch (Throwable e) {
                        endTime = System.currentTimeMillis();
                        qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se);
                        jobExEx = new JobExecutionException(se, false);
                    }
                    jec.setJobRunTime(endTime - startTime);
                    if (!notifyJobListenersComplete(jec, jobExEx)) {
                        break;
                    }
                    CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
                    try {
                        instCode = trigger.executionComplete(jec, jobExEx);
                    } catch (Exception e) {
                        SchedulerException se = new SchedulerException("Trigger threw an unhandled exception.", e);
                        qs.notifySchedulerListenersError("Please report this error to the Quartz developers.", se);
                    }
                    if (!notifyTriggerListenersComplete(jec, instCode)) {
                        break;
                    }
// 事务控制的Job在事务异常时才允许重复运行job.
                    if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
                        jec.incrementRefireCount();
                        try {
                            complete(false);
                        } catch (SchedulerException se) {
                            qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se);
                        }
                        continue;
                    }
                    try {
// 结束事务.
                        complete(true);
                    } catch (SchedulerException se) {
                        qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se);
                        continue;
                    }
                    qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                    break;
                } while (true);
            } finally {
                qs.removeInternalSchedulerListener(this);
            }
        }
    }

Ⅵ 对比一下Quartz和其他流行的调度作业框架?

58 的作业调度框架基于XXL-JOB,XXL-JOB早期基于Quartz实现调度。由于对XXL-JOB并不熟悉,因此直接参考了官方文档,XXL-JOB官方文档将自己和Quartz做了对比:

Quartz 作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:

问题一:调用API的的方式操作任务,不人性化;

问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。

问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐       渐增多,同时调度任务逻辑逐渐加重的情况加,此时调度系统的性能将大大受限于业务;

问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非        常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。

XXL-JOB 弥补了quartz的上述不足之处。

通过XXL-JOB官方文档我们主要了解到,XXL-JOB提供了一套界面且操作API管理调度作业,优化了各节点负载,但并非XXL-JOB能优化负载而Quartz不能,比如作业以Job为单元执行,将作业分散部署在多个集群,将作业量接近的Job部署在同一个集群内,节点内控制合适的线程池数量,负载问题可以缓解一大块。对于问题三,调度任务增多影响性能的问题,根源实际上在于将作业代码写在调度集群内,通过进程隔离解决这个问题并不难。对于问题二,持久化业务需要保存数据到数据表,事实上任何作业调度都无法避免,通过服务拆分和进程隔离,仍然可以一定程度缓解这个问题。总的来看,XXL-JOB对Quartz做出了一些优化,也不失为一个作业调度的选择。

最后总结一下本文:

本文从两方面初步介绍了Quartz的基本实现:

1.   Quartz 启动的主流程:

1) 通过配置初始化Scheduler和SchedulerThread主调度线程。

2) 以集群节点或单机的身份恢复作业调度。

3) 启动Misfire处理,检查恢复错过调度一定时间阈值的作业。

4) 在各个节点通知Listener。

2.   Quartz 调度作业的流程:

1) 获取Trigger和对应的Job。

2) 检查并立即调度插入的Trigger和Job。

3) 将Trigger和Job交给作业线程池执行。

4) 在各个节点通知Listener。

本文没有涉及的:

1.   Quartz 初始化配置的过程。

2.   Quartz 的JobStore的多种实现以及细节。

3.   Job 数据的持久化。

4.   Job 的事务控制。

5.   Listener 和Plugin。

6.   Cluster 的细节。

感兴趣的可以自己下载源码阅读。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章