温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Hadoop如何实现job提交

发布时间:2021-12-09 15:13:12 来源:亿速云 阅读:132 作者:小新 栏目:云计算

小编给大家分享一下Hadoop如何实现job提交,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

从如下地方开始,就要进行job的提交了

boolean isSuccess = job.waitForCompletion(true);

之后,进入Job类的waitForCompletion方法。

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    //----
    return isSuccessful();
  }> > 这里输入引用文本

之后调用Job类的submit方法,

public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
     
   }

connect方法负责初始化集群信息:

  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

集群信息cluster,包括什么,应该很清晰:

private ClientProtocolProvider clientProtocolProvider; private ClientProtocol client;

private UserGroupInformation ugi; private Configuration conf; private FileSystem fs = null;

private Path sysDir = null; private Path stagingAreaDir = null; private Path jobHistoryDir = null;

略微分析下, ClientProtocolProvider是客户端协议的生产者,对应的客户端是ClientProtocol。

ClientProtocolProvider规定了2个方法:

  • create

  • close 分别也用来创建和关闭客户端ClientProtocol。

而,ClientProtocolProvider的具体实现类有2个。

Hadoop如何实现job提交

可以看到,有两个协议生产者,分别是yarn和local的。

那么,对应的客户端ClientProtocol,也会有两个。 Hadoop如何实现job提交

ClientProtocol是个接口,里面规定了如下几个方法: Hadoop如何实现job提交

那么,不同的客户端yarn或者local,实现其中的方法即可。 因为,我们是本地Eclipse运行,直接看local即可,yarn的原理差不多,

OK,经过connect方法之后,cluster中这几个就有啦,即使没有的话,get的时候,也会初始化的。

之后, 使用集群的,FileSystem和client创建一个submiter。

 final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());

然后,调用submitter 的submitJobInternal方法提交作业,OK,进入submitJobInternal方法。

JobSubmiter类的submitJobInternal方法大致过程如下:

checkSpecs(job); 检查作业输出路径。

//获得staging路径,注意:集群cluster中有这个路径的名称,只不过这里需要创建路径。 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

//在staging路径下创建一个以jobid为标示的文件夹 JobID jobId = submitClient.getNewJobID(); Path submitJobDir = new Path(jobStagingArea, jobId.toString());

//job需要的一些文件和jar包之类的,都放到刚才的那个submitJobDir路径下 copyAndConfigureFiles(job, submitJobDir);

具体的东西包括:

 String files = conf.get("tmpfiles");
 String libjars = conf.get("tmpjars");
 String archives = conf.get("tmparchives");

//写入job输入的分片信息 int maps = writeSplits(job, submitJobDir);

split信息包括两个部分。 首先调用Inputformat获得分片的个数,具体如何获得,后续讲。 将返回的分片数组逐个遍历并持久化到一个文件。

SplitMetaInfo[] info = writeNewSplits(conf, splits, out); 而writeNewSplits代码主要就是写分片信息到文件中。 Hadoop如何实现job提交

之后,将split的分片信息持久化一个元数据文件。 writeJobSplitMetaInfo方法。 Hadoop如何实现job提交

//将job的描述信息,写到一个job.xml放到相应的staging目录下的jobid目录。 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); writeConf(conf, submitJobFile);

 FSDataOutputStream out = 
      FileSystem.create(jtFs, jobFile, 
                        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
    try {
      conf.writeXml(out);
    } finally {
      out.close();
    }

//提交作业 status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());

OK,提交作业部分的代码就到这,后续写写,app master运行的过程。

总结,提交作业的主要功能。

  • 创建staging路径

  • 在staging路径下面创建作业id的路径

  • 把job相关的文件拷贝到路径下

  • 将job的split信息序列化到文件中

  • 将job的xml写到路径下

这些东西都放到hdfs,作为所有节点共享访问的地方。之后,app master会访问这个目录,copy job的配置文件到本地并创建job对象,并根据split的信息,创建对应的maptaskrunable。运行。

但是,总的job信息依然在hdfs上。

以上是“Hadoop如何实现job提交”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI