2019-03-02 13:42  阅读(2433)
文章分类:Hadoop 学习之旅 文章标签:大数据HadoopHadoop 学习
©  原文作者:扎心了,老铁 原文地址:https://www.cnblogs.com/qingyunzong/category/1169344.html

作者:扎心了,老铁

出处:https://www.cnblogs.com/qingyunzong/category/1169344.html


MapReduce 多 Job 串联

需求

一个稍复杂点的处理逻辑往往需要多个 MapReduce 程序串联处理,多 job 的串联可以借助 MapReduce 框架的 JobControl 实现

实例

以下有两个 MapReduce 任务,分别是 Flow 的 SumMR 和 SortMR,其中有依赖关系:SumMR 的输出是 SortMR 的输入,所以 SortMR 的启动得在 SumMR 完成之后

Configuration conf1 = new Configuration();
        Configuration conf2 = new Configuration();

        Job job1 = Job.getInstance(conf1);
        Job job2 = Job.getInstance(conf2);

        job1.setJarByClass(MRScore3.class);
        job1.setMapperClass(MRMapper3_1.class);
        //job.setReducerClass(ScoreReducer3.class);

        job1.setMapOutputKeyClass(IntWritable.class);
        job1.setMapOutputValueClass(StudentBean.class);
        job1.setOutputKeyClass(IntWritable.class);
        job1.setOutputValueClass(StudentBean.class);

        job1.setPartitionerClass(CoursePartitioner2.class);

        job1.setNumReduceTasks(4);

        Path inputPath = new Path("D:\\MR\\hw\\work3\\input");
        Path outputPath = new Path("D:\\MR\\hw\\work3\\output_hw3_1");

        FileInputFormat.setInputPaths(job1, inputPath);
        FileOutputFormat.setOutputPath(job1, outputPath);

        job2.setMapperClass(MRMapper3_2.class);
        job2.setReducerClass(MRReducer3_2.class);

        job2.setMapOutputKeyClass(IntWritable.class);
        job2.setMapOutputValueClass(StudentBean.class);
        job2.setOutputKeyClass(StudentBean.class);
        job2.setOutputValueClass(NullWritable.class);

        Path inputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_1");
        Path outputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_end");

        FileInputFormat.setInputPaths(job2, inputPath2);
        FileOutputFormat.setOutputPath(job2, outputPath2);

        **JobControl control** **= new JobControl("Score3");

        ControlledJob aJob = new ControlledJob(job1.getConfiguration());
        ControlledJob bJob = new** **ControlledJob(job2.getConfiguration());**
        // 设置作业依赖关系
        **bJob.addDependingJob(aJob);

        control.addJob(aJob);
        control.addJob(bJob);

        Thread thread** **= new Thread(control);
        thread.start();

        while(!control.allFinished()) {
            thread.sleep(1000);
        }
        System.exit(0);**

MapReduce 全局计数器

MapReduce计数器是什么?

计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。

MapReduce计数器能做什么?

MapReduce 计数器(Counter)为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些 Counter 的数值表现出来的。

MapReduce 都有哪些内置计数器?

MapReduce 自带了许多默认Counter,现在我们来分析这些默认 Counter 的含义,方便大家观察 Job 结果,如输入的字节数、输出的字节数、Map端输入/输出的字节数和条数、Reduce端的输入/输出的字节数和条数等。下面我们只需了解这些内置计数器,知道计数器组名称(groupName)和计数器名称(counterName),以后使用计数器会查找groupName和counterName即可。

1、任务计数器

在任务执行过程中,任务计数器采集任务的相关信息,每个作业的所有任务的结果会被聚集起来。例如,MAP_INPUT_RECORDS 计数器统计每个map任务输入记录的总数,并在一个作业的所有map任务上进行聚集,使得最终数字是整个作业的所有输入记录的总数。任务计数器由其关联任务维护,并定期发送给TaskTracker,再由TaskTracker发送给 JobTracker。因此,计数器能够被全局地聚集。下面我们分别了解各种任务计数器。

1)MapReduce 任务计数器

MapReduce 任务计数器的 groupName为org.apache.hadoop.mapreduce.TaskCounter,它包含的计数器如下表所示

计数器名称 说明
map输入的记录数(MAP_INPUT_RECORDS) 作业中所有map已处理的输入记录数。每次RecorderReader读到一条记录并将其传给map的map()函数时,该计数器的值增加。
map跳过的记录数(MAP_SKIPPED_RECORDS) 作业中所有map跳过的输入记录数。
map输入的字节数(MAP_INPUT_BYTES) 作业中所有map已处理的未经压缩的输入数据的字节数。每次RecorderReader读到一条记录并将其传给map的map()函数时,该计数器的值增加
分片split的原始字节数(SPLIT_RAW_BYTES) 由map读取的输入-分片对象的字节数。这些对象描述分片元数据(文件的位移和长度),而不是分片的数据自身,因此总规模是小的
map输出的记录数(MAP_OUTPUT_RECORDS) 作业中所有map产生的map输出记录数。每次某一个map的Context调用write()方法时,该计数器的值增加
map输出的字节数(MAP_OUTPUT_BYTES) 作业中所有map产生的未经压缩的输出数据的字节数。每次某一个map的Context调用write()方法时,该计数器的值增加。
map输出的物化字节数(MAP_OUTPUT_MATERIALIZED_BYTES) map输出后确实写到磁盘上的字节数;若map输出压缩功能被启用,则会在计数器值上反映出来
combine输入的记录数(COMBINE_INPUT_RECORDS) 作业中所有Combiner(如果有)已处理的输入记录数。Combiner的迭代器每次读一个值,该计数器的值增加。
combine输出的记录数(COMBINE_OUTPUT_RECORDS) 作业中所有Combiner(如果有)已产生的输出记录数。每当一个Combiner的Context调用write()方法时,该计数器的值增加。
reduce输入的组(REDUCE_INPUT_GROUPS) 作业中所有reducer已经处理的不同的码分组的个数。每当某一个reducer的reduce()被调用时,该计数器的值增加。
reduce输入的记录数(REDUCE_INPUT_RECORDS) 作业中所有reducer已经处理的输入记录的个数。每当某个reducer的迭代器读一个值时,该计数器的值增加。如果所有reducer已经处理完所有输入,则该计数器的值与计数器“map输出的记录”的值相同
reduce输出的记录数(REDUCE_OUTPUT_RECORDS) 作业中所有map已经产生的reduce输出记录数。每当某一个reducer的Context调用write()方法时,该计数器的值增加。
reduce跳过的组数(REDUCE_SKIPPED_GROUPS) 作业中所有reducer已经跳过的不同的码分组的个数。
reduce跳过的记录数(REDUCE_SKIPPED_RECORDS) 作业中所有reducer已经跳过输入记录数。
reduce经过shuffle的字节数(REDUCE_SHUFFLE_BYTES) shuffle将map的输出数据复制到reducer中的字节数。
溢出的记录数(SPILLED_RECORDS) 作业中所有map和reduce任务溢出到磁盘的记录数
CPU毫秒(CPU_MILLISECONDS) 总计的CPU时间,以毫秒为单位,由/proc/cpuinfo获取
物理内存字节数(PHYSICAL_MEMORY_BYTES) 一个任务所用物理内存的字节数,由/proc/cpuinfo获取
虚拟内存字节数(VIRTUAL_MEMORY_BYTES) 一个任务所用虚拟内存的字节数,由/proc/cpuinfo获取
有效的堆字节数(COMMITTED_HEAP_BYTES) 在JVM中的总有效内存量(以字节为单位),可由Runtime().getRuntime().totaoMemory()获取。
GC运行时间毫秒数(GC_TIME_MILLIS) 在任务执行过程中,垃圾收集器(garbagecollection)花费的时间(以毫秒为单位),可由GarbageCollectorMXBean.getCollectionTime()获取;该计数器并未出现在1.x版本中。
由shuffle传输的map输出数(SHUFFLED_MAPS) 有shuffle传输到reducer的map输出文件数。
失败的shuffle数(SHUFFLE_MAPS) 在shuffle过程中,发生拷贝错误的map输出文件数,该计数器并没有包含在1.x版本中。
被合并的map输出数 在shuffle过程中,在reduce端被合并的map输出文件数,该计数器没有包含在1.x版本中。

2)文件系统计数器

文件系统计数器的 groupName为org.apache.hadoop.mapreduce.FileSystemCounter,它包含的计数器如下表所示

计数器名称 说明
文件系统的读字节数(BYTES_READ) 由map和reduce等任务在各个文件系统中读取的字节数,各个文件系统分别对应一个计数器,可以是Local、HDFS、S3和KFS等。
文件系统的写字节数(BYTES_WRITTEN) 由map和reduce等任务在各个文件系统中写的字节数。

3)FileInputFormat 计数器

FileInputFormat 计数器的 groupName为org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter,它包含的计数器如下表所示,计数器名称列的括号()内容即为counterName

计数器名称 说明
读取的字节数(BYTES_READ) 由map任务通过FileInputFormat读取的字节数

4)FileOutputFormat 计数器

FileOutputFormat 计数器的 groupName为org.apache.hadoop.mapreduce.lib.input.FileOutputFormatCounter,它包含的计数器如下表所示

计数器名称 说明
写的字节数(BYTES_WRITTEN) 由map任务(针对仅含map的作业)或者reduce任务通过FileOutputFormat写的字节数。

2、作业计数器

作业计数器由 JobTracker(或者 YARN)维护,因此无需在网络间传输数据,这一点与包括 “用户定义的计数器” 在内的其它计数器不同。这些计数器都是作业级别的统计量,其值不会随着任务运行而改变。 作业计数器计数器的 groupName为org.apache.hadoop.mapreduce.JobCounter,它包含的计数器如下表所示

*计数器名称 说明
启用的map任务数(TOTAL_LAUNCHED_MAPS) 启动的map任务数,包括以“推测执行”方式启动的任务。
启用的reduce任务数(TOTAL_LAUNCHED_REDUCES) 启动的reduce任务数,包括以“推测执行”方式启动的任务
失败的map任务数(NUM_FAILED_MAPS) 失败的map任务数。
失败的reduce任务数(NUM_FAILED_REDUCES) 失败的reduce任务数。
数据本地化的map任务数(DATA_LOCAL_MAPS) 与输入数据在同一节点的map任务数。
机架本地化的map任务数(RACK_LOCAL_MAPS) 与输入数据在同一机架范围内、但不在同一节点上的map任务数。
其它本地化的map任务数(OTHER_LOCAL_MAPS) 与输入数据不在同一机架范围内的map任务数。由于机架之间的宽带资源相对较少,Hadoop会尽量让map任务靠近输入数据执行,因此该计数器值一般比较小。
map任务的总运行时间(SLOTS_MILLIS_MAPS) map任务的总运行时间,单位毫秒。该计数器包括以推测执行方式启动的任务。
reduce任务的总运行时间(SLOTS_MILLIS_REDUCES) educe任务的总运行时间,单位毫秒。该值包括以推测执行方式启动的任务。
在保留槽之后,map任务等待的总时间(FALLOW_SLOTS_MILLIS_MAPS) 在为map任务保留槽之后所花费的总等待时间,单位是毫秒。
在保留槽之后,reduce任务等待的总时间(FALLOW_SLOTS_MILLIS_REDUCES) 在为reduce任务保留槽之后,花在等待上的总时间,单位为毫秒。

计数器的该如何使用?

下面我们来介绍如何使用计数器。

1、定义计数器

  1)枚举声明计数器

 // 自定义枚举变量Enum
Counter counter = context.getCounter(Enum enum)

 2)自定义计数器

 / 自己命名groupName和counterName
Counter counter = context.getCounter(String groupName,String counterName)

 2、为计数器赋值

  1)初始化计数器

 counter.setValue(long value);// 设置初始值

 2)计数器自增

 counter.increment(long incr);// 增加计数

 3、获取计数器的值

  1. 获取枚举计数器的值


    Configuration conf = new Configuration(); Job job = new Job(conf, "MyCounter"); job.waitForCompletion(true); Counters counters=job.getCounters(); Counter counter=counters.findCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG);// 查找枚举计数器,假如Enum的变量为BAD_RECORDS_LONG long value=counter.getValue();//获取计数值

  2. 获取自定义计数器的值


    Configuration conf = new Configuration(); Job job = new Job(conf, "MyCounter"); job.waitForCompletion(true); Counters counters = job.getCounters(); Counter counter=counters.findCounter("ErrorCounter","toolong");// 假如groupName为ErrorCounter,counterName为toolong long value = counter.getValue();// 获取计数值

  3. 获取内置计数器的值


    Configuration conf = new Configuration(); Job job = new Job(conf, "MyCounter"); job.waitForCompletion(true); Counters counters=job.getCounters(); // 查找作业运行启动的reduce个数的计数器,groupName和counterName可以从内置计数器表格查询(前面已经列举有) Counter counter=counters.findCounter("org.apache.hadoop.mapreduce.JobCounter","TOTAL_LAUNCHED_REDUCES");// 假如groupName为org.apache.hadoop.mapreduce.JobCounter,counterName为TOTAL_LAUNCHED_REDUCES long value=counter.getValue();// 获取计数值

  4. 获取所有计数器的值


    Configuration conf = new Configuration(); Job job = new Job(conf, "MyCounter"); Counters counters = job.getCounters(); for (CounterGroup group : counters) { for (Counter counter : group) { System.out.println(counter.getDisplayName() + ": " + counter.getName() + ": "+ counter.getValue()); } }

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> Hadoop学习之路(十五)MapReduce的多Job串联和全局计数器
上一篇
Hadoop学习之路(十四)MapReduce的核心运行机制
下一篇
Hadoop学习之路(十七)MapReduce框架Partitoner分区