MapReduce中parquet的split处理

在阅读《Hadoop权威指南》中parquet相关章节的时候,我想到了之前看到的MapReduce中split分片的代码,当时只看了基础的FileInputFormat,这次就借着这个机会来看看parquet是如何处理的。

文件格式

有关parquet文件格式的内容我这里就不多赘述了,网上有很多不错的文章,大家可以参考。

这里有一篇 快速入门的文章 ,看完之后我总结一下:

parquet主要分为3个部分:

  1. Header
  2. Block
  3. Footer

文件相关的元数据都在Footer中,而Block中则是一个Row Group,其中有一族Column Chunk。

源码分析

大致了解了一下parquet的文件格式之后,我们来看具体的源码:

/**
 * {@inheritDoc}
 */
@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
  Configuration configuration = ContextUtil.getConfiguration(jobContext);
  List<InputSplit> splits = new ArrayList<InputSplit>();

  if (isTaskSideMetaData(configuration)) {
    // Although not required by the API, some clients may depend on always
    // receiving ParquetInputSplit. Translation is required at some point.
    for (InputSplit split : super.getSplits(jobContext)) {
      Preconditions.checkArgument(split instanceof FileSplit,
          "Cannot wrap non-FileSplit: " + split);
      splits.add(ParquetInputSplit.from((FileSplit) split));
    }
    return splits;

  } else {
    splits.addAll(getSplits(configuration, getFooters(jobContext)));
  }

  return splits;
}

public static boolean isTaskSideMetaData(Configuration configuration) {
    return configuration.getBoolean(TASK_SIDE_METADATA, TRUE);
}

首先在源码中根据TASK_SIDE_METADATA,也就是parquet.task.side.metadata这个属性值去区分分片的逻辑,默认为true的情况下,走的是父类FileInputFormat的逻辑,也就是根据split size(默认128M)去进行分片。而如果这个值为false,则走另外一种形式的分片逻辑,这也是我们今年要了解的内容。

我们先来看getFooters方法:

public List<Footer> getFooters(JobContext jobContext) throws IOException {
  List<FileStatus> statuses = listStatus(jobContext);
  if (statuses.isEmpty()) {
    return Collections.emptyList();
  }
  Configuration config = ContextUtil.getConfiguration(jobContext);
  List<Footer> footers = new ArrayList<Footer>(statuses.size());
  Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
  Map<Path, FileStatusWrapper> missingStatusesMap =
          new HashMap<Path, FileStatusWrapper>(missingStatuses.size());

  if (footersCache == null) {
    footersCache =
            new LruCache<FileStatusWrapper, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
  }
  for (FileStatus status : statuses) {
    FileStatusWrapper statusWrapper = new FileStatusWrapper(status);
    FootersCacheValue cacheEntry =
            footersCache.getCurrentValue(statusWrapper);
    if (Log.DEBUG) {
      LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "")
              + " found for '" + status.getPath() + "'");
    }
    if (cacheEntry != null) {
      footers.add(cacheEntry.getFooter());
    } else {
      missingStatuses.add(status);
      missingStatusesMap.put(status.getPath(), statusWrapper);
    }
  }
  if (Log.DEBUG) {
    LOG.debug("found " + footers.size() + " footers in cache and adding up "
            + "to " + missingStatuses.size() + " missing footers to the cache");
  }


  if (missingStatuses.isEmpty()) {
    return footers;
  }

  List<Footer> newFooters = getFooters(config, missingStatuses);
  for (Footer newFooter : newFooters) {
    // Use the original file status objects to make sure we store a
    // conservative (older) modification time (i.e. in case the files and
    // footers were modified and it's not clear which version of the footers
    // we have)
    FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile());
    footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter));
  }

  footers.addAll(newFooters);
  return footers;
}

顾名思义,这个方法就是去获取parquet文件的footer,而根据上面文件格式的学习我们可以得知,footer中有parquet文件的元数据,其中最重要的就是schema信息还有对应每一个block的元数据了:

private final FileMetaData fileMetaData;
private final List<BlockMetaData> blocks;

public final class FileMetaData implements Serializable {
  private static final long serialVersionUID = 1L;

  private final MessageType schema;

  private final Map<String, String> keyValueMetaData;

  private final String createdBy;
}

在获取到了文件的footer之后,调用了getSplits方法:

public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
  boolean strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
  final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
  final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
  if (maxSplitSize < 0 || minSplitSize < 0) {
    throw new ParquetDecodingException("maxSplitSize or minSplitSize should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
  }
  GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, strictTypeChecking);
  ReadContext readContext = getReadSupport(configuration).init(new InitContext(
      configuration,
      globalMetaData.getKeyValueMetaData(),
      globalMetaData.getSchema()));

  return new ClientSideMetadataSplitStrategy().getSplits(
      configuration, footers, maxSplitSize, minSplitSize, readContext);
}

让我们来看最后的getSplits:

