Quartz源码分析

第一部分:What

暂略

第二部分:How

暂略(主要介绍Quartz的基本使用、几个基本组件以及各个组件之间的关系)

第三部分:Why

从上文可以知道,使用Quartz的入口点是Scheduler,那么我们就从Scheduler开始分析。

3.1 Scheduler

Scheduler是个接口,定义了一个最基本的调度器应该提供的能力。
Quartz提供了多个Scheduler的实现,但是我们最常用的就是StdScheduler,所以我们主要从StdScheduler开始分析。
上面的demo中我们首先通过代码

Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

获取到了Scheduler。深入到StdSchedulerFactory.getDefaultScheduler()方法中看到

public static Scheduler getDefaultScheduler() throws SchedulerException {
        StdSchedulerFactory fact = new StdSchedulerFactory();

        return fact.getScheduler();
    }

实际上就是一个工厂方法,该方法实现如下

public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        // 1.  先从本地缓存中查找 
        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                // 如果找到且已经关闭,则从缓存中删除
                schedRep.remove(getSchedulerName());
            } else {
                //  缓存中查找到且还在使用,则直接返回
                return sched;
            }
        }
        // 初始化        
        sched = instantiate();

        return sched;
    }

从上面的代码中可以看到,当调用getScheduler()方法获取Scheduler的时候,首先会从本地缓存中找,如果找到正在使用的则直接返回,没有找到则调用instantiate()方法初始化一个。继续查看instantiate()方法,该方法比较复杂,主要就是从配置文件中读取相应的数据然后初始化各个组件。由于这个方法太长,我们分多个部分进行分析。首先是各主要组件的定义

JobStore js = null;  // Job仓库,用来保存Job、Trigger的信息
ThreadPool tp = null; // 线程池,用来执行Job
QuartzScheduler qs = null; // Quartz调度器,Quartz的枢纽和心脏所在
DBConnectionManager dbMgr = null;
String instanceIdGeneratorClass = null;
Properties tProps = null;
String userTXLocation = null;
boolean wrapJobInTx = false;
boolean autoId = false;
long idleWaitTime = -1;
long dbFailureRetry = 15000L; // 15 secs
String classLoadHelperClass;
String jobFactoryClass; // 任务工厂,用来实例化具体的任务
ThreadExecutor threadExecutor; // 线程执行器

其中,QuartzScheduler是最主要的主见,也是Quartz的心脏枢纽所在。StdScheduler只是对QuartzScheduler的简单的封装。而JobStoreJob的仓库,用来保存执行job所必须的数据,包括JobTrigger的定义。线程池ThreadPool用来执行客户端的任务。任务工厂JobFactory用来生成客户端定义的Job对象。
既然有定义那就一定有初始化的地方,继续往下看instantiate()方法。

// 如果配置文件中没有指定要使用的类,则使用默认的实现类,否则使用指定的类
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

if (tpClass == null) {
    initException = new SchedulerException(
            "ThreadPool class not specified. ");
    throw initException;
}

try {
    tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {
    initException = new SchedulerException("ThreadPool class '"
            + tpClass + "' could not be instantiated.", e);
    throw initException;
}
tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
try {
    setBeanProps(tp, tProps);
} catch (Exception e) {
    initException = new SchedulerException("ThreadPool class '"
            + tpClass + "' props could not be configured.", e);
    throw initException;
}

// Get JobStore Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
        RAMJobStore.class.getName());

if (jsClass == null) {
    initException = new SchedulerException(
            "JobStore class not specified. ");
    throw initException;
}

try {
    js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {
    initException = new SchedulerException("JobStore class '" + jsClass
            + "' could not be instantiated.", e);
    throw initException;
}

SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
try {
    setBeanProps(js, tProps);
} catch (Exception e) {
    initException = new SchedulerException("JobStore class '" + jsClass
            + "' props could not be configured.", e);
    throw initException;
}

对上述变量进行初始化的过程都是一致的:首先看配置文件中有没有注定要使用的类,如果没有,则使用默认的实现类,反之则使用用户在配置文件中指定的实现类,并且同时设置配置文件中给定的属性。
从上面的代码可以看出,
***ThreadPool默认使用SimpleThreadPool,而JobStore默认使用RAMJobStore。***
本文也就是以这两个默认实现类进行分析。
除了RAMJobStoreQuartz还支持数据库形式的JobStore,所以如果用户需要使用数据库来保存Job信息,就需要指定数据源,所以instantiate()方法接下来的操作是读取并设置数据源(如果设置了的话):

String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
for (int i = 0; i < dsNames.length; i++) {
    PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
            PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));

    String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);

    // custom connectionProvider...
    if(cpClass != null) {
        ConnectionProvider cp = null;
        try {
            cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("ConnectionProvider class '" + cpClass
                    + "' could not be instantiated.", e);
            throw initException;
        }

        try {
            // remove the class name, so it isn't attempted to be set
            pp.getUnderlyingProperties().remove(
                    PROP_CONNECTION_PROVIDER_CLASS);

            setBeanProps(cp, pp.getUnderlyingProperties());
            cp.initialize();
        } catch (Exception e) {
            initException = new SchedulerException("ConnectionProvider class '" + cpClass
                    + "' props could not be configured.", e);
            throw initException;
        }

        dbMgr = DBConnectionManager.getInstance();
        dbMgr.addConnectionProvider(dsNames[i], cp);
    } else {
        String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null);

        if (dsJndi != null) {
            boolean dsAlwaysLookup = pp.getBooleanProperty(
                    PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);
            String dsJndiInitial = pp.getStringProperty(
                    PROP_DATASOURCE_JNDI_INITIAL);
            String dsJndiProvider = pp.getStringProperty(
                    PROP_DATASOURCE_JNDI_PROVDER);
            String dsJndiPrincipal = pp.getStringProperty(
                    PROP_DATASOURCE_JNDI_PRINCIPAL);
            String dsJndiCredentials = pp.getStringProperty(
                    PROP_DATASOURCE_JNDI_CREDENTIALS);
            Properties props = null;
            if (null != dsJndiInitial || null != dsJndiProvider
                    || null != dsJndiPrincipal || null != dsJndiCredentials) {
                props = new Properties();
                if (dsJndiInitial != null) {
                    props.put(PROP_DATASOURCE_JNDI_INITIAL,
                            dsJndiInitial);
                }
                if (dsJndiProvider != null) {
                    props.put(PROP_DATASOURCE_JNDI_PROVDER,
                            dsJndiProvider);
                }
                if (dsJndiPrincipal != null) {
                    props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,
                            dsJndiPrincipal);
                }
                if (dsJndiCredentials != null) {
                    props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,
                            dsJndiCredentials);
                }
            }
            JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi,
                    props, dsAlwaysLookup);
            dbMgr = DBConnectionManager.getInstance();
            dbMgr.addConnectionProvider(dsNames[i], cp);
        } else {
            String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);
            String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL);

            if (dsDriver == null) {
                initException = new SchedulerException(
                        "Driver not specified for DataSource: "
                                + dsNames[i]);
                throw initException;
            }
            if (dsURL == null) {
                initException = new SchedulerException(
                        "DB URL not specified for DataSource: "
                                + dsNames[i]);
                throw initException;
            }
            try {
                PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties());
                dbMgr = DBConnectionManager.getInstance();
                dbMgr.addConnectionProvider(dsNames[i], cp);
            } catch (SQLException sqle) {
                initException = new SchedulerException(
                        "Could not initialize DataSource: " + dsNames[i],
                        sqle);
                throw initException;
            }
        }
    }
}

Quartz另一大特使就是支持监听器和插件(SchedulerPluginsJobListenersTriggerListener),instantiate()方法接下来会判断配置文件中有没有配置插件和监听器,如果有的话,则会读取并保存相应的数据。

上面的过程都是从配置文件中读取配置,然后按照配置分别初始化各个单独的组件,接下来要做的事情就是把各个独立的组件串联在一起,代码如下:

try {
    JobRunShellFactory jrsf = null; // Create correct run-shell factory...    
    // ...其他省略与主线无关代码
    // QuartzSchedulerResources封装了Quartz的运行环境
    QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
    rsrcs.setName(schedName);
    rsrcs.setThreadName(threadName);
    rsrcs.setInstanceId(schedInstId);
    rsrcs.setJobRunShellFactory(jrsf);
    rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
    rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
    rsrcs.setRunUpdateCheck(!skipUpdateCheck);
    rsrcs.setBatchTimeWindow(batchTimeWindow);  // 设置批处理时间窗口,选择要出发的 Triggers使用
    rsrcs.setMaxBatchSize(maxBatchSize); // 设置一次可处理的最大任务数量,选择要出发的 Triggers使用
    rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
    rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
    // ...  省略与主线无关代码
    rsrcs.setThreadExecutor(threadExecutor); // 设置线程执行器
    threadExecutor.initialize(); // 初始化线程执行器

    rsrcs.setThreadPool(tp); // 设置执行任务的线程池
    if(tp instanceof SimpleThreadPool) {
        if(threadsInheritInitalizersClassLoader)
            ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
    }
    tp.initialize(); // 初始化线程池
    tpInited = true;
    rsrcs.setJobStore(js);

    // add plugins
    for (int i = 0; i < plugins.length; i++) {
        rsrcs.addSchedulerPlugin(plugins[i]);
    }

    // 最核心的地方,生成QuartzScheduler,参数为rsrcs,则所有的数据都有了
    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
    qsInited = true;

    // Create Scheduler ref...
    Scheduler scheduler = instantiate(rsrcs, qs);

    // set job factory if specified
    if(jobFactory != null) {
        qs.setJobFactory(jobFactory);
    }

    // Initialize plugins now that we have a Scheduler instance.
    for (int i = 0; i < plugins.length; i++) {
        plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
    }

    // add listeners
    for (int i = 0; i < jobListeners.length; i++) {
        qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
    }
    for (int i = 0; i < triggerListeners.length; i++) {
        qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
    }

    // set scheduler context data...
    for(Object key: schedCtxtProps.keySet()) {
        String val = schedCtxtProps.getProperty((String) key);    
        scheduler.getContext().put((String)key, val);
    }

    // fire up job store, and runshell factory

    js.setInstanceId(schedInstId);
    js.setInstanceName(schedName);
    js.setThreadPoolSize(tp.getPoolSize());
    js.initialize(loadHelper, qs.getSchedulerSignaler());

    jrsf.initialize(scheduler);

    // 初始化QuartzScheduler对象    
    qs.initialize();
    // prevents the repository from being garbage collected
    qs.addNoGCObject(schedRep); // 为了防止被垃圾回收,保存对应的引用
    // prevents the db manager from being garbage collected
    if (dbMgr != null) {
        qs.addNoGCObject(dbMgr);
    }
    schedRep.bind(scheduler); // 保存缓存
    return scheduler;
}
catch(SchedulerException e) {
    shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
    throw e;
}
catch(RuntimeException re) {
    shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
    throw re;
}
catch(Error re) {
    shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
    throw re;
}

上面的代码,首先会生成一个QuartzSchedulerResources的对象。

先分析核心点,后续再串起来.

3.2 ThreadPool

ThreadPoolQuartz内部定义的一个线程池,用来执行用户定义的Job,默认实现为SimpleThreadPool

2018-04-07 17:1554