视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
MR总结(三)-MapReduce组件自定义
2020-11-09 12:58:50 责编:小采
文档

自定义InputFormat InputFormat主要包括: ? ? ? ? ? InputSplit和RecordReader ? ? InputSplit用于定义Map的数目和确定最合适的执行节点(位置) ? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。 ? 一个自定义分片实现要继承与抽

自定义InputFormat

InputFormat主要包括:

? ? ? ? ? InputSplit和RecordReader

? ?InputSplit用于定义Map的数目和确定最合适的执行节点(位置)

? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。

? 一个自定义分片实现要继承与抽象类InputSplit,通过定义输入的长度和位置。分片的位置暗示调度器如何是放置一个分片的执行器(即,选择一个合适的TaskTracker)

? JobTracker处理分片的算法大致是:

  1. 通过TaskTracker节点的心跳获取可用的map槽资源
  2. 从排队等候的分片中找出那些可用的节点是本地的
  3. 向TaskTracker提交分片

? 基于存储机制和执行策略,分片的大小和位置是有不同的意思。例如在HDFS上,一个分片和一个物理数据块是一致的,分片的位置是这个数据块的物理存放位置的一个集合。

? 下面是FileInputFormat工作的机制:

  1. 继承InputSplit,计算文件的信息。如文件中数据块的开始位置和块的长度
  2. 获取文件的数据块的一个集合
  3. 创建一个数据分片,这个分片的长度和块大小一样,分片位置为这个快的位置。此外还含有文件位置,块位移,长度等信息。

下面是FileInputFormat创建分片的代码:


/**
 * Generate the list of files and make them into FileSplits.
 * @param job the job context
 * @throws IOException
 */
 public List getSplits(JobContext job) throws IOException {
 Stopwatch sw = new Stopwatch().start();
 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 long maxSize = getMaxSplitSize(job);

 // generate splits
 List splits = new ArrayList();
 List files = listStatus(job);
 for (FileStatus file: files) {
 Path path = file.getPath();
 long length = file.getLen();
 if (length != 0) {
 BlockLocation[] blkLocations;
 if (file instanceof LocatedFileStatus) {
 blkLocations = ((LocatedFileStatus) file).getBlockLocations();
 } else {
 FileSystem fs = path.getFileSystem(job.getConfiguration());
 blkLocations = fs.getFileBlockLocations(file, 0, length);
 }

 if (isSplitable(job, path)) {
 long blockSize = file.getBlockSize();
 long splitSize = computeSplitSize(blockSize, minSize, maxSize);

 long bytesRemaining = length;
 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
 splits.add(makeSplit(path, length-bytesRemaining, splitSize,
 blkLocations[blkIndex].getHosts(),
 blkLocations[blkIndex].getCachedHosts()));
 bytesRemaining -= splitSize;
 }

 if (bytesRemaining != 0) {
 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
 blkLocations[blkIndex].getHosts(),
 blkLocations[blkIndex].getCachedHosts()));
 }
 } else { // not splitable
 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
 blkLocations[0].getCachedHosts()));
 }
 } else {
 //Create empty hosts array for zero length files
 splits.add(makeSplit(path, 0, length, new String[0]));
 }
 }
 // Save the number of input files for metrics/loadgen
 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
 sw.stop();
 if (LOG.isDebugEnabled()) {
 LOG.debug("Total # of splits generated by getSplits: " + splits.size()
 + ", TimeTaken: " + sw.elapsedMillis());
 }
 return splits;
 }

方法创建分片过程主要做了下面几件事:1、首先从Job对象中获取输入文件的状态信息FileStatus。2.然后针对每个文件获取块信息。3.根据文件是否可分割按照分片大小进行切割,如果不能则不分割。

?案例:实现计算密集型InputFormat

有一种比较常见的MapReduce程序:计算密集型程序。

计算密集型MR程序即指:对于与每一个输入的键值对需要复杂的计算算法去处理,主要特征是每一个map处理函数需要比获取该处理数据更长的时间,至少一个量级。比如人脸识别程序。

如果使用默认的FileInputFormat去处理该类型应用的话,很大情况下会出现部门机器cpu负载过高,而其他的则比较闲。(可以通过ganglia监控分析)

默认情况下由于FileInputFormat的实现会根据数据的本地性来创建分片数据。然而对于计算密集型的程序数据的本地性可能不适合了。那我们应该如何做出改变呢?我们获取所有可用服务器各自计算能力的情况,根据服务器来分配创建分片。

