5.2 示例一:采用老API的ValueAggregatorJob
通过JobClient类的runJob()方法提交作业,属于Hadoop的老API。Tom White为此在其Hadoop:The Definitive Guide一书的第二章中给出了一个示例——OldMaxTemperature。以前在老版本的Hadoop源码中也曾经有过好几个类似的示例,然而在新版本,即2.0版以后的Hadoop源码中已经难以找到采用这种方法的示例了。但是JobClient这个类还在,runJob()这个方法还在,老的API还在,想要直接使用这个方法也还是可以的。事实上,新版的Hadoop源码中至今(至少到2.7.1版)仍在./hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop下面保留了mapreduce和mapred两个分支,后者就是采用老API的,其中有一个可以直接作为进程运行的类ValueAggregatorJob,我们正好把它用作示例。其实Hadoop 2.6.0版的源码中在上述的mapreduce和mapred两个分支上分别有一个ValueAggregatorJob类。我们在这里采用的是mapred分支上的那一个。
Hadoop源码的目录结构嵌套很深,通向具体源码文件的路径往往很长,有时书页上的一个整行还容不下一个完整的路径,读者还不如用类名搜索一下(例如在Linux上用find命令)反倒方便。Hadoop源码的文件名很规则,一个类(嵌套在别的类内部的不算)就是一个文件,类名是什么,文件名就是什么。
我们知道,Aggregate是“汇总、合计”的意思,所以这个类的功用一定与此有关。这个类的源码文件是ValueAggregatorJob.java,在mapred分支上的lib/aggregate目录中,其摘要大致是这样的:
class ValueAggregatorJob {} ]createValueAggregatorJobs() ]createValueAggregatorJob() ]setAggregatorDescriptors() ]main()
在Java程序中,如果一个类提供了方法main(),这个类就是“有源”的,可以从命令行直接启动作为一个Java虚拟机(JVM)进程运行,这跟C/C++程序的情况是相似的。
注意,这个类中并没有提供相应的Mapper和Reducer,所以一定来自外部,但是其源码一般都会在同一目录中,编译之后也会被打在同一个Java包(package)中。事实上,在这个源码文件的开头有这么一行:
package org.apache.hadoop.mapred.l ib.aggregate;
而同一目录中还有好几个源码文件,其中就有ValueAggregatorMapper.java和ValueAggregatorReducer.j ava,而且这些源码文件的头部也都指定属于同一个package。
可想而知,如果这个应用是用户自行开发的,那么用户一般就得提供自己的Mapper和Reducer。复杂一些也可能还需要提供Combiner,这就要视具体情况而定了。Combiner用于Reducer之前的数据整合。
凡是提供了main()的类,Java虚拟机在装载和初始化了这个类之后就会执行其main()方法。所以我们就从ValueAggregatorJob的main()开始。
[ValueAggregatorJob.main()] public static void main(String args[])throws IOException { JobConf j ob=ValueAggregatorJob.createValueAggregatorJob(args); JobClient.runJob(j ob); }
这里的方法(函数)名那一行最后有“throws IOException”,那是用于出错处理的,表示在这个函数的执行过程中可能发生类型为IOException的异常,但是函数内部没有加以捕捉和处理,所以这个函数的调用者应该加以捕捉和处理。IOException也是一个类,是由Java语言提供的,凡要使用这个机制的源文件需要在文件头部加上“import j ava.io.IOException”。对于由main()产生的异常,Java虚拟机自会加以捕捉和处理。以后,为了集中我们的注意力,也为了减小篇幅,我们在遇到throws什么异常的时候也许就用省略号代替,或者连省略号也省略不用了,因为一般而言这与主旋律没有什么关系。
这个main()方法中先通过本类内部提供的createValueAggregatorJob()方法创建一个JobConf类对象job,然后以此为参数调用JobClient.runJob()。注意,这里的JobClient只是一个类,而不是一个具体的对象,但是我们仍可直接调用这个类所提供的方法。这个类是在文件头部通过import语句导入的,不过我们在这里并未创建任何具体的JobClient对象。
我们先看createValueAggregatorJob()做了些什么。
Java是面向对象的程序设计语言,由于其多态性,在ValueAggregatorJob这个类中其实有好几个createValueAggregatorJob(),它们的调用参数各不相同,编译器会根据调用参数确定实际调用的是哪一个方法。显然,这里的调用参数就是args,即命令行参数的数组。
[ValueAggregatorJob.main()> createValueAggregatorJob(args)] public static JobConf createValueAggregatorJob(String args[])throws IOException { return createValueAggregatorJob(args, ValueAggregator.class); }
这个方法只做了一件事,就是调用另一个同名的方法。但是,此时调用参数表不同了,现在有两个参数,在原有参数的后面添上了ValueAggregator.class。在同一个Package的代码中搜索一下,可知ValueAggregator是个interface,就是“界面”,或者称为“接口”,而ValueAggregator.class是由Java编译器生成的对于ValueAggregator这个界面或类的描述,这种描述本身就是一个Class类的对象。
接下去看那个有两个参数的createValueAggregatorJob()。为了帮助读者阅读,我在源码中加上了一些中文注释。
[ValueAggregatorJob.main()> createValueAggregatorJob()> createValueAggregatorJob()] public static JobConf createValueAggregatorJob(String args[], Class<? > caller)… { Configuration conf=new Configuration(); //创建一个Configuration类对象conf GenericOptionsParser genericParser=new GenericOptionsParser(conf, args); //用来解析命令行参数,其中args[0]是应用的类名本身 args=genericParser.getRemainingArgs(); //获取命令行中其余的参数(除args[0]以外) if (args.length < 2){ //后面至少还要有两个参数,即输入目录和输出目录 System.out.println("usage:inputDirs outDir" +"[numOfReducer [textinputformat|seq[specfile [jobName]]]]"); GenericOptionsParser.printGenericCommandUsage(System.out); System.exit(1); } String inputDir=args[0]; //输入目录,注意,原来的args[1]现在变成了args[0] String outputDir=args[1]; //输出目录 int numOfReducers=1; //先假定只有一个Reducer
if (args.length > 2){ numOfReducers=Integer.parseInt(args[2]); //若命令行中另有规定就加以调整 } Class<? extends InputFormat> theInputFormat=TextInputFormat.class; if (args.length > 3&&args[3].compareToIgnoreCase("textinputformat")==0){ theInputFormat=TextInputFormat.class; //允许在命令行中指定的输入格式 }else { theInputFormat=SequenceFileInputFormat.class; //默认的输入格式 } … //处理其余的命令行参数,对我们不太重要,故而从略 JobConf theJob=new JobConf(conf); //将Configuration对象扩展成JobConf对象 if (specFile! =null){ theJob.addResource(specFile); } … //这里会用到参数caller,用于确定应该装载何种Java类 //而且caller也可以是空(null),如果是空就强制为ValueAggregatorJob theJob.setJobName("ValueAggregatorJob:"+j obName); FileInputFormat.addInputPaths(theJob, inputDir); //设置输入路径 theJob.setInputFormat(theInputFormat); //设置输入格式 theJob.setMapperClass(ValueAggregatorMapper.class); //设置mapper类 FileOutputFormat.setOutputPath(theJob, new Path(outputDir)); //设置输出目录路径 theJob.setOutputFormat(TextOutputFormat.class); //设置输出格式 theJob.setMapOutputKeyClass(Text.class); //设置mapper输出的键类型 theJob.setMapOutputValueClass(Text.class); //设置mapper输出的值类型 theJob.setOutputKeyClass(Text.class); //设置最后(reducer)输出的键类型 theJob.setOutputValueClass(Text.class); //设置最后(reducer)输出的值类型 theJob.setReducerClass(ValueAggregatorReducer.class); //设置reducer类 theJob.setCombinerClass(ValueAggregatorCombiner.class); //设置combiner类 theJob.setNumMapTasks(1); //mapper只需一个 theJob.setNumReduceTasks(numOfReducers); //reducer的个数取决于命令行参数 return theJob; }
先看调用参数。“Class<? > caller”是个模版式类型说明,表示任何类都可以,所以也称为“泛型”,只要是Class就行,在这里实际上是对于ValueAggregator界面的描述,包括其名称。
这个函数的代码,重点在于对JobConf类对象theJob的设置。JobConf类是对Configuration类的扩充,具中的对象就是关于某个具体作业的参数配置块。我们可以把这个对象更多地看成数据结构,对这个数据结构的内容进行设置,就好像是在填写申请表或作业单。在Hadoop的术语中,作业(Job)就是对某个应用(Application)的可执行映像的一次运行。
在这个作业单中,有些信息是固定的,有些信息直接来自平台上的某些xml配置文件,那些都属于Configuration原有的那一部分。而新扩充出来的那一部分,就是与具体作业相关的了,所以扩充以后称为JobConf。
可以说,要在Hadoop上执行一个作业,用户需要做的事情主要有:
(1)提供具体的Mapper、Reducer,也许还有Combiner等程序模块(Java类)。
(2)填写作业单,在作业单中写明所使用的Mapper、Reducer等模块及其他有关信息。
(3)提交作业单。
对于我们现在这个具体的作业,Mapper和Reducer都来自体外,因为ValueAggregatorJob内部并未定义内嵌的Mapper和Reducer。不过在这个示例中这二者同样也是由Hadoop提供的,就是同一目录中的源码文件ValueAggregatorMapper.java和ValueAggregatorReducer.j ava。
现在我们正在做的是第二项。那么具体填写、设置了些什么呢?例如把Mapper设置成ValueAggregatorMapper.class,把Reducer设置成ValueAggregatorReducer.class。这个作业还安排了使用Combiner,所以还要把Combiner设置成ValueAggregatorCombiner.class。根据前述可知,这里ValueAggregatorMapper.class是对ValueAggregatorMapper类的描述,余可类推。从代码中可见,需要设置的内容还有不少,例如所需mapper和reducer的数量,Map阶段的输出类型,最终的输出类型即整个MapReduce计算的输出类型,等等。
填写完这个作业单,程序就依次返回到main(),用户的准备工作业已完成,下一步就是通过JobClient.runJob()提交作业了。我们的目的是要了解Hadoop内部的运转,因而需要顺着程序的流程深入进去。
这里对runJob()的调用是直接通过类进行的,而不是通过这个类的具体对象进行的,程序中迄今尚未创建具体的JobClient类对象。其实都一样,总之是通过JobClient这个类的函数跳转表进行的。我们直接看runJob()的代码。
[ValueAggregatorJob.main()> JobCl ient.runJob()] public static RunningJob runJob(JobConf job)throws IOException { JobCl ient j c=new JobClient(j ob); //创建JobCl ient类的对象 j c RunningJob rj=j c.submitJob(j ob); //提交作业,返回一个RunningJob对象rj try{ if (! jc.monitorAndPrintJob(job, rj)){//监视并显示作业rj 的运行情况,直至作业结束 throw new IOException("Job failed! "); //如果返回值是 false就说明运行失败 } }catch(InterruptedException ie){ //如果运行过程中 InterruptedException异常 Thread.currentThread().interrupt(); //就中断该线程的执行
} return rj; }
这里创建具体的JobClient对象了,而作为参数传下来的JobConf类对象,即参数配置块job,则用作创建JobClient对象的参数。JobClient的体量较大,其结构大致如下:
class JobClient extends CLI{} ]static final StringMAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY= "mapreduce.jobclient.retry.policy.enabled" ]…//还有好几个类似于这样的字符串 ]static{ConfigUtil.loadResources(); } //静态初始化 ]UserGroupInformation clientUgi //UserGroupInformation类的对象clientUgi,用来表明客户身份 ]publ ic JobClient(JobConf conf) //本类对象的构造函数 > init(conf) //创建JobCl ient对象时要调用其 init()方法 ]public void init(JobConf conf) > setConf(conf) > cluster=new Cluster(conf) > clientUgi=UserGroupInformation.getCurrentUser() //获取客户的身份信息 ]… //跳过若干此刻不感兴趣的方法 ]public synchronized FileSystem getFs() > return cluster.getFileSystem() ]public RunningJob submitJob(String jobFile) //提交作业,以文件名为参数 > job=new JobConf(jobFile) > return submitJob(j ob) ]public RunningJob submitJob(final JobConf conf) //提交作业,以JobConf对象为参数 > return submitJobInternal(conf) //submitJob()通过 submitJobInternal()完成作业提交 ]public RunningJob submitJobInternal(final JobConf conf) //实际提交作业的过程 > … //详见下述 ]… //跳过更多此刻不感兴趣的方法 ]public static RunningJob runJob(JobConf job) //提交并监视作业的运行,直至其结束 ]public boolean monitorAndPrintJob(JobConf conf, RunningJob job) > return ((NetworkedJob)j ob).monitorAndPrintJob() ]public void setTaskOutputFilter(TaskStatusFilter newValue) ]… ]public static void main(String argv[]) > res=ToolRunner.run(new JobCl ient(), argv) > System.exit(res) ]static class NetworkedJob implements RunningJob{} //JobClient内部定义的类 ]]Job j ob //NetworkedJob里面有一个Job对象
]]publ ic NetworkedJob(JobStatus status, Cluster cluster) //该类对象的构建函数 ]]public Configuration getConfiguration() ]]public String getJobName() ]]…//NetworkedJob类中还有些我们暂时不感兴趣的方法
这里要对上述内容做一些解释。
首先,JobClient这个类是对另一个类CLI的扩展,而CLI是“Command Line Interface”即“命令行界面”的缩写,这个类将来我们还会碰到。所以,JobClient实质上是个经过扩充的Hadoop命令行使用界面。这个类有自己的main()函数,因而是可以直接通过命令行(例如“java JobClient …”)启动作为一个JVM进程运行的;但是如果通过new加以创建,那程序就不走它的main(),而走它的构造函数JobClient(),那就不另起一个Java虚拟机了。
如前所述,“类”(以及由类的实体化而来的“对象”)既包含数据,又包含用来处理这些数据的程序即“方法”,每一项数据可以是原始类型的数据,如整数、浮点数、布尔量等,也可以是某个类的对象。例如这里就有个String类的对象MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,注意这是个作为String对象的变量名,而不是像C程序中所使用的宏定义。这个变量的值是“mapreduce.jobclient.retry.policy.enabled”,而且是final,即不可更改的,实际上这是用来查询配置文件的,平台上的某个.xml配置文件中应该有以此为键名的值,要是更改了就查询不到了。此外,这个对象的定义前面有个static标志,说明只存在于类的Class数据结构中,而不存在于具体对象的数据结构中。换言之,同属JobClient类的所有对象都共享此项数据。
然后,这里有个形如static{… }的无名方法,称为“静态初始化”方法,用于整个类的初始化。当Java虚拟机按import语句的指示装载JobClient这个类的时候,要执行这个静态初始化方法,这里面只有一个语句——ConfigUtil.loadResources(),那就是执行ConfigUtil类所提供的方法loadResources(),以装载相关的资源,实际上就是一些采用XML语言的配置文件,包括mapred-default.xml、mapred-site.xml、yarn-default.xml、yarn-site.xml。
JobClient中还内嵌定义了一个成员类NetworkedJob,这是为远程管理而设置的,我们在这里不感兴趣。
接着是一个UserGroupInformation类的对象clientUgi。这个成分对于用户访问权限,从而对于系统的安全性起着关键的作用,本书后面还要详述。顺便说明,像clientUgi这样的变量形式的成分,并不代表着整个UserGroupInformation类对象的实体,而只是个“指引(Reference)”,实质上就是个结构指针,所以这里只是为这个指针分配内存空间。较之C语言中的结构指针,Java语言中Reference的特殊之处在于:凡有给这个指针赋值的语句时,编译器(或解释器)会检查所赋之值是否确实指向一个该类对象,并且不允许在Java程序中对此进行运算,所以比C语言中的指针安全。
除上面所述的这些以外,剩下的就是JobClient类所提供的种种方法函数了。注意,这里所列的各种方法其实都可能有多个版本,方法名相同但参数表不同,这就是多态。
那么,在我们这个情景中,通过new操作创建JobClient对象时在其构造方法中都做了些什么呢?从上面的摘要中可以看出,执行的是JobClient类所提供的方法init()。而这个init(),则做了三件事情。一是setConf(conf),将创建对象时传下的conf设置成为本对象的配置块,也就是从其创建者那里继承了它的配置块。二是创建一个Cluster类的对象cluster,这是对于运行着Hadoop的计算机集群的一个笼统的描述,实际上只是对于如何与外界联系的描述,其内容主要来自配置块,而并不涉及集群中那些具体的节点。注意这里被赋值的变量cluster,我们在JobClient类的定义中找不到这个变量,当然它也不可能是全局变量,因为Java语言不允许有全局变量。实际上,这个变量是从JobClient的父类CLI那里继承下来的, JobClient是对CLI类的扩充。再看第三件事,那就是通过UserGroupInformation类的方法getCurrentUser()获取当前用户的身份信息,那是一个UserGroupInformation类(缩写成UGI)的对象。前面我们已经看到JobClient内部有个该类对象clientUgi,但是并未见到赋值,所以只是一个空白的对象指引,相当于一个未经初始化的变量,现在经过getCurrentUser()才有了相关的信息。
做完这些事情之后,前面runJob()中创建JobClient对象jc这一步就完成了。
接着是调用jc.submitJob(job),就是调用JobClient类的方法submitJob()。从前面的摘要中可见,这个方法所做的事情就只是调用submitJobInternal(conf),那才是关键所在。
由于submitJobInternal()这个方法相对比较复杂,我们在摘要中没有加以展开,需要专门加以分析。当然,这个方法也是由JobClient提供的。
[ValueAggregatorJob.main()> JobCl ient.runJob() > JobClient.submitJob > JobClient.submitJobInternal()] public RunningJob submitJobInternal(final JobConf conf)… { try{ conf.setBooleanIfUnset("mapred.mapper.new-api", false); //mapper,老API conf.setBooleanIfUnset("mapred.reducer.new-api", false); //reducer,老API Job job=clientUgi.doAs (new PrivilegedExceptionAction<Job> (){ //以实际用户的权限行事 @Override//表示下面这个run()是对于从父类继承的同名方法函数的覆盖 public Job run()throws IOException, ClassNotFoundException, InterruptedException{ Job job=Job.getInstance(conf); job.submit(); //以实际用户的权限,通过Job.submit()提交作业 return j ob; } }); //cl ientUgi.doAs()结束于此,回到原来的身份和权限 //update our Cluster instance with the one created by Job for submission //(we can't pass our Cluster instance to Job, since Job wraps the config //instance, and the two configs would then diverge) cluster=j ob.getCluster(); return new NetworkedJob(j ob); //创建一个NetworkedJob对象,以便了解作业的进展 }catch(InterruptedException ie){ throw new IOException("interrupted", ie); } }
在软件运行的过程中,用户的身份是根据其属于哪一个“组(group)”来判定的,这种身份决定了具体的用户有些什么权限。在单机上这个问题相对比较简单,然而在跨机器节点的“服务/客户”结构的系统中就比较复杂了。作为服务端的进程很可能是由系统管理员启动的,因而具有“超级用户”的权限,但是做的事情、提供的服务,却可能是为普通用户的,需要以普通用户的名义去做。打个比方,我们去某个政府部门办事,办事的人是公务员,当然有比较高的权限,例如可以查看许多信息,可是如果要把这些信息发送给申请人,那就不可以了,因为申请人按规定没有那样高的权限。所以,办事的人必须区分,哪些事是以公务员的身份在做,哪些事是以申请人的名义、按申请人的身份在做。通常,以申请人的身份意味着权限的降低。
但是也有倒过来的,有时候会碰到一些需要更高权限才能完成的事情,这时候需要暂时赋予用户更高的权限,使其得以完成相关的操作。当然,这个更高的权限也不是随便就给、谁都给的,通常会在屏幕上弹出一个小窗口要求用户输入密码,核对密码无误后才能往下走,到完成了需要更高权限的操作之后,就又降低到用户原先的权限。Unix/Linux的“su”机制就是这样。
Java语言 (Java虚拟机)也提供了这么一种机制。这里的clientUgi.doAs(),即UserGroupInformation.doAs()就属于此种机制,意为以谁谁谁的身份做什么什么。以谁的身份呢?以具体UserGroupInformation对象所述的那个组的身份。以这个身份做什么呢?做调用参数PrivilegedExceptionAction所定义的行动。这是需要严格查验和控制操作权限的行动(如果启用了安全模式的话)。可是这个行动究竟是什么呢?那就是后面花括号里面的那个run()函数。实际上PrivilegedExceptionAction是JDK中定义的一个界面,凡是实现这个界面的类,都必须提供一个run()函数,供doAs()调用。对于类的定义和创建,通常我们采用静态定义,那就是通过例如“class MyAction implements PrivilegedExceptionAction {…}”这样的形式加以定义,然后就通过“new MyAction(…)”加以创建。但是也可以采用像这里的动态定义,把定义和创建都放在一个复合语句中。这里的“new PrivilegedExceptionAction”表示创建一个实现了PrivilegedExceptionAction界面的某个无名类对象,而后面的花括号中就是这个类的定义。当然,动态定义只适用于形式上比较简单的类,像这里只有一个方法,就是run();复杂一些的类就不适合动态定义了,因为那会使程序的可读性变差。
另外,界面PrivilegedExceptionAction<Job>的定义是一种模板,说明像这样同名的界面可以有好多,其中之一是针对Job类的。这样,“new PrivilegedExceptionAction<Job>()”就是创建一个实现了针对Job类的PrivilegedExceptionAction界面的某类对象。
还有个问题,这里的clientUgi是怎么来的?前面说了,创建JobClient对象时的init()阶段所做的事情之一就是通过UserGroupInformation.getCurrentUser()获取当前用户的UGI。但是这个getCurrentUser()又怎么能得到应该放在UGI中的那些信息呢?在用户这一边,这些信息来自宿主操作系统。以ValueAggregatorJob这个作业为例,用户是在某个节点机的宿主操作系统上通过类似于“java ValueAggregatorJob …”这样的命令行发起一个JVM进程,来执行这个类的main()函数,以提交作业的。在宿主操作系统上,这个JVM进程属于当前用户,而当前用户是通过login过程进入系统的,所以操作系统知道这个用户是谁,属于哪一个组。所以,UserGroupInformation.getCurrentUser()通过操作系统可以获取当前用户的身份信息。
不仅如此,这里所创建的UGI将会伴随着这个作业的整个生命周期,不管这个作业被提交到哪里,指派到哪里,构成这作业的任务被分配到哪里,这个UGI都会如影相随。
回到所创建PrivilegedExceptionAction对象的run()函数:
[ValueAggregatorJob.main()> JobClient.runJob()> JobClient.submitJob > JobClient.submitJobInternal()> UserGroupInformation.doAs()> PrivilegedExceptionAction.run()] publ ic Job run(){ Job job=Job.getInstance(conf); job.submit(); //提交作业 return j ob; }
这里先通过Job类的getInstance()方法获取代表着本作业的Job类对象,实际上就是创建一个Job类的对象job。注意用作getInstance()参数的conf是个JobConf类对象,这是从上面一路传下来的,有关本次作业的所有信息都在这个对象(数据结构)中。然后,就调用这个Job类对象的submit()函数,提交这个作业。
值得注意的是,在Hadoop的代码中,有两个关于Job类的定义,一个在mapred分支上,另一个在mapreduce分支上。而submitJobInternal()也有两个,我们正在分析的是作为JobClient类的一个方法函数的submitJobInternal(),也在mapred分支上,所以是“老”的作业提交方法。那么,按说这里引用的Job类也应该是定义于mapred分支上的那一个了?然而偏不是。要知道这里引用的是哪一个Job类,最可靠的办法是看看源码文件头部的import,看它导入的究竟是哪一个Job类。事实上在JobClient.java文件头部有这么一行:
import org.apache.hadoop.mapreduce.Job;
所以这里引用的Job类是定义在mapreduce分支上的那一个。也就是说,程序的流程转到新API上去了。
这又是一个体量挺大的类型,定义了许多方法,下面的定义摘要只列出了我们马上就要用到的成分,其余的则在将来要用到的时候再加说明:
class Job extends JobContextImpl implements JobContext {} ]… //数据部分,还有更多的数据在父类JobContextImpl 中 ]static {ConfigUtil.loadResources(); } //本类的静态初始化 ]Job(JobConf conf) //Job类的构建方法 > super(conf, null) //调用父类JobContextImpl 的构建方法 > this.credentials.mergeAll(this.ugi.getCredentials()) ]getInstance(Configuration conf) //根据Configuration创建Job对象 > j obConf=new JobConf(conf) > new Job(j obConf) //创建Job类对象 ]getCluster() ]setUseNewAPI() ]connect() ]getJobSubmitter()
]submit() > … //详见后述 ]… //更多的操作方法
这个Job类提供一个方法getInstance(Configuration conf),它要求的调用参数是Configuration类对象,而上面run()给出的conf是JobConf类的。不过这倒不要紧,因为JobConf是对Configuration的扩充,可以被用作Configuration。可是getInstance()所要做的其实是创建一个Job类对象,创建时所需的参数却又是JobConf类对象,所以这里又根据Configuration类对象再创建起一个JobConf类对象。其实这里只要做一下类型转换(cast),或者在Job类中增加一个以JobConf为参数的getInstance(),就不需要这么来回绕了。至于Job类对象的创建,我们这里就不追下去了。
回到前面run()的代码中,现在已经根据上面传下来的JobConf创建了一个Job对象,下一步就是调用它的方法submit(),即Job.submit(),进而完成作业的提交。
而Job.submit(),则正是前面所说三种作业提交方法的汇聚点。从调用Job.submit()开始,在“地方”上的流程就进入了第二阶段。