List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
    long maxSplitSize, long minSplitSize, ReadContext readContext)
    throws IOException {
  List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
  Filter filter = ParquetInputFormat.getFilter(configuration);

  long rowGroupsDropped = 0;
  long totalRowGroups = 0;

  for (Footer footer : footers) {
    final Path file = footer.getFile();
    LOG.debug(file);
    FileSystem fs = file.getFileSystem(configuration);
    FileStatus fileStatus = fs.getFileStatus(file);
    ParquetMetadata parquetMetaData = footer.getParquetMetadata();
    List<BlockMetaData> blocks = parquetMetaData.getBlocks();

    List<BlockMetaData> filteredBlocks;

    totalRowGroups += blocks.size();
    filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
    rowGroupsDropped += blocks.size() - filteredBlocks.size();

    if (filteredBlocks.isEmpty()) {
      continue;
    }

    BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    splits.addAll(
        generateSplits(
            filteredBlocks,
            fileBlockLocations,
            fileStatus,
            readContext.getRequestedSchema().toString(),
            readContext.getReadSupportMetadata(),
            minSplitSize,
            maxSplitSize)
        );
  }

  if (rowGroupsDropped > 0 && totalRowGroups > 0) {
    int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
    LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
  } else {
    LOG.info("There were no row groups that could be dropped due to filter predicates");
  }
  return splits;
}

该方法首先根据Hadoop的FileSystem获取到了文件所对应的block信息,这一步和FileInputFormat如出一辙。

接着调用了generateSplits方法:

static <T> List<ParquetInputSplit> generateSplits(
        List<BlockMetaData> rowGroupBlocks,
        BlockLocation[] hdfsBlocksArray,
        FileStatus fileStatus,
        String requestedSchema,
        Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {

  List<SplitInfo> splitRowGroups =
      generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);

  //generate splits from rowGroups of each split
  List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
  for (SplitInfo splitInfo : splitRowGroups) {
    ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
    resultSplits.add(split);
  }
  return resultSplits;
}

该方法中又调用了generateSplitInfo:

static List<SplitInfo> generateSplitInfo(
    List<BlockMetaData> rowGroupBlocks,
    BlockLocation[] hdfsBlocksArray,
    long minSplitSize, long maxSplitSize) {
  List<SplitInfo> splitRowGroups;

  if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
    throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
  }
  HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
  hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
  SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());

  //assign rowGroups to splits
  splitRowGroups = new ArrayList<SplitInfo>();
  checkSorted(rowGroupBlocks);//assert row groups are sorted
  for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
    if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
           && currentSplit.getCompressedByteSize() >= minSplitSize
           && currentSplit.getCompressedByteSize() > 0)
         || currentSplit.getCompressedByteSize() >= maxSplitSize) {
      //create a new split
      splitRowGroups.add(currentSplit);//finish previous split
      currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
    }
    currentSplit.addRowGroup(rowGroupMetadata);
  }

  if (currentSplit.getRowGroupCount() > 0) {
    splitRowGroups.add(currentSplit);
  }

  return splitRowGroups;
}

这个方法就是最核心的方法了,其中的逻辑大致是就是根据checkBelongingToANewHDFSBlock这个方法去判断parquet文件的某一个block是否在hdfs的block中:

private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
  boolean isNewHdfsBlock = false;
  long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);

  //if mid point is not in the current HDFS block any more, return true
  while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
    isNewHdfsBlock = true;
    currentMidPointHDFSBlockIndex++;
    if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
      throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
              + rowGroupMidPoint
              + ", the end of the hdfs block is "
              + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
  }

  while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
    currentStartHdfsBlockIndex++;
    if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
      throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
              + rowGroupMetadata.getStartingPos()
              + " but the end of hdfs blocks of file is "
              + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
  }
  return isNewHdfsBlock;
}

可以看到,checkBelongingToANewHDFSBlock内部就是通过文件offset的方式去判断的。

由上可知,generateSplitInfo就是去做了一个parquet的block和hdfs的block的映射,一个hdfs的block可以对应多个parquet文件的block。换句话说,代码中的SplitInfo就可以对应多个parquet的block:

static class SplitInfo {
  List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
  BlockLocation hdfsBlock;
  long compressedByteSize = 0L;
}

最后让我们来看一下SplitInfo的getParquetInputSplit方法:

public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
    MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
    long length = 0;

    for (BlockMetaData block : this.getRowGroups()) {
      List<ColumnChunkMetaData> columns = block.getColumns();
      for (ColumnChunkMetaData column : columns) {
        if (requested.containsPath(column.getPath().toArray())) {
          length += column.getTotalSize();
        }
      }
    }

    BlockMetaData lastRowGroup = this.getRowGroups().get(this.getRowGroupCount() - 1);
    long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();

    long[] rowGroupOffsets = new long[this.getRowGroupCount()];
    for (int i = 0; i < rowGroupOffsets.length; i++) {
      rowGroupOffsets[i] = this.getRowGroups().get(i).getStartingPos();
    }

    return new ParquetInputSplit(
            fileStatus.getPath(),
            hdfsBlock.getOffset(),
            end,
            length,
            hdfsBlock.getHosts(),
            rowGroupOffsets
    );
  }
}

很简单,就是去生成一个FileSplit,length就是对应所有Row Group的大小。

总结

通过上面的学习,我们可以知道,ParquetInputFormat内部有2种分片策略:

  1. 使用其父类的FileInputFormat的分片策略,根据split size来进行划分。
  2. 结合parquet文件格式自身的特性,根据block,也就是Row Group进行划分。
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章