因此我们需要重载分片函数。

下面我们重载SequenceFileInputFormat来演示如何实现上述要求:

ComputeIntensiveSequenceFileInputFormat继承SequenceFileInputFormat函数,重载gitSplits函数:

//重写分片函数
 @Override
 public List getSplits(JobContext job) throws IOException {
 String[] servers = getActiveServersList(job);
 if (servers == null)
 return null;
 List splits = new ArrayList();
 List files = listStatus(job);
 int currentServer = 0;
 for (FileStatus file : files) {
 Path path = file.getPath();
 long length = file.getLen();
 if ((length != 0) && isSplitable(job, path)) {
 long blockSize = file.getBlockSize();
 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
 long bytesRemaining = length;
 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
 splits.add(new FileSplit(path, length - bytesRemaining,
 splitSize, new String[] { servers[currentServer] }));
 currentServer = getNextServer(currentServer, servers.length);
 bytesRemaining -= splitSize;
 }
 } else if (length != 0) {
 splits.add(new FileSplit(path, 0, length,
 new String[] { servers[currentServer] }));
 currentServer = getNextServer(currentServer, servers.length);
 }
 }
 // Save the number of input files in the job-conf
 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
 return splits;
 }
 //获取服务器列表
 private String[] getActiveServersList(JobContext context) {
 String[] servers = null;
 try {
 JobClient jc = new JobClient((JobConf) context.getConfiguration());
 ClusterStatus status = jc.getClusterStatus(true);
 Collection atc = status.getActiveTrackerNames();
 servers = new String[atc.size()];
 int s = 0;
 for (String serverInfo : atc) {
 StringTokenizer st = new StringTokenizer(serverInfo, ":");
 String trackerName = st.nextToken();
 StringTokenizer st1 = new StringTokenizer(trackerName, "_");
 st1.nextToken();
 servers[s++] = st1.nextToken();
 }
 } catch (IOException e) {
 e.printStackTrace();
 }
 return servers;
 }
 //选择一个服务器
 private static int getNextServer(int current, int max) {
 current++;
 if (current >= max)
 current = 0;
 return current;
 }

这个类继承于SequenceFileInputFormat,重写了getSplits()函数。计算分片的和FileInputFormat一样。只是原来数据的物理本地性由可用的服务器资源代替。两个主要函数:
getActiveServersList() 查询集群状态,计算一个可用的服务器名字列表
getNextServer() 获取一个服务器

优化:
上面方案是否就已经很完美,没有问题了呢?答案是否定的。
在上面我们替换了数据物理本地性这个特性,那么会导致更多的数据传输的问题,会给网络io带来压力,影响io性能。

因此我们想到可以把两个策略综合起来。首先放置尽可能多的任务为本地,分发剩下的到其他节点。
下面是实现这个方案的程序ComputeIntensiveLocalizedSequenceFileInputFormat:

@Override
 public List getSplits(JobContext job) throws IOException {
 List originalSplits = super.getSplits(job);
 String[] servers = getActiveServersList(job);
 if (servers == null)
 return null;
 List splits = new ArrayList(
 originalSplits.size());
 int numSplits = originalSplits.size();
 int currentServer = 0;
 for (int i = 0; i < numSplits; i++, currentServer = getNextServer(
 currentServer, servers.length)) {
 String server = servers[currentServer]; // Current server
 boolean replaced = false;
 for (InputSplit split : originalSplits) {
 FileSplit fs = (FileSplit) split;
 for (String l : fs.getLocations()) {
 if (l.equals(server)) {
 splits.add(new FileSplit(fs.getPath(), fs.getStart(),
 fs.getLength(), new String[] { server }));
 originalSplits.remove(split);
 replaced = true;
 break;
 }
 }
 if (replaced)
 break;
 }
 if (!replaced) {
 FileSplit fs = (FileSplit) splits.get(0);
 splits.add(new FileSplit(fs.getPath(), fs.getStart(), fs
 .getLength(), new String[] { server }));
 originalSplits.remove(0);
 }
 }
 return splits;
 }

这里第一步利用父类(FileInputFormat))获取分片来确保数据本地性。对于每一个服务器,首先试着去指定本地的split给它。其他没有本地分片的则随机分配剩下的分片。

总结:

MapReduce的输入格式的重写主要要主要两个组件:InputFormat和Recordreader。本文主要讲述InputFormat的原理及怎么来重写InputFormat,根据业务的特点选择创建分片的策略。

下载本文
显示全文
专题